Skip to content

Commit d6129cf

Browse files
committed
ack notification events
1 parent 6e974de commit d6129cf

File tree

1 file changed

+64
-57
lines changed

1 file changed

+64
-57
lines changed

java/com/engflow/notificationqueue/Client.java

Lines changed: 64 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.grpc.netty.GrpcSslContexts;
2323
import io.grpc.netty.NegotiationType;
2424
import io.grpc.netty.NettyChannelBuilder;
25+
import io.grpc.stub.ClientCallStreamObserver;
26+
import io.grpc.stub.ClientResponseObserver;
2527
import io.grpc.stub.MetadataUtils;
2628
import io.grpc.stub.StreamObserver;
2729
import io.netty.handler.ssl.SslContextBuilder;
@@ -121,71 +123,76 @@ private static void pull(
121123
asyncStub = asyncStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
122124
final CountDownLatch finishLatch = new CountDownLatch(1);
123125
System.out.println("Listening for build events...");
124-
StreamObserver<PullNotificationRequest> requestObserver =
125-
asyncStub.pull(
126-
new StreamObserver<PullNotificationResponse>() {
127-
@Override
128-
public void onNext(PullNotificationResponse response) {
129-
Notification streamedNotification = response.getNotification().getNotification();
130-
System.out.println("Notification: " + streamedNotification.toString());
131-
try {
132-
/** Forward notification data to external server */
133-
forwardToBESStub(
134-
forwardChannel,
135-
streamedNotification.getId().toString(),
136-
streamedNotification.getPayload().toString());
137-
} catch (Exception e) {
138-
System.err.println("Could not forward notification to external sever...");
139-
}
140-
Any notificationContent = streamedNotification.getPayload();
126+
var observer =
127+
new ClientResponseObserver<PullNotificationRequest, PullNotificationResponse>() {
128+
private ClientCallStreamObserver<PullNotificationRequest> requestStream;
129+
130+
@Override
131+
public void beforeStart(ClientCallStreamObserver<PullNotificationRequest> requestStream) {
132+
this.requestStream = requestStream;
133+
}
134+
135+
@Override
136+
public void onNext(PullNotificationResponse response) {
137+
Notification streamedNotification = response.getNotification().getNotification();
138+
System.out.println("Notification: " + streamedNotification.toString());
139+
try {
140+
/** Forward notification data to external server */
141+
forwardToBESStub(
142+
forwardChannel,
143+
streamedNotification.getId().toString(),
144+
streamedNotification.getPayload().toString());
145+
} catch (Exception e) {
146+
System.err.println("Could not forward notification to external sever...");
147+
}
148+
Any notificationContent = streamedNotification.getPayload();
149+
try {
150+
BuildLifecycleEventNotification lifeCycleEvent =
151+
notificationContent.unpack(BuildLifecycleEventNotification.class);
152+
/**
153+
* Check if this is an invocation started event. Options are INVOCATION_STARTED and
154+
* INVOCATION_FINISHED
155+
*/
156+
if (lifeCycleEvent.getKindCase().name().equals("INVOCATION_STARTED")) {
157+
String invocation = lifeCycleEvent.getInvocationStarted().getInvocationId();
141158
try {
142-
BuildLifecycleEventNotification lifeCycleEvent =
143-
notificationContent.unpack(BuildLifecycleEventNotification.class);
144159
/**
145-
* Check if this is an invocation started event. Options are INVOCATION_STARTED
146-
* and INVOCATION_FINISHED
160+
* Fetch the invocation using the grpc {@link EventStoreGrpc} stub using the
161+
* acquired invocation id
147162
*/
148-
if (lifeCycleEvent.getKindCase().name().equals("INVOCATION_STARTED")) {
149-
String invocation = lifeCycleEvent.getInvocationStarted().getInvocationId();
150-
try {
151-
/**
152-
* Fetch the invocation using the grpc {@link EventStoreGrpc} stub using the
153-
* acquired invocation id
154-
*/
155-
getInvocations(channel, invocation, header, forwardChannel);
156-
} catch (InterruptedException e) {
157-
System.err.println("Could not get invocation with uuid " + invocation);
158-
}
159-
}
160-
161-
} catch (InvalidProtocolBufferException e) {
162-
throw new RuntimeException(e);
163+
getInvocations(channel, invocation, header, forwardChannel);
164+
} catch (InterruptedException e) {
165+
System.err.println("Could not get invocation with uuid " + invocation);
163166
}
164167
}
165168

166-
@Override
167-
public void onError(Throwable t) {
168-
System.err.println("Error on request: " + t.getMessage());
169-
finishLatch.countDown();
170-
}
169+
} catch (InvalidProtocolBufferException e) {
170+
throw new RuntimeException(e);
171+
}
172+
requestStream.onNext(
173+
PullNotificationRequest.newBuilder()
174+
.addAcknowledgementTokens(response.getNotification().getToken())
175+
.build());
176+
}
171177

172-
@Override
173-
public void onCompleted() {
174-
System.out.println("Finished pulling notifications");
175-
finishLatch.countDown();
176-
}
177-
});
178+
@Override
179+
public void onError(Throwable t) {
180+
System.err.println("Error on request: " + t.getMessage());
181+
finishLatch.countDown();
182+
}
178183

179-
try {
180-
requestObserver.onNext(
181-
PullNotificationRequest.newBuilder()
182-
.setQueue(QueueId.newBuilder().setName(queueName).build())
183-
.build());
184-
} catch (RuntimeException e) {
185-
// Cancel RPC
186-
requestObserver.onError(e);
187-
throw e;
188-
}
184+
@Override
185+
public void onCompleted() {
186+
System.out.println("Finished pulling notifications");
187+
finishLatch.countDown();
188+
}
189+
};
190+
asyncStub.pull(observer);
191+
192+
observer.requestStream.onNext(
193+
PullNotificationRequest.newBuilder()
194+
.setQueue(QueueId.newBuilder().setName(queueName).build())
195+
.build());
189196

190197
finishLatch.await();
191198
}

0 commit comments

Comments
 (0)