Piac test code coverage report
Current view: top level - src - daemon_p2p_thread.cpp (source / functions) Hit Total Coverage
Commit: Piac-RELEASE Lines: 119 123 96.7 %
Date: 2022-12-16 13:44:03 Functions: 7 7 100.0 %
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: 192 370 51.9 %

           Branch data     Line data    Source code
       1                 :            : // *****************************************************************************
       2                 :            : /*!
       3                 :            :   \file      src/daemon_p2p_thread.cpp
       4                 :            :   \copyright 2022-2025 J. Bakosi,
       5                 :            :              All rights reserved. See the LICENSE file for details.
       6                 :            :   \brief     Piac daemon peer-to-peer communication
       7                 :            : */
       8                 :            : // *****************************************************************************
       9                 :            : 
      10                 :            : #include <mutex>
      11                 :            : #include <condition_variable>
      12                 :            : 
      13                 :            : #include "logging_util.hpp"
      14                 :            : #include "crypto_util.hpp"
      15                 :            : #include "zmq_util.hpp"
      16                 :            : #include "daemon_p2p_thread.hpp"
      17                 :            : 
      18                 :            : namespace piac {
      19                 :            : 
      20                 :            : extern std::mutex g_hashes_mtx;
      21                 :            : extern std::condition_variable g_hashes_cv;
      22                 :            : extern bool g_hashes_access;
      23                 :            : 
      24                 :            : } // piac::
      25                 :            : 
      26                 :            : zmqpp::socket
      27                 :         12 : piac::p2p_connect_peer( zmqpp::context& ctx, const std::string& addr )
      28                 :            : // *****************************************************************************
      29                 :            : //  Create ZeroMQ socket and onnect to peer piac daemon
      30                 :            : //! \param[in,out] ctx ZeroMQ socket contex
      31                 :            : //! \param[in] addr Address (hostname or IP + port) of peer to connect to
      32                 :            : //! \return ZeroMQ socket created
      33                 :            : // *****************************************************************************
      34                 :            : {
      35                 :            :   // create socket to connect to peer
      36                 :         12 :   zmqpp::socket dealer( ctx, zmqpp::socket_type::dealer );
      37 [ +  - ][ +  - ]:         12 :   dealer.connect( "tcp://" + addr );
      38 [ +  - ][ +  - ]:         36 :   MDEBUG( "Connecting to peer at " + addr );
         [ +  - ][ +  - ]
      39                 :         12 :   return dealer;
      40                 :            : }
      41                 :            : 
      42                 :            : void
      43                 :         72 : piac::p2p_bcast_peers(
      44                 :            :   int p2p_port,
      45                 :            :   std::unordered_map< std::string, zmqpp::socket >& my_peers,
      46                 :            :   bool& to_bcast_peers )
      47                 :            : // *****************************************************************************
      48                 :            : //  Broadcast to peers
      49                 :            : //! \param[in] p2p_port Peer-to-peer port to use
      50                 :            : //! \param[in] my_peers List of peers (address and socket) to broadcast to
      51                 :            : //! \param[in,out] to_bcast_peers True to broadcast, false to not
      52                 :            : // *****************************************************************************
      53                 :            : {
      54         [ +  + ]:         72 :   if (not to_bcast_peers) return;
      55                 :            : 
      56         [ +  + ]:         37 :   for (auto& [addr,sock] : my_peers) {
      57                 :         36 :     zmqpp::message msg;
      58         [ +  - ]:         18 :     msg << "PEER";
      59 [ +  - ][ +  - ]:         18 :     msg << std::to_string( my_peers.size() + 1 );
      60 [ +  - ][ +  - ]:         36 :     msg << "localhost:" + std::to_string(p2p_port);
         [ +  - ][ -  + ]
                 [ -  - ]
      61 [ +  + ][ +  - ]:         48 :     for (const auto& [taddr,sock] : my_peers) msg << taddr;
      62         [ +  - ]:         18 :     sock.send( msg );
      63                 :            :   }
      64                 :            : 
      65                 :         19 :   to_bcast_peers = false;
      66                 :            : }
      67                 :            : 
      68                 :            : void
      69                 :         72 : piac::p2p_bcast_hashes(
      70                 :            :   int p2p_port,
      71                 :            :   std::unordered_map< std::string, zmqpp::socket >& my_peers,
      72                 :            :   const std::unordered_set< std::string >& my_hashes,
      73                 :            :   bool& to_bcast_hashes )
      74                 :            : // *****************************************************************************
      75                 :            : //  Broadcast advertisement database hashes to peers
      76                 :            : //! \param[in] p2p_port Peer-to-peer port to use
      77                 :            : //! \param[in,out] my_peers List of  peers (address and socket) to broadcast to
      78                 :            : //! \param[in] my_hashes Set of advertisement database hashes to broadcast
      79                 :            : //! \param[in,out] to_bcast_hashes True to broadcast, false to not
      80                 :            : // *****************************************************************************
      81                 :            : {
      82         [ +  + ]:         72 :   if (not to_bcast_hashes) return;
      83                 :            : 
      84                 :            :   {
      85                 :            :     std::unique_lock lock( g_hashes_mtx );
      86         [ -  + ]:         30 :     g_hashes_cv.wait( lock, []{ return g_hashes_access; } );
      87                 :            :   }
      88                 :            : 
      89         [ +  + ]:         54 :   for (auto& [addr,sock] : my_peers) {
      90                 :         48 :     zmqpp::message msg;
      91         [ +  - ]:         24 :     msg << "HASH";
      92 [ +  - ][ +  - ]:         48 :     msg << "localhost:" + std::to_string(p2p_port);
         [ +  - ][ -  + ]
         [ +  - ][ -  - ]
      93 [ +  - ][ +  - ]:         48 :     msg << std::to_string( my_hashes.size() );
      94 [ +  + ][ +  - ]:         30 :     for (const auto& h : my_hashes) msg << h;
      95         [ +  - ]:         24 :     sock.send( msg );
      96 [ +  - ][ +  - ]:         48 :     MDEBUG( "Broadcasting " << my_hashes.size() << " hashes to " << addr );
                 [ +  - ]
      97                 :            :   }
      98                 :            : 
      99                 :         30 :   to_bcast_hashes = false;
     100                 :            : }
     101                 :            : 
     102                 :            : void
     103                 :         72 : piac::p2p_send_db_requests(
     104                 :            :   int p2p_port,
     105                 :            :   std::unordered_map< std::string, zmqpp::socket >& my_peers,
     106                 :            :   std::unordered_map< std::string,
     107                 :            :     std::unordered_set< std::string > >& db_requests,
     108                 :            :   bool& to_send_db_requests )
     109                 :            : // *****************************************************************************
     110                 :            : //  Send requests for advertisement database entries to peers
     111                 :            : //! \param[in] p2p_port Peer-to-peer port to use
     112                 :            : //! \param[in,out] my_peers List of  peers (address and socket) to broadcast to
     113                 :            : //! \param[in,out] db_requests Multiple ad requests from multiple peers
     114                 :            : //! \param[in,out] to_send_db_requests True to send requests, false to not
     115                 :            : // *****************************************************************************
     116                 :            : {
     117         [ +  + ]:         72 :   if (not to_send_db_requests) return;
     118                 :            : 
     119         [ +  + ]:          6 :   for (auto&& [addr,hashes] : db_requests) {
     120                 :          6 :     zmqpp::message msg;
     121         [ +  - ]:          3 :     msg << "REQ";
     122 [ +  - ][ +  - ]:          6 :     msg << "localhost:" + std::to_string(p2p_port);
         [ +  - ][ -  + ]
         [ +  - ][ -  - ]
     123 [ +  - ][ +  - ]:          6 :     msg << std::to_string( hashes.size() );
     124 [ +  + ][ +  - ]:          6 :     for (const auto& h : hashes) msg << h;
     125                 :            :     hashes.clear();
     126         [ +  - ]:          3 :     my_peers.at( addr ).send( msg );
     127                 :            :   }
     128                 :            : 
     129                 :            :   db_requests.clear();
     130                 :          3 :   to_send_db_requests = false;
     131                 :            : }
     132                 :            : 
     133                 :            : void
     134         [ +  - ]:         48 : piac::p2p_answer_p2p(
     135                 :            :   zmqpp::context& ctx_p2p,
     136                 :            :   zmqpp::socket& db_p2p,
     137                 :            :   zmqpp::message& msg,
     138                 :            :   std::unordered_map< std::string, zmqpp::socket >& my_peers,
     139                 :            :   const std::unordered_set< std::string >& my_hashes,
     140                 :            :   std::unordered_map< std::string,
     141                 :            :     std::unordered_set< std::string > >& db_requests,
     142                 :            :   int p2p_port,
     143                 :            :   bool& to_bcast_peers,
     144                 :            :   bool& to_bcast_hashes,
     145                 :            :   bool& to_send_db_requests )
     146                 :            : // *****************************************************************************
     147                 :            : //  Answer peer's request
     148                 :            : //! \param[in,out] ctx_p2p ZMQ context used for peer-to-peer communication
     149                 :            : //! \param[in,out] db_p2p ZMQ context used for communication with the db thread
     150                 :            : //! \param[in,out] msg Incoming message to answer
     151                 :            : //! \param[in,out] my_peers List of this daemon's peers (address and socket)
     152                 :            : //! \param[in] my_hashes This daemon's set of advertisement database hashes
     153                 :            : //! \param[in,out] db_requests Store multiple ad requests from multiple peers
     154                 :            : //! \param[in] p2p_port Peer-to-peer port to use
     155                 :            : //! \param[in,out] to_bcast_peers True to broadcast to peers next, false to not
     156                 :            : //! \param[in,out] to_bcast_hashes True to broadcast hashes next, false to not
     157                 :            : //! \param[in,out] to_send_db_requests True to send db requests next, false: not
     158                 :            : // *****************************************************************************
     159                 :            : {
     160                 :            :   std::string id, cmd;
     161                 :            :   msg >> id >> cmd;
     162 [ +  - ][ +  - ]:         96 :   MDEBUG( "Recv msg: " << cmd );
                 [ +  - ]
     163                 :            : 
     164         [ +  + ]:         48 :   if (cmd == "PEER") {
     165                 :            : 
     166                 :            :     std::string size;
     167                 :            :     msg >> size;
     168                 :            :     std::size_t num = stoul( size );
     169         [ +  + ]:         66 :     while (num-- != 0) {
     170                 :            :       std::string addr;
     171                 :            :       msg >> addr;
     172 [ +  - ][ +  - ]:        126 :       if (addr != "localhost:" + std::to_string(p2p_port) &&
         [ +  + ][ +  + ]
         [ -  + ][ +  + ]
                 [ -  - ]
     173                 :            :           my_peers.find(addr) == end(my_peers))
     174                 :            :       {
     175 [ +  - ][ -  - ]:         16 :         my_peers.emplace( addr, p2p_connect_peer( ctx_p2p, addr ) );
     176                 :          8 :         to_bcast_peers = true;
     177                 :          8 :         to_bcast_hashes = true;
     178                 :            :       }
     179                 :            :     }
     180 [ +  - ][ +  - ]:         36 :     MDEBUG( "Number of peers: " << my_peers.size() );
         [ +  - ][ -  - ]
     181                 :            : 
     182         [ +  + ]:         30 :   } else if (cmd == "HASH") {
     183                 :            : 
     184                 :            :     {
     185                 :            :       std::unique_lock lock( g_hashes_mtx );
     186         [ -  + ]:         24 :       g_hashes_cv.wait( lock, []{ return g_hashes_access; } );
     187                 :            :     }
     188                 :            :     std::string from, size;
     189                 :            :     msg >> from >> size;
     190                 :            :     std::size_t num = stoul( size );
     191         [ +  + ]:         30 :     while (num-- != 0) {
     192                 :            :       std::string hash;
     193                 :            :       msg >> hash;
     194         [ +  + ]:          6 :       if (my_hashes.find(hash) == end(my_hashes)) {
     195                 :            :         db_requests[ from ].insert( hash );
     196                 :          3 :         to_send_db_requests = true;
     197                 :            :       }
     198                 :            :     }
     199                 :            :     auto r = db_requests.find( from );
     200 [ +  - ][ +  - ]:         72 :     MDEBUG( "Recv " << (r != end(db_requests) ? r->second.size() : 0)
         [ +  - ][ +  + ]
                 [ -  - ]
     201                 :            :             << " hashes from " << from );
     202                 :            : 
     203         [ +  + ]:          6 :   } else if (cmd == "REQ") {
     204                 :            : 
     205                 :            :     std::string addr, size;
     206                 :            :     msg >> addr >> size;
     207                 :            :     std::size_t num = stoul( size );
     208                 :            :     assert( num > 0 );
     209         [ +  - ]:          6 :     zmqpp::message req;
     210 [ +  - ][ +  - ]:          3 :     req << "GET" << addr << size;
                 [ +  - ]
     211         [ +  + ]:          6 :     while (num-- != 0) {
     212                 :            :       std::string hash;
     213                 :            :       msg >> hash;
     214         [ +  - ]:          3 :       req << hash;
     215                 :            :     }
     216         [ +  - ]:          3 :     db_p2p.send( req );
     217 [ +  - ][ +  - ]:          6 :     MDEBUG( "Will prepare " << size << " db entries for " << addr );
                 [ +  - ]
     218                 :            : 
     219         [ +  - ]:          3 :   } else if (cmd == "DOC") {
     220                 :            : 
     221                 :            :     {
     222                 :            :       std::unique_lock lock( g_hashes_mtx );
     223         [ -  + ]:          3 :       g_hashes_cv.wait( lock, []{ return g_hashes_access; } );
     224                 :            :     }
     225                 :            :     std::string size;
     226                 :            :     msg >> size;
     227 [ +  - ][ +  - ]:          6 :     MDEBUG( "Recv " << size << " db entries" );
         [ +  - ][ -  - ]
     228                 :            :     std::size_t num = stoul( size );
     229                 :            :     assert( num > 0 );
     230                 :          3 :     std::vector< std::string > docs_to_insert;
     231         [ +  + ]:          6 :     while (num-- != 0) {
     232                 :            :       std::string doc;
     233                 :            :       msg >> doc;
     234         [ +  - ]:          3 :       auto hash = sha256( doc );
     235         [ +  + ]:          3 :       if (my_hashes.find(hash) == end(my_hashes)) {
     236         [ +  - ]:          2 :         docs_to_insert.emplace_back( std::move(doc) );
     237                 :            :       }
     238                 :            :     }
     239         [ +  - ]:          6 :     zmqpp::message req;
     240 [ +  - ][ +  - ]:          6 :     req << "INS" << std::to_string( docs_to_insert.size() );
                 [ +  - ]
     241 [ +  + ][ +  - ]:          5 :     for (const auto& doc : docs_to_insert) req << doc;
     242         [ +  - ]:          3 :     db_p2p.send( req );
     243 [ +  - ][ +  - ]:          6 :     MDEBUG( "Attempting to insert " << docs_to_insert.size() << " db entries" );
                 [ +  - ]
     244                 :            : 
     245                 :            :   } else {
     246                 :            : 
     247 [ -  - ][ -  - ]:          0 :     MERROR( "unknown cmd" );
         [ -  - ][ -  - ]
     248                 :            : 
     249                 :            :   }
     250                 :         48 : }
     251                 :            : 
     252                 :            : void
     253         [ +  - ]:         14 : piac::p2p_answer_db( zmqpp::message& msg,
     254                 :            :                      std::unordered_map< std::string, zmqpp::socket >& my_peers,
     255                 :            :                      bool& to_bcast_hashes )
     256                 :            : // *****************************************************************************
     257                 :            : //  Answer request from db thread
     258                 :            : //! \param[in,out] msg Incoming message to answer
     259                 :            : //! \param[in,out] my_peers List of this daemon's peers (address and socket)
     260                 :            : //! \param[in,out] to_bcast_hashes True to broadcast hashes next, false to not
     261                 :            : // *****************************************************************************
     262                 :            : {
     263                 :            :   std::string cmd;
     264                 :            :   msg >> cmd;
     265 [ +  - ][ +  - ]:         28 :   MDEBUG( "Recv msg: " << cmd );
                 [ +  - ]
     266                 :            : 
     267         [ +  + ]:         14 :   if (cmd == "PUT") {
     268                 :            : 
     269                 :            :     std::string addr, size;
     270                 :            :     msg >> addr >> size;
     271 [ +  - ][ +  - ]:          6 :     MDEBUG( "Prepared " << size << " db entries for " << addr );
         [ +  - ][ -  - ]
     272                 :            :     std::size_t num = stoul( size );
     273                 :            :     assert( num > 0 );
     274         [ +  - ]:          6 :     zmqpp::message rep;
     275 [ +  - ][ +  - ]:          3 :     rep << "DOC" << size;
     276         [ +  + ]:          6 :     while (num-- != 0) {
     277                 :            :       std::string doc;
     278                 :            :       msg >> doc;
     279         [ +  - ]:          3 :       rep << doc;
     280                 :            :     }
     281         [ +  - ]:          3 :     my_peers.at( addr ).send( rep );
     282 [ +  - ][ +  - ]:          6 :     MDEBUG( "Sent back " << size << " db entries to " << addr );
                 [ +  - ]
     283                 :            : 
     284         [ +  - ]:         11 :   } else if (cmd == "NEW") {
     285                 :            : 
     286                 :         11 :     to_bcast_hashes = true;
     287                 :            : 
     288                 :            :   } else {
     289                 :            : 
     290 [ -  - ][ -  - ]:          0 :     MERROR( "unknown cmd" );
         [ -  - ][ -  - ]
     291                 :            : 
     292                 :            :   }
     293                 :         14 : }
     294                 :            : 
     295                 :            : [[noreturn]] void
     296                 :         11 : piac::p2p_thread( zmqpp::context& ctx_p2p,
     297                 :            :                   zmqpp::context& ctx_db,
     298                 :            :                   std::unordered_map< std::string, zmqpp::socket >& my_peers,
     299                 :            :                   const std::unordered_set< std::string >& my_hashes,
     300                 :            :                   int default_p2p_port,
     301                 :            :                   int p2p_port,
     302                 :            :                   bool use_strict_ports )
     303                 :            : // *****************************************************************************
     304                 :            : //  Entry point to thread to communicate with peers
     305                 :            : //! \param[in,out] ctx_p2p ZMQ context used for peer-to-peer communication
     306                 :            : //! \param[in,out] ctx_db ZMQ context used for communication with the db thread
     307                 :            : //! \param[in,out] my_peers List of this daemon's peers (address and socket)
     308                 :            : //! \param[in] my_hashes This daemon's set of advertisement database hashes
     309                 :            : //! \param[in] default_p2p_port Port to use by default for peer communication
     310                 :            : //! \param[in] p2p_port Port that is used for peer communication
     311                 :            : //! \param[in] use_strict_ports True to try only the default port
     312                 :            : // *****************************************************************************
     313                 :            : {
     314                 :         22 :   MLOG_SET_THREAD_NAME( "p2p" );
     315 [ +  - ][ +  - ]:         22 :   MINFO( "p2p thread initialized" );
     316                 :            : 
     317                 :            :   // create socket that will listen to peers and bind to p2p port
     318                 :         11 :   zmqpp::socket router( ctx_p2p, zmqpp::socket_type::router );
     319         [ +  - ]:         11 :   try_bind( router, p2p_port, 10, use_strict_ports );
     320 [ +  - ][ +  - ]:         33 :   MINFO( "Bound to P2P port " << p2p_port );
         [ +  - ][ +  - ]
     321                 :            : 
     322                 :            :   // remove our address from peer list
     323 [ +  - ][ +  - ]:         22 :   my_peers.erase( "localhost:" + std::to_string(p2p_port) );
                 [ -  + ]
     324                 :            :   // add default peers
     325         [ -  + ]:         11 :   for (int p = default_p2p_port; p < p2p_port; ++p)
     326 [ -  - ][ -  - ]:          0 :     my_peers.emplace( "localhost:" + std::to_string( p ),
         [ -  - ][ -  - ]
     327         [ -  - ]:          0 :                       zmqpp::socket( ctx_p2p, zmqpp::socket_type::dealer ) );
     328                 :            :   // initially connect to peers
     329 [ +  + ][ +  - ]:         15 :   for (auto& [addr,sock] : my_peers) sock = p2p_connect_peer( ctx_p2p, addr );
     330 [ +  - ][ +  - ]:         22 :   MDEBUG( "Initial number of peers: " << my_peers.size() );
                 [ +  - ]
     331                 :            : 
     332                 :            :   { // log initial number of hashes (populated by db thread)
     333                 :            :     std::unique_lock lock( g_hashes_mtx );
     334         [ +  + ]:         18 :     g_hashes_cv.wait( lock, []{ return g_hashes_access; } );
     335                 :            :   }
     336 [ +  - ][ +  - ]:         22 :   MDEBUG( "Initial number of db hashes: " << my_hashes.size() );
                 [ +  - ]
     337                 :            : 
     338                 :            :   // create socket to send requests for db lookups from peers
     339         [ +  - ]:         11 :   zmqpp::socket db_p2p( ctx_db, zmqpp::socket_type::pair );
     340 [ +  - ][ +  - ]:         22 :   db_p2p.connect( "inproc://db_p2p" );
                 [ +  - ]
     341 [ +  - ][ +  - ]:         22 :   MDEBUG( "Connected to inproc:://db_p2p" );
                 [ +  - ]
     342                 :            : 
     343                 :            :   std::unordered_map< std::string, std::unordered_set< std::string > >
     344                 :            :     db_requests;
     345                 :            : 
     346                 :            :   // listen to peers
     347         [ +  - ]:         11 :   zmqpp::poller poller;
     348         [ +  - ]:         11 :   poller.add( router );
     349         [ +  - ]:         11 :   poller.add( db_p2p );
     350                 :         11 :   bool to_bcast_peers = true;
     351                 :         11 :   bool to_bcast_hashes = true;
     352                 :         11 :   bool to_send_db_requests = false;
     353                 :            : 
     354                 :            :   while (1) {
     355         [ +  - ]:         72 :     p2p_bcast_peers( p2p_port, my_peers, to_bcast_peers );
     356         [ +  - ]:         72 :     p2p_bcast_hashes( p2p_port, my_peers, my_hashes, to_bcast_hashes );
     357         [ +  - ]:         72 :     p2p_send_db_requests( p2p_port, my_peers, db_requests,
     358                 :            :                           to_send_db_requests );
     359                 :            : 
     360 [ +  - ][ -  + ]:         72 :     if (poller.poll()) {
     361         [ +  + ]:         61 :       if (poller.has_input( router )) {
     362         [ +  - ]:         96 :         zmqpp::message msg;
     363         [ +  - ]:         48 :         router.receive( msg );
     364         [ +  - ]:         48 :         p2p_answer_p2p( ctx_p2p, db_p2p, msg, my_peers, my_hashes, db_requests,
     365                 :            :                         p2p_port, to_bcast_peers, to_bcast_hashes,
     366                 :            :                         to_send_db_requests );
     367                 :            :       }
     368         [ +  + ]:         61 :       if (poller.has_input( db_p2p )) {
     369         [ +  - ]:         28 :         zmqpp::message msg;
     370         [ +  - ]:         14 :         db_p2p.receive( msg );
     371         [ +  - ]:         14 :         p2p_answer_db( msg, my_peers, to_bcast_hashes );
     372                 :            :       }
     373                 :            :     }
     374                 :            :   }
     375                 :            : }

Generated by: LCOV version 1.14