Branch data Line data Source code
1 : : // *****************************************************************************
2 : : /*!
3 : : \file src/daemon_db_thread.cpp
4 : : \copyright 2022-2025 J. Bakosi,
5 : : All rights reserved. See the LICENSE file for details.
6 : : \brief Piac daemon interaction with database
7 : : */
8 : : // *****************************************************************************
9 : :
10 : : #include <mutex>
11 : : #include <condition_variable>
12 : :
13 : : #include "db.hpp"
14 : : #include "logging_util.hpp"
15 : : #include "crypto_util.hpp"
16 : : #include "zmq_util.hpp"
17 : : #include "daemon_db_thread.hpp"
18 : :
19 : : void
20 : 22 : piac::db_update_hashes( const std::string& db_name,
21 : : std::unordered_set< std::string >& my_hashes )
22 : : // *****************************************************************************
23 : : // Update advertisement database hashes
24 : : //! \param[in] db_name The name of the database to query for the hashes
25 : : //! \param[in] my_hashes Set of advertisement database hashes to update
26 : : // *****************************************************************************
27 : : {
28 [ + - ]: 44 : auto hashes = piac::db_list_hash( db_name, /* inhex = */ false );
29 [ + - ]: 44 : std::lock_guard lock( g_hashes_mtx );
30 : 22 : g_hashes_access = false;
31 : 22 : my_hashes.clear();
32 [ + + ][ + - ]: 68 : for (auto&& h : hashes) my_hashes.insert( std::move(h) );
33 : 22 : g_hashes_access = true;
34 : 22 : g_hashes_cv.notify_one();
35 [ + - ][ + - ]: 22 : MDEBUG( "Number of db hashes: " << my_hashes.size() );
[ + - ][ + - ]
[ + - ]
36 : 22 : }
37 : :
38 : : void
39 : 30 : piac::db_client_op(
40 : : zmqpp::socket& client,
41 : : zmqpp::socket& db_p2p,
42 : : const std::string& db_name,
43 : : const std::unordered_map< std::string, zmqpp::socket >& my_peers,
44 : : std::unordered_set< std::string >& my_hashes,
45 : : zmqpp::message& msg )
46 : : // *****************************************************************************
47 : : // Perform a database operation for a client
48 : : //! \param[in,out] client ZMQ socket of the client
49 : : //! \param[in,out] db_p2p ZMQ socket of the daemon's p2p thread
50 : : //! \param[in] db_name The name of the database to operate on
51 : : //! \param[in] my_peers List of this daemon's peers (address and socket)
52 : : //! \param[in,out] my_hashes Set of this daemon's advertisement database hashes
53 : : //! \param[in,out] msg Incoming message to answer
54 : : // *****************************************************************************
55 : : {
56 : 60 : std::string cmd;
57 [ + - ]: 30 : msg >> cmd;
58 : :
59 : : // extract hash of user auth from cmd if any, remove from cmd (and log)
60 : 60 : std::string user;
61 [ + - ]: 30 : auto u = cmd.rfind( "AUTH:" );
62 [ + + ]: 30 : if (u != std::string::npos) {
63 [ + - ]: 9 : user = cmd.substr( u + 5 );
64 [ + - ]: 9 : cmd.erase( u - 1 );
65 : : }
66 : :
67 [ + - ][ + - ]: 30 : MDEBUG( "Recv msg " << cmd );
[ + - ][ + - ]
[ + - ]
68 : :
69 [ - + ]: 30 : if (cmd == "connect") {
70 : :
71 [ - - ]: 0 : zmqpp::message reply;
72 [ - - ]: 0 : reply << "accept";
73 [ - - ][ - - ]: 0 : MDEBUG( "Sending reply " << cmd );
[ - - ][ - - ]
[ - - ]
74 [ - - ]: 0 : client.send( reply );
75 : :
76 [ + - ][ + + ]: 30 : } else if (cmd[0]=='d' && cmd[1]=='b') {
[ + - ][ + - ]
[ + + ]
77 : :
78 [ + - ]: 22 : cmd.erase( 0, 3 );
79 : 44 : auto q = std::move( cmd );
80 : 44 : std::string reply;
81 [ + - ][ + + ]: 22 : if (q[0]=='q' && q[1]=='u' && q[2]=='e' && q[3]=='r' && q[4]=='y') {
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ][ + - ]
[ + + ]
82 : :
83 [ + - ]: 1 : q.erase( 0, 6 );
84 [ + - ]: 1 : reply = piac::db_query( db_name, std::move(q) );
85 : :
86 [ + - ][ + + ]: 21 : } else if (q[0]=='a' && q[1]=='d' && q[2]=='d') {
[ + - ][ + - ]
[ + - ][ + - ]
[ + + ]
87 : :
88 [ + - ]: 6 : q.erase( 0, 4 );
89 [ - + ]: 6 : assert( not user.empty() );
90 [ + - ]: 6 : reply = piac::db_add( user, db_name, std::move(q), my_hashes );
91 [ + - ][ + - ]: 6 : MDEBUG( "Number of documents: " <<piac::get_doccount( db_name ) );
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ]
92 [ + - ]: 6 : db_update_hashes( db_name, my_hashes );
93 [ + - ]: 12 : zmqpp::message note;
94 [ + - ]: 6 : note << "NEW";
95 [ + - ]: 6 : db_p2p.send( note );
96 [ + - ][ + - ]: 6 : MDEBUG( "Sent note on new documents" );
[ + - ][ + - ]
97 : :
98 [ + - ][ + + ]: 15 : } else if (q[0]=='r' && q[1]=='m') {
[ + - ][ + - ]
[ + + ]
99 : :
100 [ + - ]: 3 : q.erase( 0, 3 );
101 [ - + ]: 3 : assert( not user.empty() );
102 [ + - ]: 3 : reply = piac::db_rm( user, db_name, std::move(q), my_hashes );
103 [ + - ][ + - ]: 3 : MDEBUG( "Number of documents: " << piac::get_doccount( db_name ) );
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ]
104 [ + - ]: 3 : db_update_hashes( db_name, my_hashes );
105 [ + - ]: 6 : zmqpp::message note;
106 [ + - ]: 3 : note << "NEW";
107 [ + - ]: 3 : db_p2p.send( note );
108 [ + - ][ + - ]: 3 : MDEBUG( "Sent note on removed documents" );
[ + - ][ + - ]
109 : :
110 [ + - ][ + - ]: 12 : } else if (q[0]=='l' && q[1]=='i' && q[2]=='s' && q[3]=='t') {
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ]
111 : :
112 [ + - ]: 12 : q.erase( 0, 5 );
113 [ + - ]: 12 : reply = piac::db_list( db_name, std::move(q) );
114 : :
115 : : } else {
116 : :
117 [ - - ]: 0 : reply = "unknown command";
118 : :
119 : : }
120 [ + - ]: 22 : client.send( reply );
121 : :
122 [ + - ]: 8 : } else if (cmd == "peers") {
123 : :
124 [ + - ]: 16 : zmqpp::message reply;
125 [ + + ]: 8 : if (not my_peers.empty()) {
126 [ + - ]: 3 : std::stringstream peers_list;
127 [ + + ][ + - ]: 9 : for (const auto& [addr,sock] : my_peers) peers_list << addr << ' ';
[ + - ]
128 [ + - ][ + - ]: 3 : reply << peers_list.str();
129 : : } else {
130 [ + - ]: 5 : reply << "No peers";
131 : : }
132 [ + - ]: 8 : client.send( reply );
133 : :
134 : : } else {
135 : :
136 [ - - ][ - - ]: 0 : MERROR( "unknown command" );
[ - - ][ - - ]
137 [ - - ][ - - ]: 0 : client.send( "unknown command" );
138 : :
139 : : }
140 : 30 : }
141 : :
142 : : void
143 : 6 : piac::db_peer_op( const std::string& db_name,
144 : : zmqpp::message& msg,
145 : : zmqpp::socket& db_p2p,
146 : : std::unordered_set< std::string >& my_hashes )
147 : : // *****************************************************************************
148 : : // Perform an operation for a peer
149 : : //! \param[in] db_name The name of the database to operate on
150 : : //! \param[in,out] msg Incoming message to answer
151 : : //! \param[in,out] db_p2p ZMQ socket of the daemon's p2p thread
152 : : //! \param[in,out] my_hashes Set of this daemon's advertisement database hashes
153 : : // *****************************************************************************
154 : : {
155 : 12 : std::string cmd;
156 [ + - ]: 6 : msg >> cmd;
157 [ + - ][ + - ]: 6 : MDEBUG( "Recv msg: " << cmd );
[ + - ][ + - ]
[ + - ]
158 : :
159 [ + + ]: 6 : if (cmd == "GET") {
160 : :
161 : 6 : std::string addr, size;
162 [ + - ][ + - ]: 3 : msg >> addr >> size;
163 [ + - ]: 3 : std::size_t num = stoul( size );
164 [ - + ]: 3 : assert( num > 0 );
165 : 6 : std::vector< std::string > hashes;
166 [ + + ]: 6 : while (num-- != 0) {
167 : 6 : std::string h;
168 [ + - ]: 3 : msg >> h;
169 [ + - ]: 3 : hashes.emplace_back( std::move(h) );
170 : : }
171 : :
172 [ + - ]: 6 : auto docs = piac::db_get_docs( db_name, hashes );
173 [ + - ][ + - ]: 3 : MDEBUG( "Looked up " << docs.size() << " hashes" );
[ + - ][ + - ]
[ + - ][ + - ]
174 : :
175 [ + - ]: 6 : zmqpp::message reply;
176 [ + - ][ + - ]: 3 : reply << "PUT" << addr << std::to_string( docs.size() );
[ + - ][ + - ]
177 [ + + ][ + - ]: 6 : for (const auto& d : docs) reply << d;
178 [ + - ]: 3 : db_p2p.send( reply );
179 [ + - ][ + - ]: 3 : MDEBUG( "Sending " << docs.size() << " entries" );
[ + - ][ + - ]
[ + - ][ + - ]
180 : :
181 [ + - ]: 3 : } else if (cmd == "INS") {
182 : :
183 : 6 : std::string size;
184 [ + - ]: 3 : msg >> size;
185 [ + - ]: 3 : std::size_t num = stoul( size );
186 [ - + ]: 3 : assert( num > 0 );
187 : 6 : std::vector< std::string > docs;
188 [ + + ]: 6 : while (num-- != 0) {
189 : 6 : std::string doc;
190 [ + - ]: 3 : msg >> doc;
191 [ + - ]: 6 : auto hash = sha256( doc );
192 [ + - ][ + + ]: 3 : if (my_hashes.find(hash) == end(my_hashes)) {
193 [ + - ]: 2 : docs.emplace_back( std::move(doc) );
194 : : }
195 : : }
196 [ + + ]: 3 : if (not docs.empty()) {
197 [ + - ]: 2 : piac::db_put_docs( db_name, docs );
198 [ + - ][ + - ]: 2 : MDEBUG( "Inserted " << docs.size() << " entries to db" );
[ + - ][ + - ]
[ + - ][ + - ]
199 [ + - ][ + - ]: 2 : auto ndoc = piac::get_doccount( db_name );
200 [ + - ][ + - ]: 2 : MDEBUG( "Number of documents: " << ndoc );
[ + - ][ + - ]
[ + - ]
201 [ + - ]: 2 : db_update_hashes( db_name, my_hashes );
202 [ + - ]: 4 : zmqpp::message reply;
203 [ + - ]: 2 : reply << "NEW";
204 [ + - ]: 2 : db_p2p.send( reply );
205 [ + - ][ + - ]: 2 : MDEBUG( "Sent note on new documents" );
[ + - ][ + - ]
206 : : }
207 : :
208 : : } else {
209 : :
210 [ - - ][ - - ]: 0 : MERROR( "unknown cmd" );
[ - - ][ - - ]
211 : :
212 : : }
213 : 6 : }
214 : :
215 : : [[noreturn]] void
216 : 11 : piac::db_thread(
217 : : zmqpp::context& ctx_db,
218 : : const std::string& db_name,
219 : : int rpc_port,
220 : : bool use_strict_ports,
221 : : const std::unordered_map< std::string, zmqpp::socket >& my_peers,
222 : : std::unordered_set< std::string >& my_hashes,
223 : : int rpc_secure,
224 : : const zmqpp::curve::keypair& rpc_server_keys,
225 : : const std::vector< std::string >& rpc_authorized_clients )
226 : : // *****************************************************************************
227 : : // Entry point to thread to perform database operations
228 : : //! \param[in,out] ctx_db ZMQ context used for communication with the db thread
229 : : //! \param[in] db_name The name of the database to operate on
230 : : //! \param[in] rpc_port Port to use for client communication
231 : : //! \param[in] use_strict_ports True to try only the default port
232 : : //! \param[in] my_peers List of this daemon's peers (address and socket)
233 : : //! \param[in,out] my_hashes Set of this daemon's advertisement database hashes
234 : : //! \param[in] rpc_secure Non-zero to use secure client communication
235 : : //! \param[in] rpc_server_keys CurveMQ keypair to use for secure client comm.
236 : : //! \param[in] rpc_authorized_clients Only communicate with these clients if
237 : : //! secure communication is used to talk to clients
238 : : //! \see http://curvezmq.org
239 : : //! \see http://www.evilpaul.org/wp/2017/05/02/authentication-encryption-zeromq
240 : : // *****************************************************************************
241 : : {
242 [ + - ][ + - ]: 11 : MLOG_SET_THREAD_NAME( "db" );
243 [ + - ][ + - ]: 11 : MINFO( "db thread initialized" );
[ + - ][ + - ]
244 [ + - ][ + - ]: 11 : MINFO( "Using database: " << db_name );
[ + - ][ + - ]
[ + - ]
245 : :
246 : : // initially optionally populate database
247 [ + - ][ + - ]: 11 : auto ndoc = piac::get_doccount( db_name );
248 [ + - ][ + - ]: 11 : MINFO( "Initial number of documents: " << ndoc );
[ + - ][ + - ]
[ + - ]
249 : :
250 : : // initially query hashes of db entries
251 [ + - ]: 11 : db_update_hashes( db_name, my_hashes );
252 : :
253 [ + - ]: 11 : zmqpp::context ctx_rpc;
254 : :
255 : : // configure secure socket that will listen to clients and bind to RPC port
256 [ + - ]: 11 : zmqpp::auth authenticator( ctx_rpc );
257 [ + - ]: 11 : authenticator.set_verbose( true );
258 [ + - ][ + - ]: 11 : authenticator.configure_domain( "*" );
259 [ + - ][ + - ]: 11 : authenticator.allow( "127.0.0.1" );
260 [ + + ]: 11 : if (rpc_secure) {
261 [ + + ]: 3 : if (rpc_authorized_clients.empty()) { // stonehouse
262 [ + - ][ + - ]: 2 : authenticator.configure_curve( "CURVE_ALLOW_ANY" );
263 : : } else { // ironhouse
264 [ + + ]: 3 : for (const auto& cpk : rpc_authorized_clients) {
265 [ + - ]: 2 : authenticator.configure_curve( cpk );
266 : : }
267 : : }
268 : : }
269 : : // create socket that will listen to clients via RPC
270 [ + - ]: 11 : zmqpp::socket client( ctx_rpc, zmqpp::socket_type::reply );
271 [ + + ]: 11 : if (rpc_secure) {
272 [ + - ]: 3 : client.set( zmqpp::socket_option::identity, "IDENT" );
273 : 3 : int as_server = 1;
274 [ + - ]: 3 : client.set( zmqpp::socket_option::curve_server, as_server );
275 [ + - ]: 3 : client.set( zmqpp::socket_option::curve_secret_key,
276 [ + - ]: 6 : rpc_server_keys.secret_key );
277 : : }
278 [ + - ]: 11 : try_bind( client, rpc_port, 10, use_strict_ports );
279 : :
280 [ + - ][ + - ]: 11 : MINFO( "Bound to RPC port " << rpc_port );
[ + - ][ + - ]
[ + - ]
281 [ + - ]: 11 : epee::set_console_color( epee::console_color_yellow, /* bright = */ false );
282 [ + - ][ + - ]: 11 : std::cout << "Bound to RPC port " << rpc_port << '\n';
[ + - ]
283 [ + - ]: 11 : epee::set_console_color( epee::console_color_default, /* bright = */ false );
284 : :
285 : : // create socket that will listen to requests for db lookups from peers
286 [ + - ]: 11 : zmqpp::socket db_p2p( ctx_db, zmqpp::socket_type::pair );
287 [ + - ][ + - ]: 11 : db_p2p.bind( "inproc://db_p2p" );
288 [ + - ][ + - ]: 11 : MDEBUG( "Bound to inproc:://db_p2p" );
[ + - ][ + - ]
289 : :
290 : : // listen to messages
291 [ + - ]: 11 : zmqpp::poller poller;
292 [ + - ]: 11 : poller.add( db_p2p );
293 : : while (1) {
294 : :
295 [ + - ]: 12655 : zmqpp::message msg;
296 [ + - ][ + + ]: 6333 : if (client.receive( msg, /* dont_block = */ true )) {
297 [ + - ]: 30 : db_client_op( client, db_p2p, db_name, my_peers, my_hashes, msg );
298 : : }
299 : :
300 [ + - ][ + + ]: 6333 : if (poller.poll(100)) {
301 [ + - ][ + - ]: 6 : if (poller.has_input( db_p2p )) {
302 [ + - ]: 12 : zmqpp::message m;
303 [ + - ]: 6 : db_p2p.receive( m );
304 [ + - ]: 6 : db_peer_op( db_name, m, db_p2p, my_hashes );
305 : : }
306 : : }
307 : 6322 : }
308 : : }
|