Piac test code coverage report
Current view: top level - src - zmq_util.cpp (source / functions) Hit Total Coverage
Commit: Piac-RELEASE Lines: 34 42 81.0 %
Date: 2022-12-16 13:44:03 Functions: 3 3 100.0 %
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: 51 130 39.2 %

           Branch data     Line data    Source code
       1                 :            : // *****************************************************************************
       2                 :            : /*!
       3                 :            :   \file      src/zmq_util.cpp
       4                 :            :   \copyright 2022-2025 J. Bakosi,
       5                 :            :              All rights reserved. See the LICENSE file for details.
       6                 :            :   \brief     Piac ZeroMQ utilities
       7                 :            : */
       8                 :            : // *****************************************************************************
       9                 :            : 
      10                 :            : #include <string>
      11                 :            : #include <unistd.h>
      12                 :            : 
      13                 :            : #include "zmq_util.hpp"
      14                 :            : #include "logging_util.hpp"
      15                 :            : 
      16                 :            : #define REQUEST_TIMEOUT     3000   //  msecs, (> 1000!)
      17                 :            : #define REQUEST_RETRIES     5      //  before we abandon trying to send
      18                 :            : 
      19                 :            : zmqpp::socket
      20                 :         87 : piac::daemon_socket( zmqpp::context& ctx,
      21                 :            :                      const std::string& host,
      22                 :            :                      const std::string& rpc_server_public_key,
      23                 :            :                      const zmqpp::curve::keypair& client_keys )
      24                 :            : // *****************************************************************************
      25                 :            : //  Create ZMQ socket
      26                 :            : //! \param[in,out] ctx ZeroMQ socket context
      27                 :            : //! \param[in] host Hostname or IP + port to send cmd to
      28                 :            : //! \param[in] rpc_server_public_key CurveZMQ server public key to use
      29                 :            : //! \param[in] client_keys CurveMQ client keypair to use
      30                 :            : //! \return ZMQ socket created
      31                 :            : //! \see http://curvezmq.org
      32                 :            : // *****************************************************************************
      33                 :            : {
      34 [ +  - ][ +  - ]:        174 :   MINFO( "Connecting to " << host );
      35                 :         87 :   auto sock = zmqpp::socket( ctx, zmqpp::socket_type::req );
      36                 :            : 
      37         [ +  + ]:         87 :   if (not rpc_server_public_key.empty()) {
      38                 :            :     try {
      39         [ +  - ]:         44 :       sock.set( zmqpp::socket_option::curve_server_key, rpc_server_public_key );
      40         [ +  - ]:         44 :       sock.set( zmqpp::socket_option::curve_public_key, client_keys.public_key );
      41                 :         44 :       sock.set( zmqpp::socket_option::curve_secret_key, client_keys.secret_key );
      42         [ -  - ]:          0 :     } catch ( zmqpp::exception& ) {}
      43                 :            :   }
      44                 :            : 
      45 [ +  - ][ +  - ]:         87 :   sock.connect( "tcp://" + host );
      46                 :            :   //  configure socket to not wait at close time
      47         [ +  - ]:         87 :   sock.set( zmqpp::socket_option::linger, 0 );
      48                 :         87 :   return sock;
      49                 :            : }
      50                 :            : 
      51                 :            : std::string
      52         [ +  - ]:         43 : piac::pirate_send( const std::string& cmd,
      53                 :            :                    zmqpp::context& ctx,
      54                 :            :                    const std::string& host,
      55                 :            :                    const std::string& rpc_server_public_key,
      56                 :            :                    const zmqpp::curve::keypair& client_keys )
      57                 :            : // *****************************************************************************
      58                 :            : //  Send message via a ZMQ socket with retries and time-out
      59                 :            : //! \param[in] cmd Command to send
      60                 :            : //! \param[in,out] ctx ZeroMQ socket context
      61                 :            : //! \param[in] host Hostname or IP + port to send cmd to
      62                 :            : //! \param[in] rpc_server_public_key CurveZMQ server public key to use
      63                 :            : //! \param[in] client_keys CurveMQ client keypair to use
      64                 :            : //! \return Response from remote host
      65                 :            : //! \see https://zguide.zeromq.org/docs/chapter4/#Client-Side-Reliability-Lazy-Pirate-Pattern
      66                 :            : //! \see https://github.com/zeromq/libzmq/issues/3495
      67                 :            : // *****************************************************************************
      68                 :            : {
      69                 :            :   std::string reply;
      70                 :            :   int retries = REQUEST_RETRIES;
      71         [ +  - ]:         86 :   auto server = daemon_socket( ctx, host, rpc_server_public_key, client_keys );
      72                 :            : 
      73                 :            :   // send cmd from a Lazy Pirate client
      74         [ +  + ]:         86 :   while (retries) {
      75                 :            :     std::string request = cmd;
      76         [ +  - ]:         43 :     server.send( request );
      77 [ -  + ][ -  - ]:         43 :     if (retries != REQUEST_RETRIES) sleep(1);
      78                 :            : 
      79         [ +  - ]:         86 :     zmqpp::poller poller;
      80         [ +  - ]:         43 :     poller.add( server );
      81                 :            :     bool expect_reply = true;
      82                 :            :     while (expect_reply) {
      83 [ +  - ][ +  + ]:        119 :       if (poller.poll(REQUEST_TIMEOUT) && poller.has_input(server)) {
                 [ -  + ]
      84         [ +  - ]:         32 :         server.receive( reply );
      85         [ +  - ]:         32 :         if (not reply.empty()) {
      86 [ +  - ][ +  - ]:         96 :           MDEBUG( "Recv reply: " + reply );
         [ +  - ][ +  - ]
      87                 :            :           expect_reply = false;
      88                 :            :           retries = 0;
      89                 :            :         } else {
      90 [ -  - ][ -  - ]:          0 :           MERROR( "Recv empty msg from daemon" );
                 [ -  - ]
      91                 :            :         }
      92         [ +  + ]:         55 :       } else if (--retries == 0) {
      93 [ +  - ][ +  - ]:         33 :         MERROR( "Abandoning server at " + host );
         [ +  - ][ +  - ]
      94                 :            :         reply = "No response from server";
      95                 :            :         expect_reply = false;
      96                 :            :       } else {
      97 [ +  - ][ +  - ]:        132 :         MWARNING( "No response from " + host + ", retrying: " << retries );
         [ +  - ][ +  - ]
         [ +  - ][ -  + ]
                 [ -  - ]
      98         [ +  - ]:         44 :         poller.remove( server );
      99         [ +  - ]:         44 :         server = daemon_socket( ctx, host, rpc_server_public_key, client_keys );
     100         [ +  - ]:         44 :         poller.add( server );
     101         [ +  - ]:         44 :         server.send( request );
     102                 :            :       }
     103                 :            :     }
     104                 :            :   }
     105                 :            : 
     106 [ +  - ][ +  - ]:        129 :   MDEBUG( "Destroyed socket to " + host );
         [ +  - ][ +  - ]
     107                 :         43 :   return reply;
     108                 :            : }
     109                 :            : 
     110                 :            : void
     111                 :         22 : piac::try_bind( zmqpp::socket& sock, int& port, int range, bool use_strict_ports )
     112                 :            : // *****************************************************************************
     113                 :            : //  Try to bind ZMQ socket, attempting unused ports
     114                 :            : //! \param[in,out] sock Socket to use
     115                 :            : //! \param[in,out] port Port to try first
     116                 :            : //! \param[in] range Number of ports to attempt in increasing order: port+range
     117                 :            : //! \param[in] use_strict_ports True to try only the default port
     118                 :            : // *****************************************************************************
     119                 :            : {
     120         [ +  - ]:         22 :   if (use_strict_ports) {
     121 [ +  - ][ +  - ]:         44 :     sock.bind( "tcp://*:" + std::to_string(port) );
         [ -  + ][ -  - ]
     122                 :         22 :     return;
     123                 :            :   }
     124                 :            : 
     125                 :            :   do {
     126                 :            :     try {
     127 [ -  - ][ -  - ]:          0 :       sock.bind( "tcp://*:" + std::to_string(port) );
         [ -  - ][ -  - ]
                 [ -  - ]
     128                 :          0 :       return;
     129                 :            :     }
     130         [ -  - ]:          0 :     catch ( zmqpp::exception& ) {}
     131                 :          0 :     ++port;
     132         [ -  - ]:          0 :   } while (port < port+range );
     133                 :            : 
     134 [ -  - ][ -  - ]:          0 :   MERROR( "Could not bind to socket within range: ["
         [ -  - ][ -  - ]
     135                 :            :           << port << ',' << port+range << ')' );
     136                 :            : }

Generated by: LCOV version 1.14