Skip to content

Commit 5f2bb9b

Browse files
committed
ack notification events
1 parent 6e974de commit 5f2bb9b

File tree

1 file changed

+67
-57
lines changed

1 file changed

+67
-57
lines changed

java/com/engflow/notificationqueue/Client.java

Lines changed: 67 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.grpc.netty.GrpcSslContexts;
2323
import io.grpc.netty.NegotiationType;
2424
import io.grpc.netty.NettyChannelBuilder;
25+
import io.grpc.stub.ClientCallStreamObserver;
26+
import io.grpc.stub.ClientResponseObserver;
2527
import io.grpc.stub.MetadataUtils;
2628
import io.grpc.stub.StreamObserver;
2729
import io.netty.handler.ssl.SslContextBuilder;
@@ -121,71 +123,79 @@ private static void pull(
121123
asyncStub = asyncStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
122124
final CountDownLatch finishLatch = new CountDownLatch(1);
123125
System.out.println("Listening for build events...");
124-
StreamObserver<PullNotificationRequest> requestObserver =
125-
asyncStub.pull(
126-
new StreamObserver<PullNotificationResponse>() {
127-
@Override
128-
public void onNext(PullNotificationResponse response) {
129-
Notification streamedNotification = response.getNotification().getNotification();
130-
System.out.println("Notification: " + streamedNotification.toString());
131-
try {
132-
/** Forward notification data to external server */
133-
forwardToBESStub(
134-
forwardChannel,
135-
streamedNotification.getId().toString(),
136-
streamedNotification.getPayload().toString());
137-
} catch (Exception e) {
138-
System.err.println("Could not forward notification to external sever...");
139-
}
140-
Any notificationContent = streamedNotification.getPayload();
126+
var observer =
127+
new ClientResponseObserver<PullNotificationRequest, PullNotificationResponse>() {
128+
private ClientCallStreamObserver<PullNotificationRequest> requestStream;
129+
130+
@Override
131+
public void beforeStart(ClientCallStreamObserver<PullNotificationRequest> requestStream) {
132+
this.requestStream = requestStream;
133+
}
134+
135+
@Override
136+
public void onNext(PullNotificationResponse response) {
137+
if (!response.hasNotification()) {
138+
return;
139+
}
140+
Notification streamedNotification = response.getNotification().getNotification();
141+
System.out.println("Notification: " + streamedNotification.toString());
142+
try {
143+
/** Forward notification data to external server */
144+
forwardToBESStub(
145+
forwardChannel,
146+
streamedNotification.getId().toString(),
147+
streamedNotification.getPayload().toString());
148+
} catch (Exception e) {
149+
System.err.println("Could not forward notification to external sever...");
150+
}
151+
Any notificationContent = streamedNotification.getPayload();
152+
try {
153+
BuildLifecycleEventNotification lifeCycleEvent =
154+
notificationContent.unpack(BuildLifecycleEventNotification.class);
155+
/**
156+
* Check if this is an invocation started event. Options are INVOCATION_STARTED and
157+
* INVOCATION_FINISHED
158+
*/
159+
if (lifeCycleEvent.getKindCase().name().equals("INVOCATION_STARTED")) {
160+
String invocation = lifeCycleEvent.getInvocationStarted().getInvocationId();
141161
try {
142-
BuildLifecycleEventNotification lifeCycleEvent =
143-
notificationContent.unpack(BuildLifecycleEventNotification.class);
144162
/**
145-
* Check if this is an invocation started event. Options are INVOCATION_STARTED
146-
* and INVOCATION_FINISHED
163+
* Fetch the invocation using the grpc {@link EventStoreGrpc} stub using the
164+
* acquired invocation id
147165
*/
148-
if (lifeCycleEvent.getKindCase().name().equals("INVOCATION_STARTED")) {
149-
String invocation = lifeCycleEvent.getInvocationStarted().getInvocationId();
150-
try {
151-
/**
152-
* Fetch the invocation using the grpc {@link EventStoreGrpc} stub using the
153-
* acquired invocation id
154-
*/
155-
getInvocations(channel, invocation, header, forwardChannel);
156-
} catch (InterruptedException e) {
157-
System.err.println("Could not get invocation with uuid " + invocation);
158-
}
159-
}
160-
161-
} catch (InvalidProtocolBufferException e) {
162-
throw new RuntimeException(e);
166+
getInvocations(channel, invocation, header, forwardChannel);
167+
} catch (InterruptedException e) {
168+
System.err.println("Could not get invocation with uuid " + invocation);
163169
}
164170
}
165171

166-
@Override
167-
public void onError(Throwable t) {
168-
System.err.println("Error on request: " + t.getMessage());
169-
finishLatch.countDown();
170-
}
172+
} catch (InvalidProtocolBufferException e) {
173+
throw new RuntimeException(e);
174+
}
175+
requestStream.onNext(
176+
PullNotificationRequest.newBuilder()
177+
.addAcknowledgementTokens(response.getNotification().getToken())
178+
.build());
179+
}
171180

172-
@Override
173-
public void onCompleted() {
174-
System.out.println("Finished pulling notifications");
175-
finishLatch.countDown();
176-
}
177-
});
181+
@Override
182+
public void onError(Throwable t) {
183+
System.err.println("Error on request: " + t.getMessage());
184+
finishLatch.countDown();
185+
}
178186

179-
try {
180-
requestObserver.onNext(
181-
PullNotificationRequest.newBuilder()
182-
.setQueue(QueueId.newBuilder().setName(queueName).build())
183-
.build());
184-
} catch (RuntimeException e) {
185-
// Cancel RPC
186-
requestObserver.onError(e);
187-
throw e;
188-
}
187+
@Override
188+
public void onCompleted() {
189+
System.out.println("Finished pulling notifications");
190+
finishLatch.countDown();
191+
}
192+
};
193+
asyncStub.pull(observer);
194+
195+
observer.requestStream.onNext(
196+
PullNotificationRequest.newBuilder()
197+
.setQueue(QueueId.newBuilder().setName(queueName).build())
198+
.build());
189199

190200
finishLatch.await();
191201
}

0 commit comments

Comments
 (0)