Skip to content

KAFKA-19425: Stop the server when fail to initialize to avoid local segment never got deleted. #20007

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from

Conversation

jiafu1115
Copy link

@jiafu1115 jiafu1115 commented Jun 21, 2025

We found that one broker's local segment on disk never get removed forever no matter how long it stored. The disk always keep increasing.

image
note: Partition 2's node is the exception node.

After we trouble shooting. we find if one broker is very slow to startup it will cause the TopicBasedRemoteLogMetadataManager#initializeResources's fail sometime (it meet expectation due to the server is not ready as fast). Thus it won't stop the server so that the server still run just with some exception log but not shutdown. It won't upload to remote for the local so that the local segment never to deleted.

So propose the change to shutdown the broker to avoid the silence critical error which caused the disk keep increasing forever.

@github-actions github-actions bot added triage PRs from the community storage Pull requests that target the storage module tiered-storage Related to the Tiered Storage feature small Small PRs labels Jun 21, 2025
@jiafu1115 jiafu1115 changed the title stop the server when fail to initialize to avoid local segment never … stop the server when fail to initialize to avoid local segment never got deleted. Jun 21, 2025
@jiafu1115 jiafu1115 changed the title stop the server when fail to initialize to avoid local segment never got deleted. KAFKA-19425: stop the server when fail to initialize to avoid local segment never got deleted. Jun 21, 2025
@jiafu1115 jiafu1115 changed the title KAFKA-19425: stop the server when fail to initialize to avoid local segment never got deleted. KAFKA-19425: Stop the server when fail to initialize to avoid local segment never got deleted. Jun 21, 2025
@FrankYang0529
Copy link
Member

@jiafu1115 Could you share the exception you got? I would like to check whether the exception is retriable. If it's, we may not use Exception to handle all cases.

@jiafu1115
Copy link
Author

@jiafu1115 Could you share the exception you got? I would like to check whether the exception is retriable. If it's, we may not use Exception to handle all cases.

[2025-06-03 20:44:27,356] ERROR [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error building remote log auxiliary state for MyTopicName (kafka.server.ReplicaFetcherThread) org.apache.kafka.common.internals.FatalExitError at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed(TopicBasedRemoteLogMetadataManager.java:553) at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(TopicBasedRemoteLogMetadataManager.java:221) at kafka.log.remote.RemoteLogManager.fetchRemoteLogSegmentMetadata(RemoteLogManager.java:586) at kafka.server.TierStateMachine.buildRemoteLogAuxState(TierStateMachine.java:231) at kafka.server.TierStateMachine.start(TierStateMachine.java:113) at kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:763) at kafka....

@jiafu1115
Copy link
Author

jiafu1115 commented Jun 24, 2025

@FrankYang0529 in fact. I check the code and from the throw exception "FatalExitError" within multiple method's ensureInitializedAndNotClosed.
The original goal should shutdown the server for this case. but no cases can shutdown it due to the usage is in thread pool's task. So it never shutdown. And it never get retry.
So I think we should shutdown it when it failed at startup so that the issue can be found asap.

How we find it?
I expand the disk again and again to meet the topic's disk increase and don't know why we need so many disk capacity. At last. we found the issue: one broker fail to load the data with initialize and never had chance to recover.

BTW: Why it failed to complete the initial status: you can check this #20008 and https://issues.apache.org/jira/browse/KAFKA-19371's description part for more information.

@FrankYang0529
Copy link
Member

From comment in TopicBasedRemoteLogMetadataManager#ensureInitializedAndNotClosed [0], it would like to stop the broker when there is an initialization error. However, the error doesn't really shutdown the broker. The RemoteLogManager uses thread pool to execute RLMTask and stores result in a map like leaderCopyRLMTasks [1], but it never check the result in it. The RemoteLogManager only cancel the task and log the exception [2]. IMO, we need to check FatalExitError and stop the broker. Or we don't use another thread to initialize resources in TopicBasedRemoteLogMetadataManager [3] and throw exception immediately if there is an error.

cc @showuon @chia7712

[0]

if (initializationFailed) {
// If initialization is failed, shutdown the broker.
throw new FatalExitError();
}

[1]

// Only create copy task when remoteLogCopyDisable is disabled
if (!remoteLogCopyDisable) {
leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> {
RLMCopyTask task = new RLMCopyTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
// set this upfront when it is getting initialized instead of doing it after scheduling.
LOGGER.info("Created a new copy task: {} and getting scheduled", task);
ScheduledFuture<?> future = rlmCopyThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, TimeUnit.MILLISECONDS);
return new RLMTaskWithFuture(task, future);
});
}

[2]

static class RLMTaskWithFuture {
private final RLMTask rlmTask;
private final Future<?> future;
RLMTaskWithFuture(RLMTask rlmTask, Future<?> future) {
this.rlmTask = rlmTask;
this.future = future;
}
public void cancel() {
rlmTask.cancel();
try {
future.cancel(true);
} catch (Exception ex) {
LOGGER.error("Error occurred while canceling the task: {}", rlmTask, ex);
}
}
}

[3]

// Scheduling the initialization producer/consumer managers in a separate thread. Required resources may
// not yet be available now. This thread makes sure that it is retried at regular intervals until it is
// successful.
initializationThread = KafkaThread.nonDaemon("RLMMInitializationThread", this::initializeResources);
initializationThread.start();

@jiafu1115
Copy link
Author

"we don't use another thread to initialize resources in TopicBasedRemoteLogMetadataManager [3] and throw exception immediately if there is an error."
this is another good proposal. but I am worrying that if it will make the startup take more time so that the initial process fail due to potential loop for request for topic existed with connection

So that is why I suppose to stop it when fail initial and still keep it in thread. Then we can delete this check in many methods:
if (initializationFailed) {
// If initialization is failed, shutdown the broker.
throw new FatalExitError();
}

WDTY?

@jiafu1115
Copy link
Author

jiafu1115 commented Jun 26, 2025

@FrankYang0529 Can you help to check the code again. Thanks

I make all the wanted changes. You can see that after this change we don't relay on the callers (at least 5+ methods as callers) to do check if stop the server (It is not easy to do this). just stop it ASAP at the original position.

BTW: Why I think it is critical issue?
If we keep current code without change. we need one way to monitor if this broker running with this kind of error so that we can avoid the disk (aws ebs) need to keep expansion forever.
When I encountered this issue. my solution was also to stop the broker and fix the issue by adding retry times(#20008) to make sure load success and start it again. thus the EBS capacity can't be reduced by aws design except I remount another EBS disk with small size after data migration. You can see the follow snapshot to see the used ratio is so low after we fix the issue.
image

Copy link

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-attention small Small PRs storage Pull requests that target the storage module tiered-storage Related to the Tiered Storage feature triage PRs from the community
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants