Branch data Line data Source code
1 : : // *****************************************************************************
2 : : /*!
3 : : \file src/daemon_p2p_thread.cpp
4 : : \copyright 2022-2025 J. Bakosi,
5 : : All rights reserved. See the LICENSE file for details.
6 : : \brief Piac daemon peer-to-peer communication
7 : : */
8 : : // *****************************************************************************
9 : :
10 : : #include <mutex>
11 : : #include <condition_variable>
12 : :
13 : : #include "logging_util.hpp"
14 : : #include "crypto_util.hpp"
15 : : #include "zmq_util.hpp"
16 : : #include "daemon_p2p_thread.hpp"
17 : :
18 : : namespace piac {
19 : :
20 : : extern std::mutex g_hashes_mtx;
21 : : extern std::condition_variable g_hashes_cv;
22 : : extern bool g_hashes_access;
23 : :
24 : : } // piac::
25 : :
26 : : zmqpp::socket
27 : 12 : piac::p2p_connect_peer( zmqpp::context& ctx, const std::string& addr )
28 : : // *****************************************************************************
29 : : // Create ZeroMQ socket and onnect to peer piac daemon
30 : : //! \param[in,out] ctx ZeroMQ socket contex
31 : : //! \param[in] addr Address (hostname or IP + port) of peer to connect to
32 : : //! \return ZeroMQ socket created
33 : : // *****************************************************************************
34 : : {
35 : : // create socket to connect to peer
36 : 12 : zmqpp::socket dealer( ctx, zmqpp::socket_type::dealer );
37 [ + - ][ + - ]: 12 : dealer.connect( "tcp://" + addr );
38 [ + - ][ + - ]: 36 : MDEBUG( "Connecting to peer at " + addr );
[ + - ][ + - ]
39 : 12 : return dealer;
40 : : }
41 : :
42 : : void
43 : 72 : piac::p2p_bcast_peers(
44 : : int p2p_port,
45 : : std::unordered_map< std::string, zmqpp::socket >& my_peers,
46 : : bool& to_bcast_peers )
47 : : // *****************************************************************************
48 : : // Broadcast to peers
49 : : //! \param[in] p2p_port Peer-to-peer port to use
50 : : //! \param[in] my_peers List of peers (address and socket) to broadcast to
51 : : //! \param[in,out] to_bcast_peers True to broadcast, false to not
52 : : // *****************************************************************************
53 : : {
54 [ + + ]: 72 : if (not to_bcast_peers) return;
55 : :
56 [ + + ]: 37 : for (auto& [addr,sock] : my_peers) {
57 : 36 : zmqpp::message msg;
58 [ + - ]: 18 : msg << "PEER";
59 [ + - ][ + - ]: 18 : msg << std::to_string( my_peers.size() + 1 );
60 [ + - ][ + - ]: 36 : msg << "localhost:" + std::to_string(p2p_port);
[ + - ][ - + ]
[ - - ]
61 [ + + ][ + - ]: 48 : for (const auto& [taddr,sock] : my_peers) msg << taddr;
62 [ + - ]: 18 : sock.send( msg );
63 : : }
64 : :
65 : 19 : to_bcast_peers = false;
66 : : }
67 : :
68 : : void
69 : 72 : piac::p2p_bcast_hashes(
70 : : int p2p_port,
71 : : std::unordered_map< std::string, zmqpp::socket >& my_peers,
72 : : const std::unordered_set< std::string >& my_hashes,
73 : : bool& to_bcast_hashes )
74 : : // *****************************************************************************
75 : : // Broadcast advertisement database hashes to peers
76 : : //! \param[in] p2p_port Peer-to-peer port to use
77 : : //! \param[in,out] my_peers List of peers (address and socket) to broadcast to
78 : : //! \param[in] my_hashes Set of advertisement database hashes to broadcast
79 : : //! \param[in,out] to_bcast_hashes True to broadcast, false to not
80 : : // *****************************************************************************
81 : : {
82 [ + + ]: 72 : if (not to_bcast_hashes) return;
83 : :
84 : : {
85 : : std::unique_lock lock( g_hashes_mtx );
86 [ - + ]: 30 : g_hashes_cv.wait( lock, []{ return g_hashes_access; } );
87 : : }
88 : :
89 [ + + ]: 54 : for (auto& [addr,sock] : my_peers) {
90 : 48 : zmqpp::message msg;
91 [ + - ]: 24 : msg << "HASH";
92 [ + - ][ + - ]: 48 : msg << "localhost:" + std::to_string(p2p_port);
[ + - ][ - + ]
[ + - ][ - - ]
93 [ + - ][ + - ]: 48 : msg << std::to_string( my_hashes.size() );
94 [ + + ][ + - ]: 30 : for (const auto& h : my_hashes) msg << h;
95 [ + - ]: 24 : sock.send( msg );
96 [ + - ][ + - ]: 48 : MDEBUG( "Broadcasting " << my_hashes.size() << " hashes to " << addr );
[ + - ]
97 : : }
98 : :
99 : 30 : to_bcast_hashes = false;
100 : : }
101 : :
102 : : void
103 : 72 : piac::p2p_send_db_requests(
104 : : int p2p_port,
105 : : std::unordered_map< std::string, zmqpp::socket >& my_peers,
106 : : std::unordered_map< std::string,
107 : : std::unordered_set< std::string > >& db_requests,
108 : : bool& to_send_db_requests )
109 : : // *****************************************************************************
110 : : // Send requests for advertisement database entries to peers
111 : : //! \param[in] p2p_port Peer-to-peer port to use
112 : : //! \param[in,out] my_peers List of peers (address and socket) to broadcast to
113 : : //! \param[in,out] db_requests Multiple ad requests from multiple peers
114 : : //! \param[in,out] to_send_db_requests True to send requests, false to not
115 : : // *****************************************************************************
116 : : {
117 [ + + ]: 72 : if (not to_send_db_requests) return;
118 : :
119 [ + + ]: 6 : for (auto&& [addr,hashes] : db_requests) {
120 : 6 : zmqpp::message msg;
121 [ + - ]: 3 : msg << "REQ";
122 [ + - ][ + - ]: 6 : msg << "localhost:" + std::to_string(p2p_port);
[ + - ][ - + ]
[ + - ][ - - ]
123 [ + - ][ + - ]: 6 : msg << std::to_string( hashes.size() );
124 [ + + ][ + - ]: 6 : for (const auto& h : hashes) msg << h;
125 : : hashes.clear();
126 [ + - ]: 3 : my_peers.at( addr ).send( msg );
127 : : }
128 : :
129 : : db_requests.clear();
130 : 3 : to_send_db_requests = false;
131 : : }
132 : :
133 : : void
134 [ + - ]: 48 : piac::p2p_answer_p2p(
135 : : zmqpp::context& ctx_p2p,
136 : : zmqpp::socket& db_p2p,
137 : : zmqpp::message& msg,
138 : : std::unordered_map< std::string, zmqpp::socket >& my_peers,
139 : : const std::unordered_set< std::string >& my_hashes,
140 : : std::unordered_map< std::string,
141 : : std::unordered_set< std::string > >& db_requests,
142 : : int p2p_port,
143 : : bool& to_bcast_peers,
144 : : bool& to_bcast_hashes,
145 : : bool& to_send_db_requests )
146 : : // *****************************************************************************
147 : : // Answer peer's request
148 : : //! \param[in,out] ctx_p2p ZMQ context used for peer-to-peer communication
149 : : //! \param[in,out] db_p2p ZMQ context used for communication with the db thread
150 : : //! \param[in,out] msg Incoming message to answer
151 : : //! \param[in,out] my_peers List of this daemon's peers (address and socket)
152 : : //! \param[in] my_hashes This daemon's set of advertisement database hashes
153 : : //! \param[in,out] db_requests Store multiple ad requests from multiple peers
154 : : //! \param[in] p2p_port Peer-to-peer port to use
155 : : //! \param[in,out] to_bcast_peers True to broadcast to peers next, false to not
156 : : //! \param[in,out] to_bcast_hashes True to broadcast hashes next, false to not
157 : : //! \param[in,out] to_send_db_requests True to send db requests next, false: not
158 : : // *****************************************************************************
159 : : {
160 : : std::string id, cmd;
161 : : msg >> id >> cmd;
162 [ + - ][ + - ]: 96 : MDEBUG( "Recv msg: " << cmd );
[ + - ]
163 : :
164 [ + + ]: 48 : if (cmd == "PEER") {
165 : :
166 : : std::string size;
167 : : msg >> size;
168 : : std::size_t num = stoul( size );
169 [ + + ]: 66 : while (num-- != 0) {
170 : : std::string addr;
171 : : msg >> addr;
172 [ + - ][ + - ]: 126 : if (addr != "localhost:" + std::to_string(p2p_port) &&
[ + + ][ + + ]
[ - + ][ + + ]
[ - - ]
173 : : my_peers.find(addr) == end(my_peers))
174 : : {
175 [ + - ][ - - ]: 16 : my_peers.emplace( addr, p2p_connect_peer( ctx_p2p, addr ) );
176 : 8 : to_bcast_peers = true;
177 : 8 : to_bcast_hashes = true;
178 : : }
179 : : }
180 [ + - ][ + - ]: 36 : MDEBUG( "Number of peers: " << my_peers.size() );
[ + - ][ - - ]
181 : :
182 [ + + ]: 30 : } else if (cmd == "HASH") {
183 : :
184 : : {
185 : : std::unique_lock lock( g_hashes_mtx );
186 [ - + ]: 24 : g_hashes_cv.wait( lock, []{ return g_hashes_access; } );
187 : : }
188 : : std::string from, size;
189 : : msg >> from >> size;
190 : : std::size_t num = stoul( size );
191 [ + + ]: 30 : while (num-- != 0) {
192 : : std::string hash;
193 : : msg >> hash;
194 [ + + ]: 6 : if (my_hashes.find(hash) == end(my_hashes)) {
195 : : db_requests[ from ].insert( hash );
196 : 3 : to_send_db_requests = true;
197 : : }
198 : : }
199 : : auto r = db_requests.find( from );
200 [ + - ][ + - ]: 72 : MDEBUG( "Recv " << (r != end(db_requests) ? r->second.size() : 0)
[ + - ][ + + ]
[ - - ]
201 : : << " hashes from " << from );
202 : :
203 [ + + ]: 6 : } else if (cmd == "REQ") {
204 : :
205 : : std::string addr, size;
206 : : msg >> addr >> size;
207 : : std::size_t num = stoul( size );
208 : : assert( num > 0 );
209 [ + - ]: 6 : zmqpp::message req;
210 [ + - ][ + - ]: 3 : req << "GET" << addr << size;
[ + - ]
211 [ + + ]: 6 : while (num-- != 0) {
212 : : std::string hash;
213 : : msg >> hash;
214 [ + - ]: 3 : req << hash;
215 : : }
216 [ + - ]: 3 : db_p2p.send( req );
217 [ + - ][ + - ]: 6 : MDEBUG( "Will prepare " << size << " db entries for " << addr );
[ + - ]
218 : :
219 [ + - ]: 3 : } else if (cmd == "DOC") {
220 : :
221 : : {
222 : : std::unique_lock lock( g_hashes_mtx );
223 [ - + ]: 3 : g_hashes_cv.wait( lock, []{ return g_hashes_access; } );
224 : : }
225 : : std::string size;
226 : : msg >> size;
227 [ + - ][ + - ]: 6 : MDEBUG( "Recv " << size << " db entries" );
[ + - ][ - - ]
228 : : std::size_t num = stoul( size );
229 : : assert( num > 0 );
230 : 3 : std::vector< std::string > docs_to_insert;
231 [ + + ]: 6 : while (num-- != 0) {
232 : : std::string doc;
233 : : msg >> doc;
234 [ + - ]: 3 : auto hash = sha256( doc );
235 [ + + ]: 3 : if (my_hashes.find(hash) == end(my_hashes)) {
236 [ + - ]: 2 : docs_to_insert.emplace_back( std::move(doc) );
237 : : }
238 : : }
239 [ + - ]: 6 : zmqpp::message req;
240 [ + - ][ + - ]: 6 : req << "INS" << std::to_string( docs_to_insert.size() );
[ + - ]
241 [ + + ][ + - ]: 5 : for (const auto& doc : docs_to_insert) req << doc;
242 [ + - ]: 3 : db_p2p.send( req );
243 [ + - ][ + - ]: 6 : MDEBUG( "Attempting to insert " << docs_to_insert.size() << " db entries" );
[ + - ]
244 : :
245 : : } else {
246 : :
247 [ - - ][ - - ]: 0 : MERROR( "unknown cmd" );
[ - - ][ - - ]
248 : :
249 : : }
250 : 48 : }
251 : :
252 : : void
253 [ + - ]: 14 : piac::p2p_answer_db( zmqpp::message& msg,
254 : : std::unordered_map< std::string, zmqpp::socket >& my_peers,
255 : : bool& to_bcast_hashes )
256 : : // *****************************************************************************
257 : : // Answer request from db thread
258 : : //! \param[in,out] msg Incoming message to answer
259 : : //! \param[in,out] my_peers List of this daemon's peers (address and socket)
260 : : //! \param[in,out] to_bcast_hashes True to broadcast hashes next, false to not
261 : : // *****************************************************************************
262 : : {
263 : : std::string cmd;
264 : : msg >> cmd;
265 [ + - ][ + - ]: 28 : MDEBUG( "Recv msg: " << cmd );
[ + - ]
266 : :
267 [ + + ]: 14 : if (cmd == "PUT") {
268 : :
269 : : std::string addr, size;
270 : : msg >> addr >> size;
271 [ + - ][ + - ]: 6 : MDEBUG( "Prepared " << size << " db entries for " << addr );
[ + - ][ - - ]
272 : : std::size_t num = stoul( size );
273 : : assert( num > 0 );
274 [ + - ]: 6 : zmqpp::message rep;
275 [ + - ][ + - ]: 3 : rep << "DOC" << size;
276 [ + + ]: 6 : while (num-- != 0) {
277 : : std::string doc;
278 : : msg >> doc;
279 [ + - ]: 3 : rep << doc;
280 : : }
281 [ + - ]: 3 : my_peers.at( addr ).send( rep );
282 [ + - ][ + - ]: 6 : MDEBUG( "Sent back " << size << " db entries to " << addr );
[ + - ]
283 : :
284 [ + - ]: 11 : } else if (cmd == "NEW") {
285 : :
286 : 11 : to_bcast_hashes = true;
287 : :
288 : : } else {
289 : :
290 [ - - ][ - - ]: 0 : MERROR( "unknown cmd" );
[ - - ][ - - ]
291 : :
292 : : }
293 : 14 : }
294 : :
295 : : [[noreturn]] void
296 : 11 : piac::p2p_thread( zmqpp::context& ctx_p2p,
297 : : zmqpp::context& ctx_db,
298 : : std::unordered_map< std::string, zmqpp::socket >& my_peers,
299 : : const std::unordered_set< std::string >& my_hashes,
300 : : int default_p2p_port,
301 : : int p2p_port,
302 : : bool use_strict_ports )
303 : : // *****************************************************************************
304 : : // Entry point to thread to communicate with peers
305 : : //! \param[in,out] ctx_p2p ZMQ context used for peer-to-peer communication
306 : : //! \param[in,out] ctx_db ZMQ context used for communication with the db thread
307 : : //! \param[in,out] my_peers List of this daemon's peers (address and socket)
308 : : //! \param[in] my_hashes This daemon's set of advertisement database hashes
309 : : //! \param[in] default_p2p_port Port to use by default for peer communication
310 : : //! \param[in] p2p_port Port that is used for peer communication
311 : : //! \param[in] use_strict_ports True to try only the default port
312 : : // *****************************************************************************
313 : : {
314 : 22 : MLOG_SET_THREAD_NAME( "p2p" );
315 [ + - ][ + - ]: 22 : MINFO( "p2p thread initialized" );
316 : :
317 : : // create socket that will listen to peers and bind to p2p port
318 : 11 : zmqpp::socket router( ctx_p2p, zmqpp::socket_type::router );
319 [ + - ]: 11 : try_bind( router, p2p_port, 10, use_strict_ports );
320 [ + - ][ + - ]: 33 : MINFO( "Bound to P2P port " << p2p_port );
[ + - ][ + - ]
321 : :
322 : : // remove our address from peer list
323 [ + - ][ + - ]: 22 : my_peers.erase( "localhost:" + std::to_string(p2p_port) );
[ - + ]
324 : : // add default peers
325 [ - + ]: 11 : for (int p = default_p2p_port; p < p2p_port; ++p)
326 [ - - ][ - - ]: 0 : my_peers.emplace( "localhost:" + std::to_string( p ),
[ - - ][ - - ]
327 [ - - ]: 0 : zmqpp::socket( ctx_p2p, zmqpp::socket_type::dealer ) );
328 : : // initially connect to peers
329 [ + + ][ + - ]: 15 : for (auto& [addr,sock] : my_peers) sock = p2p_connect_peer( ctx_p2p, addr );
330 [ + - ][ + - ]: 22 : MDEBUG( "Initial number of peers: " << my_peers.size() );
[ + - ]
331 : :
332 : : { // log initial number of hashes (populated by db thread)
333 : : std::unique_lock lock( g_hashes_mtx );
334 [ + + ]: 18 : g_hashes_cv.wait( lock, []{ return g_hashes_access; } );
335 : : }
336 [ + - ][ + - ]: 22 : MDEBUG( "Initial number of db hashes: " << my_hashes.size() );
[ + - ]
337 : :
338 : : // create socket to send requests for db lookups from peers
339 [ + - ]: 11 : zmqpp::socket db_p2p( ctx_db, zmqpp::socket_type::pair );
340 [ + - ][ + - ]: 22 : db_p2p.connect( "inproc://db_p2p" );
[ + - ]
341 [ + - ][ + - ]: 22 : MDEBUG( "Connected to inproc:://db_p2p" );
[ + - ]
342 : :
343 : : std::unordered_map< std::string, std::unordered_set< std::string > >
344 : : db_requests;
345 : :
346 : : // listen to peers
347 [ + - ]: 11 : zmqpp::poller poller;
348 [ + - ]: 11 : poller.add( router );
349 [ + - ]: 11 : poller.add( db_p2p );
350 : 11 : bool to_bcast_peers = true;
351 : 11 : bool to_bcast_hashes = true;
352 : 11 : bool to_send_db_requests = false;
353 : :
354 : : while (1) {
355 [ + - ]: 72 : p2p_bcast_peers( p2p_port, my_peers, to_bcast_peers );
356 [ + - ]: 72 : p2p_bcast_hashes( p2p_port, my_peers, my_hashes, to_bcast_hashes );
357 [ + - ]: 72 : p2p_send_db_requests( p2p_port, my_peers, db_requests,
358 : : to_send_db_requests );
359 : :
360 [ + - ][ - + ]: 72 : if (poller.poll()) {
361 [ + + ]: 61 : if (poller.has_input( router )) {
362 [ + - ]: 96 : zmqpp::message msg;
363 [ + - ]: 48 : router.receive( msg );
364 [ + - ]: 48 : p2p_answer_p2p( ctx_p2p, db_p2p, msg, my_peers, my_hashes, db_requests,
365 : : p2p_port, to_bcast_peers, to_bcast_hashes,
366 : : to_send_db_requests );
367 : : }
368 [ + + ]: 61 : if (poller.has_input( db_p2p )) {
369 [ + - ]: 28 : zmqpp::message msg;
370 [ + - ]: 14 : db_p2p.receive( msg );
371 [ + - ]: 14 : p2p_answer_db( msg, my_peers, to_bcast_hashes );
372 : : }
373 : : }
374 : : }
375 : : }
|