Skip to content

Commit 5b751da

Browse files
committed
provides tests for maxInboundPayloadSize check and related fixes (#908)
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent cc16c0a commit 5b751da

13 files changed

+474
-57
lines changed

rsocket-core/src/main/java/io/rsocket/core/FireAndForgetResponderSubscriber.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,21 @@ public void onComplete() {}
8989

9090
@Override
9191
public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLastPayload) {
92-
final CompositeByteBuf frames =
93-
ReassemblyUtils.addFollowingFrame(this.frames, followingFrame, this.maxInboundPayloadSize);
92+
final CompositeByteBuf frames;
93+
try {
94+
frames =
95+
ReassemblyUtils.addFollowingFrame(
96+
this.frames, followingFrame, this.maxInboundPayloadSize);
97+
} catch (IllegalStateException t) {
98+
this.requesterResponderSupport.remove(this.streamId, this);
99+
100+
CompositeByteBuf framesToRelease = this.frames;
101+
this.frames = null;
102+
framesToRelease.release();
103+
104+
logger.debug("Reassembly has failed", t);
105+
return;
106+
}
94107

95108
if (!hasFollows) {
96109
this.requesterResponderSupport.remove(this.streamId, this);

rsocket-core/src/main/java/io/rsocket/core/FragmentationUtils.java

+14-9
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,14 @@ static ByteBuf encodeFirstFragment(
117117
int remaining = mtu - FRAME_OFFSET;
118118

119119
ByteBuf metadataFragment = hasMetadata ? Unpooled.EMPTY_BUFFER : null;
120-
if (metadata.isReadable()) {
120+
if (hasMetadata) {
121121
// subtract the metadata frame length
122122
remaining -= FrameLengthCodec.FRAME_LENGTH_SIZE;
123-
int r = Math.min(remaining, metadata.readableBytes());
124-
remaining -= r;
125-
metadataFragment = metadata.readRetainedSlice(r);
123+
if (metadata.isReadable()) {
124+
int r = Math.min(remaining, metadata.readableBytes());
125+
remaining -= r;
126+
metadataFragment = metadata.readRetainedSlice(r);
127+
}
126128
}
127129

128130
ByteBuf dataFragment = Unpooled.EMPTY_BUFFER;
@@ -166,18 +168,21 @@ static ByteBuf encodeFirstFragment(
166168
long initialRequestN,
167169
FrameType frameType,
168170
int streamId,
171+
boolean hasMetadata,
169172
ByteBuf metadata,
170173
ByteBuf data) {
171174
// subtract the header bytes + frame length bytes + initial requestN bytes
172175
int remaining = mtu - FRAME_OFFSET_WITH_INITIAL_REQUEST_N;
173176

174-
ByteBuf metadataFragment = null;
175-
if (metadata.isReadable()) {
177+
ByteBuf metadataFragment = hasMetadata ? Unpooled.EMPTY_BUFFER : null;
178+
if (hasMetadata) {
176179
// subtract the metadata frame length
177180
remaining -= FrameLengthCodec.FRAME_LENGTH_SIZE;
178-
int r = Math.min(remaining, metadata.readableBytes());
179-
remaining -= r;
180-
metadataFragment = metadata.readRetainedSlice(r);
181+
if (metadata.isReadable()) {
182+
int r = Math.min(remaining, metadata.readableBytes());
183+
remaining -= r;
184+
metadataFragment = metadata.readRetainedSlice(r);
185+
}
181186
}
182187

183188
ByteBuf dataFragment = Unpooled.EMPTY_BUFFER;

rsocket-core/src/main/java/io/rsocket/core/ReassemblyUtils.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.rsocket.Payload;
3131
import io.rsocket.frame.FrameHeaderCodec;
3232
import io.rsocket.frame.FrameLengthCodec;
33+
import io.rsocket.frame.FrameType;
3334
import io.rsocket.frame.decoder.PayloadDecoder;
3435
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
3536
import org.reactivestreams.Subscription;
@@ -118,8 +119,6 @@ static <T extends RequesterFrameHandler> void handleNextSupport(
118119
return;
119120
}
120121

121-
instance.setFrames(null);
122-
123122
// sends cancel frame to prevent any further frames
124123
subscription.cancel();
125124
// terminates downstream
@@ -168,7 +167,6 @@ static CompositeByteBuf addFollowingFrame(
168167
} else if (maxInboundPayloadSize != Integer.MAX_VALUE
169168
&& readableBytes + followingFrame.readableBytes() - FrameHeaderCodec.size()
170169
> maxInboundPayloadSize) {
171-
frames.release();
172170
throw new IllegalStateException(
173171
String.format(ILLEGAL_REASSEMBLED_PAYLOAD_SIZE, maxInboundPayloadSize));
174172
}
@@ -181,14 +179,19 @@ static CompositeByteBuf addFollowingFrame(
181179
// if has metadata, then we have to increase metadata length in containing frames
182180
// CompositeByteBuf
183181
if (hasMetadata) {
184-
frames.markReaderIndex().skipBytes(FrameHeaderCodec.size());
182+
final FrameType frameType = FrameHeaderCodec.frameType(frames);
183+
final int lengthFieldPosition =
184+
FrameHeaderCodec.size() + (frameType.hasInitialRequestN() ? Integer.BYTES : 0);
185+
186+
frames.markReaderIndex();
187+
frames.skipBytes(lengthFieldPosition);
185188

186189
final int nextMetadataLength = decodeLength(frames) + decodeLength(followingFrame);
187190

188191
frames.resetReaderIndex();
189192

190193
frames.markWriterIndex();
191-
frames.writerIndex(FrameHeaderCodec.size());
194+
frames.writerIndex(lengthFieldPosition);
192195

193196
encodeLength(frames, nextMetadataLength);
194197

rsocket-core/src/main/java/io/rsocket/core/RequestChannelResponderSubscriber.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,12 @@ final long tryTerminate(boolean isFromInbound) {
340340
}
341341
}
342342

343-
this.outboundSubscription.cancel();
343+
final Subscription outboundSubscription = this.outboundSubscription;
344+
if (outboundSubscription == null) {
345+
return previousState;
346+
}
347+
348+
outboundSubscription.cancel();
344349

345350
if (!isSubscribed(previousState)) {
346351
final Payload firstPayload = this.firstPayload;
@@ -492,7 +497,7 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
492497
} else {
493498
try {
494499
frames = ReassemblyUtils.addFollowingFrame(frames, frame, this.maxInboundPayloadSize);
495-
} catch (IllegalReferenceCountException e) {
500+
} catch (IllegalStateException e) {
496501
if (isTerminated(this.state)) {
497502
return;
498503
}
@@ -503,8 +508,6 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
503508
return;
504509
}
505510

506-
this.frames = null;
507-
508511
this.outboundDone = true;
509512
// send error to terminate interaction
510513
final ByteBuf errorFrame =

rsocket-core/src/main/java/io/rsocket/core/RequestResponseResponderSubscriber.java

+30-3
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,29 @@ public void handleCancel() {
210210

211211
@Override
212212
public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload) {
213-
final CompositeByteBuf frames =
214-
ReassemblyUtils.addFollowingFrame(this.frames, frame, this.maxInboundPayloadSize);
213+
final CompositeByteBuf frames;
214+
try {
215+
frames = ReassemblyUtils.addFollowingFrame(this.frames, frame, this.maxInboundPayloadSize);
216+
} catch (IllegalStateException t) {
217+
S.lazySet(this, Operators.cancelledSubscription());
218+
219+
this.requesterResponderSupport.remove(this.streamId, this);
220+
221+
CompositeByteBuf framesToRelease = this.frames;
222+
this.frames = null;
223+
framesToRelease.release();
224+
225+
logger.debug("Reassembly has failed", t);
226+
227+
// sends error frame from the responder side to tell that something went wrong
228+
final ByteBuf errorFrame =
229+
ErrorFrameCodec.encode(
230+
this.allocator,
231+
this.streamId,
232+
new CanceledException("Failed to reassemble payload. Cause: " + t.getMessage()));
233+
this.sendProcessor.onNext(errorFrame);
234+
return;
235+
}
215236

216237
if (!hasFollows) {
217238
this.frames = null;
@@ -220,14 +241,20 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
220241
payload = this.payloadDecoder.apply(frames);
221242
frames.release();
222243
} catch (Throwable t) {
244+
S.lazySet(this, Operators.cancelledSubscription());
245+
246+
this.requesterResponderSupport.remove(this.streamId, this);
247+
223248
ReferenceCountUtil.safeRelease(frames);
249+
224250
logger.debug("Reassembly has failed", t);
251+
225252
// sends error frame from the responder side to tell that something went wrong
226253
final ByteBuf errorFrame =
227254
ErrorFrameCodec.encode(
228255
this.allocator,
229256
this.streamId,
230-
new CanceledException("Failed to reassemble payload. Cause" + t.getMessage()));
257+
new CanceledException("Failed to reassemble payload. Cause: " + t.getMessage()));
231258
this.sendProcessor.onNext(errorFrame);
232259
return;
233260
}

rsocket-core/src/main/java/io/rsocket/core/RequestStreamResponderSubscriber.java

+34-5
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,33 @@ public final void handleCancel() {
250250

251251
@Override
252252
public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLastPayload) {
253-
final CompositeByteBuf frames =
254-
ReassemblyUtils.addFollowingFrame(this.frames, followingFrame, this.maxInboundPayloadSize);
253+
final CompositeByteBuf frames;
254+
try {
255+
frames =
256+
ReassemblyUtils.addFollowingFrame(
257+
this.frames, followingFrame, this.maxInboundPayloadSize);
258+
} catch (IllegalStateException t) {
259+
// if subscription is null, it means that streams has not yet reassembled all the fragments
260+
// and fragmentation of the first frame was cancelled before
261+
S.lazySet(this, Operators.cancelledSubscription());
262+
263+
this.requesterResponderSupport.remove(this.streamId, this);
264+
265+
CompositeByteBuf framesToRelease = this.frames;
266+
this.frames = null;
267+
framesToRelease.release();
268+
269+
logger.debug("Reassembly has failed", t);
270+
271+
// sends error frame from the responder side to tell that something went wrong
272+
final ByteBuf errorFrame =
273+
ErrorFrameCodec.encode(
274+
this.allocator,
275+
this.streamId,
276+
new CanceledException("Failed to reassemble payload. Cause: " + t.getMessage()));
277+
this.sendProcessor.onNext(errorFrame);
278+
return;
279+
}
255280

256281
if (!hasFollows) {
257282
this.frames = null;
@@ -260,17 +285,21 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
260285
payload = this.payloadDecoder.apply(frames);
261286
frames.release();
262287
} catch (Throwable t) {
288+
S.lazySet(this, Operators.cancelledSubscription());
289+
this.done = true;
290+
291+
this.requesterResponderSupport.remove(this.streamId, this);
292+
263293
ReferenceCountUtil.safeRelease(frames);
294+
264295
logger.debug("Reassembly has failed", t);
265296

266-
S.lazySet(this, Operators.cancelledSubscription());
267-
this.done = true;
268297
// sends error frame from the responder side to tell that something went wrong
269298
final ByteBuf errorFrame =
270299
ErrorFrameCodec.encode(
271300
this.allocator,
272301
this.streamId,
273-
new CanceledException("Failed to reassemble payload. Cause" + t.getMessage()));
302+
new CanceledException("Failed to reassemble payload. Cause: " + t.getMessage()));
274303
this.sendProcessor.onNext(errorFrame);
275304
return;
276305
}

rsocket-core/src/main/java/io/rsocket/core/SendUtils.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,14 @@ static void sendReleasingPayload(
195195
try {
196196
first =
197197
FragmentationUtils.encodeFirstFragment(
198-
allocator, mtu, initialRequestN, frameType, streamId, slicedMetadata, slicedData);
198+
allocator,
199+
mtu,
200+
initialRequestN,
201+
frameType,
202+
streamId,
203+
hasMetadata,
204+
slicedMetadata,
205+
slicedData);
199206
} catch (IllegalReferenceCountException e) {
200207
sendTerminalFrame(streamId, frameType, sendProcessor, allocator, true, false, e);
201208
throw e;

rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java

+6
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public abstract class AbstractSocketRule<T extends RSocket> extends ExternalReso
3535
protected T socket;
3636
protected LeaksTrackingByteBufAllocator allocator;
3737
protected int maxFrameLength = FRAME_LENGTH_MASK;
38+
protected int maxInboundPayloadSize = Integer.MAX_VALUE;
3839

3940
@Override
4041
public Statement apply(final Statement base, Description description) {
@@ -60,6 +61,11 @@ protected void init() {
6061
socket = newRSocket();
6162
}
6263

64+
public void setMaxInboundPayloadSize(int maxInboundPayloadSize) {
65+
this.maxInboundPayloadSize = maxInboundPayloadSize;
66+
init();
67+
}
68+
6369
public void setMaxFrameLength(int maxFrameLength) {
6470
this.maxFrameLength = maxFrameLength;
6571
init();

rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ protected RSocketRequester newRSocket() {
540540
StreamIdSupplier.clientSupplier(),
541541
0,
542542
maxFrameLength,
543-
Integer.MAX_VALUE,
543+
maxInboundPayloadSize,
544544
Integer.MAX_VALUE,
545545
Integer.MAX_VALUE,
546546
null,

0 commit comments

Comments
 (0)