Skip to content

Commit 5da0257

Browse files
nictasnebhale
authored andcommitted
Fix Direct Memory Leaks in Doppler Client
Previously the versions of Reactor that we used had a behavior where, in the case of a cancellation of a subscription, any element that was caching data wouldn't be notified and they'd be at risk of creating a memory leak (think buffers and streaming codecs). This change makes use of new Reactor features that allow notification of cancellation and cleanup of state). [resolves #1019] Signed-off-by: Ben Hale <[email protected]>
1 parent 4f007b9 commit 5da0257

File tree

2 files changed

+18
-3
lines changed

2 files changed

+18
-3
lines changed

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/doppler/MultipartCodec.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,26 @@
1616

1717
package org.cloudfoundry.reactor.doppler;
1818

19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
1922
import io.netty.buffer.Unpooled;
2023
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
2124
import io.netty.handler.codec.http.HttpHeaderNames;
2225
import reactor.core.publisher.Flux;
2326
import reactor.netty.ByteBufFlux;
2427
import reactor.netty.http.client.HttpClientResponse;
2528

29+
import java.io.IOException;
2630
import java.io.InputStream;
2731
import java.nio.charset.Charset;
2832
import java.util.regex.Matcher;
2933
import java.util.regex.Pattern;
3034

3135
final class MultipartCodec {
3236

37+
private static final Logger LOGGER = LoggerFactory.getLogger(MultipartCodec.class);
38+
3339
private static final Pattern BOUNDARY_PATTERN = Pattern.compile("multipart/.+; boundary=(.*)");
3440

3541
private static final int MAX_PAYLOAD_SIZE = 1024 * 1024;
@@ -49,7 +55,8 @@ static DelimiterBasedFrameDecoder createDecoder(HttpClientResponse response) {
4955

5056
static Flux<InputStream> decode(ByteBufFlux body) {
5157
return body.asInputStream()
52-
.skip(1);
58+
.skip(1)
59+
.doOnDiscard(InputStream.class, MultipartCodec::close);
5360
}
5461

5562
private static String extractMultipartBoundary(HttpClientResponse response) {
@@ -63,4 +70,12 @@ private static String extractMultipartBoundary(HttpClientResponse response) {
6370
}
6471
}
6572

73+
private static void close(InputStream in) {
74+
try {
75+
in.close();
76+
} catch (IOException e) {
77+
LOGGER.warn("Could not close input stream. This will cause a direct memory leak.", e);
78+
}
79+
}
80+
6681
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public <T> Flux<T> parseBodyToFlux(Function<HttpClientResponseWithBody, Publishe
164164
ByteBufFlux body = connection.inbound().receive();
165165

166166
return processResponse(response, body).flatMapMany(responseTransformer)
167-
.doOnTerminate(connection::dispose);
167+
.doFinally(signalType -> connection.dispose());
168168
});
169169
}
170170

@@ -257,7 +257,7 @@ private Publisher<InputStream> handleWebsocketCommunication(WebsocketInbound inb
257257
return inbound.aggregateFrames()
258258
.receive()
259259
.asInputStream()
260-
.doOnTerminate(outbound::sendClose);
260+
.doFinally(signalType -> outbound.sendClose());
261261
}
262262

263263
}

0 commit comments

Comments
 (0)