3
3
// BSD-style license that can be found in the LICENSE file.
4
4
5
5
import 'dart:async' ;
6
- import 'dart:math' ;
7
6
import 'dart:typed_data' ;
8
7
9
8
/// Collects messages from an input stream of bytes.
@@ -14,18 +13,11 @@ class MessageGrouper {
14
13
/// The input bytes stream subscription.
15
14
late final StreamSubscription _inputStreamSubscription;
16
15
17
- /// The length of the current message to read, or `-1` if we are currently
18
- /// reading the length.
19
- int _length = - 1 ;
20
-
21
16
/// The buffer to store the length bytes in.
22
- BytesBuilder _lengthBuffer = new BytesBuilder ( );
17
+ final _FixedBuffer _lengthBuffer = new _FixedBuffer ( 4 );
23
18
24
19
/// If reading raw data, buffer for the data.
25
- Uint8List _messageBuffer = new Uint8List (0 );
26
-
27
- /// The position to write the next byte in [_messageBuffer] .
28
- int _messagePos = 0 ;
20
+ _FixedBuffer ? _messageBuffer;
29
21
30
22
late StreamController <Uint8List > _messageStreamController =
31
23
new StreamController <Uint8List >(onCancel: () {
@@ -38,45 +30,36 @@ class MessageGrouper {
38
30
}
39
31
40
32
void _handleBytes (List <int > bytes, [int offset = 0 ]) {
41
- if (_length == - 1 ) {
42
- while (_lengthBuffer.length < 4 && offset < bytes.length) {
33
+ final _FixedBuffer ? messageBuffer = _messageBuffer;
34
+ if (messageBuffer == null ) {
35
+ while (offset < bytes.length && ! _lengthBuffer.isReady) {
43
36
_lengthBuffer.addByte (bytes[offset++ ]);
44
37
}
45
- if (_lengthBuffer.length >= 4 ) {
46
- Uint8List lengthBytes = _lengthBuffer.takeBytes ();
47
- _length = lengthBytes[0 ] << 24 |
48
- lengthBytes[1 ] << 16 |
49
- lengthBytes[2 ] << 8 |
50
- lengthBytes[3 ];
38
+ if (_lengthBuffer.isReady) {
39
+ int length = _lengthBuffer[0 ] << 24 |
40
+ _lengthBuffer[1 ] << 16 |
41
+ _lengthBuffer[2 ] << 8 |
42
+ _lengthBuffer[3 ];
43
+ // Reset the length reading state.
44
+ _lengthBuffer.reset ();
45
+ // Switch to the message payload reading state.
46
+ _messageBuffer = new _FixedBuffer (length);
47
+ _handleBytes (bytes, offset);
48
+ } else {
49
+ // Continue reading the length.
50
+ return ;
51
+ }
52
+ } else {
53
+ // Read the data from `bytes`.
54
+ while (offset < bytes.length && ! messageBuffer.isReady) {
55
+ messageBuffer.addByte (bytes[offset++ ]);
51
56
}
52
- }
53
-
54
- // Just pass along `bytes` without a copy if we can, and reset our state
55
- if (offset == 0 && bytes.length == _length && bytes is Uint8List ) {
56
- _length = - 1 ;
57
- _messageStreamController.add (bytes);
58
- return ;
59
- }
60
-
61
- // Initialize a new buffer.
62
- if (_messagePos == 0 ) {
63
- _messageBuffer = new Uint8List (_length);
64
- }
65
-
66
- // Read the data from `bytes`.
67
- int lenToRead = min (_length - _messagePos, bytes.length - offset);
68
- while (lenToRead-- > 0 ) {
69
- _messageBuffer[_messagePos++ ] = bytes[offset++ ];
70
- }
71
-
72
- // If we completed a message, add it to the output stream, reset our state,
73
- // and call ourselves again if we have more data to read.
74
- if (_messagePos >= _length) {
75
- _messageStreamController.add (_messageBuffer);
76
- _length = - 1 ;
77
- _messagePos = 0 ;
78
57
79
- if (offset < bytes.length) {
58
+ // If we completed a message, add it to the output stream.
59
+ if (messageBuffer.isReady) {
60
+ _messageStreamController.add (messageBuffer.bytes);
61
+ // Switch to the length reading state.
62
+ _messageBuffer = null ;
80
63
_handleBytes (bytes, offset);
81
64
}
82
65
}
@@ -89,3 +72,27 @@ class MessageGrouper {
89
72
_messageStreamController.close ();
90
73
}
91
74
}
75
+
76
+ /// A buffer of fixed length.
77
+ class _FixedBuffer {
78
+ final Uint8List bytes;
79
+
80
+ /// The offset in [bytes] .
81
+ int _offset = 0 ;
82
+
83
+ _FixedBuffer (int length) : bytes = new Uint8List (length);
84
+
85
+ /// Return `true` when the required number of bytes added.
86
+ bool get isReady => _offset == bytes.length;
87
+
88
+ int operator [](int index) => bytes[index];
89
+
90
+ void addByte (int byte) {
91
+ bytes[_offset++ ] = byte;
92
+ }
93
+
94
+ /// Reset the number of added bytes to zero.
95
+ void reset () {
96
+ _offset = 0 ;
97
+ }
98
+ }
0 commit comments