-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Fix negative Kafka partition lag caused by inconsistent current/latest offsets #18750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
95a388c to
335d1e9
Compare
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Fixed
Show fixed
Hide fixed
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Fixed
Show fixed
Hide fixed
89b59bc to
d1f5e4e
Compare
|
Our team has been suffering from these negative-lag alerts recently, and they’ve been repeatedly waking us up at night. It’s become difficult for us to get a good night’s sleep. |
|
@wuguowei1994 , thanks for creating a PR in Apache Druid! One of the contributors will take a look at the changes soon. |
|
@wuguowei1994 could you sync the latest master to your branch? I think this patch might fix the test failures. |
7b8667f to
14a115a
Compare
@clintropolis Thanks for rerunning the jobs. Before the rerun there were two failed tasks, and now there’s only one left. The error still looks like an environment issue… really frustrating. This whole code submission experience also shows how inactive the project has become. A lot of people report issues in the Druid Slack, but almost no one responds anymore. It’s no surprise so many companies are migrating from Druid to ClickHouse. |
...dexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
Outdated
Show resolved
Hide resolved
Yea, we are in the middle of a bit of a migration/overhaul of our integration test framework and processes, hopefully this will be more well behaved in the future, since part of the reason for this change is to address flakiness as well as make it much easier to write and debug these tests.
Apologies for the perception - while I can assure you that there are quite a lot of active and interesting projects happening and that Druid is still very much being actively developed, that doesn't fix your experience here or the optics in slack. The unfortunate matter is that there are always a lot more things to do than people to do them, and while we try our best, sometimes things are a bit slow to get a committers attention. All that said, thanks again for the contribution and trying to make things better, and I do hear you - i'll see if maybe I can nudge some of the other committers to be a bit more responsive and try to do the same myself. |
ebf41e0 to
3e773a2
Compare
3e773a2 to
74c3ac4
Compare
|
@clintropolis Take a look ? |
|
@wuguowei1994 , thanks for your patience! |
cecemei
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making the PR, and the description really explains the problem well. Kafka ingest lag is an important metric and we also monitor it closely. I always feel a bit intimated by this part of code base, but your PR has helped me understand it much better.
| } | ||
| } | ||
|
|
||
| protected final AtomicReference<OffsetSnapshot<PartitionIdType, SequenceOffsetType>> offsetSnapshotRef = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any specific reason why you decided to put offsetSnapshotRef in SeekableStreamSupervisor instead of KafkaSupervisor? Maybe it's more suitable there for the sake of encapsulation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SeekableStreamSupervisor is the parent class, and it has three subclasses: KafkaSupervisor, KinesisSupervisor, and RabbitStreamSupervisor.
The negative-lag issue mentioned in my PR doesn’t only exist in Kafka — it should also be present in the other two subclasses.
When writing the code, I had the same concern. If we put offsetSnapshotRef directly into the KafkaSupervisor subclass, then in the future, if someone wants to fix the same issue in KinesisSupervisor or RabbitStreamSupervisor, they would need to redefine offsetSnapshotRef in each subclass.
The ideal approach would be:
- Define
offsetSnapshotRefin the parent class. - Fix the negative-lag issue in all three subclasses.
If needed, I can update the changes in this way. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this field to KafkaSupervisor and use it only for Kafka for the time being.
If it works fine there, we will adopt it later for the other supervisors too.
| return ImmutableMap.of(); | ||
| } | ||
|
|
||
| return input.entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: looks like you just want to remove null value from the map, in which case Maps.filterValues could be cleaner.
| @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets | ||
| ) | ||
| { | ||
| this.currentOffsets = toImmutableOffsetMap(currentOffsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It’s not clear to me under what circumstances the inputs could be null. Could you document the scenarios where this might occur?
| Map<KafkaTopicPartition, Long> currentOffsets = offsetSnapshot.getCurrentOffsets(); | ||
| Map<KafkaTopicPartition, Long> endOffsets = offsetSnapshot.getEndOffsets(); | ||
|
|
||
| if (endOffsets == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can offsetSnapshot.getEndOffsets() be null? I thought you have already replaced null with empty map?
| } | ||
|
|
||
| return latestSequenceFromStream | ||
| return endOffsets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can e.getValue() be null? I thought you have removed all null values from the map. Also, we could use currentOffsets.getOrDefault instead of the nullable stuff.
| ioConfig.getReplicas(), | ||
| ioConfig.getTaskDuration().getMillis() / 1000, | ||
| includeOffsets ? latestSequenceFromStream : null, | ||
| includeOffsets ? offsetSnapshotRef.get().getEndOffsets() : null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're replacing latestSequenceFromStream with offsetSnapshotRef right? Maybe remove latestSequenceFromStream as class level variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| Map<KafkaTopicPartition, Long> endOffsets = offsetSnapshot.getEndOffsets(); | ||
|
|
||
| if (latestSequenceFromStream == null) { | ||
| if (endOffsets == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, this be null?
| if (latestSequenceFromStream == null || currentOffsets == null) { | ||
| Map<KafkaTopicPartition, Long> endOffsets = offsetSnapshotRef.get().getEndOffsets(); | ||
|
|
||
| if (endOffsets == null || currentOffsets == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, can this be null?
| // Internal data structures | ||
| // -------------------------------------------------------- | ||
|
|
||
| protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth adding some javadoc explaining the decision of putting current_offset and end_offset in one class, how it might affect the lag metrics etc...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to the javadocs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered adding some more complicated test cases to KafkaSupervisorTest? I'm not sure if it's feasible, thinking maybe we could use recordSupplier to mock the end_offset, indexerMetadataStorageCoordinator to mock current_offset, and test the lags the system emits. I'm not very familiar with this part of code base, so could be wrong.
|
@cecemei |
kfaraz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, @wuguowei1994 ! I have left some suggestions.
While the changes in this PR make sense by reporting the lag more consistently (updating the two offsets in lockstep), I was wondering if it wouldn't be simpler to just report zero lag in case the lag turns out to be negative.
A negative record lag does mean that the task has already caught up to the last offsets that we had fetched from the topic and ingested some more records beyond that. And the lag metric is really just meant to indicate if the tasks are keeping up.
For other purposes, we have the message gap and the time lag metrics.
In fact, the negative lag could even be a feature to identify if some tasks are particularly slow in returning their offsets. 😛 , and we could probably have alerts set up if the negative lag goes below a specific threshold.
@cecemei , I think you also raised a concern regarding the possibility that the lag reported might now be higher than the actual lag, since we always fetch the stream offsets only after we have received updates from all the tasks.
I think the current code is also susceptible to reporting stale lag (higher or lower).
Say if a task were slow to return its latest ingested offsets, we would be delayed in fetching the latest offsets from the stream.
So, in that period, we would be reporting stale lag (which could have been higher or lower than the actual lag, a special case of which would be negative lag) and then as soon as we fetched the latest offsets from the stream, the reported lag would fix itself.
| latestSequenceFromStream = | ||
| partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, recordSupplier::getPosition)); | ||
|
|
||
| OffsetSnapshot<KafkaTopicPartition, Long> snapshot = new OffsetSnapshot<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
After the latestSequenceFromStream has been fetched, another thread could have updated the metadata store with the latest offsets which are returned from the getHighestCurrentOffsets().
If you just make the change as @cecemei suggests, I think we could ensure positive lag in this case as well.
| // Internal data structures | ||
| // -------------------------------------------------------- | ||
|
|
||
| protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to the javadocs
| // Internal data structures | ||
| // -------------------------------------------------------- | ||
|
|
||
| protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put this class in a separate file of its own since the supervisor class is already too bloated.
| } | ||
| } | ||
|
|
||
| protected final AtomicReference<OffsetSnapshot<PartitionIdType, SequenceOffsetType>> offsetSnapshotRef = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this field to KafkaSupervisor and use it only for Kafka for the time being.
If it works fine there, we will adopt it later for the other supervisors too.
|
|
||
| public OffsetSnapshot( | ||
| @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets, | ||
| @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the field names latestOffsetsFromStream, highestIngestedOffsets to avoid confusion. currentOffsets and endOffsets are too similar.
| OffsetSnapshot<KafkaTopicPartition, Long> snapshot = new OffsetSnapshot<>( | ||
| highestCurrentOffsets, | ||
| latestSequenceFromStream | ||
| ); | ||
| offsetSnapshotRef.set(snapshot); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can be in a single statement
| OffsetSnapshot<KafkaTopicPartition, Long> snapshot = new OffsetSnapshot<>( | |
| highestCurrentOffsets, | |
| latestSequenceFromStream | |
| ); | |
| offsetSnapshotRef.set(snapshot); | |
| offsetSnapshotRef.set( | |
| OffsetSnapshot.of(highestCurrentOffsets, latestSequenceFromStream) | |
| ); |
| ioConfig.getReplicas(), | ||
| ioConfig.getTaskDuration().getMillis() / 1000, | ||
| includeOffsets ? latestSequenceFromStream : null, | ||
| includeOffsets ? offsetSnapshotRef.get().getEndOffsets() : null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| ioConfig.getReplicas(), | ||
| ioConfig.getTaskDuration().getMillis() / 1000, | ||
| includeOffsets ? latestSequenceFromStream : null, | ||
| includeOffsets ? offsetSnapshotRef.get().getEndOffsets() : null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re-use method instead
| includeOffsets ? offsetSnapshotRef.get().getEndOffsets() : null, | |
| includeOffsets ? getLatestSequencesFromStream() : null, |
| } | ||
|
|
||
| protected final AtomicReference<OffsetSnapshot<PartitionIdType, SequenceOffsetType>> offsetSnapshotRef = | ||
| new AtomicReference<>(new OffsetSnapshot<>(Collections.emptyMap(), Collections.emptyMap())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: for brevity, might be nice to have a static of method in OffsetSnapshot.
for example:
| new AtomicReference<>(new OffsetSnapshot<>(Collections.emptyMap(), Collections.emptyMap())); | |
| new AtomicReference<>(OffsetSnapshot.of(Map.of(), Map.of())); |
Yes I suspect we might seeing a slightly higher lag after this change (comparing with what might be reported before). The accuracy of the lag would not be affected by when the For future reference, we could maybe calculate the lag on a per partition basis. |
@kfaraz We’ve talked about this internally, and if it only happens occasionally (for example, under a minute), adjusting the alert thresholds would absolutely work for us. But when it lasts longer, it tends to indicate something worth investigating. We’ve also seen a few situations where negative lag actually pointed to issues in the upstream Kafka cluster, so that’s part of why we’re a bit cautious here. If we keep the current Druid behavior and treat negative lag as normal consumption, there’s a chance we might overlook real problems. So overall, having clear and reliable metrics to signal the health of the cluster would be really helpful for us. |
Motivation
We operate a Druid deployment with more than 500 nodes.
In real-time ingestion scenarios, a monitoring process queries the cluster every minute to retrieve the
ingest/kafka/partitionLagmetric. If the lag remains unhealthy for more than five minutes, alerts are triggered.In our production environment, this metric periodically becomes negative, even when the cluster is fully healthy. These false alerts create unnecessary operational load and frequently wake the on-call team during off-hours. At the same time, we cannot suppress negative-lag alerts entirely, since in some situations negative lag can indicate real ingestion problems.
For a large-scale, 24×7 real-time ingestion pipeline, accurate and consistent lag metrics are essential to avoid unnecessary nighttime wake-ups while still ensuring that real issues are detected promptly.
Problem Description
In the current implementation, the Druid supervisor maintains two volatile data structures:
end_offsetfor each partitioncurrent_offsetfor each partitionThe supervisor periodically updates these values (every 30 seconds):
current_offset.This step waits for all HTTP requests to complete and each request has a timeout of two minutes.
end_offset.On the other hand, a separate periodic task (every minute) computes:
Because the two updates are not atomic, intermediate inconsistent states may occur.
Intermediate State Leading to Negative Lag
If one task becomes heavily loaded or experiences other delays during Step 1, it may take significantly longer to return its offset. In this situation, the supervisor continues waiting for that slow task while the other tasks have already responded.
During this waiting period:
current_offsetvalues already have been updated to new values.end_offsetvalues remain stale because Step 2 has not executed yet.If a monitoring request arrives in this intermediate window, the supervisor computes lag using:
current_offsetend_offsetThis produces negative lag values.
This issue repeats as long as at least one task remains slow. Large clusters with many partitions and many Kafka-indexing tasks are more likely to experience this scenario.
Example Scenario
Initial state:
end_offset = 10000,current_offset = 0.After consumption: latest Kafka
end_offset = 30000, and all tasks have consumed up to20000.During Step 1, 49 tasks respond quickly, and their
current_offsetis updated to20000.One task is slow, causing Step 1 to remain in the awaiting state.
The in-memory
end_offsetstays at the old value10000.If a metric query occurs at this point, the supervisor calculates:
Because the periodic update logic repeats, this situation can persist across multiple cycles.
Proposed Changes
Replace the two volatile structures storing
current_offsetandend_offsetwithAtomicReferencecontainers that hold both values as a single immutable state object. The supervisor will update these references as atomic units, ensuring that lag computation always observes a consistent snapshot.This eliminates inconsistent intermediate states and prevents negative lag due to partial updates.
Rationale
Operational Impact
Test Plan
This PR has: