Skip to content

Remove Stream#shouldSupportAdditionalTimeout method #1227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,6 @@ public ByteBuf read(final int numBytes) throws IOException {
return handler.getRead();
}

@Override
public boolean supportsAdditionalTimeout() {
return true;
}

@Override
public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOException {
FutureAsyncCompletionHandler<ByteBuf> handler = new FutureAsyncCompletionHandler<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.event.ConnectionPoolReadyEvent;
import com.mongodb.event.ConnectionReadyEvent;
import com.mongodb.internal.time.TimePoint;
import com.mongodb.internal.time.Timeout;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.SdamServerDescriptionManager.SdamIssue;
Expand All @@ -56,6 +54,8 @@
import com.mongodb.internal.logging.StructuredLogger;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.internal.thread.DaemonThreadFactory;
import com.mongodb.internal.time.TimePoint;
import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.NonNull;
import com.mongodb.lang.Nullable;
import org.bson.ByteBuf;
Expand Down Expand Up @@ -777,12 +777,6 @@ public <T> T receive(final Decoder<T> decoder, final SessionContext sessionConte
return wrapped.receive(decoder, sessionContext);
}

@Override
public boolean supportsAdditionalTimeout() {
isTrue("open", !isClosed.get());
return wrapped.supportsAdditionalTimeout();
}

@Override
public <T> T receive(final Decoder<T> decoder, final SessionContext sessionContext, final int additionalTimeout) {
isTrue("open", !isClosed.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
}

private boolean shouldStreamResponses(final ServerDescription currentServerDescription) {
return currentServerDescription.getTopologyVersion() != null && connection.supportsAdditionalTimeout();
return currentServerDescription.getTopologyVersion() != null;
}

private CommandMessage createCommandMessage(final BsonDocument command, final InternalConnection connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ <T> T sendAndReceive(CommandMessage message, Decoder<T> decoder, SessionContext
<T> T receive(Decoder<T> decoder, SessionContext sessionContext);


default boolean supportsAdditionalTimeout() {
return false;
}

default <T> T receive(Decoder<T> decoder, SessionContext sessionContext, int additionalTimeout) {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,6 @@ public <T> T receive(final Decoder<T> decoder, final SessionContext sessionConte
return receiveCommandMessageResponse(decoder, new NoOpCommandEventSender(), sessionContext, 0);
}

@Override
public boolean supportsAdditionalTimeout() {
return stream.supportsAdditionalTimeout();
}

@Override
public <T> T receive(final Decoder<T> decoder, final SessionContext sessionContext, final int additionalTimeout) {
isTrue("Response is expected", hasMoreToCome);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,6 @@ public ByteBuf read(final int numBytes) throws IOException {
}
}

@Override
public boolean supportsAdditionalTimeout() {
return true;
}

@Override
public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOException {
int curTimeout = socket.getSoTimeout();
Expand Down
25 changes: 1 addition & 24 deletions driver-core/src/main/com/mongodb/internal/connection/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,39 +60,16 @@ public interface Stream extends BufferProvider {
*/
ByteBuf read(int numBytes) throws IOException;

/**
* Gets whether this implementation supports specifying an additional timeout for read operations
* <p>
* The default is to not support specifying an additional timeout
* </p>
*
* @return true if this implementation supports specifying an additional timeouts for reads operations
* @see #read(int, int)
*/
default boolean supportsAdditionalTimeout() {
return false;
}

/**
* Read from the stream, blocking until the requested number of bytes have been read. If supported by the implementation,
* adds the given additional timeout to the configured timeout for the stream.
* <p>
* This method should not be called unless {@link #supportsAdditionalTimeout()} returns true.
* </p>
* <p>
* The default behavior is to throw an {@link UnsupportedOperationException}
* </p>
*
* @param numBytes The number of bytes to read into the returned byte buffer
* @param additionalTimeout additional timeout in milliseconds to add to the configured timeout
* @return a byte buffer filled with number of bytes requested
* @throws IOException if there are problems reading from the stream
* @throws UnsupportedOperationException if this implementation does not support additional timeouts
* @see #supportsAdditionalTimeout()
*/
default ByteBuf read(int numBytes, int additionalTimeout) throws IOException {
throw new UnsupportedOperationException();
}
ByteBuf read(int numBytes, int additionalTimeout) throws IOException;

/**
* Write each buffer in the list to the stream in order, asynchronously. This method should return immediately, and invoke the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,6 @@ private static class TlsChannelStream extends AsynchronousChannelStream {
this.selectorMonitor = selectorMonitor;
}

@Override
public boolean supportsAdditionalTimeout() {
return true;
}

@Override
public void openAsync(final AsyncCompletionHandler<Void> handler) {
isTrue("unopened", getChannel() == null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,6 @@ public <T> T receive(final Decoder<T> decoder, final SessionContext sessionConte
return result;
}

@Override
public boolean supportsAdditionalTimeout() {
return wrapped.supportsAdditionalTimeout();
}

@Override
public <T> T receive(final Decoder<T> decoder, final SessionContext sessionContext, final int additionalTimeout) {
T result = wrapped.receive(decoder, sessionContext, additionalTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,6 @@ public ByteBuf read(final int numBytes) throws IOException {
return read(numBytes, 0);
}

@Override
public boolean supportsAdditionalTimeout() {
return true;
}

@Override
public ByteBuf read(final int numBytes, final int additionalTimeoutMillis) throws IOException {
isTrueArgument("additionalTimeoutMillis must not be negative", additionalTimeoutMillis >= 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ class DefaultServerMonitorSpecification extends Specification {
initialServerDescription
}

supportsAdditionalTimeout() >> true

send(_, _, _) >> { }

receive(_, _) >> {
Expand Down Expand Up @@ -238,8 +236,6 @@ class DefaultServerMonitorSpecification extends Specification {
initialServerDescription
}

supportsAdditionalTimeout() >> true

send(_, _, _) >> { }

receive(_, _) >> {
Expand Down