Skip to content

Commit a3c416c

Browse files
Gracefully shutdown worker on WorkerTerminate request
1 parent 8845d98 commit a3c416c

File tree

7 files changed

+77
-6
lines changed

7 files changed

+77
-6
lines changed

src/main/java/com/microsoft/azure/functions/worker/Application.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.logging.*;
44
import javax.annotation.*;
55

6+
import com.microsoft.azure.functions.worker.handler.GracefulTerminationException;
67
import org.apache.commons.cli.*;
78
import org.apache.commons.lang3.exception.ExceptionUtils;
89

src/main/java/com/microsoft/azure/functions/worker/JavaWorkerClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ private void addHandlers() {
4242
this.handlerSuppliers.put(StreamingMessage.ContentCase.FUNCTION_LOAD_REQUEST, () -> new FunctionLoadRequestHandler(broker));
4343
this.handlerSuppliers.put(StreamingMessage.ContentCase.INVOCATION_REQUEST, () -> new InvocationRequestHandler(broker));
4444
this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_STATUS_REQUEST, WorkerStatusRequestHandler::new);
45+
this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_TERMINATE, () -> new WorkerTerminateRequestHandler(this));
4546
}
4647

4748
public Future<Void> listen(String workerId, String requestId) {

src/main/java/com/microsoft/azure/functions/worker/WorkerLogManager.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ public class WorkerLogManager {
1313
public static Logger getInvocationLogger(String invocationId) { return INSTANCE.getInvocationLoggerImpl(invocationId); }
1414

1515
static void initialize(JavaWorkerClient client, boolean logToConsole) { INSTANCE.initializeImpl(client, logToConsole); }
16-
static void deinitialize() { INSTANCE.deinitializempl(); }
16+
static void deinitialize() { INSTANCE.deinitializImpl(); }
17+
public static void flushLogs() { INSTANCE.flushLogsImpl(); }
1718

1819
private WorkerLogManager() {
1920
this.client = null;
@@ -28,20 +29,33 @@ private void initializeImpl(JavaWorkerClient client, boolean logToConsole) {
2829
assert this.client == null && client != null;
2930
this.client = client;
3031
this.logToConsole = logToConsole;
31-
addHostClientHandlers(this.hostLogger, null);
32+
addHandlers(this.hostLogger, null);
3233
}
3334

34-
private void deinitializempl() {
35+
private void deinitializImpl() {
3536
assert this.client != null;
3637
clearHandlers(this.hostLogger);
3738
this.logToConsole = false;
3839
this.client = null;
3940
}
4041

42+
private void flushLogsImpl() {
43+
flush(emptyLogger);
44+
flush(systemLogger);
45+
flush(hostLogger);
46+
}
47+
48+
private static void flush(Logger logger) {
49+
Handler[] handlers = logger.getHandlers();
50+
for (Handler handler : handlers) {
51+
handler.flush();
52+
}
53+
}
54+
4155
private Logger getInvocationLoggerImpl(String invocationId) {
4256
Logger logger = Logger.getAnonymousLogger();
4357
logger.setLevel(Level.ALL);
44-
addHostClientHandlers(logger, invocationId);
58+
addHandlers(logger, invocationId);
4559
return logger;
4660
}
4761

@@ -58,7 +72,7 @@ private static void addSystemHandler(Logger logger) {
5872
logger.addHandler(new SystemLoggerListener());
5973
}
6074

61-
private void addHostClientHandlers(Logger logger, String invocationId) {
75+
private void addHandlers(Logger logger, String invocationId) {
6276
assert this.client != null;
6377
clearHandlers(logger);
6478
logger.addHandler(new HostLoggerListener(this.client, invocationId));
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.microsoft.azure.functions.worker.handler;
2+
3+
public class GracefulTerminationException extends RuntimeException {
4+
5+
public GracefulTerminationException() {
6+
}
7+
8+
public GracefulTerminationException(String message) {
9+
super(message);
10+
}
11+
12+
public GracefulTerminationException(String message, Throwable cause) {
13+
super(message, cause);
14+
}
15+
16+
public GracefulTerminationException(Throwable cause) {
17+
super(cause);
18+
}
19+
20+
public GracefulTerminationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
21+
super(message, cause, enableSuppression, writableStackTrace);
22+
}
23+
}

src/main/java/com/microsoft/azure/functions/worker/handler/MessageHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.microsoft.azure.functions.worker.handler;
22

3-
import java.util.concurrent.*;
43
import java.util.function.*;
54
import java.util.logging.*;
65

src/main/java/com/microsoft/azure/functions/worker/handler/WorkerInitRequestHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ String execute(WorkerInitRequest request, WorkerInitResponse.Builder response) {
2424
response.putCapabilities("WorkerStatus", "WorkerStatus");
2525
response.putCapabilities("RpcHttpBodyOnly", "RpcHttpBodyOnly");
2626
response.putCapabilities("RpcHttpTriggerMetadataRemoved", "RpcHttpTriggerMetadataRemoved");
27+
response.putCapabilities("HandlesWorkerTerminateMessage", "HandlesWorkerTerminateMessage");
2728
response.setWorkerMetadata(composeWorkerMetadata());
2829
return "Worker initialized";
2930
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.microsoft.azure.functions.worker.handler;
2+
3+
import com.google.protobuf.Message;
4+
import com.microsoft.azure.functions.rpc.messages.StreamingMessage;
5+
import com.microsoft.azure.functions.rpc.messages.WorkerTerminate;
6+
import com.microsoft.azure.functions.worker.JavaWorkerClient;
7+
import com.microsoft.azure.functions.worker.WorkerLogManager;
8+
import org.apache.commons.lang3.exception.ExceptionUtils;
9+
10+
import java.util.logging.Level;
11+
12+
public class WorkerTerminateRequestHandler extends MessageHandler<WorkerTerminate, Message.Builder> {
13+
14+
public WorkerTerminateRequestHandler(JavaWorkerClient client) {
15+
super(StreamingMessage::getWorkerTerminate,
16+
() -> null,
17+
null,
18+
null);
19+
this.client = client;
20+
}
21+
22+
@Override
23+
String execute(WorkerTerminate workerTerminate, Message.Builder builder) {
24+
WorkerLogManager.getSystemLogger().log(Level.INFO, "Worker terminate request received. Gracefully shutting down the worker.");
25+
// Flushing the logs. Keeping the grpc client connection open here in case it is used by user's shutdown hooks
26+
WorkerLogManager.flushLogs();
27+
System.exit(0);
28+
return null;
29+
}
30+
31+
private final JavaWorkerClient client;
32+
}

0 commit comments

Comments
 (0)