Branch data Line data Source code
1 : : // *****************************************************************************
2 : : /*!
3 : : \file src/zmq_util.cpp
4 : : \copyright 2022-2025 J. Bakosi,
5 : : All rights reserved. See the LICENSE file for details.
6 : : \brief Piac ZeroMQ utilities
7 : : */
8 : : // *****************************************************************************
9 : :
10 : : #include <string>
11 : : #include <unistd.h>
12 : :
13 : : #include "zmq_util.hpp"
14 : : #include "logging_util.hpp"
15 : :
16 : : #define REQUEST_TIMEOUT 3000 // msecs, (> 1000!)
17 : : #define REQUEST_RETRIES 5 // before we abandon trying to send
18 : :
19 : : zmqpp::socket
20 : 87 : piac::daemon_socket( zmqpp::context& ctx,
21 : : const std::string& host,
22 : : const std::string& rpc_server_public_key,
23 : : const zmqpp::curve::keypair& client_keys )
24 : : // *****************************************************************************
25 : : // Create ZMQ socket
26 : : //! \param[in,out] ctx ZeroMQ socket context
27 : : //! \param[in] host Hostname or IP + port to send cmd to
28 : : //! \param[in] rpc_server_public_key CurveZMQ server public key to use
29 : : //! \param[in] client_keys CurveMQ client keypair to use
30 : : //! \return ZMQ socket created
31 : : //! \see http://curvezmq.org
32 : : // *****************************************************************************
33 : : {
34 [ + - ][ + - ]: 174 : MINFO( "Connecting to " << host );
35 : 87 : auto sock = zmqpp::socket( ctx, zmqpp::socket_type::req );
36 : :
37 [ + + ]: 87 : if (not rpc_server_public_key.empty()) {
38 : : try {
39 [ + - ]: 44 : sock.set( zmqpp::socket_option::curve_server_key, rpc_server_public_key );
40 [ + - ]: 44 : sock.set( zmqpp::socket_option::curve_public_key, client_keys.public_key );
41 : 44 : sock.set( zmqpp::socket_option::curve_secret_key, client_keys.secret_key );
42 [ - - ]: 0 : } catch ( zmqpp::exception& ) {}
43 : : }
44 : :
45 [ + - ][ + - ]: 87 : sock.connect( "tcp://" + host );
46 : : // configure socket to not wait at close time
47 [ + - ]: 87 : sock.set( zmqpp::socket_option::linger, 0 );
48 : 87 : return sock;
49 : : }
50 : :
51 : : std::string
52 [ + - ]: 43 : piac::pirate_send( const std::string& cmd,
53 : : zmqpp::context& ctx,
54 : : const std::string& host,
55 : : const std::string& rpc_server_public_key,
56 : : const zmqpp::curve::keypair& client_keys )
57 : : // *****************************************************************************
58 : : // Send message via a ZMQ socket with retries and time-out
59 : : //! \param[in] cmd Command to send
60 : : //! \param[in,out] ctx ZeroMQ socket context
61 : : //! \param[in] host Hostname or IP + port to send cmd to
62 : : //! \param[in] rpc_server_public_key CurveZMQ server public key to use
63 : : //! \param[in] client_keys CurveMQ client keypair to use
64 : : //! \return Response from remote host
65 : : //! \see https://zguide.zeromq.org/docs/chapter4/#Client-Side-Reliability-Lazy-Pirate-Pattern
66 : : //! \see https://github.com/zeromq/libzmq/issues/3495
67 : : // *****************************************************************************
68 : : {
69 : : std::string reply;
70 : : int retries = REQUEST_RETRIES;
71 [ + - ]: 86 : auto server = daemon_socket( ctx, host, rpc_server_public_key, client_keys );
72 : :
73 : : // send cmd from a Lazy Pirate client
74 [ + + ]: 86 : while (retries) {
75 : : std::string request = cmd;
76 [ + - ]: 43 : server.send( request );
77 [ - + ][ - - ]: 43 : if (retries != REQUEST_RETRIES) sleep(1);
78 : :
79 [ + - ]: 86 : zmqpp::poller poller;
80 [ + - ]: 43 : poller.add( server );
81 : : bool expect_reply = true;
82 : : while (expect_reply) {
83 [ + - ][ + + ]: 119 : if (poller.poll(REQUEST_TIMEOUT) && poller.has_input(server)) {
[ - + ]
84 [ + - ]: 32 : server.receive( reply );
85 [ + - ]: 32 : if (not reply.empty()) {
86 [ + - ][ + - ]: 96 : MDEBUG( "Recv reply: " + reply );
[ + - ][ + - ]
87 : : expect_reply = false;
88 : : retries = 0;
89 : : } else {
90 [ - - ][ - - ]: 0 : MERROR( "Recv empty msg from daemon" );
[ - - ]
91 : : }
92 [ + + ]: 55 : } else if (--retries == 0) {
93 [ + - ][ + - ]: 33 : MERROR( "Abandoning server at " + host );
[ + - ][ + - ]
94 : : reply = "No response from server";
95 : : expect_reply = false;
96 : : } else {
97 [ + - ][ + - ]: 132 : MWARNING( "No response from " + host + ", retrying: " << retries );
[ + - ][ + - ]
[ + - ][ - + ]
[ - - ]
98 [ + - ]: 44 : poller.remove( server );
99 [ + - ]: 44 : server = daemon_socket( ctx, host, rpc_server_public_key, client_keys );
100 [ + - ]: 44 : poller.add( server );
101 [ + - ]: 44 : server.send( request );
102 : : }
103 : : }
104 : : }
105 : :
106 [ + - ][ + - ]: 129 : MDEBUG( "Destroyed socket to " + host );
[ + - ][ + - ]
107 : 43 : return reply;
108 : : }
109 : :
110 : : void
111 : 22 : piac::try_bind( zmqpp::socket& sock, int& port, int range, bool use_strict_ports )
112 : : // *****************************************************************************
113 : : // Try to bind ZMQ socket, attempting unused ports
114 : : //! \param[in,out] sock Socket to use
115 : : //! \param[in,out] port Port to try first
116 : : //! \param[in] range Number of ports to attempt in increasing order: port+range
117 : : //! \param[in] use_strict_ports True to try only the default port
118 : : // *****************************************************************************
119 : : {
120 [ + - ]: 22 : if (use_strict_ports) {
121 [ + - ][ + - ]: 44 : sock.bind( "tcp://*:" + std::to_string(port) );
[ - + ][ - - ]
122 : 22 : return;
123 : : }
124 : :
125 : : do {
126 : : try {
127 [ - - ][ - - ]: 0 : sock.bind( "tcp://*:" + std::to_string(port) );
[ - - ][ - - ]
[ - - ]
128 : 0 : return;
129 : : }
130 [ - - ]: 0 : catch ( zmqpp::exception& ) {}
131 : 0 : ++port;
132 [ - - ]: 0 : } while (port < port+range );
133 : :
134 [ - - ][ - - ]: 0 : MERROR( "Could not bind to socket within range: ["
[ - - ][ - - ]
135 : : << port << ',' << port+range << ')' );
136 : : }
|