Piac test code coverage report
Current view: top level - src - daemon_db_thread.cpp (source / functions) Hit Total Coverage
Commit: Piac-DEBUG Lines: 147 155 94.8 %
Date: 2022-12-16 13:46:15 Functions: 4 4 100.0 %
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: 271 532 50.9 %

           Branch data     Line data    Source code
       1                 :            : // *****************************************************************************
       2                 :            : /*!
       3                 :            :   \file      src/daemon_db_thread.cpp
       4                 :            :   \copyright 2022-2025 J. Bakosi,
       5                 :            :              All rights reserved. See the LICENSE file for details.
       6                 :            :   \brief     Piac daemon interaction with database
       7                 :            : */
       8                 :            : // *****************************************************************************
       9                 :            : 
      10                 :            : #include <mutex>
      11                 :            : #include <condition_variable>
      12                 :            : 
      13                 :            : #include "db.hpp"
      14                 :            : #include "logging_util.hpp"
      15                 :            : #include "crypto_util.hpp"
      16                 :            : #include "zmq_util.hpp"
      17                 :            : #include "daemon_db_thread.hpp"
      18                 :            : 
      19                 :            : void
      20                 :         22 : piac::db_update_hashes( const std::string& db_name,
      21                 :            :                         std::unordered_set< std::string >& my_hashes )
      22                 :            : // *****************************************************************************
      23                 :            : //  Update advertisement database hashes
      24                 :            : //! \param[in] db_name The name of the database to query for the hashes
      25                 :            : //! \param[in] my_hashes Set of advertisement database hashes to update
      26                 :            : // *****************************************************************************
      27                 :            : {
      28         [ +  - ]:         44 :   auto hashes = piac::db_list_hash( db_name, /* inhex = */ false );
      29         [ +  - ]:         44 :   std::lock_guard lock( g_hashes_mtx );
      30                 :         22 :   g_hashes_access = false;
      31                 :         22 :   my_hashes.clear();
      32 [ +  + ][ +  - ]:         68 :   for (auto&& h : hashes) my_hashes.insert( std::move(h) );
      33                 :         22 :   g_hashes_access = true;
      34                 :         22 :   g_hashes_cv.notify_one();
      35 [ +  - ][ +  - ]:         22 :   MDEBUG( "Number of db hashes: " << my_hashes.size() );
         [ +  - ][ +  - ]
                 [ +  - ]
      36                 :         22 : }
      37                 :            : 
      38                 :            : void
      39                 :         30 : piac::db_client_op(
      40                 :            :   zmqpp::socket& client,
      41                 :            :   zmqpp::socket& db_p2p,
      42                 :            :   const std::string& db_name,
      43                 :            :   const std::unordered_map< std::string, zmqpp::socket >& my_peers,
      44                 :            :   std::unordered_set< std::string >& my_hashes,
      45                 :            :   zmqpp::message& msg )
      46                 :            : // *****************************************************************************
      47                 :            : //  Perform a database operation for a client
      48                 :            : //! \param[in,out] client ZMQ socket of the client
      49                 :            : //! \param[in,out] db_p2p ZMQ socket of the daemon's p2p thread
      50                 :            : //! \param[in] db_name The name of the database to operate on
      51                 :            : //! \param[in] my_peers List of this daemon's peers (address and socket)
      52                 :            : //! \param[in,out] my_hashes Set of this daemon's advertisement database hashes
      53                 :            : //! \param[in,out] msg Incoming message to answer
      54                 :            : // *****************************************************************************
      55                 :            : {
      56                 :         60 :   std::string cmd;
      57         [ +  - ]:         30 :   msg >> cmd;
      58                 :            : 
      59                 :            :   // extract hash of user auth from cmd if any, remove from cmd (and log)
      60                 :         60 :   std::string user;
      61         [ +  - ]:         30 :   auto u = cmd.rfind( "AUTH:" );
      62         [ +  + ]:         30 :   if (u != std::string::npos) {
      63         [ +  - ]:          9 :     user = cmd.substr( u + 5 );
      64         [ +  - ]:          9 :     cmd.erase( u - 1 );
      65                 :            :   }
      66                 :            : 
      67 [ +  - ][ +  - ]:         30 :   MDEBUG( "Recv msg " << cmd );
         [ +  - ][ +  - ]
                 [ +  - ]
      68                 :            : 
      69         [ -  + ]:         30 :   if (cmd == "connect") {
      70                 :            : 
      71         [ -  - ]:          0 :     zmqpp::message reply;
      72         [ -  - ]:          0 :     reply << "accept";
      73 [ -  - ][ -  - ]:          0 :     MDEBUG( "Sending reply " << cmd );
         [ -  - ][ -  - ]
                 [ -  - ]
      74         [ -  - ]:          0 :     client.send( reply );
      75                 :            : 
      76 [ +  - ][ +  + ]:         30 :   } else if (cmd[0]=='d' && cmd[1]=='b') {
         [ +  - ][ +  - ]
                 [ +  + ]
      77                 :            : 
      78         [ +  - ]:         22 :     cmd.erase( 0, 3 );
      79                 :         44 :     auto q = std::move( cmd );
      80                 :         44 :     std::string reply;
      81 [ +  - ][ +  + ]:         22 :     if (q[0]=='q' && q[1]=='u' && q[2]=='e' && q[3]=='r' && q[4]=='y') {
         [ +  - ][ +  - ]
         [ +  - ][ +  - ]
         [ +  - ][ +  - ]
         [ +  - ][ +  - ]
                 [ +  + ]
      82                 :            : 
      83         [ +  - ]:          1 :       q.erase( 0, 6 );
      84         [ +  - ]:          1 :       reply = piac::db_query( db_name, std::move(q) );
      85                 :            : 
      86 [ +  - ][ +  + ]:         21 :     } else if (q[0]=='a' && q[1]=='d' && q[2]=='d') {
         [ +  - ][ +  - ]
         [ +  - ][ +  - ]
                 [ +  + ]
      87                 :            : 
      88         [ +  - ]:          6 :       q.erase( 0, 4 );
      89         [ -  + ]:          6 :       assert( not user.empty() );
      90         [ +  - ]:          6 :       reply = piac::db_add( user, db_name, std::move(q), my_hashes );
      91 [ +  - ][ +  - ]:          6 :       MDEBUG( "Number of documents: " <<piac::get_doccount( db_name ) );
         [ +  - ][ +  - ]
         [ +  - ][ +  - ]
                 [ +  - ]
      92         [ +  - ]:          6 :       db_update_hashes( db_name, my_hashes );
      93         [ +  - ]:         12 :       zmqpp::message note;
      94         [ +  - ]:          6 :       note << "NEW";
      95         [ +  - ]:          6 :       db_p2p.send( note );
      96 [ +  - ][ +  - ]:          6 :       MDEBUG( "Sent note on new documents" );
         [ +  - ][ +  - ]
      97                 :            : 
      98 [ +  - ][ +  + ]:         15 :     } else if (q[0]=='r' && q[1]=='m') {
         [ +  - ][ +  - ]
                 [ +  + ]
      99                 :            : 
     100         [ +  - ]:          3 :       q.erase( 0, 3 );
     101         [ -  + ]:          3 :       assert( not user.empty() );
     102         [ +  - ]:          3 :       reply = piac::db_rm( user, db_name, std::move(q), my_hashes );
     103 [ +  - ][ +  - ]:          3 :       MDEBUG( "Number of documents: " << piac::get_doccount( db_name ) );
         [ +  - ][ +  - ]
         [ +  - ][ +  - ]
                 [ +  - ]
     104         [ +  - ]:          3 :       db_update_hashes( db_name, my_hashes );
     105         [ +  - ]:          6 :       zmqpp::message note;
     106         [ +  - ]:          3 :       note << "NEW";
     107         [ +  - ]:          3 :       db_p2p.send( note );
     108 [ +  - ][ +  - ]:          3 :       MDEBUG( "Sent note on removed documents" );
         [ +  - ][ +  - ]
     109                 :            : 
     110 [ +  - ][ +  - ]:         12 :     } else if (q[0]=='l' && q[1]=='i' && q[2]=='s' && q[3]=='t') {
         [ +  - ][ +  - ]
         [ +  - ][ +  - ]
         [ +  - ][ +  - ]
                 [ +  - ]
     111                 :            : 
     112         [ +  - ]:         12 :       q.erase( 0, 5 );
     113         [ +  - ]:         12 :       reply = piac::db_list( db_name, std::move(q) );
     114                 :            : 
     115                 :            :     } else {
     116                 :            : 
     117         [ -  - ]:          0 :       reply = "unknown command";
     118                 :            : 
     119                 :            :     }
     120         [ +  - ]:         22 :     client.send( reply );
     121                 :            : 
     122         [ +  - ]:          8 :   } else if (cmd == "peers") {
     123                 :            : 
     124         [ +  - ]:         16 :     zmqpp::message reply;
     125         [ +  + ]:          8 :     if (not my_peers.empty()) {
     126         [ +  - ]:          3 :       std::stringstream peers_list;
     127 [ +  + ][ +  - ]:          9 :       for (const auto& [addr,sock] : my_peers) peers_list << addr << ' ';
                 [ +  - ]
     128 [ +  - ][ +  - ]:          3 :       reply << peers_list.str();
     129                 :            :     } else {
     130         [ +  - ]:          5 :       reply << "No peers";
     131                 :            :     }
     132         [ +  - ]:          8 :     client.send( reply );
     133                 :            : 
     134                 :            :   } else {
     135                 :            : 
     136 [ -  - ][ -  - ]:          0 :     MERROR( "unknown command" );
         [ -  - ][ -  - ]
     137 [ -  - ][ -  - ]:          0 :     client.send( "unknown command" );
     138                 :            : 
     139                 :            :   }
     140                 :         30 : }
     141                 :            : 
     142                 :            : void
     143                 :          6 : piac::db_peer_op( const std::string& db_name,
     144                 :            :                   zmqpp::message& msg,
     145                 :            :                   zmqpp::socket& db_p2p,
     146                 :            :                   std::unordered_set< std::string >& my_hashes )
     147                 :            : // *****************************************************************************
     148                 :            : //  Perform an operation for a peer
     149                 :            : //! \param[in] db_name The name of the database to operate on
     150                 :            : //! \param[in,out] msg Incoming message to answer
     151                 :            : //! \param[in,out] db_p2p ZMQ socket of the daemon's p2p thread
     152                 :            : //! \param[in,out] my_hashes Set of this daemon's advertisement database hashes
     153                 :            : // *****************************************************************************
     154                 :            : {
     155                 :         12 :   std::string cmd;
     156         [ +  - ]:          6 :   msg >> cmd;
     157 [ +  - ][ +  - ]:          6 :   MDEBUG( "Recv msg: " << cmd );
         [ +  - ][ +  - ]
                 [ +  - ]
     158                 :            : 
     159         [ +  + ]:          6 :   if (cmd == "GET") {
     160                 :            : 
     161                 :          6 :     std::string addr, size;
     162 [ +  - ][ +  - ]:          3 :     msg >> addr >> size;
     163         [ +  - ]:          3 :     std::size_t num = stoul( size );
     164         [ -  + ]:          3 :     assert( num > 0 );
     165                 :          6 :     std::vector< std::string > hashes;
     166         [ +  + ]:          6 :     while (num-- != 0) {
     167                 :          6 :       std::string h;
     168         [ +  - ]:          3 :       msg >> h;
     169         [ +  - ]:          3 :       hashes.emplace_back( std::move(h) );
     170                 :            :     }
     171                 :            : 
     172         [ +  - ]:          6 :     auto docs = piac::db_get_docs( db_name, hashes );
     173 [ +  - ][ +  - ]:          3 :     MDEBUG( "Looked up " << docs.size() << " hashes" );
         [ +  - ][ +  - ]
         [ +  - ][ +  - ]
     174                 :            : 
     175         [ +  - ]:          6 :     zmqpp::message reply;
     176 [ +  - ][ +  - ]:          3 :     reply << "PUT" << addr << std::to_string( docs.size() );
         [ +  - ][ +  - ]
     177 [ +  + ][ +  - ]:          6 :     for (const auto& d : docs) reply << d;
     178         [ +  - ]:          3 :     db_p2p.send( reply );
     179 [ +  - ][ +  - ]:          3 :     MDEBUG( "Sending " << docs.size() << " entries" );
         [ +  - ][ +  - ]
         [ +  - ][ +  - ]
     180                 :            : 
     181         [ +  - ]:          3 :   } else if (cmd == "INS") {
     182                 :            : 
     183                 :          6 :     std::string size;
     184         [ +  - ]:          3 :     msg >> size;
     185         [ +  - ]:          3 :     std::size_t num = stoul( size );
     186         [ -  + ]:          3 :     assert( num > 0 );
     187                 :          6 :     std::vector< std::string > docs;
     188         [ +  + ]:          6 :     while (num-- != 0) {
     189                 :          6 :       std::string doc;
     190         [ +  - ]:          3 :       msg >> doc;
     191         [ +  - ]:          6 :       auto hash = sha256( doc );
     192 [ +  - ][ +  + ]:          3 :       if (my_hashes.find(hash) == end(my_hashes)) {
     193         [ +  - ]:          2 :         docs.emplace_back( std::move(doc) );
     194                 :            :       }
     195                 :            :     }
     196         [ +  + ]:          3 :     if (not docs.empty()) {
     197         [ +  - ]:          2 :       piac::db_put_docs( db_name, docs );
     198 [ +  - ][ +  - ]:          2 :       MDEBUG(  "Inserted " << docs.size() << " entries to db" );
         [ +  - ][ +  - ]
         [ +  - ][ +  - ]
     199 [ +  - ][ +  - ]:          2 :       auto ndoc = piac::get_doccount( db_name );
     200 [ +  - ][ +  - ]:          2 :       MDEBUG( "Number of documents: " << ndoc );
         [ +  - ][ +  - ]
                 [ +  - ]
     201         [ +  - ]:          2 :       db_update_hashes( db_name, my_hashes );
     202         [ +  - ]:          4 :       zmqpp::message reply;
     203         [ +  - ]:          2 :       reply << "NEW";
     204         [ +  - ]:          2 :       db_p2p.send( reply );
     205 [ +  - ][ +  - ]:          2 :       MDEBUG( "Sent note on new documents" );
         [ +  - ][ +  - ]
     206                 :            :     }
     207                 :            : 
     208                 :            :   } else {
     209                 :            : 
     210 [ -  - ][ -  - ]:          0 :     MERROR( "unknown cmd" );
         [ -  - ][ -  - ]
     211                 :            : 
     212                 :            :   }
     213                 :          6 : }
     214                 :            : 
     215                 :            : [[noreturn]] void
     216                 :         11 : piac::db_thread(
     217                 :            :   zmqpp::context& ctx_db,
     218                 :            :   const std::string& db_name,
     219                 :            :   int rpc_port,
     220                 :            :   bool use_strict_ports,
     221                 :            :   const std::unordered_map< std::string, zmqpp::socket >& my_peers,
     222                 :            :   std::unordered_set< std::string >& my_hashes,
     223                 :            :   int rpc_secure,
     224                 :            :   const zmqpp::curve::keypair& rpc_server_keys,
     225                 :            :   const std::vector< std::string >& rpc_authorized_clients )
     226                 :            : // *****************************************************************************
     227                 :            : //  Entry point to thread to perform database operations
     228                 :            : //! \param[in,out] ctx_db ZMQ context used for communication with the db thread
     229                 :            : //! \param[in] db_name The name of the database to operate on
     230                 :            : //! \param[in] rpc_port Port to use for client communication
     231                 :            : //! \param[in] use_strict_ports True to try only the default port
     232                 :            : //! \param[in] my_peers List of this daemon's peers (address and socket)
     233                 :            : //! \param[in,out] my_hashes Set of this daemon's advertisement database hashes
     234                 :            : //! \param[in] rpc_secure Non-zero to use secure client communication
     235                 :            : //! \param[in] rpc_server_keys CurveMQ keypair to use for secure client comm.
     236                 :            : //! \param[in] rpc_authorized_clients Only communicate with these clients if
     237                 :            : //!    secure communication is used to talk to clients
     238                 :            : //! \see http://curvezmq.org
     239                 :            : //! \see http://www.evilpaul.org/wp/2017/05/02/authentication-encryption-zeromq
     240                 :            : // *****************************************************************************
     241                 :            : {
     242 [ +  - ][ +  - ]:         11 :   MLOG_SET_THREAD_NAME( "db" );
     243 [ +  - ][ +  - ]:         11 :   MINFO( "db thread initialized" );
         [ +  - ][ +  - ]
     244 [ +  - ][ +  - ]:         11 :   MINFO( "Using database: " << db_name );
         [ +  - ][ +  - ]
                 [ +  - ]
     245                 :            : 
     246                 :            :   // initially optionally populate database
     247 [ +  - ][ +  - ]:         11 :   auto ndoc = piac::get_doccount( db_name );
     248 [ +  - ][ +  - ]:         11 :   MINFO( "Initial number of documents: " << ndoc );
         [ +  - ][ +  - ]
                 [ +  - ]
     249                 :            : 
     250                 :            :   // initially query hashes of db entries
     251         [ +  - ]:         11 :   db_update_hashes( db_name, my_hashes );
     252                 :            : 
     253         [ +  - ]:         11 :   zmqpp::context ctx_rpc;
     254                 :            : 
     255                 :            :   // configure secure socket that will listen to clients and bind to RPC port
     256         [ +  - ]:         11 :   zmqpp::auth authenticator( ctx_rpc );
     257         [ +  - ]:         11 :   authenticator.set_verbose( true );
     258 [ +  - ][ +  - ]:         11 :   authenticator.configure_domain( "*" );
     259 [ +  - ][ +  - ]:         11 :   authenticator.allow( "127.0.0.1" );
     260         [ +  + ]:         11 :   if (rpc_secure) {
     261         [ +  + ]:          3 :     if (rpc_authorized_clients.empty()) {    // stonehouse
     262 [ +  - ][ +  - ]:          2 :       authenticator.configure_curve( "CURVE_ALLOW_ANY" );
     263                 :            :     } else {                                  // ironhouse
     264         [ +  + ]:          3 :       for (const auto& cpk : rpc_authorized_clients) {
     265         [ +  - ]:          2 :         authenticator.configure_curve( cpk );
     266                 :            :       }
     267                 :            :     }
     268                 :            :   }
     269                 :            :   // create socket that will listen to clients via RPC
     270         [ +  - ]:         11 :   zmqpp::socket client( ctx_rpc, zmqpp::socket_type::reply );
     271         [ +  + ]:         11 :   if (rpc_secure) {
     272         [ +  - ]:          3 :     client.set( zmqpp::socket_option::identity, "IDENT" );
     273                 :          3 :     int as_server = 1;
     274         [ +  - ]:          3 :     client.set( zmqpp::socket_option::curve_server, as_server );
     275         [ +  - ]:          3 :     client.set( zmqpp::socket_option::curve_secret_key,
     276         [ +  - ]:          6 :                 rpc_server_keys.secret_key );
     277                 :            :   }
     278         [ +  - ]:         11 :   try_bind( client, rpc_port, 10, use_strict_ports );
     279                 :            : 
     280 [ +  - ][ +  - ]:         11 :   MINFO( "Bound to RPC port " << rpc_port );
         [ +  - ][ +  - ]
                 [ +  - ]
     281         [ +  - ]:         11 :   epee::set_console_color( epee::console_color_yellow, /* bright = */ false );
     282 [ +  - ][ +  - ]:         11 :   std::cout << "Bound to RPC port " << rpc_port << '\n';
                 [ +  - ]
     283         [ +  - ]:         11 :   epee::set_console_color( epee::console_color_default, /* bright = */ false );
     284                 :            : 
     285                 :            :   // create socket that will listen to requests for db lookups from peers
     286         [ +  - ]:         11 :   zmqpp::socket db_p2p( ctx_db, zmqpp::socket_type::pair );
     287 [ +  - ][ +  - ]:         11 :   db_p2p.bind( "inproc://db_p2p" );
     288 [ +  - ][ +  - ]:         11 :   MDEBUG( "Bound to inproc:://db_p2p" );
         [ +  - ][ +  - ]
     289                 :            : 
     290                 :            :   // listen to messages
     291         [ +  - ]:         11 :   zmqpp::poller poller;
     292         [ +  - ]:         11 :   poller.add( db_p2p );
     293                 :            :   while (1) {
     294                 :            : 
     295         [ +  - ]:      12655 :     zmqpp::message msg;
     296 [ +  - ][ +  + ]:       6333 :     if (client.receive( msg, /* dont_block = */ true )) {
     297         [ +  - ]:         30 :       db_client_op( client, db_p2p, db_name, my_peers, my_hashes, msg );
     298                 :            :     }
     299                 :            : 
     300 [ +  - ][ +  + ]:       6333 :     if (poller.poll(100)) {
     301 [ +  - ][ +  - ]:          6 :       if (poller.has_input( db_p2p )) {
     302         [ +  - ]:         12 :         zmqpp::message m;
     303         [ +  - ]:          6 :         db_p2p.receive( m );
     304         [ +  - ]:          6 :         db_peer_op( db_name, m, db_p2p, my_hashes );
     305                 :            :       }
     306                 :            :     }
     307                 :       6322 :   }
     308                 :            : }

Generated by: LCOV version 1.14