Skip to content

stub: stabilize StreamObserver APIs #7938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.grpc.stub;

import io.grpc.ExperimentalApi;

import javax.annotation.Nullable;

/**
Expand All @@ -30,7 +28,6 @@
* <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create
* "real" RPCs suitable for testing and make a fake for the server-side.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1788")
public abstract class ClientCallStreamObserver<V> extends CallStreamObserver<V> {
/**
* Prevent any further processing for this {@code ClientCallStreamObserver}. No further messages
Expand All @@ -57,10 +54,60 @@ public abstract class ClientCallStreamObserver<V> extends CallStreamObserver<V>
*
* <p>This method may only be called during {@link ClientResponseObserver#beforeStart
* ClientResponseObserver.beforeStart()}.
*
* <p>This API is still a work in-progress and may change in the future.
*/
public void disableAutoRequestWithInitial(int request) {
throw new UnsupportedOperationException();
}

/**
* If {@code true}, indicates that the observer is capable of sending additional messages
* without requiring excessive buffering internally. This value is just a suggestion and the
* application is free to ignore it, however doing so may result in excessive buffering within the
* observer.
*
* <p>If {@code false}, the runnable passed to {@link #setOnReadyHandler} will be called after
* {@code isReady()} transitions to {@code true}.
*/
@Override
public abstract boolean isReady();

/**
* Set a {@link Runnable} that will be executed every time the stream {@link #isReady()} state
* changes from {@code false} to {@code true}. While it is not guaranteed that the same
* thread will always be used to execute the {@link Runnable}, it is guaranteed that executions
* are serialized with calls to the 'inbound' {@link StreamObserver}.
*
* <p>On client-side this method may only be called during {@link
* ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
* call to the application, before the service returns its {@code StreamObserver}.
*
* <p>Because there is a processing delay to deliver this notification, it is possible for
* concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious"
* notifications by checking {@code isReady()}'s current value instead of assuming it is now
* {@code true}. If {@code isReady() == false} the normal expectations apply, so there would be
* <em>another</em> {@code onReadyHandler} callback.
*
* @param onReadyHandler to call when peer is ready to receive more messages.
*/
@Override
public abstract void setOnReadyHandler(Runnable onReadyHandler);

/**
* Requests the peer to produce {@code count} more messages to be delivered to the 'inbound'
* {@link StreamObserver}.
*
* <p>This method is safe to call from multiple threads without external synchronization.
*
* @param count more messages
*/
@Override
public abstract void request(int count);

/**
* Sets message compression for subsequent calls to {@link #onNext}.
*
* @param enable whether to enable compression.
*/
@Override
public abstract void setMessageCompression(boolean enable);
}
3 changes: 0 additions & 3 deletions stub/src/main/java/io/grpc/stub/ClientResponseObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@

package io.grpc.stub;

import io.grpc.ExperimentalApi;

/**
* Specialization of {@link StreamObserver} implemented by clients in order to interact with the
* advanced features of a call such as flow-control.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4693")
public interface ClientResponseObserver<ReqT, RespT> extends StreamObserver<RespT> {
/**
* Called by the runtime priot to the start of a call to provide a reference to the
Expand Down
58 changes: 53 additions & 5 deletions stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.grpc.stub;

import io.grpc.ExperimentalApi;

/**
* A refinement of {@link CallStreamObserver} to allows for interaction with call
* cancellation events on the server side.
Expand All @@ -28,7 +26,6 @@
* <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create
* "real" RPCs suitable for testing and interact with the server using a normal client stub.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1788")
public abstract class ServerCallStreamObserver<V> extends CallStreamObserver<V> {

/**
Expand Down Expand Up @@ -89,10 +86,61 @@ public abstract class ServerCallStreamObserver<V> extends CallStreamObserver<V>
* <li>{@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.</li>
* </ul>
* </p>
*
* <p>This API is still a work in-progress and may change in the future.
*/
public void disableAutoRequest() {
throw new UnsupportedOperationException();
}


/**
* If {@code true}, indicates that the observer is capable of sending additional messages
* without requiring excessive buffering internally. This value is just a suggestion and the
* application is free to ignore it, however doing so may result in excessive buffering within the
* observer.
*
* <p>If {@code false}, the runnable passed to {@link #setOnReadyHandler} will be called after
* {@code isReady()} transitions to {@code true}.
*/
@Override
public abstract boolean isReady();

/**
* Set a {@link Runnable} that will be executed every time the stream {@link #isReady()} state
* changes from {@code false} to {@code true}. While it is not guaranteed that the same
* thread will always be used to execute the {@link Runnable}, it is guaranteed that executions
* are serialized with calls to the 'inbound' {@link StreamObserver}.
*
* <p>On client-side this method may only be called during {@link
* ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
* call to the application, before the service returns its {@code StreamObserver}.
*
* <p>Because there is a processing delay to deliver this notification, it is possible for
* concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious"
* notifications by checking {@code isReady()}'s current value instead of assuming it is now
* {@code true}. If {@code isReady() == false} the normal expectations apply, so there would be
* <em>another</em> {@code onReadyHandler} callback.
*
* @param onReadyHandler to call when peer is ready to receive more messages.
*/
@Override
public abstract void setOnReadyHandler(Runnable onReadyHandler);

/**
* Requests the peer to produce {@code count} more messages to be delivered to the 'inbound'
* {@link StreamObserver}.
*
* <p>This method is safe to call from multiple threads without external synchronization.
*
* @param count more messages
*/
@Override
public abstract void request(int count);

/**
* Sets message compression for subsequent calls to {@link #onNext}.
*
* @param enable whether to enable compression.
*/
@Override
public abstract void setMessageCompression(boolean enable);
}
2 changes: 1 addition & 1 deletion stub/src/test/java/io/grpc/stub/ClientCallsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ public void request(int numMessages) {
ClientCalls.asyncBidiStreamingCall(call, new ClientResponseObserver<Integer, String>() {
@Override
public void beforeStart(ClientCallStreamObserver<Integer> requestStream) {
requestStream.disableAutoInboundFlowControl();
requestStream.disableAutoRequestWithInitial(1);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion stub/src/test/java/io/grpc/stub/ServerCallsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public void disablingInboundAutoFlowControlSuppressesRequestsForMoreMessages() t
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.disableAutoInboundFlowControl();
serverCallObserver.disableAutoRequest();
return new ServerCalls.NoopStreamObserver<>();
}
});
Expand Down