Skip to content

Commit 784260c

Browse files
committed
provides fixes for WrappedDirectBufferByteBuf
uncomments largePayload tests Signed-off-by: Oleh Dokuka <[email protected]>
1 parent cdabb2d commit 784260c

File tree

4 files changed

+60
-55
lines changed

4 files changed

+60
-55
lines changed

rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Lines changed: 49 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -168,18 +168,18 @@ default void fireAndForget10() {
168168
.isTrue();
169169
}
170170

171-
// @DisplayName("makes 10 fireAndForget with Large Payload in Requests")
172-
// @Test
173-
// default void largePayloadFireAndForget10() {
174-
// Flux.range(1, 10)
175-
// .flatMap(i -> getClient().fireAndForget(LARGE_PAYLOAD.retain()))
176-
// .as(StepVerifier::create)
177-
// .expectComplete()
178-
// .verify(getTimeout());
179-
//
180-
// Assertions.assertThat(getTransportPair().responder.awaitUntilObserved(10, getTimeout()))
181-
// .isTrue();
182-
// }
171+
@DisplayName("makes 10 fireAndForget with Large Payload in Requests")
172+
@Test
173+
default void largePayloadFireAndForget10() {
174+
Flux.range(1, 10)
175+
.flatMap(i -> getClient().fireAndForget(LARGE_PAYLOAD.retain()))
176+
.as(StepVerifier::create)
177+
.expectComplete()
178+
.verify(getTimeout());
179+
180+
Assertions.assertThat(getTransportPair().responder.awaitUntilObserved(10, getTimeout()))
181+
.isTrue();
182+
}
183183

184184
@DisplayName("makes 10 metadataPush requests")
185185
@Test
@@ -195,19 +195,19 @@ default void metadataPush10() {
195195
.isTrue();
196196
}
197197

198-
// @DisplayName("makes 10 metadataPush with Large Metadata in requests")
199-
// @Test
200-
// default void largePayloadMetadataPush10() {
201-
// Assumptions.assumeThat(getTransportPair().withResumability).isFalse();
202-
// Flux.range(1, 10)
203-
// .flatMap(i -> getClient().metadataPush(ByteBufPayload.create("", LARGE_DATA)))
204-
// .as(StepVerifier::create)
205-
// .expectComplete()
206-
// .verify(getTimeout());
207-
//
208-
// Assertions.assertThat(getTransportPair().responder.awaitUntilObserved(10, getTimeout()))
209-
// .isTrue();
210-
// }
198+
@DisplayName("makes 10 metadataPush with Large Metadata in requests")
199+
@Test
200+
default void largePayloadMetadataPush10() {
201+
Assumptions.assumeThat(getTransportPair().withResumability).isFalse();
202+
Flux.range(1, 10)
203+
.flatMap(i -> getClient().metadataPush(ByteBufPayload.create("", LARGE_DATA)))
204+
.as(StepVerifier::create)
205+
.expectComplete()
206+
.verify(getTimeout());
207+
208+
Assertions.assertThat(getTransportPair().responder.awaitUntilObserved(10, getTimeout()))
209+
.isTrue();
210+
}
211211

212212
@DisplayName("makes 1 requestChannel request with 0 payloads")
213213
@Test
@@ -250,19 +250,19 @@ default void requestChannel200_000() {
250250
.verify(getTimeout());
251251
}
252252

253-
// @DisplayName("makes 1 requestChannel request with 50 large payloads")
254-
// @Test
255-
// default void largePayloadRequestChannel50() {
256-
// Flux<Payload> payloads = Flux.range(0, 50).map(__ -> LARGE_PAYLOAD.retain());
257-
//
258-
// getClient()
259-
// .requestChannel(payloads)
260-
// .doOnNext(Payload::release)
261-
// .as(StepVerifier::create)
262-
// .expectNextCount(50)
263-
// .expectComplete()
264-
// .verify(getTimeout());
265-
// }
253+
@DisplayName("makes 1 requestChannel request with 50 large payloads")
254+
@Test
255+
default void largePayloadRequestChannel50() {
256+
Flux<Payload> payloads = Flux.range(0, 50).map(__ -> LARGE_PAYLOAD.retain());
257+
258+
getClient()
259+
.requestChannel(payloads)
260+
.doOnNext(Payload::release)
261+
.as(StepVerifier::create)
262+
.expectNextCount(50)
263+
.expectComplete()
264+
.verify(getTimeout());
265+
}
266266

267267
@DisplayName("makes 1 requestChannel request with 20,000 payloads")
268268
@Test
@@ -388,17 +388,17 @@ default void requestResponse100() {
388388
.verify(getTimeout());
389389
}
390390

391-
// @DisplayName("makes 50 requestResponse requests")
392-
// @Test
393-
// default void largePayloadRequestResponse50() {
394-
// Flux.range(1, 50)
395-
// .flatMap(
396-
// i -> getClient().requestResponse(LARGE_PAYLOAD.retain()).doOnNext(Payload::release))
397-
// .as(StepVerifier::create)
398-
// .expectNextCount(50)
399-
// .expectComplete()
400-
// .verify(getTimeout());
401-
// }
391+
@DisplayName("makes 50 requestResponse requests")
392+
@Test
393+
default void largePayloadRequestResponse50() {
394+
Flux.range(1, 50)
395+
.flatMap(
396+
i -> getClient().requestResponse(LARGE_PAYLOAD.retain()).doOnNext(Payload::release))
397+
.as(StepVerifier::create)
398+
.expectNextCount(50)
399+
.expectComplete()
400+
.verify(getTimeout());
401+
}
402402

403403
@DisplayName("makes 10,000 requestResponse requests")
404404
@Test

rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronClientTransport.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,9 @@ public Mono<DuplexConnection> connect() {
117117

118118
return Mono.error(
119119
new TimeoutException(
120-
"Timeout on send SetupFrame { connection: [{}]; stream: [{}]; channel: [{}] }"));
120+
String.format(
121+
"Timeout on send SetupFrame { connection: [%s]; stream: [%s]; channel: [%s] }",
122+
connectionId, streamId, channel)));
121123
}
122124

123125
CloseHelper.quietClose(serverManagementPublication);
@@ -170,8 +172,7 @@ public void onAvailableImage(Image image) {
170172

171173
final NanoClock nanoClock = aeron.context().nanoClock();
172174
final long nowNs = nanoClock.nanoTime();
173-
final long deadlineNs =
174-
nowNs + aeron.context().keepAliveIntervalNs() * 1000;
175+
final long deadlineNs = nowNs + timeoutNs;
175176

176177
idleStrategy.reset();
177178
for (; ; ) {
@@ -213,7 +214,12 @@ public void onAvailableImage(Image image) {
213214
public static AeronClientTransport createUdp(
214215
Aeron aeron, String host, int port, EventLoopGroup resources) {
215216
final Supplier<IdleStrategy> idleStrategySupplier =
216-
() -> new BackoffIdleStrategy(/* maxSpins */ 100, /* maxYields */ 1000, /* minParkPeriodNs */ 10000, /* maxParkPeriodNs */100000);
217+
() ->
218+
new BackoffIdleStrategy(
219+
/* maxSpins */ 100, /* maxYields */
220+
1000, /* minParkPeriodNs */
221+
10000, /* maxParkPeriodNs */
222+
100000);
217223
return new AeronClientTransport(
218224
aeron,
219225
new ChannelUriStringBuilder()

rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/WrappedDirectBufferByteBuf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,10 +230,10 @@ public int nioBufferCount() {
230230

231231
@Override
232232
public ByteBuffer nioBuffer(int index, int length) {
233-
// FIXME: works incorrectly
234233
final ByteBuffer byteBuffer =
235234
BufferUtil.allocateDirectAligned(length, BitUtil.CACHE_LINE_LENGTH);
236235
directBuffer.getBytes(index, byteBuffer, length);
236+
byteBuffer.flip();
237237
return byteBuffer;
238238
}
239239

rsocket-transport-aeron/src/test/resources/logback-test.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
<logger name="io.rsocket.transport.netty" level="INFO"/>
2727
<logger name="io.rsocket.FrameLogger" level="INFO"/>
28-
<logger name="io.rsocket.fragmentation.FragmentationDuplexConnection" level="INFO"/>
2928
<logger name="io.rsocket.transport.netty" level="INFO"/>
3029
<logger name="io.rsocket.core.RSocketRequester" level="DEBUG"/>
3130
<logger name="io.rsocket.core.RSocketResponder" level="DEBUG"/>

0 commit comments

Comments
 (0)