@@ -151,6 +151,9 @@ struct CSerializedNetMsg {
151
151
152
152
std::vector<unsigned char > data;
153
153
std::string m_type;
154
+
155
+ /* * Compute total memory usage of this object (own memory + any dynamic memory). */
156
+ size_t GetMemoryUsage () const noexcept ;
154
157
};
155
158
156
159
/* * Different types of connections to a peer. This enum encapsulates the
@@ -350,42 +353,105 @@ class CNetMessage {
350
353
}
351
354
};
352
355
353
- /* * The TransportDeserializer takes care of holding and deserializing the
354
- * network receive buffer. It can deserialize the network buffer into a
355
- * transport protocol agnostic CNetMessage (message type & payload)
356
- */
357
- class TransportDeserializer {
356
+ /* * The Transport converts one connection's sent messages to wire bytes, and received bytes back. */
357
+ class Transport {
358
358
public:
359
- // returns true if the current deserialization is complete
360
- virtual bool Complete () const = 0;
361
- // set the serialization context version
362
- virtual void SetVersion (int version) = 0;
363
- /* * read and deserialize data, advances msg_bytes data pointer */
364
- virtual int Read (Span<const uint8_t >& msg_bytes) = 0;
365
- // decomposes a message from the context
366
- virtual CNetMessage GetMessage (std::chrono::microseconds time, bool & reject_message) = 0;
367
- virtual ~TransportDeserializer () {}
359
+ virtual ~Transport () {}
360
+
361
+ // 1. Receiver side functions, for decoding bytes received on the wire into transport protocol
362
+ // agnostic CNetMessage (message type & payload) objects.
363
+
364
+ /* * Returns true if the current message is complete (so GetReceivedMessage can be called). */
365
+ virtual bool ReceivedMessageComplete () const = 0;
366
+ /* * Set the deserialization context version for objects returned by GetReceivedMessage. */
367
+ virtual void SetReceiveVersion (int version) = 0;
368
+
369
+ /* * Feed wire bytes to the transport.
370
+ *
371
+ * @return false if some bytes were invalid, in which case the transport can't be used anymore.
372
+ *
373
+ * Consumed bytes are chopped off the front of msg_bytes.
374
+ */
375
+ virtual bool ReceivedBytes (Span<const uint8_t >& msg_bytes) = 0;
376
+
377
+ /* * Retrieve a completed message from transport.
378
+ *
379
+ * This can only be called when ReceivedMessageComplete() is true.
380
+ *
381
+ * If reject_message=true is returned the message itself is invalid, but (other than false
382
+ * returned by ReceivedBytes) the transport is not in an inconsistent state.
383
+ */
384
+ virtual CNetMessage GetReceivedMessage (std::chrono::microseconds time, bool & reject_message) = 0;
385
+
386
+ // 2. Sending side functions, for converting messages into bytes to be sent over the wire.
387
+
388
+ /* * Set the next message to send.
389
+ *
390
+ * If no message can currently be set (perhaps because the previous one is not yet done being
391
+ * sent), returns false, and msg will be unmodified. Otherwise msg is enqueued (and
392
+ * possibly moved-from) and true is returned.
393
+ */
394
+ virtual bool SetMessageToSend (CSerializedNetMsg& msg) noexcept = 0;
395
+
396
+ /* * Return type for GetBytesToSend, consisting of:
397
+ * - Span<const uint8_t> to_send: span of bytes to be sent over the wire (possibly empty).
398
+ * - bool more: whether there will be more bytes to be sent after the ones in to_send are
399
+ * all sent (as signaled by MarkBytesSent()).
400
+ * - const std::string& m_type: message type on behalf of which this is being sent.
401
+ */
402
+ using BytesToSend = std::tuple<
403
+ Span<const uint8_t > /* to_send*/ ,
404
+ bool /* more*/ ,
405
+ const std::string& /* m_type*/
406
+ >;
407
+
408
+ /* * Get bytes to send on the wire.
409
+ *
410
+ * As a const function, it does not modify the transport's observable state, and is thus safe
411
+ * to be called multiple times.
412
+ *
413
+ * The bytes returned by this function act as a stream which can only be appended to. This
414
+ * means that with the exception of MarkBytesSent, operations on the transport can only append
415
+ * to what is being returned.
416
+ *
417
+ * Note that m_type and to_send refer to data that is internal to the transport, and calling
418
+ * any non-const function on this object may invalidate them.
419
+ */
420
+ virtual BytesToSend GetBytesToSend () const noexcept = 0;
421
+
422
+ /* * Report how many bytes returned by the last GetBytesToSend() have been sent.
423
+ *
424
+ * bytes_sent cannot exceed to_send.size() of the last GetBytesToSend() result.
425
+ *
426
+ * If bytes_sent=0, this call has no effect.
427
+ */
428
+ virtual void MarkBytesSent (size_t bytes_sent) noexcept = 0;
429
+
430
+ /* * Return the memory usage of this transport attributable to buffered data to send. */
431
+ virtual size_t GetSendMemoryUsage () const noexcept = 0;
368
432
};
369
433
370
- class V1TransportDeserializer final : public TransportDeserializer
434
+ class V1Transport final : public Transport
371
435
{
372
436
private:
373
- const CChainParams& m_chain_params ;
437
+ CMessageHeader::MessageStartChars m_magic_bytes ;
374
438
const NodeId m_node_id; // Only for logging
375
- mutable CHash256 hasher;
376
- mutable uint256 data_hash;
377
- bool in_data; // parsing header (false) or data (true)
378
- CDataStream hdrbuf; // partially received header
379
- CMessageHeader hdr; // complete header
380
- CDataStream vRecv; // received message data
381
- unsigned int nHdrPos;
382
- unsigned int nDataPos;
383
-
384
- const uint256& GetMessageHash () const ;
385
- int readHeader (Span<const uint8_t > msg_bytes);
386
- int readData (Span<const uint8_t > msg_bytes);
387
-
388
- void Reset () {
439
+ mutable Mutex m_recv_mutex; // !< Lock for receive state
440
+ mutable CHash256 hasher GUARDED_BY (m_recv_mutex);
441
+ mutable uint256 data_hash GUARDED_BY (m_recv_mutex);
442
+ bool in_data GUARDED_BY (m_recv_mutex); // parsing header (false) or data (true)
443
+ CDataStream hdrbuf GUARDED_BY (m_recv_mutex); // partially received header
444
+ CMessageHeader hdr GUARDED_BY (m_recv_mutex); // complete header
445
+ CDataStream vRecv GUARDED_BY (m_recv_mutex); // received message data
446
+ unsigned int nHdrPos GUARDED_BY (m_recv_mutex);
447
+ unsigned int nDataPos GUARDED_BY (m_recv_mutex);
448
+
449
+ const uint256& GetMessageHash () const EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex);
450
+ int readHeader (Span<const uint8_t > msg_bytes) EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex);
451
+ int readData (Span<const uint8_t > msg_bytes) EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex);
452
+
453
+ void Reset () EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex) {
454
+ AssertLockHeld (m_recv_mutex);
389
455
vRecv.clear ();
390
456
hdrbuf.clear ();
391
457
hdrbuf.resize (24 );
@@ -396,52 +462,60 @@ class V1TransportDeserializer final : public TransportDeserializer
396
462
hasher.Reset ();
397
463
}
398
464
399
- public:
400
- V1TransportDeserializer (const CChainParams& chain_params, const NodeId node_id, int nTypeIn, int nVersionIn)
401
- : m_chain_params(chain_params),
402
- m_node_id (node_id),
403
- hdrbuf(nTypeIn, nVersionIn),
404
- vRecv(nTypeIn, nVersionIn)
465
+ bool CompleteInternal () const noexcept EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex)
405
466
{
406
- Reset ();
467
+ AssertLockHeld (m_recv_mutex);
468
+ if (!in_data) return false ;
469
+ return hdr.nMessageSize == nDataPos;
407
470
}
408
471
409
- bool Complete () const override
472
+ /* * Lock for sending state. */
473
+ mutable Mutex m_send_mutex;
474
+ /* * The header of the message currently being sent. */
475
+ std::vector<uint8_t > m_header_to_send GUARDED_BY (m_send_mutex);
476
+ /* * The data of the message currently being sent. */
477
+ CSerializedNetMsg m_message_to_send GUARDED_BY (m_send_mutex);
478
+ /* * Whether we're currently sending header bytes or message bytes. */
479
+ bool m_sending_header GUARDED_BY (m_send_mutex) {false };
480
+ /* * How many bytes have been sent so far (from m_header_to_send, or from m_message_to_send.data). */
481
+ size_t m_bytes_sent GUARDED_BY (m_send_mutex) {0 };
482
+
483
+ public:
484
+ V1Transport (const NodeId node_id, int nTypeIn, int nVersionIn) noexcept ;
485
+
486
+ bool ReceivedMessageComplete () const override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
410
487
{
411
- if (!in_data)
412
- return false ;
413
- return (hdr.nMessageSize == nDataPos);
488
+ AssertLockNotHeld (m_recv_mutex);
489
+ return WITH_LOCK (m_recv_mutex, return CompleteInternal ());
414
490
}
415
- void SetVersion (int nVersionIn) override
491
+
492
+ void SetReceiveVersion (int nVersionIn) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
416
493
{
494
+ AssertLockNotHeld (m_recv_mutex);
495
+ LOCK (m_recv_mutex);
417
496
hdrbuf.SetVersion (nVersionIn);
418
497
vRecv.SetVersion (nVersionIn);
419
498
}
420
- int Read (Span<const uint8_t >& msg_bytes) override
499
+
500
+ bool ReceivedBytes (Span<const uint8_t >& msg_bytes) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
421
501
{
502
+ AssertLockNotHeld (m_recv_mutex);
503
+ LOCK (m_recv_mutex);
422
504
int ret = in_data ? readData (msg_bytes) : readHeader (msg_bytes);
423
505
if (ret < 0 ) {
424
506
Reset ();
425
507
} else {
426
508
msg_bytes = msg_bytes.subspan (ret);
427
509
}
428
- return ret;
510
+ return ret >= 0 ;
429
511
}
430
- CNetMessage GetMessage (std::chrono::microseconds time, bool & reject_message) override ;
431
- };
432
512
433
- /* * The TransportSerializer prepares messages for the network transport
434
- */
435
- class TransportSerializer {
436
- public:
437
- // prepare message for transport (header construction, error-correction computation, payload encryption, etc.)
438
- virtual void prepareForTransport (CSerializedNetMsg& msg, std::vector<unsigned char >& header) const = 0;
439
- virtual ~TransportSerializer () {}
440
- };
513
+ CNetMessage GetReceivedMessage (std::chrono::microseconds time, bool & reject_message) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex);
441
514
442
- class V1TransportSerializer : public TransportSerializer {
443
- public:
444
- void prepareForTransport (CSerializedNetMsg& msg, std::vector<unsigned char >& header) const override ;
515
+ bool SetMessageToSend (CSerializedNetMsg& msg) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
516
+ BytesToSend GetBytesToSend () const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
517
+ void MarkBytesSent (size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
518
+ size_t GetSendMemoryUsage () const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
445
519
};
446
520
447
521
/* * Information about a peer */
@@ -451,8 +525,9 @@ class CNode
451
525
friend struct ConnmanTestMsg ;
452
526
453
527
public:
454
- const std::unique_ptr<TransportDeserializer> m_deserializer; // Used only by SocketHandler thread
455
- const std::unique_ptr<const TransportSerializer> m_serializer;
528
+ /* * Transport serializer/deserializer. The receive side functions are only called under cs_vRecv, while
529
+ * the sending side functions are only called under cs_vSend. */
530
+ const std::unique_ptr<Transport> m_transport;
456
531
457
532
NetPermissionFlags m_permissionFlags{NetPermissionFlags::None}; // treated as const outside of fuzz tester
458
533
@@ -466,12 +541,12 @@ class CNode
466
541
*/
467
542
std::shared_ptr<Sock> m_sock GUARDED_BY (m_sock_mutex);
468
543
469
- /* * Total size of all vSendMsg entries */
470
- size_t nSendSize GUARDED_BY (cs_vSend){0 };
471
- /* * Offset inside the first vSendMsg already sent */
472
- size_t nSendOffset GUARDED_BY (cs_vSend){0 };
544
+ /* * Sum of GetMemoryUsage of all vSendMsg entries. */
545
+ size_t m_send_memusage GUARDED_BY (cs_vSend){0 };
546
+ /* * Total number of bytes sent on the wire to this peer. */
473
547
uint64_t nSendBytes GUARDED_BY (cs_vSend){0 };
474
- std::deque<std::vector<unsigned char >> vSendMsg GUARDED_BY (cs_vSend);
548
+ /* * Messages still to be fed to m_transport->SetMessageToSend. */
549
+ std::deque<CSerializedNetMsg> vSendMsg GUARDED_BY (cs_vSend);
475
550
std::atomic<size_t > nSendMsgSize{0 };
476
551
Mutex cs_vSend;
477
552
Mutex m_sock_mutex;
0 commit comments