Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ public class FanOutKinesisShardSubscription {

// Store the current starting position for this subscription. Will be updated each time new
// batch of records is consumed
private StartingPosition startingPosition;
private FanOutShardSubscriber shardSubscriber;
// Cross-thread access: written by Netty thread in Subscriber#onNext, read by Flink main thread in kinesis#subscribeToShard.
private volatile StartingPosition startingPosition;
private volatile FanOutShardSubscriber shardSubscriber;

public FanOutKinesisShardSubscription(
AsyncStreamProxy kinesis,
Expand Down Expand Up @@ -137,20 +138,11 @@ public void activateSubscription() {
kinesis.subscribeToShard(consumerArn, shardId, startingPosition, responseHandler)
.exceptionally(
throwable -> {
// If consumer exists and is still activating, we want to countdown.
if (ExceptionUtils.findThrowable(
throwable, ResourceInUseException.class)
.isPresent()) {
waitForSubscriptionLatch.countDown();
return null;
}
LOG.error(
"Error subscribing to shard {} with starting position {} for consumer {}.",
shardId,
startingPosition,
consumerArn,
String.format("Error subscribing to shard %s with starting position %s for consumer %s.", shardId, startingPosition, consumerArn),
throwable);
terminateSubscription(throwable);
waitForSubscriptionLatch.countDown();
return null;
});

Expand All @@ -161,14 +153,16 @@ public void activateSubscription() {
try {
if (waitForSubscriptionLatch.await(
subscriptionTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
LOG.info(
"Successfully subscribed to shard {} with starting position {} for consumer {}.",
shardId,
startingPosition,
consumerArn);
subscriptionActive.set(true);
// Request first batch of records.
shardSubscriber.requestRecords();
Throwable error = subscriptionException.get();
if (error == null) {
LOG.info(
"Successfully subscribed to shard {} with starting position {} for consumer {}.",
shardId,
startingPosition,
consumerArn);
} else {
LOG.debug(String.format("Initialization finished with error: %s", error.getMessage()), error);
}
} else {
String errorMessage =
"Timeout when subscribing to shard "
Expand Down Expand Up @@ -209,10 +203,6 @@ private void terminateSubscription(Throwable t) {
public SubscribeToShardEvent nextEvent() {
Throwable throwable = subscriptionException.getAndSet(null);
if (throwable != null) {
// If consumer is still activating, we want to wait.
if (ExceptionUtils.findThrowable(throwable, ResourceInUseException.class).isPresent()) {
return null;
}
// We don't want to wrap ResourceNotFoundExceptions because it is handled via a
// try-catch loop
if (throwable instanceof ResourceNotFoundException) {
Expand Down Expand Up @@ -262,6 +252,10 @@ private FanOutShardSubscriber(CountDownLatch subscriptionLatch) {
}

public void requestRecords() {
if (subscription == null) {
LOG.warn("requestRecords() called before subscription is set; ignoring.");
return;
}
subscription.request(1);
}

Expand All @@ -284,7 +278,9 @@ public void onSubscribe(Subscription subscription) {
startingPosition,
consumerArn);
this.subscription = subscription;
subscriptionActive.set(true);
subscriptionLatch.countDown();
requestRecords();
}

@Override
Expand Down