Skip to content

Conversation

kfaraz
Copy link
Contributor

@kfaraz kfaraz commented May 12, 2025

Description

#17935 enables use of HeapMemorySegmentMetadataCache on the Coordinator.
But it cannot be used in conjunction with centralized datasource schema (i.e. SegmentSchemaCache)
This patch supports usage of both features on the Coordinator together.

Main Changes

  • Make SegmentSchemaCache a dependency of HeapMemorySegmentMetadataCache
  • Bind SegmentMetadataCache and SegmentSchemaCache in MetadataManagerModule
  • Add NoopSegmentSchemaCache to be used on the Overlord
  • Poll schemas in HeapMemorySegmentMetadataCache and update SegmentSchemaCache
  • Update the used_status_last_updated column of a segment record when its schema fingerprint is updated

Fix a race condition

Add a sync buffer duration of 10 seconds to HeapMemorySegmentMetadataCache

  • Handles a race condition between sync and insert to cache (caught in CompactionTaskRunTest)
  • Prevents removal of entries from cache if they have a last updated time just before sync start
    and were added to the cache just after sync start
  • This means that non-leader nodes will continue to consider a segment as used if it was marked unused
    within 10 seconds of any other update done it (created, marked used, schema info added)
  • 10s is more than enough for this, since the cache is already performing as expected in several prod clusters.

Guice changes

  • Add MetadataManagerModule used only by Coordinator and Overlord to bind metadata managers
  • Restrict SQLMetadataStorageDruidModule to bind only SQL connector related stuff

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

updateUsedSegmentPayloadsInCache(datasourceToSummary);
retrieveAllPendingSegments(datasourceToSummary);
updatePendingSegmentsInCache(datasourceToSummary, syncStartTime);
retrieveAllSegmentSchemas(datasourceToSummary);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add something to the method-level javadoc of "stuff that happens every sync".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the javadoc of each of these methods should say whether they are invoked in every sync or only the first sync? Or some other additional info too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that in the javadoc for this method itself, there's a section titled "The following actions are performed in every sync". It doesn't currently mention the schema syncing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right, will do 👍🏻

final String sql = StringUtils.format(
"SELECT fingerprint, payload FROM %s WHERE version = %s",
tablesConfig.getSegmentSchemasTable(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code is fetching the entire set of segment schemas on every call to syncWithMetadataStore. Is this going to be OK, performance-wise? It seems expensive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation in SqlSegmentsMetadataManager polls all the schemas in every sync too.
But since the schemas table already has a used_status_last_updated as well as a created_time column,
we can try to do delta syncs in a fashion similar to the segments table.

Thanks for the suggestion, I will update the PR accordingly.

if (syncFinishTime.get() == null) {
retrieveUsedSegmentSchemasUpdatedAfter(DateTimes.COMPARE_DATE_AS_STRING_MIN, datasourceToSummary);
} else {
retrieveUsedSegmentSchemasUpdatedAfter(syncStartTime, datasourceToSummary);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be some slack in this, to allow for the fact that clocks may not be perfectly synced across servers, and various factors (such as retries on insert) can cause records with timestamps in the past to appear. An hour should be more than enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

There is another bug here, anyway I should have been using the start time of the previous sync rather than the current sync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(such as retries on insert)

Minor clarification on this point:
For the most part, in IndexerSQLMetadataStorageCoordinator, I have tried to ensure that each transaction retry uses a fresh timestamp, since retries can go on for a while.
But as you point out, there can still be cases where past records appear.

P.S: I think we should be able to do something similar for fetching used segment IDs too.
Currently, we fetch all of them. But we could potentially fetch only the recently updated ones,
thus improving the delta sync time even further.
We would just need an index on used + used_status_last_updated in druid_segments table
(same as schemas table already does).

this.cacheMode = config.get().getCacheUsageMode();
this.pollDuration = config.get().getPollDuration().toStandardDuration();
this.tablesConfig = tablesConfig.get();
this.useSchemaCache = schemaConfig.get().isEnabled() && nodeRoles.contains(NodeRole.COORDINATOR);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small point, but just wanted to comment. This sort of logic is an anti pattern in Guice usage. In an ideal world, decisions about which features to enable on which servers should live in the Guice modules, rather than the main code. Separating that way makes the main code more composable and testable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling it out! Felt really hacky to me too.

Let me see how I can clean it up.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with the delta sync changes. Up to you if you want to adjust the Guice stuff.

@kfaraz
Copy link
Contributor Author

kfaraz commented May 16, 2025

Thanks for the review, @gianm .
I have updated the Guice bindings too. Waiting for CI to finish.

@kfaraz
Copy link
Contributor Author

kfaraz commented May 18, 2025

@gianm , I have had to fix up the delta sync logic since there were some bugs with the previous approach.

The sync for schemas now resembles the logic employed for used segments. It works as follows:

  • Full sync
    • Fetch all schemas marked as "used" in the metadata store
  • Delta sync
    • Fetch only the fingerprints (and not payloads) for all schemas marked as "used" in the metadata store
    • Remove entries from cache if they are not present in the metadata store anymore
    • Fetch payloads for the fingerprints which are not already present in the metadata store

@kfaraz kfaraz merged commit 8be787a into apache:master May 19, 2025
74 checks passed
@kfaraz kfaraz deleted the poll_schemas_in_metadata_cache branch June 8, 2025 14:05
@capistrant capistrant added this to the 34.0.0 milestone Jul 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants