Skip to content

Commit 86b740d

Browse files
authored
Pass binary data through rest proxy. (#29084)
* wip * more tests. * pass binary data. * wip * wip. * more testing. * stream v1 * tests. * more testing. * more testing. * modify this test. * check if http clients hit eof * static logger.
1 parent 4a2e40a commit 86b740d

File tree

10 files changed

+782
-112
lines changed

10 files changed

+782
-112
lines changed

sdk/core/azure-core-test/src/main/java/com/azure/core/test/implementation/RestProxyTests.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import reactor.core.publisher.Mono;
5050
import reactor.test.StepVerifier;
5151

52+
import java.io.ByteArrayInputStream;
5253
import java.nio.ByteBuffer;
5354
import java.nio.channels.AsynchronousFileChannel;
5455
import java.nio.charset.StandardCharsets;
@@ -446,6 +447,13 @@ HttpBinJSON putBodyAndContentLength(@BodyParam(ContentType.APPLICATION_OCTET_STR
446447
Mono<HttpBinJSON> putAsyncBodyAndContentLength(@BodyParam(ContentType.APPLICATION_OCTET_STREAM) Flux<ByteBuffer> body,
447448
@HeaderParam("Content-Length") long contentLength);
448449

450+
@Put("put")
451+
@ExpectedResponses({200})
452+
@UnexpectedResponseExceptionType(MyRestException.class)
453+
Mono<HttpBinJSON> putAsyncBodyAndContentLength(
454+
@BodyParam(ContentType.APPLICATION_OCTET_STREAM) BinaryData body,
455+
@HeaderParam("Content-Length") long contentLength);
456+
449457
@Put("put")
450458
@ExpectedResponses({201})
451459
HttpBinJSON putWithUnexpectedResponse(@BodyParam(ContentType.APPLICATION_OCTET_STREAM) String putBody);
@@ -592,6 +600,88 @@ public void asyncPutRequestWithBodyAndMoreThanContentLength() {
592600
});
593601
}
594602

603+
@Test
604+
public void asyncPutRequestWithBinaryDataBodyAndEqualContentLength() {
605+
Mono<BinaryData> bodyMono = BinaryData.fromFlux(
606+
Flux.just(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8))));
607+
StepVerifier.create(
608+
bodyMono.flatMap(body ->
609+
createService(Service9.class).putAsyncBodyAndContentLength(body, 4L)))
610+
.assertNext(json -> {
611+
assertEquals("test", json.data());
612+
assertEquals(ContentType.APPLICATION_OCTET_STREAM, json.getHeaderValue("Content-Type"));
613+
assertEquals("4", json.getHeaderValue("Content-Length"));
614+
}).verifyComplete();
615+
}
616+
617+
@Test
618+
public void asyncPutRequestWithBinaryDataBodyAndLessThanContentLength() {
619+
Mono<BinaryData> bodyMono = BinaryData.fromFlux(
620+
Flux.just(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8))));
621+
StepVerifier.create(
622+
bodyMono.flatMap(body ->
623+
createService(Service9.class).putAsyncBodyAndContentLength(body, 5L)))
624+
.verifyErrorSatisfies(exception -> {
625+
assertTrue(exception instanceof UnexpectedLengthException
626+
|| (exception.getSuppressed().length > 0
627+
&& exception.getSuppressed()[0] instanceof UnexpectedLengthException));
628+
assertTrue(exception.getMessage().contains("less than"));
629+
});
630+
}
631+
632+
/**
633+
* LengthValidatingInputStream in rest proxy relies on reader
634+
* reaching EOF. This test specifically targets InputStream to assert this behavior.
635+
*/
636+
@Test
637+
public void asyncPutRequestWithStreamBinaryDataBodyAndLessThanContentLength() {
638+
Mono<BinaryData> bodyMono = Mono.just(BinaryData.fromStream(
639+
new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8))));
640+
StepVerifier.create(
641+
bodyMono.flatMap(body ->
642+
createService(Service9.class).putAsyncBodyAndContentLength(body, 5L)))
643+
.verifyErrorSatisfies(exception -> {
644+
assertTrue(exception instanceof UnexpectedLengthException
645+
|| (exception.getSuppressed().length > 0
646+
&& exception.getSuppressed()[0] instanceof UnexpectedLengthException));
647+
assertTrue(exception.getMessage().contains("less than"));
648+
});
649+
}
650+
651+
@Test
652+
public void asyncPutRequestWithBinaryDataBodyAndMoreThanContentLength() {
653+
Mono<BinaryData> bodyMono = BinaryData.fromFlux(
654+
Flux.just(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8))));
655+
StepVerifier.create(
656+
bodyMono.flatMap(body ->
657+
createService(Service9.class).putAsyncBodyAndContentLength(body, 3L)))
658+
.verifyErrorSatisfies(exception -> {
659+
assertTrue(exception instanceof UnexpectedLengthException
660+
|| (exception.getSuppressed().length > 0
661+
&& exception.getSuppressed()[0] instanceof UnexpectedLengthException));
662+
assertTrue(exception.getMessage().contains("more than"));
663+
});
664+
}
665+
666+
/**
667+
* LengthValidatingInputStream in rest proxy relies on reader
668+
* reaching EOF. This test specifically targets InputStream to assert this behavior.
669+
*/
670+
@Test
671+
public void asyncPutRequestWithStreamBinaryDataBodyAndMoreThanContentLength() {
672+
Mono<BinaryData> bodyMono = Mono.just(BinaryData.fromStream(
673+
new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8))));
674+
StepVerifier.create(
675+
bodyMono.flatMap(body ->
676+
createService(Service9.class).putAsyncBodyAndContentLength(body, 3L)))
677+
.verifyErrorSatisfies(exception -> {
678+
assertTrue(exception instanceof UnexpectedLengthException
679+
|| (exception.getSuppressed().length > 0
680+
&& exception.getSuppressed()[0] instanceof UnexpectedLengthException));
681+
assertTrue(exception.getMessage().contains("more than"));
682+
});
683+
}
684+
595685
@Test
596686
public void syncPutRequestWithUnexpectedResponse() {
597687
HttpResponseException e = assertThrows(HttpResponseException.class,
@@ -1593,6 +1683,36 @@ public void segmentUploadTest() throws Exception {
15931683
assertEquals("quick brown fox", response.getValue().data());
15941684
}
15951685

1686+
@Host("http://localhost")
1687+
@ServiceInterface(name = "FluxUploadService")
1688+
interface BinaryDataUploadService {
1689+
@Put("/put")
1690+
Response<HttpBinJSON> put(@BodyParam("text/plain") BinaryData content,
1691+
@HeaderParam("Content-Length") long contentLength);
1692+
}
1693+
1694+
@Test
1695+
public void binaryDataUploadTest() throws Exception {
1696+
Path filePath = Paths.get(getClass().getClassLoader().getResource("upload.txt").toURI());
1697+
BinaryData data = BinaryData.fromFile(filePath);
1698+
1699+
final HttpClient httpClient = createHttpClient();
1700+
// Scenario: Log the body so that body buffering/replay behavior is exercised.
1701+
//
1702+
// Order in which policies applied will be the order in which they added to builder
1703+
//
1704+
final HttpPipeline httpPipeline = new HttpPipelineBuilder()
1705+
.httpClient(httpClient)
1706+
.policies(new PortPolicy(getWireMockPort(), true),
1707+
new HttpLoggingPolicy(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS)))
1708+
.build();
1709+
//
1710+
Response<HttpBinJSON> response = RestProxy
1711+
.create(BinaryDataUploadService.class, httpPipeline).put(data, Files.size(filePath));
1712+
1713+
assertEquals("The quick brown fox jumps over the lazy dog", response.getValue().data());
1714+
}
1715+
15961716
@Host("{url}")
15971717
@ServiceInterface(name = "Service22")
15981718
interface Service22 {

sdk/core/azure-core/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
<jacoco.min.branchcoverage>0.60</jacoco.min.branchcoverage>
5050
<javaModulesSurefireArgLine>
5151
--add-exports com.azure.core/com.azure.core.implementation.http=ALL-UNNAMED
52+
--add-exports com.azure.core/com.azure.core.implementation.http.rest=ALL-UNNAMED
5253
--add-exports com.azure.core/com.azure.core.implementation.serializer=ALL-UNNAMED
5354
--add-exports com.azure.core/com.azure.core.implementation.jackson=ALL-UNNAMED
5455

sdk/core/azure-core/src/main/java/com/azure/core/http/rest/RequestOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public RequestOptions addRequestCallback(Consumer<HttpRequest> requestCallback)
237237
*/
238238
public RequestOptions setBody(BinaryData requestBody) {
239239
Objects.requireNonNull(requestBody, "'requestBody' cannot be null.");
240-
this.requestCallback = this.requestCallback.andThen(request -> request.setBody(requestBody.toBytes()));
240+
this.requestCallback = this.requestCallback.andThen(request -> request.setBody(requestBody));
241241
return this;
242242
}
243243

sdk/core/azure-core/src/main/java/com/azure/core/http/rest/RestProxy.java

Lines changed: 5 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import com.azure.core.exception.ResourceModifiedException;
1111
import com.azure.core.exception.ResourceNotFoundException;
1212
import com.azure.core.exception.TooManyRedirectsException;
13-
import com.azure.core.exception.UnexpectedLengthException;
1413
import com.azure.core.http.ContentType;
1514
import com.azure.core.http.HttpHeaders;
1615
import com.azure.core.http.HttpMethod;
@@ -23,6 +22,7 @@
2322
import com.azure.core.http.policy.UserAgentPolicy;
2423
import com.azure.core.implementation.TypeUtil;
2524
import com.azure.core.implementation.http.UnexpectedExceptionInformation;
25+
import com.azure.core.implementation.http.rest.RestProxyUtils;
2626
import com.azure.core.implementation.serializer.HttpResponseDecoder;
2727
import com.azure.core.implementation.serializer.HttpResponseDecoder.HttpDecodedResponse;
2828
import com.azure.core.util.Base64Url;
@@ -63,9 +63,6 @@
6363
* as asynchronous Single objects that resolve to a deserialized Java object.
6464
*/
6565
public final class RestProxy implements InvocationHandler {
66-
private static final ByteBuffer VALIDATION_BUFFER = ByteBuffer.allocate(0);
67-
private static final String BODY_TOO_LARGE = "Request body emitted %d bytes, more than the expected %d bytes.";
68-
private static final String BODY_TOO_SMALL = "Request body emitted %d bytes, less than the expected %d bytes.";
6966
private static final String MUST_IMPLEMENT_PAGE_ERROR =
7067
"Unable to create PagedResponse<T>. Body must be of a type that implements: " + Page.class;
7168

@@ -140,11 +137,9 @@ public Object invoke(Object proxy, final Method method, Object[] args) {
140137
options.getRequestCallback().accept(request);
141138
}
142139

143-
if (request.getBody() != null) {
144-
request.setBody(validateLength(request));
145-
}
146-
147-
final Mono<HttpResponse> asyncResponse = send(request, context);
140+
Context finalContext = context;
141+
final Mono<HttpResponse> asyncResponse = RestProxyUtils.validateLengthAsync(request)
142+
.flatMap(r -> send(r, finalContext));
148143

149144
Mono<HttpDecodedResponse> asyncDecodedResponse = this.decoder.decode(asyncResponse, methodParser);
150145

@@ -177,43 +172,6 @@ static Context mergeRequestOptionsContext(Context context, RequestOptions option
177172
return context;
178173
}
179174

180-
static Flux<ByteBuffer> validateLength(final HttpRequest request) {
181-
final Flux<ByteBuffer> bbFlux = request.getBody();
182-
if (bbFlux == null) {
183-
return Flux.empty();
184-
}
185-
186-
final long expectedLength = Long.parseLong(request.getHeaders().getValue("Content-Length"));
187-
188-
return Flux.defer(() -> {
189-
final long[] currentTotalLength = new long[1];
190-
return Flux.concat(bbFlux, Flux.just(VALIDATION_BUFFER)).handle((buffer, sink) -> {
191-
if (buffer == null) {
192-
return;
193-
}
194-
195-
if (buffer == VALIDATION_BUFFER) {
196-
if (expectedLength != currentTotalLength[0]) {
197-
sink.error(new UnexpectedLengthException(String.format(BODY_TOO_SMALL,
198-
currentTotalLength[0], expectedLength), currentTotalLength[0], expectedLength));
199-
} else {
200-
sink.complete();
201-
}
202-
return;
203-
}
204-
205-
currentTotalLength[0] += buffer.remaining();
206-
if (currentTotalLength[0] > expectedLength) {
207-
sink.error(new UnexpectedLengthException(String.format(BODY_TOO_LARGE,
208-
currentTotalLength[0], expectedLength), currentTotalLength[0], expectedLength));
209-
return;
210-
}
211-
212-
sink.next(buffer);
213-
});
214-
});
215-
}
216-
217175
/**
218176
* Starts the tracing span for the current service call, additionally set metadata attributes on the span by passing
219177
* additional context information.
@@ -321,7 +279,7 @@ private HttpRequest configRequest(final HttpRequest request, final SwaggerMethod
321279
// sending the request to the service. There is no memory copy that happens here. Sources like
322280
// InputStream, File and Flux<ByteBuffer> will not be eagerly copied into memory until it's required
323281
// by the HttpClient implementations.
324-
request.setBody(binaryData.toFluxByteBuffer());
282+
request.setBody(binaryData);
325283
return request;
326284
}
327285

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.core.implementation.http.rest;
4+
5+
import com.azure.core.exception.UnexpectedLengthException;
6+
import com.azure.core.util.logging.ClientLogger;
7+
8+
import java.io.IOException;
9+
import java.io.InputStream;
10+
import java.util.Objects;
11+
12+
import static com.azure.core.implementation.http.rest.RestProxyUtils.BODY_TOO_LARGE;
13+
import static com.azure.core.implementation.http.rest.RestProxyUtils.BODY_TOO_SMALL;
14+
15+
/**
16+
* An {@link InputStream} decorator that tracks the number of bytes read from an inner {@link InputStream} and throws
17+
* an exception if the number of bytes read doesn't match what was expected.
18+
*
19+
* This implementation assumes that reader is going to read until EOF.
20+
*/
21+
final class LengthValidatingInputStream extends InputStream {
22+
23+
private static final ClientLogger LOGGER = new ClientLogger(LengthValidatingInputStream.class);
24+
25+
private final InputStream inner;
26+
private final long expectedReadSize;
27+
28+
private long position;
29+
private long mark = -1;
30+
31+
/**
32+
* Creates a new {@link LengthValidatingInputStream}.
33+
*
34+
* @param inputStream The {@link InputStream} being decorated.
35+
* @param expectedReadSize The expected number of bytes to be read from the inner {@code inputStream}.
36+
*/
37+
LengthValidatingInputStream(InputStream inputStream, long expectedReadSize) {
38+
this.inner = Objects.requireNonNull(inputStream, "'inputStream' cannot be null.");
39+
40+
if (expectedReadSize < 0) {
41+
throw LOGGER.logExceptionAsError(new IllegalArgumentException("'expectedReadSize' cannot be less than 0."));
42+
}
43+
44+
this.expectedReadSize = expectedReadSize;
45+
}
46+
47+
@Override
48+
public synchronized int read(byte[] b, int off, int len) throws IOException {
49+
int readSize = inner.read(b, off, len);
50+
validateLength(readSize);
51+
52+
return readSize;
53+
}
54+
55+
@Override
56+
public synchronized long skip(long n) throws IOException {
57+
long skipped = inner.skip(n);
58+
position += skipped;
59+
return skipped;
60+
}
61+
62+
@Override
63+
public int available() throws IOException {
64+
return inner.available();
65+
}
66+
67+
@Override
68+
public void close() throws IOException {
69+
inner.close();
70+
}
71+
72+
@Override
73+
public synchronized void mark(int readlimit) {
74+
inner.mark(readlimit);
75+
mark = position;
76+
}
77+
78+
@Override
79+
public synchronized void reset() throws IOException {
80+
inner.reset();
81+
position = mark;
82+
}
83+
84+
@Override
85+
public boolean markSupported() {
86+
return inner.markSupported();
87+
}
88+
89+
@Override
90+
public synchronized int read() throws IOException {
91+
int read = inner.read();
92+
validateLength(read == -1 ? -1 : 1);
93+
94+
return read;
95+
}
96+
97+
private void validateLength(int readSize) {
98+
if (readSize == -1) {
99+
// If the inner InputStream has reached termination validate that the read bytes matches what was expected.
100+
if (position > expectedReadSize) {
101+
throw new UnexpectedLengthException(String.format(BODY_TOO_LARGE,
102+
position, expectedReadSize), position, expectedReadSize);
103+
} else if (position < expectedReadSize) {
104+
throw new UnexpectedLengthException(String.format(BODY_TOO_SMALL,
105+
position, expectedReadSize), position, expectedReadSize);
106+
}
107+
} else {
108+
position += readSize;
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)