Skip to content

Commit 2690724

Browse files
committed
1 parent 99d5474 commit 2690724

17 files changed

+644
-545
lines changed

projects/client/Apigen/src/apigen/Apigen.cs

Lines changed: 321 additions & 346 deletions
Large diffs are not rendered by default.

projects/client/RabbitMQ.Client/src/client/impl/BasicPublishBatch.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ public void Add(string exchange, string routingKey, bool mandatory, IBasicProper
5959
var bp = basicProperties == null ? model.CreateBasicProperties() : basicProperties;
6060
var method = new BasicPublish
6161
{
62-
m_exchange = exchange,
63-
m_routingKey = routingKey,
64-
m_mandatory = mandatory
62+
_exchange = exchange,
63+
_routingKey = routingKey,
64+
_mandatory = mandatory
6565
};
6666

6767
commands.Add(new Command(method, (ContentHeaderBase)bp, body));
@@ -72,4 +72,4 @@ public void Publish()
7272
model.SendCommands(commands);
7373
}
7474
}
75-
}
75+
}

projects/client/RabbitMQ.Client/src/client/impl/CommandAssembler.cs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,8 @@ public Command HandleFrame(InboundFrame f)
8181
{
8282
throw new UnexpectedFrameException(f);
8383
}
84-
BinaryBufferReader reader = f.GetReader();
85-
m_method = m_protocol.DecodeMethodFrom(reader);
86-
m_state = m_method.HasContent
87-
? AssemblyState.ExpectingContentHeader
88-
: AssemblyState.Complete;
84+
m_method = m_protocol.DecodeMethodFrom(f.Payload.AsMemory(0, f.PayloadSize));
85+
m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
8986
return CompletedCommand();
9087
}
9188
case AssemblyState.ExpectingContentHeader:
@@ -94,9 +91,9 @@ public Command HandleFrame(InboundFrame f)
9491
{
9592
throw new UnexpectedFrameException(f);
9693
}
97-
BinaryBufferReader reader = f.GetReader();
98-
m_header = m_protocol.DecodeContentHeaderFrom(reader);
99-
ulong totalBodyBytes = m_header.ReadFrom(reader);
94+
Memory<byte> memory = f.Payload.AsMemory(0, f.PayloadSize);
95+
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(memory));
96+
ulong totalBodyBytes = m_header.ReadFrom(memory.Slice(2));
10097
if (totalBodyBytes > MaxArrayOfBytesSize)
10198
{
10299
throw new UnexpectedFrameException(f);
@@ -114,10 +111,7 @@ public Command HandleFrame(InboundFrame f)
114111
}
115112
if (f.PayloadSize > m_remainingBodyBytes)
116113
{
117-
throw new MalformedFrameException
118-
(string.Format("Overlong content body received - {0} bytes remaining, {1} bytes received",
119-
m_remainingBodyBytes,
120-
f.PayloadSize));
114+
throw new MalformedFrameException($"Overlong content body received - {m_remainingBodyBytes} bytes remaining, {f.PayloadSize} bytes received");
121115
}
122116
f.PayloadSpan.CopyTo(UnwritedSpan);
123117
m_remainingBodyBytes -= f.PayloadSize;

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class Connection : IConnection
7272

7373
private IDictionary<string, object> m_clientProperties;
7474

75-
private volatile ShutdownEventArgs m_closeReason = null;
75+
private volatile ShutdownEventArgs _closeReason = null;
7676
private volatile bool m_closed = false;
7777

7878
private EventHandler<ConnectionBlockedEventArgs> m_connectionBlocked;
@@ -85,7 +85,7 @@ public class Connection : IConnection
8585
private Guid m_id = Guid.NewGuid();
8686
private ModelBase m_model0;
8787
private volatile bool m_running = true;
88-
private MainSession m_session0;
88+
private MainSession _session0;
8989
private SessionManager m_sessionManager;
9090

9191
private IList<ShutdownReportEntry> m_shutdownReport = new SynchronizedList<ShutdownReportEntry>(new List<ShutdownReportEntry>());
@@ -132,8 +132,8 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
132132
}
133133

134134
m_sessionManager = new SessionManager(this, 0);
135-
m_session0 = new MainSession(this) { Handler = NotifyReceivedCloseOk };
136-
m_model0 = (ModelBase)Protocol.CreateModel(m_session0);
135+
_session0 = new MainSession(this) { Handler = NotifyReceivedCloseOk };
136+
m_model0 = (ModelBase)Protocol.CreateModel(_session0);
137137

138138
StartMainLoop(factory.UseBackgroundThreadsForIO);
139139
Open(insist);
@@ -202,15 +202,15 @@ public event EventHandler<ShutdownEventArgs> ConnectionShutdown
202202
bool ok = false;
203203
lock (m_eventLock)
204204
{
205-
if (m_closeReason == null)
205+
if (_closeReason == null)
206206
{
207207
m_connectionShutdown += value;
208208
ok = true;
209209
}
210210
}
211211
if (!ok)
212212
{
213-
value(this, m_closeReason);
213+
value(this, _closeReason);
214214
}
215215
}
216216
remove
@@ -273,7 +273,7 @@ public IDictionary<string, object> ClientProperties
273273

274274
public ShutdownEventArgs CloseReason
275275
{
276-
get { return m_closeReason; }
276+
get { return _closeReason; }
277277
}
278278

279279
public AmqpTcpEndpoint Endpoint
@@ -386,19 +386,19 @@ public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
386386
{
387387
if (!abort)
388388
{
389-
throw new AlreadyClosedException(m_closeReason);
389+
throw new AlreadyClosedException(_closeReason);
390390
}
391391
}
392392
else
393393
{
394394
OnShutdown();
395-
m_session0.SetSessionClosing(false);
395+
_session0.SetSessionClosing(false);
396396

397397
try
398398
{
399399
// Try to send connection.close
400400
// Wait for CloseOk in the MainLoop
401-
m_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, reason.ReplyText));
401+
_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, reason.ReplyText));
402402
}
403403
catch (AlreadyClosedException ace)
404404
{
@@ -521,7 +521,7 @@ public void FinishClose()
521521
MaybeStopHeartbeatTimers();
522522

523523
m_frameHandler.Close();
524-
m_model0.SetCloseReason(m_closeReason);
524+
m_model0.SetCloseReason(_closeReason);
525525
m_model0.FinishClose();
526526
}
527527

@@ -552,10 +552,10 @@ public bool HardProtocolExceptionHandler(HardProtocolException hpe)
552552
if (SetCloseReason(hpe.ShutdownReason))
553553
{
554554
OnShutdown();
555-
m_session0.SetSessionClosing(false);
555+
_session0.SetSessionClosing(false);
556556
try
557557
{
558-
m_session0.Transmit(ConnectionCloseWrapper(
558+
_session0.Transmit(ConnectionCloseWrapper(
559559
hpe.ShutdownReason.ReplyCode,
560560
hpe.ShutdownReason.ReplyText));
561561
return true;
@@ -580,13 +580,13 @@ public void InternalClose(ShutdownEventArgs reason)
580580
{
581581
if (m_closed)
582582
{
583-
throw new AlreadyClosedException(m_closeReason);
583+
throw new AlreadyClosedException(_closeReason);
584584
}
585585
// We are quiescing, but still allow for server-close
586586
}
587587

588588
OnShutdown();
589-
m_session0.SetSessionClosing(true);
589+
_session0.SetSessionClosing(true);
590590
TerminateMainloop();
591591
}
592592

@@ -688,7 +688,7 @@ public async Task MainLoopIteration()
688688
// quiescing situation, even though technically we
689689
// should be ignoring everything except
690690
// connection.close-ok.
691-
m_session0.HandleFrame(frame);
691+
_session0.HandleFrame(frame);
692692
}
693693
else
694694
{
@@ -697,7 +697,7 @@ public async Task MainLoopIteration()
697697
// frames for non-zero channels (and any inbound
698698
// commands on channel zero that aren't
699699
// Connection.CloseOk) must be discarded.
700-
if (m_closeReason == null)
700+
if (_closeReason == null)
701701
{
702702
// No close reason, not quiescing the
703703
// connection. Handle the frame. (Of course, the
@@ -819,7 +819,7 @@ public void OnShutdown()
819819
lock (m_eventLock)
820820
{
821821
handler = m_connectionShutdown;
822-
reason = m_closeReason;
822+
reason = _closeReason;
823823
m_connectionShutdown = null;
824824
}
825825
if (handler != null)
@@ -929,9 +929,9 @@ public bool SetCloseReason(ShutdownEventArgs reason)
929929
{
930930
lock (m_eventLock)
931931
{
932-
if (m_closeReason == null)
932+
if (_closeReason == null)
933933
{
934-
m_closeReason = reason;
934+
_closeReason = reason;
935935
return true;
936936
}
937937
else

projects/client/RabbitMQ.Client/src/client/impl/ContentHeaderBase.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ public virtual object Clone()
6666
///<summary>
6767
/// Fill this instance from the given byte buffer stream.
6868
///</summary>
69-
public ulong ReadFrom(BinaryBufferReader reader)
69+
public ulong ReadFrom(Memory<byte> memory)
7070
{
71-
reader.ReadUInt16(); // weight - not currently used
72-
ulong bodySize = reader.ReadUInt64();
73-
var headerReader = new ContentHeaderPropertyReader(reader);
74-
ReadPropertiesFrom(ref headerReader);
71+
// Skipping the first two bytes since they arent used (weight - not currently used)
72+
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(memory.Slice(2));
73+
var reader = new ContentHeaderPropertyReader(memory.Slice(10));
74+
ReadPropertiesFrom(ref reader);
7575
return bodySize;
7676
}
7777

projects/client/RabbitMQ.Client/src/client/impl/ContentHeaderPropertyReader.cs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,16 @@ public struct ContentHeaderPropertyReader
4949
private ushort m_bitCount;
5050
private ushort m_flagWord;
5151

52-
public ContentHeaderPropertyReader(BinaryBufferReader reader)
52+
public ContentHeaderPropertyReader(Memory<byte> memory)
5353
{
54-
BaseReader = reader;
54+
_memory = memory;
5555
m_flagWord = 1; // just the continuation bit
5656
m_bitCount = 15; // the correct position to force a m_flagWord read
57+
_memoryOffset = 0;
5758
}
5859

59-
private BinaryBufferReader BaseReader;
60+
private Memory<byte> _memory;
61+
private int _memoryOffset;
6062

6163
public bool ContinuationBitSet
6264
{
@@ -82,28 +84,35 @@ public void ReadFlagWord()
8284
{
8385
throw new MalformedFrameException("Attempted to read flag word when none advertised");
8486
}
85-
m_flagWord = BaseReader.ReadUInt16();
87+
m_flagWord = NetworkOrderDeserializer.ReadUInt16(_memory.Slice(_memoryOffset));
88+
_memoryOffset += 2;
8689
m_bitCount = 0;
8790
}
8891

8992
public uint ReadLong()
9093
{
91-
return WireFormatting.ReadLong(BaseReader);
94+
uint result = NetworkOrderDeserializer.ReadUInt32(_memory.Slice(_memoryOffset));
95+
_memoryOffset += 4;
96+
return result;
9297
}
9398

9499
public ulong ReadLonglong()
95100
{
96-
return WireFormatting.ReadLonglong(BaseReader);
101+
ulong result = NetworkOrderDeserializer.ReadUInt64(_memory.Slice(_memoryOffset));
102+
_memoryOffset += 8;
103+
return result;
97104
}
98105

99106
public byte[] ReadLongstr()
100107
{
101-
return WireFormatting.ReadLongstr(BaseReader);
108+
byte[] result = WireFormatting.ReadLongstr(_memory.Slice(_memoryOffset));
109+
_memoryOffset += 4 + result.Length;
110+
return result;
102111
}
103112

104113
public byte ReadOctet()
105114
{
106-
return WireFormatting.ReadOctet(BaseReader);
115+
return _memory.Span[_memoryOffset++];
107116
}
108117

109118
public bool ReadPresence()
@@ -121,23 +130,31 @@ public bool ReadPresence()
121130

122131
public ushort ReadShort()
123132
{
124-
return WireFormatting.ReadShort(BaseReader);
133+
ushort result = NetworkOrderDeserializer.ReadUInt16(_memory.Slice(_memoryOffset));
134+
_memoryOffset += 2;
135+
return result;
125136
}
126137

127138
public string ReadShortstr()
128139
{
129-
return WireFormatting.ReadShortstr(BaseReader);
140+
string result = WireFormatting.ReadShortstr(_memory.Slice(_memoryOffset), out int bytesRead);
141+
_memoryOffset += bytesRead;
142+
return result;
130143
}
131144

132145
/// <returns>A type of <seealso cref="System.Collections.Generic.IDictionary{TKey,TValue}"/>.</returns>
133146
public IDictionary<string, object> ReadTable()
134147
{
135-
return WireFormatting.ReadTable(BaseReader);
148+
IDictionary<string, object> result = WireFormatting.ReadTable(_memory.Slice(_memoryOffset), out int bytesRead);
149+
_memoryOffset += bytesRead;
150+
return result;
136151
}
137152

138153
public AmqpTimestamp ReadTimestamp()
139154
{
140-
return WireFormatting.ReadTimestamp(BaseReader);
155+
AmqpTimestamp result = WireFormatting.ReadTimestamp(_memory.Slice(_memoryOffset));
156+
_memoryOffset += 8;
157+
return result;
141158
}
142159
}
143160
}

projects/client/RabbitMQ.Client/src/client/impl/MainSession.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public override void HandleFrame(InboundFrame frame)
8585

8686
if (!m_closeServerInitiated && (frame.IsMethod()))
8787
{
88-
MethodBase method = Connection.Protocol.DecodeMethodFrom(frame.GetReader());
88+
MethodBase method = Connection.Protocol.DecodeMethodFrom(frame.Payload.AsMemory(0, frame.PayloadSize));
8989
if ((method.ProtocolClassId == m_closeClassId)
9090
&& (method.ProtocolMethodId == m_closeMethodId))
9191
{

0 commit comments

Comments
 (0)