Skip to content

Shutdown executor and provide new endpoint constructor #388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ public String getProtocol() {
}

public abstract Consumer<String> createConsumer(SubscriptionSession session);

public void shutdown() {
// do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ default void onStop(SubscriptionSession session, OperationMessage message) {
default void onTerminate(SubscriptionSession session, OperationMessage message) {
// do nothing
}
}

default void shutdown() {
// do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,8 @@ void abort(SubscriptionSession session) {
future.cancel(true);
}
}

void shutdown() {
this.executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class ApolloSubscriptionProtocolFactory extends SubscriptionProtocolFacto
public static final int KEEP_ALIVE_INTERVAL = 15;
@Getter private final GraphQLObjectMapper objectMapper;
private final ApolloCommandProvider commandProvider;
private KeepAliveSubscriptionConnectionListener keepAlive;

public ApolloSubscriptionProtocolFactory(
GraphQLObjectMapper objectMapper,
Expand Down Expand Up @@ -67,7 +68,8 @@ public ApolloSubscriptionProtocolFactory(
if (keepAliveInterval != null
&& listeners.stream()
.noneMatch(KeepAliveSubscriptionConnectionListener.class::isInstance)) {
listeners.add(new KeepAliveSubscriptionConnectionListener(keepAliveInterval));
keepAlive = new KeepAliveSubscriptionConnectionListener(keepAliveInterval);
listeners.add(keepAlive);
}
commandProvider =
new ApolloCommandProvider(
Expand All @@ -81,4 +83,11 @@ public ApolloSubscriptionProtocolFactory(
public Consumer<String> createConsumer(SubscriptionSession session) {
return new ApolloSubscriptionConsumer(session, objectMapper, commandProvider);
}

@Override
public void shutdown() {
if (keepAlive != null) {
keepAlive.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
public class KeepAliveSubscriptionConnectionListener
implements ApolloSubscriptionConnectionListener {

private final ApolloSubscriptionKeepAliveRunner keepAliveRunner;
protected final ApolloSubscriptionKeepAliveRunner keepAliveRunner;

public KeepAliveSubscriptionConnectionListener() {
this(Duration.ofSeconds(15));
Expand Down Expand Up @@ -35,4 +35,10 @@ public void onStop(SubscriptionSession session, OperationMessage message) {
public void onTerminate(SubscriptionSession session, OperationMessage message) {
keepAliveRunner.abort(session);
}

@Override
public void shutdown() {
keepAliveRunner.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@ public GraphQLWebsocketServlet(
.collect(toList());
}

public GraphQLWebsocketServlet(
GraphQLInvoker graphQLInvoker,
GraphQLSubscriptionInvocationInputFactory invocationInputFactory,
GraphQLObjectMapper graphQLObjectMapper,
List<SubscriptionProtocolFactory> subscriptionProtocolFactory,
SubscriptionProtocolFactory fallbackSubscriptionProtocolFactory) {

this.subscriptionProtocolFactories = subscriptionProtocolFactory;
this.fallbackSubscriptionProtocolFactory = fallbackSubscriptionProtocolFactory;

allSubscriptionProtocols =
Stream.concat(
subscriptionProtocolFactories.stream(),
Stream.of(fallbackSubscriptionProtocolFactory))
.map(SubscriptionProtocolFactory::getProtocol)
.collect(toList());
}

@Override
public void onOpen(Session session, EndpointConfig endpointConfig) {
final WebSocketSubscriptionProtocolFactory subscriptionProtocolFactory =
Expand Down Expand Up @@ -234,6 +252,12 @@ public void beginShutDown() {
log.error("GraphQLWebsocketServlet did not shut down cleanly!");
sessionSubscriptionCache.clear();
}

for (SubscriptionProtocolFactory protocolFactory : subscriptionProtocolFactories) {
protocolFactory.shutdown();
}

fallbackSubscriptionProtocolFactory.shutdown();
}

isShutDown.set(true);
Expand Down