callback
+ )
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 7af834942bdf..cdef522c4982 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -50,6 +50,7 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.NoopSegmentMetadataCacheModule;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.SegmentWranglerModule;
@@ -194,7 +195,8 @@ protected List extends Module> getModules()
.in(LazySingleton.class);
},
new LookupModule(),
- new SqlModule()
+ new SqlModule(),
+ new NoopSegmentMetadataCacheModule()
);
}
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
index 433d9ced54ef..ce3f9f6bd024 100644
--- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java
+++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
@@ -40,6 +40,7 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.NoopSegmentMetadataCacheModule;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.SegmentWranglerModule;
@@ -146,7 +147,8 @@ protected List extends Module> getModules()
.toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("historical"))
.in(LazySingleton.class);
},
- new LookupModule()
+ new LookupModule(),
+ new NoopSegmentMetadataCacheModule()
);
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index 46e243e71bec..c473fcc76a4e 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -46,6 +46,7 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.NoopSegmentMetadataCacheModule;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.QueryablePeonModule;
@@ -243,7 +244,8 @@ public DataNodeService getDataNodeService(DruidServerConfig serverConfig)
new InputSourceModule(),
new QueryablePeonModule(),
new CliIndexerServerModule(properties),
- new LookupModule()
+ new LookupModule(),
+ new NoopSegmentMetadataCacheModule()
);
}
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index 0b2a8d02ad8d..657020c26ed0 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -46,6 +46,7 @@
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.MiddleManagerServiceModule;
+import org.apache.druid.guice.NoopSegmentMetadataCacheModule;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.RetryPolicyFactory;
@@ -250,7 +251,8 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig)
new IndexingServiceTaskLogsModule(),
new IndexingServiceTuningConfigModule(),
new InputSourceModule(),
- new LookupSerdeModule()
+ new LookupSerdeModule(),
+ new NoopSegmentMetadataCacheModule()
);
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 065a6430563f..73a51f9896ae 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -57,6 +57,7 @@
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ManageLifecycleServer;
+import org.apache.druid.guice.NoopSegmentMetadataCacheModule;
import org.apache.druid.guice.PeonProcessingModule;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.QueryRunnerFactoryModule;
@@ -383,7 +384,8 @@ public LocalTmpStorageConfig getLocalTmpStorage()
new IndexingServiceTuningConfigModule(),
new InputSourceModule(),
new ChatHandlerServerModule(properties),
- new LookupModule()
+ new LookupModule(),
+ new NoopSegmentMetadataCacheModule()
);
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliRouter.java b/services/src/main/java/org/apache/druid/cli/CliRouter.java
index a3344aa4e7fe..66e81e3e789a 100644
--- a/services/src/main/java/org/apache/druid/cli/CliRouter.java
+++ b/services/src/main/java/org/apache/druid/cli/CliRouter.java
@@ -33,6 +33,7 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.NoopSegmentMetadataCacheModule;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.RouterProcessingModule;
@@ -132,7 +133,8 @@ protected List extends Module> getModules()
.toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("router"))
.in(LazySingleton.class);
},
- new LookupSerdeModule()
+ new LookupSerdeModule(),
+ new NoopSegmentMetadataCacheModule()
);
}
}
diff --git a/services/src/main/java/org/apache/druid/guice/NoopSegmentMetadataCacheModule.java b/services/src/main/java/org/apache/druid/guice/NoopSegmentMetadataCacheModule.java
new file mode 100644
index 000000000000..d9e1e2bf1548
--- /dev/null
+++ b/services/src/main/java/org/apache/druid/guice/NoopSegmentMetadataCacheModule.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import org.apache.druid.metadata.segment.cache.NoopSegmentMetadataCache;
+import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
+import org.apache.druid.segment.metadata.SegmentSchemaCache;
+
+/**
+ * Module used by services other than Overlord and Coordinator to bind
+ * {@link SegmentMetadataCache} and {@link SegmentSchemaCache} to noop instances.
+ *
+ * Classes using these caches like {@code SqlSegmentMetadataTransactionFactory}
+ * and {@code SqlSegmentsMetadataManagerProvider} are currently bound for all
+ * services in {@code SQLMetadataStorageDruidModule}. Ideally, this module should
+ * be installed only for Coordinator/Overlord since other services do not access
+ * the metadata store directly.
+ */
+public class NoopSegmentMetadataCacheModule implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bind(SegmentMetadataCache.class).to(NoopSegmentMetadataCache.class).in(LazySingleton.class);
+ binder.bind(SegmentSchemaCache.class).to(NoopSegmentSchemaCache.class).in(LazySingleton.class);
+ }
+}
From 23bbd9af8af54f060a12599c308d4763c1691581 Mon Sep 17 00:00:00 2001
From: Kashif Faraz
Date: Sat, 17 May 2025 01:09:37 +0530
Subject: [PATCH 09/18] Fix up guice bindings for real
---
.../guice/SQLMetadataStorageDruidModule.java | 48 --------
.../java/org/apache/druid/cli/CliBroker.java | 4 +-
.../org/apache/druid/cli/CliCoordinator.java | 22 +---
.../org/apache/druid/cli/CliHistorical.java | 4 +-
.../java/org/apache/druid/cli/CliIndexer.java | 4 +-
.../apache/druid/cli/CliMiddleManager.java | 4 +-
.../org/apache/druid/cli/CliOverlord.java | 20 +---
.../java/org/apache/druid/cli/CliPeon.java | 11 +-
.../java/org/apache/druid/cli/CliRouter.java | 4 +-
.../druid/guice/MetadataManagerModule.java | 108 ++++++++++++++++++
.../guice/NoopSegmentMetadataCacheModule.java | 47 --------
.../druid/guice/SegmentSchemaCacheModule.java | 2 -
12 files changed, 120 insertions(+), 158 deletions(-)
create mode 100644 services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java
delete mode 100644 services/src/main/java/org/apache/druid/guice/NoopSegmentMetadataCacheModule.java
diff --git a/server/src/main/java/org/apache/druid/guice/SQLMetadataStorageDruidModule.java b/server/src/main/java/org/apache/druid/guice/SQLMetadataStorageDruidModule.java
index 92c0118547bf..8ef0f36a07fa 100644
--- a/server/src/main/java/org/apache/druid/guice/SQLMetadataStorageDruidModule.java
+++ b/server/src/main/java/org/apache/druid/guice/SQLMetadataStorageDruidModule.java
@@ -23,23 +23,10 @@
import com.google.inject.Key;
import com.google.inject.Module;
import org.apache.druid.audit.AuditManager;
-import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
-import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
-import org.apache.druid.metadata.MetadataRuleManager;
-import org.apache.druid.metadata.MetadataRuleManagerProvider;
import org.apache.druid.metadata.MetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageProvider;
-import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SQLMetadataConnector;
-import org.apache.druid.metadata.SQLMetadataRuleManager;
-import org.apache.druid.metadata.SQLMetadataRuleManagerProvider;
-import org.apache.druid.metadata.SQLMetadataSupervisorManager;
-import org.apache.druid.metadata.SegmentsMetadataManager;
-import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
-import org.apache.druid.metadata.SqlSegmentsMetadataManagerProvider;
-import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
-import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
import org.apache.druid.server.audit.AuditManagerConfig;
import org.apache.druid.server.audit.AuditSerdeHelper;
import org.apache.druid.server.audit.SQLAuditManager;
@@ -67,14 +54,7 @@ public void createBindingChoices(Binder binder, String defaultValue)
PolyBind.createChoiceWithDefault(binder, prop, Key.get(MetadataStorageProvider.class), defaultValue);
PolyBind.createChoiceWithDefault(binder, prop, Key.get(SQLMetadataConnector.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop, Key.get(SegmentsMetadataManager.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop, Key.get(SegmentsMetadataManagerProvider.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop, Key.get(MetadataRuleManager.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop, Key.get(MetadataRuleManagerProvider.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop, Key.get(SegmentMetadataTransactionFactory.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop, Key.get(IndexerMetadataStorageCoordinator.class), defaultValue);
PolyBind.createChoiceWithDefault(binder, prop, Key.get(MetadataStorageActionHandlerFactory.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop, Key.get(MetadataSupervisorManager.class), defaultValue);
configureAuditManager(binder);
}
@@ -82,35 +62,7 @@ public void createBindingChoices(Binder binder, String defaultValue)
@Override
public void configure(Binder binder)
{
- PolyBind.optionBinder(binder, Key.get(SegmentsMetadataManagerProvider.class))
- .addBinding(type)
- .to(SqlSegmentsMetadataManagerProvider.class)
- .in(LazySingleton.class);
- PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
- .addBinding(type)
- .to(SQLMetadataRuleManager.class)
- .in(LazySingleton.class);
-
- PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
- .addBinding(type)
- .to(SQLMetadataRuleManagerProvider.class)
- .in(LazySingleton.class);
-
- PolyBind.optionBinder(binder, Key.get(SegmentMetadataTransactionFactory.class))
- .addBinding(type)
- .to(SqlSegmentMetadataTransactionFactory.class)
- .in(LazySingleton.class);
-
- PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class))
- .addBinding(type)
- .to(IndexerSQLMetadataStorageCoordinator.class)
- .in(ManageLifecycle.class);
-
- PolyBind.optionBinder(binder, Key.get(MetadataSupervisorManager.class))
- .addBinding(type)
- .to(SQLMetadataSupervisorManager.class)
- .in(LazySingleton.class);
}
private void configureAuditManager(Binder binder)
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index cdef522c4982..7af834942bdf 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -50,7 +50,6 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.guice.NoopSegmentMetadataCacheModule;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.SegmentWranglerModule;
@@ -195,8 +194,7 @@ protected List extends Module> getModules()
.in(LazySingleton.class);
},
new LookupModule(),
- new SqlModule(),
- new NoopSegmentMetadataCacheModule()
+ new SqlModule()
);
}
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 46fa00f67bad..ffe06dda70a4 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -47,6 +47,7 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.MetadataManagerModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.SegmentSchemaCacheModule;
import org.apache.druid.guice.SupervisorCleanupModule;
@@ -61,14 +62,8 @@
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
-import org.apache.druid.metadata.MetadataRuleManager;
-import org.apache.druid.metadata.MetadataRuleManagerProvider;
import org.apache.druid.metadata.MetadataStorage;
import org.apache.druid.metadata.MetadataStorageProvider;
-import org.apache.druid.metadata.SegmentsMetadataManager;
-import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
-import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
-import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.query.lookup.LookupSerdeModule;
import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
import org.apache.druid.segment.metadata.SegmentMetadataCacheConfig;
@@ -76,7 +71,6 @@
import org.apache.druid.server.coordinator.CloneStatusManager;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.DruidCoordinator;
-import org.apache.druid.server.coordinator.MetadataManager;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs;
import org.apache.druid.server.coordinator.config.CoordinatorPeriodConfig;
@@ -169,6 +163,7 @@ protected List extends Module> getModules()
List modules = new ArrayList<>();
modules.add(JettyHttpClientModule.global());
+ modules.add(new MetadataManagerModule());
if (isSegmentSchemaCacheEnabled) {
validateCentralizedDatasourceSchemaConfig(properties);
@@ -215,22 +210,9 @@ public void configure(Binder binder)
binder.bind(DirectDruidClientFactory.class).toProvider(Providers.of(null));
}
- binder.bind(SegmentsMetadataManager.class)
- .toProvider(SegmentsMetadataManagerProvider.class)
- .in(ManageLifecycle.class);
- binder.bind(SegmentMetadataCache.class)
- .to(HeapMemorySegmentMetadataCache.class)
- .in(LazySingleton.class);
-
- binder.bind(MetadataRuleManager.class)
- .toProvider(MetadataRuleManagerProvider.class)
- .in(ManageLifecycle.class);
-
binder.bind(LookupCoordinatorManager.class).in(LazySingleton.class);
binder.bind(CloneStatusManager.class).in(LazySingleton.class);
- binder.bind(CoordinatorConfigManager.class);
- binder.bind(MetadataManager.class);
binder.bind(DruidCoordinator.class);
binder.bind(CompactionStatusTracker.class).in(LazySingleton.class);
diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
index ce3f9f6bd024..433d9ced54ef 100644
--- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java
+++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
@@ -40,7 +40,6 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.guice.NoopSegmentMetadataCacheModule;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.SegmentWranglerModule;
@@ -147,8 +146,7 @@ protected List extends Module> getModules()
.toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("historical"))
.in(LazySingleton.class);
},
- new LookupModule(),
- new NoopSegmentMetadataCacheModule()
+ new LookupModule()
);
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index c473fcc76a4e..46e243e71bec 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -46,7 +46,6 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.guice.NoopSegmentMetadataCacheModule;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.QueryablePeonModule;
@@ -244,8 +243,7 @@ public DataNodeService getDataNodeService(DruidServerConfig serverConfig)
new InputSourceModule(),
new QueryablePeonModule(),
new CliIndexerServerModule(properties),
- new LookupModule(),
- new NoopSegmentMetadataCacheModule()
+ new LookupModule()
);
}
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index 657020c26ed0..0b2a8d02ad8d 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -46,7 +46,6 @@
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.MiddleManagerServiceModule;
-import org.apache.druid.guice.NoopSegmentMetadataCacheModule;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.RetryPolicyFactory;
@@ -251,8 +250,7 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig)
new IndexingServiceTaskLogsModule(),
new IndexingServiceTuningConfigModule(),
new InputSourceModule(),
- new LookupSerdeModule(),
- new NoopSegmentMetadataCacheModule()
+ new LookupSerdeModule()
);
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 71dabdde6df1..95fa654674d9 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -50,6 +50,7 @@
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ListProvider;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.MetadataManagerModule;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.SupervisorModule;
import org.apache.druid.guice.annotations.Json;
@@ -108,21 +109,14 @@
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.SegmentsMetadataManager;
-import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
import org.apache.druid.metadata.input.InputSourceModule;
-import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
-import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.query.lookup.LookupSerdeModule;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
-import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
-import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
import org.apache.druid.server.compaction.CompactionStatusTracker;
-import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.http.RedirectFilter;
@@ -200,6 +194,7 @@ public void configure(Properties properties)
protected List extends Module> getModules(final boolean standalone)
{
return ImmutableList.of(
+ standalone ? new MetadataManagerModule() : binder -> {},
new Module()
{
@Override
@@ -215,17 +210,6 @@ public void configure(Binder binder)
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8290);
binder.bind(CompactionStatusTracker.class).in(LazySingleton.class);
- binder.bind(SegmentsMetadataManager.class)
- .toProvider(SegmentsMetadataManagerProvider.class)
- .in(ManageLifecycle.class);
- binder.bind(CoordinatorConfigManager.class).in(LazySingleton.class);
-
- binder.bind(SegmentMetadataCache.class)
- .to(HeapMemorySegmentMetadataCache.class)
- .in(LazySingleton.class);
- binder.bind(SegmentSchemaCache.class)
- .to(NoopSegmentSchemaCache.class)
- .in(LazySingleton.class);
}
JsonConfigProvider.bind(binder, "druid.coordinator.asOverlord", CoordinatorOverlordServiceConfig.class);
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 73a51f9896ae..26edb68c7ee4 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -57,7 +57,7 @@
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ManageLifecycleServer;
-import org.apache.druid.guice.NoopSegmentMetadataCacheModule;
+import org.apache.druid.guice.MetadataManagerModule;
import org.apache.druid.guice.PeonProcessingModule;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.QueryRunnerFactoryModule;
@@ -88,7 +88,6 @@
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProviderImpl;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
-import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskStorage;
@@ -100,7 +99,6 @@
import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QuerySegmentWalker;
@@ -222,6 +220,7 @@ public void configure(Properties properties)
protected List extends Module> getModules()
{
return ImmutableList.of(
+ new MetadataManagerModule(), // needed here only to support druid.peon.mode=local
new PeonProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
@@ -384,8 +383,7 @@ public LocalTmpStorageConfig getLocalTmpStorage()
new IndexingServiceTuningConfigModule(),
new InputSourceModule(),
new ChatHandlerServerModule(properties),
- new LookupModule(),
- new NoopSegmentMetadataCacheModule()
+ new LookupModule()
);
}
@@ -503,9 +501,6 @@ private static void configureTaskActionClient(Binder binder)
JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class);
binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
- binder.bind(IndexerMetadataStorageCoordinator.class)
- .to(IndexerSQLMetadataStorageCoordinator.class)
- .in(LazySingleton.class);
taskActionBinder
.addBinding("remote")
.to(RemoteTaskActionClientFactory.class)
diff --git a/services/src/main/java/org/apache/druid/cli/CliRouter.java b/services/src/main/java/org/apache/druid/cli/CliRouter.java
index 66e81e3e789a..a3344aa4e7fe 100644
--- a/services/src/main/java/org/apache/druid/cli/CliRouter.java
+++ b/services/src/main/java/org/apache/druid/cli/CliRouter.java
@@ -33,7 +33,6 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.guice.NoopSegmentMetadataCacheModule;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.RouterProcessingModule;
@@ -133,8 +132,7 @@ protected List extends Module> getModules()
.toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("router"))
.in(LazySingleton.class);
},
- new LookupSerdeModule(),
- new NoopSegmentMetadataCacheModule()
+ new LookupSerdeModule()
);
}
}
diff --git a/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java b/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java
new file mode 100644
index 000000000000..1cc168318cc2
--- /dev/null
+++ b/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import com.google.inject.Module;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
+import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.metadata.MetadataRuleManagerProvider;
+import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.metadata.SQLMetadataRuleManagerProvider;
+import org.apache.druid.metadata.SQLMetadataSupervisorManager;
+import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
+import org.apache.druid.metadata.SqlSegmentsMetadataManagerProvider;
+import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
+import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
+import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
+import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
+import org.apache.druid.segment.metadata.SegmentSchemaCache;
+import org.apache.druid.server.coordinator.CoordinatorConfigManager;
+import org.apache.druid.server.coordinator.MetadataManager;
+
+import java.util.Set;
+
+/**
+ * Module used by Overlord and Coordinator to bind various metadata managers.
+ */
+public class MetadataManagerModule implements Module
+{
+ private Set nodeRoles;
+
+ @Inject
+ public void configure(
+ @Self Set nodeRoles
+ )
+ {
+ this.nodeRoles = nodeRoles;
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ if (nodeRoles.contains(NodeRole.COORDINATOR)) {
+ binder.bind(MetadataRuleManagerProvider.class)
+ .to(SQLMetadataRuleManagerProvider.class)
+ .in(LazySingleton.class);
+ binder.bind(MetadataRuleManager.class)
+ .toProvider(MetadataRuleManagerProvider.class)
+ .in(ManageLifecycle.class);
+
+ binder.bind(MetadataManager.class).in(LazySingleton.class);
+ }
+
+ binder.bind(CoordinatorConfigManager.class).in(LazySingleton.class);
+
+ binder.bind(MetadataSupervisorManager.class)
+ .to(SQLMetadataSupervisorManager.class)
+ .in(LazySingleton.class);
+
+ binder.bind(SegmentsMetadataManagerProvider.class)
+ .to(SqlSegmentsMetadataManagerProvider.class)
+ .in(LazySingleton.class);
+ binder.bind(SegmentsMetadataManager.class)
+ .toProvider(SegmentsMetadataManagerProvider.class)
+ .in(ManageLifecycle.class);
+
+ binder.bind(SegmentMetadataTransactionFactory.class)
+ .to(SqlSegmentMetadataTransactionFactory.class)
+ .in(LazySingleton.class);
+ binder.bind(IndexerMetadataStorageCoordinator.class)
+ .to(IndexerSQLMetadataStorageCoordinator.class)
+ .in(ManageLifecycle.class);
+ binder.bind(SegmentMetadataCache.class)
+ .to(HeapMemorySegmentMetadataCache.class)
+ .in(LazySingleton.class);
+
+ if (nodeRoles.contains(NodeRole.COORDINATOR)) {
+ binder.bind(SegmentSchemaCache.class).in(LazySingleton.class);
+ } else {
+ binder.bind(SegmentSchemaCache.class)
+ .to(NoopSegmentSchemaCache.class)
+ .in(LazySingleton.class);
+ }
+ }
+}
diff --git a/services/src/main/java/org/apache/druid/guice/NoopSegmentMetadataCacheModule.java b/services/src/main/java/org/apache/druid/guice/NoopSegmentMetadataCacheModule.java
deleted file mode 100644
index d9e1e2bf1548..000000000000
--- a/services/src/main/java/org/apache/druid/guice/NoopSegmentMetadataCacheModule.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.guice;
-
-import com.google.inject.Binder;
-import com.google.inject.Module;
-import org.apache.druid.metadata.segment.cache.NoopSegmentMetadataCache;
-import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
-import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
-import org.apache.druid.segment.metadata.SegmentSchemaCache;
-
-/**
- * Module used by services other than Overlord and Coordinator to bind
- * {@link SegmentMetadataCache} and {@link SegmentSchemaCache} to noop instances.
- *
- * Classes using these caches like {@code SqlSegmentMetadataTransactionFactory}
- * and {@code SqlSegmentsMetadataManagerProvider} are currently bound for all
- * services in {@code SQLMetadataStorageDruidModule}. Ideally, this module should
- * be installed only for Coordinator/Overlord since other services do not access
- * the metadata store directly.
- */
-public class NoopSegmentMetadataCacheModule implements Module
-{
- @Override
- public void configure(Binder binder)
- {
- binder.bind(SegmentMetadataCache.class).to(NoopSegmentMetadataCache.class).in(LazySingleton.class);
- binder.bind(SegmentSchemaCache.class).to(NoopSegmentSchemaCache.class).in(LazySingleton.class);
- }
-}
diff --git a/services/src/main/java/org/apache/druid/guice/SegmentSchemaCacheModule.java b/services/src/main/java/org/apache/druid/guice/SegmentSchemaCacheModule.java
index 859abc76c8b7..744d56a89e02 100644
--- a/services/src/main/java/org/apache/druid/guice/SegmentSchemaCacheModule.java
+++ b/services/src/main/java/org/apache/druid/guice/SegmentSchemaCacheModule.java
@@ -44,7 +44,6 @@
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
import org.apache.druid.segment.metadata.SegmentMetadataQuerySegmentWalker;
-import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QuerySchedulerProvider;
@@ -84,7 +83,6 @@ public void configure(Binder binder)
.in(LazySingleton.class);
binder.bind(QuerySchedulerProvider.class).in(LazySingleton.class);
binder.bind(QuerySegmentWalker.class).to(SegmentMetadataQuerySegmentWalker.class).in(LazySingleton.class);
- binder.bind(SegmentSchemaCache.class).in(LazySingleton.class);
LifecycleModule.register(binder, CoordinatorSegmentMetadataCache.class);
}
From 6ce338b6f1cdc7bfdcbba1ac9c670071e7245154 Mon Sep 17 00:00:00 2001
From: Kashif Faraz
Date: Sat, 17 May 2025 02:20:40 +0530
Subject: [PATCH 10/18] Simplify some dependencies
---
.../MaterializedViewSupervisorTest.java | 2 -
.../DatasourceOptimizerTest.java | 2 -
.../common/actions/TaskActionTestKit.java | 5 -
.../common/task/IngestionTestBase.java | 3 -
.../overlord/TaskLockBoxConcurrencyTest.java | 3 -
.../indexing/overlord/TaskLockboxTest.java | 3 -
.../indexing/overlord/TaskQueueScaleTest.java | 3 -
.../SeekableStreamIndexTaskTestBase.java | 2 -
.../druid/guice/MetadataConfigModule.java | 28 +++--
.../SegmentMetadataTransactionFactory.java | 25 ----
...entMetadataReadOnlyTransactionFactory.java | 114 ++++++++++++++++++
.../SqlSegmentMetadataTransactionFactory.java | 62 +---------
.../cache/HeapMemorySegmentMetadataCache.java | 3 +-
.../segment/metadata/SegmentSchemaCache.java | 2 +-
...etadataStorageCoordinatorMarkUsedTest.java | 2 -
...etadataStorageCoordinatorReadOnlyTest.java | 28 +++--
...exerSQLMetadataStorageCoordinatorTest.java | 3 -
...orageCoordinatorSchemaPersistenceTest.java | 2 -
.../SqlSegmentsMetadataManagerV2Test.java | 4 +-
.../HeapMemorySegmentMetadataCacheTest.java | 2 -
.../org/apache/druid/cli/CliCoordinator.java | 3 +-
.../java/org/apache/druid/cli/CliPeon.java | 30 +----
.../org/apache/druid/cli/ServerRunnable.java | 14 +--
.../druid/guice/MetadataManagerModule.java | 59 ++++++---
24 files changed, 214 insertions(+), 190 deletions(-)
create mode 100644 server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataReadOnlyTransactionFactory.java
diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index 62b3f4af3d7f..01964ce28260 100644
--- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -28,7 +28,6 @@
import junit.framework.AssertionFailedError;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.EntryAlreadyExists;
import org.apache.druid.indexer.HadoopIOConfig;
import org.apache.druid.indexer.HadoopIngestionSpec;
@@ -113,7 +112,6 @@ public void setUp()
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
diff --git a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
index 755b0efddace..1ec98359db79 100644
--- a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
+++ b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
@@ -37,7 +37,6 @@
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.curator.CuratorTestBase;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexing.materializedview.DerivativeDataSourceMetadata;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
@@ -123,7 +122,6 @@ public void setUp() throws Exception
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
index a5fb2cb04a84..8efcfa58b1e8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
@@ -21,7 +21,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
@@ -50,8 +49,6 @@
import org.joda.time.Period;
import org.junit.rules.ExternalResource;
-import java.util.Set;
-
public class TaskActionTestKit extends ExternalResource
{
private final MetadataStorageTablesConfig metadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase("druid");
@@ -178,7 +175,6 @@ private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMappe
objectMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
Suppliers.ofInstance(metadataStorageTablesConfig),
- Suppliers.ofInstance(CentralizedDatasourceSchemaConfig.create()),
new SegmentSchemaCache(),
testDerbyConnector,
(poolSize, name) -> new WrappingScheduledExecutorService(name, metadataCachePollExec, false),
@@ -193,7 +189,6 @@ private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMappe
metadataStorageTablesConfig,
testDerbyConnector,
leaderSelector,
- Set.of(NodeRole.OVERLORD),
segmentMetadataCache,
NoopServiceEmitter.instance()
)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 5f6402ce299b..a56d7acc5097 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -34,7 +34,6 @@
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.RegexParseSpec;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
@@ -325,7 +324,6 @@ private SqlSegmentMetadataTransactionFactory createTransactionFactory()
objectMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), cacheMode)),
derbyConnectorRule.metadataTablesConfigSupplier(),
- Suppliers.ofInstance(CentralizedDatasourceSchemaConfig.create()),
new SegmentSchemaCache(),
derbyConnectorRule.getConnector(),
ScheduledExecutors::fixed,
@@ -340,7 +338,6 @@ private SqlSegmentMetadataTransactionFactory createTransactionFactory()
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
leaderSelector,
- Set.of(NodeRole.OVERLORD),
segmentMetadataCache,
NoopServiceEmitter.instance()
);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
index a6eb19f47830..a55117aad12d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
@@ -22,7 +22,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.guava.SettableSupplier;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
@@ -50,7 +49,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -91,7 +89,6 @@ public void setup()
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 8c2c78611acb..48c5b5c4e94a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -29,7 +29,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLock;
@@ -138,7 +137,6 @@ public void setup()
tablesConfig,
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
@@ -481,7 +479,6 @@ public void testSyncWithUnknownTaskTypesFromModuleNotLoaded()
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
index 1900752e8be2..f54ea7fe5860 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
@@ -25,7 +25,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@@ -70,7 +69,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -113,7 +111,6 @@ public void setUp()
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 485b6699d1b8..e0332708c3fe 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -47,7 +47,6 @@
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
@@ -599,7 +598,6 @@ protected void makeToolboxFactory(TestUtils testUtils, ServiceEmitter emitter, b
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
diff --git a/server/src/main/java/org/apache/druid/guice/MetadataConfigModule.java b/server/src/main/java/org/apache/druid/guice/MetadataConfigModule.java
index 1ca4578e5603..7a429049db86 100644
--- a/server/src/main/java/org/apache/druid/guice/MetadataConfigModule.java
+++ b/server/src/main/java/org/apache/druid/guice/MetadataConfigModule.java
@@ -21,29 +21,43 @@
import com.google.inject.Binder;
import com.google.inject.Module;
-import org.apache.druid.metadata.MetadataRuleManagerConfig;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
-import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import java.util.Properties;
+
+/**
+ * Binds the following metadata configs for all services:
+ *
+ * - {@link MetadataStorageTablesConfig}
+ * - {@link MetadataStorageConnectorConfig}
+ * - {@link CentralizedDatasourceSchemaConfig}
+ *
+ * Ideally, the storage configs should be bound only on Coordinator and Overlord,
+ * but they are needed for other services too since metadata storage extensions
+ * are currently loaded on all services.
+ */
public class MetadataConfigModule implements Module
{
+ public static final String CENTRALIZED_DATASOURCE_SCHEMA_ENABLED =
+ CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX + ".enabled";
+
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, MetadataStorageTablesConfig.PROPERTY_BASE, MetadataStorageTablesConfig.class);
JsonConfigProvider.bind(binder, MetadataStorageConnectorConfig.PROPERTY_BASE, MetadataStorageConnectorConfig.class);
- JsonConfigProvider.bind(binder, "druid.manager.segments", SegmentsMetadataManagerConfig.class);
- JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class);
-
- // SegmentSchemaCacheConfig needs to be bound on all services since
- // it is a dependency of SqlSegmentsMetadataManager (both legacy and V2)
JsonConfigProvider.bind(
binder,
CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX,
CentralizedDatasourceSchemaConfig.class
);
}
+
+ public static boolean isSegmentSchemaCacheEnabled(Properties properties)
+ {
+ return Boolean.parseBoolean(properties.getProperty(CENTRALIZED_DATASOURCE_SCHEMA_ENABLED));
+ }
}
diff --git a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataTransactionFactory.java b/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataTransactionFactory.java
index 6bfbf564c964..e048e10970c6 100644
--- a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataTransactionFactory.java
+++ b/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataTransactionFactory.java
@@ -41,29 +41,4 @@ T inReadWriteDatasourceTransaction(
String dataSource,
SegmentMetadataTransaction.Callback callback
);
-
- /**
- * No-op instance of {@link SegmentMetadataTransactionFactory} which does not
- * support any operation.
- */
- SegmentMetadataTransactionFactory NOOP = new SegmentMetadataTransactionFactory()
- {
- @Override
- public T inReadOnlyDatasourceTransaction(
- String dataSource,
- SegmentMetadataReadTransaction.Callback callback
- )
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public T inReadWriteDatasourceTransaction(
- String dataSource,
- SegmentMetadataTransaction.Callback callback
- )
- {
- throw new UnsupportedOperationException();
- }
- };
}
diff --git a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataReadOnlyTransactionFactory.java b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataReadOnlyTransactionFactory.java
new file mode 100644
index 000000000000..0d0d1e42a1e2
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataReadOnlyTransactionFactory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata.segment;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.TransactionStatus;
+
+/**
+ * Factory for read-only {@link SegmentMetadataTransaction}s that always read
+ * directly from the metadata store and never from the {@code SegmentMetadataCache}.
+ *
+ * @see SqlSegmentMetadataTransactionFactory
+ */
+public class SqlSegmentMetadataReadOnlyTransactionFactory implements SegmentMetadataTransactionFactory
+{
+ private static final int QUIET_RETRIES = 2;
+ private static final int MAX_RETRIES = 3;
+
+ private final ObjectMapper jsonMapper;
+ private final MetadataStorageTablesConfig tablesConfig;
+ private final SQLMetadataConnector connector;
+
+ @Inject
+ public SqlSegmentMetadataReadOnlyTransactionFactory(
+ ObjectMapper jsonMapper,
+ MetadataStorageTablesConfig tablesConfig,
+ SQLMetadataConnector connector
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.tablesConfig = tablesConfig;
+ this.connector = connector;
+ }
+
+ public int getMaxRetries()
+ {
+ return MAX_RETRIES;
+ }
+
+ public int getQuietRetries()
+ {
+ return QUIET_RETRIES;
+ }
+
+ @Override
+ public T inReadOnlyDatasourceTransaction(
+ String dataSource,
+ SegmentMetadataReadTransaction.Callback callback
+ )
+ {
+ return connector.retryReadOnlyTransaction(
+ (handle, status) -> {
+ final SegmentMetadataTransaction sqlTransaction
+ = createSqlTransaction(dataSource, handle, status);
+ return executeReadAndClose(sqlTransaction, callback);
+ },
+ QUIET_RETRIES,
+ getMaxRetries()
+ );
+ }
+
+ @Override
+ public T inReadWriteDatasourceTransaction(
+ String dataSource,
+ SegmentMetadataTransaction.Callback callback
+ )
+ {
+ throw DruidException.defensive("Only Overlord can perform write transactions on segment metadata.");
+ }
+
+ protected SegmentMetadataTransaction createSqlTransaction(
+ String dataSource,
+ Handle handle,
+ TransactionStatus transactionStatus
+ )
+ {
+ return new SqlSegmentMetadataTransaction(
+ dataSource,
+ handle, transactionStatus, connector, tablesConfig, jsonMapper
+ );
+ }
+
+ protected T executeReadAndClose(
+ SegmentMetadataReadTransaction transaction,
+ SegmentMetadataReadTransaction.Callback callback
+ ) throws Exception
+ {
+ try (transaction) {
+ return callback.inTransaction(transaction);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java
index 2942ed1a0861..26083e44b678 100644
--- a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java
+++ b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java
@@ -23,9 +23,6 @@
import com.google.inject.Inject;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DruidLeaderSelector;
-import org.apache.druid.discovery.NodeRole;
-import org.apache.druid.error.DruidException;
-import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
@@ -34,10 +31,6 @@
import org.apache.druid.metadata.segment.cache.Metric;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.query.DruidMetrics;
-import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.TransactionStatus;
-
-import java.util.Set;
/**
* Factory for {@link SegmentMetadataTransaction}s. If the
@@ -56,46 +49,30 @@
* now, it continues to read directly from the metadata store for consistency
* with older Druid versions.
*/
-public class SqlSegmentMetadataTransactionFactory implements SegmentMetadataTransactionFactory
+public class SqlSegmentMetadataTransactionFactory extends SqlSegmentMetadataReadOnlyTransactionFactory
{
private static final Logger log = new Logger(SqlSegmentMetadataTransactionFactory.class);
- private static final int QUIET_RETRIES = 2;
- private static final int MAX_RETRIES = 3;
-
- private final ObjectMapper jsonMapper;
- private final MetadataStorageTablesConfig tablesConfig;
private final SQLMetadataConnector connector;
private final DruidLeaderSelector leaderSelector;
private final SegmentMetadataCache segmentMetadataCache;
private final ServiceEmitter emitter;
- private final boolean isNotOverlord;
-
@Inject
public SqlSegmentMetadataTransactionFactory(
ObjectMapper jsonMapper,
MetadataStorageTablesConfig tablesConfig,
SQLMetadataConnector connector,
@IndexingService DruidLeaderSelector leaderSelector,
- @Self Set nodeRoles,
SegmentMetadataCache segmentMetadataCache,
ServiceEmitter emitter
)
{
- this.jsonMapper = jsonMapper;
- this.tablesConfig = tablesConfig;
+ super(jsonMapper, tablesConfig, connector);
this.connector = connector;
this.leaderSelector = leaderSelector;
this.segmentMetadataCache = segmentMetadataCache;
this.emitter = emitter;
-
- this.isNotOverlord = !nodeRoles.contains(NodeRole.OVERLORD);
- }
-
- public int getMaxRetries()
- {
- return MAX_RETRIES;
}
@Override
@@ -109,10 +86,7 @@ public T inReadOnlyDatasourceTransaction(
final SegmentMetadataTransaction sqlTransaction
= createSqlTransaction(dataSource, handle, status);
- if (isNotOverlord) {
- // Read directly from the metadata store if not Overlord
- return executeReadAndClose(sqlTransaction, callback);
- } else if (segmentMetadataCache.isSyncedForRead()) {
+ if (segmentMetadataCache.isSyncedForRead()) {
// Use cache as it is already synced with the metadata store
emitTransactionCount(Metric.READ_ONLY_TRANSACTIONS, dataSource);
return segmentMetadataCache.readCacheForDataSource(dataSource, dataSourceCache -> {
@@ -124,7 +98,7 @@ public T inReadOnlyDatasourceTransaction(
return executeReadAndClose(sqlTransaction, callback);
}
},
- QUIET_RETRIES,
+ getQuietRetries(),
getMaxRetries()
);
}
@@ -135,10 +109,6 @@ public T inReadWriteDatasourceTransaction(
SegmentMetadataTransaction.Callback callback
)
{
- if (isNotOverlord) {
- throw DruidException.defensive("Only Overlord can perform write transactions on segment metadata.");
- }
-
return connector.retryTransaction(
(handle, status) -> {
final SegmentMetadataTransaction sqlTransaction
@@ -167,23 +137,11 @@ public T inReadWriteDatasourceTransaction(
return executeWriteAndClose(sqlTransaction, callback);
}
},
- QUIET_RETRIES,
+ getQuietRetries(),
getMaxRetries()
);
}
- private SegmentMetadataTransaction createSqlTransaction(
- String dataSource,
- Handle handle,
- TransactionStatus transactionStatus
- )
- {
- return new SqlSegmentMetadataTransaction(
- dataSource,
- handle, transactionStatus, connector, tablesConfig, jsonMapper
- );
- }
-
private T executeWriteAndClose(
SegmentMetadataTransaction transaction,
SegmentMetadataTransaction.Callback callback
@@ -201,16 +159,6 @@ private T executeWriteAndClose(
}
}
- private T executeReadAndClose(
- SegmentMetadataReadTransaction transaction,
- SegmentMetadataReadTransaction.Callback callback
- ) throws Exception
- {
- try (transaction) {
- return callback.inTransaction(transaction);
- }
- }
-
private void emitTransactionCount(String metricName, String datasource)
{
emitter.emit(
diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java
index 03a10b93fac3..7cb3b3780273 100644
--- a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java
+++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java
@@ -162,7 +162,6 @@ public HeapMemorySegmentMetadataCache(
ObjectMapper jsonMapper,
Supplier config,
Supplier tablesConfig,
- Supplier schemaConfig,
SegmentSchemaCache segmentSchemaCache,
SQLMetadataConnector connector,
ScheduledExecutorFactory executorFactory,
@@ -173,7 +172,7 @@ public HeapMemorySegmentMetadataCache(
this.cacheMode = config.get().getCacheUsageMode();
this.pollDuration = config.get().getPollDuration().toStandardDuration();
this.tablesConfig = tablesConfig.get();
- this.useSchemaCache = schemaConfig.get().isEnabled() && segmentSchemaCache.isEnabled();
+ this.useSchemaCache = segmentSchemaCache.isEnabled();
this.segmentSchemaCache = segmentSchemaCache;
this.connector = connector;
this.pollExecutor = isEnabled()
diff --git a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
index 2d46e601afd5..237b3944a176 100644
--- a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
+++ b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
@@ -95,7 +95,7 @@ public class SegmentSchemaCache
private final AtomicInteger cacheMissCount = new AtomicInteger(0);
/**
- * @return true if this cache is enabled.
+ * @return true if schema caching is enabled.
*/
public boolean isEnabled()
{
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorMarkUsedTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorMarkUsedTest.java
index 7007607a2b0a..2406af6f12b6 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorMarkUsedTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorMarkUsedTest.java
@@ -20,7 +20,6 @@
package org.apache.druid.metadata;
import com.google.common.collect.ImmutableList;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -79,7 +78,6 @@ public void setup()
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
)
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
index ada0ed1c887b..b8b72ff72e68 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
@@ -29,6 +29,7 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
+import org.apache.druid.metadata.segment.SqlSegmentMetadataReadOnlyTransactionFactory;
import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
@@ -102,7 +103,6 @@ public void setup()
mapper,
() -> new SegmentsMetadataManagerConfig(null, cacheMode),
derbyConnectorRule.metadataTablesConfigSupplier(),
- () -> CentralizedDatasourceSchemaConfig.enabled(false),
new SegmentSchemaCache(),
derbyConnector,
(corePoolSize, nameFormat) -> new WrappingScheduledExecutorService(
@@ -154,15 +154,23 @@ private IndexerSQLMetadataStorageCoordinator createStorageCoordinator(
NodeRole nodeRole
)
{
- final SegmentMetadataTransactionFactory transactionFactory = new SqlSegmentMetadataTransactionFactory(
- mapper,
- derbyConnectorRule.metadataTablesConfigSupplier().get(),
- derbyConnector,
- leaderSelector,
- Set.of(nodeRole),
- segmentMetadataCache,
- emitter
- );
+ final SegmentMetadataTransactionFactory transactionFactory;
+ if (nodeRole.equals(NodeRole.COORDINATOR)) {
+ transactionFactory = new SqlSegmentMetadataReadOnlyTransactionFactory(
+ mapper,
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ derbyConnector
+ );
+ } else {
+ transactionFactory = new SqlSegmentMetadataTransactionFactory(
+ mapper,
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ derbyConnector,
+ leaderSelector,
+ segmentMetadataCache,
+ emitter
+ );
+ }
return new IndexerSQLMetadataStorageCoordinator(
transactionFactory,
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 96d27bdda325..2562b3733510 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -24,7 +24,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.StringTuple;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.ExceptionMatcher;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
@@ -154,7 +153,6 @@ public void setUp()
mapper,
() -> new SegmentsMetadataManagerConfig(null, cacheMode),
derbyConnectorRule.metadataTablesConfigSupplier(),
- CentralizedDatasourceSchemaConfig::create,
new SegmentSchemaCache(),
derbyConnector,
(corePoolSize, nameFormat) -> new WrappingScheduledExecutorService(
@@ -180,7 +178,6 @@ public void setUp()
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
leaderSelector,
- Set.of(NodeRole.OVERLORD),
segmentMetadataCache,
emitter
)
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
index 6fd536266d04..d98e54c3ea51 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
@@ -24,7 +24,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.DateTimes;
@@ -100,7 +99,6 @@ public void setUp()
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
);
diff --git a/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java b/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
index 6b02e63498f1..e5554b931eef 100644
--- a/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
+++ b/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
@@ -33,6 +33,7 @@
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.segment.TestDataSource;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
@@ -89,8 +90,7 @@ private void initManager(
jsonMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
Suppliers.ofInstance(storageConfig),
- Suppliers.ofInstance(CentralizedDatasourceSchemaConfig.enabled(useSchemaCache)),
- new SegmentSchemaCache(),
+ useSchemaCache ? new SegmentSchemaCache() : new NoopSegmentSchemaCache(),
connector,
(poolSize, name) -> new WrappingScheduledExecutorService(name, segmentMetadataCacheExec, false),
emitter
diff --git a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
index f1a5bea26338..bdd02ba12d8d 100644
--- a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
@@ -34,7 +34,6 @@
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestDataSource;
import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.coordinator.CreateDataSegments;
@@ -106,7 +105,6 @@ private void setupTargetWithCaching(SegmentMetadataCache.UsageMode cacheMode)
TestHelper.JSON_MAPPER,
() -> metadataManagerConfig,
derbyConnectorRule.metadataTablesConfigSupplier(),
- CentralizedDatasourceSchemaConfig::create,
new SegmentSchemaCache(),
derbyConnector,
executorFactory,
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index ffe06dda70a4..5a94d73a3e8b 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -47,6 +47,7 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.MetadataConfigModule;
import org.apache.druid.guice.MetadataManagerModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.SegmentSchemaCacheModule;
@@ -141,7 +142,7 @@ public void configure(Properties properties)
{
this.properties = properties;
beOverlord = isOverlord(properties);
- isSegmentSchemaCacheEnabled = isSegmentSchemaCacheEnabled(properties);
+ isSegmentSchemaCacheEnabled = MetadataConfigModule.isSegmentSchemaCacheEnabled(properties);
if (beOverlord) {
log.info("Coordinator is configured to act as Overlord as well (%s = true).", AS_OVERLORD_PROPERTY);
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 26edb68c7ee4..e9cbab108d39 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -57,7 +57,6 @@
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ManageLifecycleServer;
-import org.apache.druid.guice.MetadataManagerModule;
import org.apache.druid.guice.PeonProcessingModule;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.QueryRunnerFactoryModule;
@@ -74,12 +73,9 @@
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.TaskToolboxFactory;
-import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
-import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.DeepStorageShuffleClient;
@@ -87,10 +83,8 @@
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProviderImpl;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
-import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
import org.apache.druid.indexing.overlord.TaskRunner;
-import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycle;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig;
@@ -220,7 +214,6 @@ public void configure(Properties properties)
protected List extends Module> getModules()
{
return ImmutableList.of(
- new MetadataManagerModule(), // needed here only to support druid.peon.mode=local
new PeonProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
@@ -485,26 +478,9 @@ static void bindPeonDataSegmentHandlers(Binder binder)
private static void configureTaskActionClient(Binder binder)
{
- PolyBind.createChoice(
- binder,
- "druid.peon.mode",
- Key.get(TaskActionClientFactory.class),
- Key.get(RemoteTaskActionClientFactory.class)
- );
- final MapBinder taskActionBinder =
- PolyBind.optionBinder(binder, Key.get(TaskActionClientFactory.class));
- taskActionBinder
- .addBinding("local")
- .to(LocalTaskActionClientFactory.class)
- .in(LazySingleton.class);
- // all of these bindings are so that we can run the peon in local mode
- JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class);
- binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);
- binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
- taskActionBinder
- .addBinding("remote")
- .to(RemoteTaskActionClientFactory.class)
- .in(LazySingleton.class);
+ binder.bind(TaskActionClientFactory.class)
+ .to(RemoteTaskActionClientFactory.class)
+ .in(LazySingleton.class);
binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance(NodeRole.PEON);
}
diff --git a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
index 5faf6e134c60..3dc1711a50e6 100644
--- a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
+++ b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
@@ -33,13 +33,13 @@
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.MetadataConfigModule;
import org.apache.druid.guice.ServerViewModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.server.DruidNode;
import java.lang.annotation.Annotation;
@@ -53,9 +53,6 @@
*/
public abstract class ServerRunnable extends GuiceRunnable
{
- private static final String CENTRALIZED_DATASOURCE_SCHEMA_ENABLED =
- CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX + ".enabled";
-
private static final EmittingLogger log = new EmittingLogger(ServerRunnable.class);
public ServerRunnable(Logger log)
@@ -207,7 +204,7 @@ public void stop()
protected static void validateCentralizedDatasourceSchemaConfig(Properties properties)
{
- if (isSegmentSchemaCacheEnabled(properties)) {
+ if (MetadataConfigModule.isSegmentSchemaCacheEnabled(properties)) {
String serverViewType = properties.getProperty(ServerViewModule.SERVERVIEW_TYPE_PROPERTY);
if (serverViewType != null && !serverViewType.equals(ServerViewModule.SERVERVIEW_TYPE_HTTP)) {
throw DruidException
@@ -221,15 +218,10 @@ protected static void validateCentralizedDatasourceSchemaConfig(Properties prope
ServerViewModule.SERVERVIEW_TYPE_PROPERTY,
serverViewType,
ServerViewModule.SERVERVIEW_TYPE_HTTP,
- CENTRALIZED_DATASOURCE_SCHEMA_ENABLED
+ MetadataConfigModule.CENTRALIZED_DATASOURCE_SCHEMA_ENABLED
)
);
}
}
}
-
- protected static boolean isSegmentSchemaCacheEnabled(Properties properties)
- {
- return Boolean.parseBoolean(properties.getProperty(CENTRALIZED_DATASOURCE_SCHEMA_ENABLED));
- }
}
diff --git a/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java b/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java
index 1cc168318cc2..6d501f9f768c 100644
--- a/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java
+++ b/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java
@@ -27,14 +27,17 @@
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.metadata.MetadataRuleManagerConfig;
import org.apache.druid.metadata.MetadataRuleManagerProvider;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SQLMetadataRuleManagerProvider;
import org.apache.druid.metadata.SQLMetadataSupervisorManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
import org.apache.druid.metadata.SqlSegmentsMetadataManagerProvider;
import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
+import org.apache.druid.metadata.segment.SqlSegmentMetadataReadOnlyTransactionFactory;
import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
@@ -43,43 +46,48 @@
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.MetadataManager;
+import java.util.Properties;
import java.util.Set;
/**
- * Module used by Overlord and Coordinator to bind various metadata managers.
+ * Module used by Overlord and Coordinator to bind the following metadata managers:
+ *
+ * - {@link MetadataManager} - Coordinator only
+ * - {@link MetadataRuleManager} - Coordinator only
+ * - {@link MetadataSupervisorManager}
+ * - {@link SegmentsMetadataManager}
+ * - {@link IndexerMetadataStorageCoordinator}
+ * - {@link CoordinatorConfigManager}
+ * - {@link SegmentMetadataCache}
+ * - {@link SegmentSchemaCache} - Coordinator only
+ *
*/
public class MetadataManagerModule implements Module
{
private Set nodeRoles;
+ private boolean isSchemaCacheEnabled;
@Inject
public void configure(
+ Properties properties,
@Self Set nodeRoles
)
{
this.nodeRoles = nodeRoles;
+ this.isSchemaCacheEnabled = MetadataConfigModule.isSegmentSchemaCacheEnabled(properties);
}
@Override
public void configure(Binder binder)
{
- if (nodeRoles.contains(NodeRole.COORDINATOR)) {
- binder.bind(MetadataRuleManagerProvider.class)
- .to(SQLMetadataRuleManagerProvider.class)
- .in(LazySingleton.class);
- binder.bind(MetadataRuleManager.class)
- .toProvider(MetadataRuleManagerProvider.class)
- .in(ManageLifecycle.class);
-
- binder.bind(MetadataManager.class).in(LazySingleton.class);
- }
-
+ // Common dependencies
binder.bind(CoordinatorConfigManager.class).in(LazySingleton.class);
binder.bind(MetadataSupervisorManager.class)
.to(SQLMetadataSupervisorManager.class)
.in(LazySingleton.class);
+ JsonConfigProvider.bind(binder, "druid.manager.segments", SegmentsMetadataManagerConfig.class);
binder.bind(SegmentsMetadataManagerProvider.class)
.to(SqlSegmentsMetadataManagerProvider.class)
.in(LazySingleton.class);
@@ -87,9 +95,6 @@ public void configure(Binder binder)
.toProvider(SegmentsMetadataManagerProvider.class)
.in(ManageLifecycle.class);
- binder.bind(SegmentMetadataTransactionFactory.class)
- .to(SqlSegmentMetadataTransactionFactory.class)
- .in(LazySingleton.class);
binder.bind(IndexerMetadataStorageCoordinator.class)
.to(IndexerSQLMetadataStorageCoordinator.class)
.in(ManageLifecycle.class);
@@ -97,12 +102,36 @@ public void configure(Binder binder)
.to(HeapMemorySegmentMetadataCache.class)
.in(LazySingleton.class);
+ // Coordinator-only dependencies
if (nodeRoles.contains(NodeRole.COORDINATOR)) {
+ JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class);
+ binder.bind(MetadataRuleManagerProvider.class)
+ .to(SQLMetadataRuleManagerProvider.class)
+ .in(LazySingleton.class);
+ binder.bind(MetadataRuleManager.class)
+ .toProvider(MetadataRuleManagerProvider.class)
+ .in(ManageLifecycle.class);
+
+ binder.bind(MetadataManager.class).in(LazySingleton.class);
+ }
+
+ if (nodeRoles.contains(NodeRole.COORDINATOR) && isSchemaCacheEnabled) {
binder.bind(SegmentSchemaCache.class).in(LazySingleton.class);
} else {
binder.bind(SegmentSchemaCache.class)
.to(NoopSegmentSchemaCache.class)
.in(LazySingleton.class);
}
+
+ // Overlord-only dependencies
+ if (nodeRoles.contains(NodeRole.OVERLORD)) {
+ binder.bind(SegmentMetadataTransactionFactory.class)
+ .to(SqlSegmentMetadataTransactionFactory.class)
+ .in(LazySingleton.class);
+ } else {
+ binder.bind(SegmentMetadataTransactionFactory.class)
+ .to(SqlSegmentMetadataReadOnlyTransactionFactory.class)
+ .in(LazySingleton.class);
+ }
}
}
From 52db331e9107f7c765c13d7ccf90557d5786de71 Mon Sep 17 00:00:00 2001
From: Kashif Faraz
Date: Sat, 17 May 2025 10:38:06 +0530
Subject: [PATCH 11/18] Fix up tests
---
.../druid/indexing/common/actions/TaskActionTestKit.java | 4 ++--
.../apache/druid/indexing/common/task/IngestionTestBase.java | 3 ++-
.../apache/druid/segment/metadata/NoopSegmentSchemaCache.java | 3 ++-
.../org/apache/druid/segment/metadata/SegmentSchemaCache.java | 1 +
.../IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java | 4 ++--
.../metadata/IndexerSQLMetadataStorageCoordinatorTest.java | 4 ++--
.../segment/cache/HeapMemorySegmentMetadataCacheTest.java | 4 ++--
7 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
index 8efcfa58b1e8..3a63495928aa 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
@@ -39,7 +39,7 @@
import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
-import org.apache.druid.segment.metadata.SegmentSchemaCache;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
@@ -175,7 +175,7 @@ private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMappe
objectMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
Suppliers.ofInstance(metadataStorageTablesConfig),
- new SegmentSchemaCache(),
+ new NoopSegmentSchemaCache(),
testDerbyConnector,
(poolSize, name) -> new WrappingScheduledExecutorService(name, metadataCachePollExec, false),
NoopServiceEmitter.instance()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index a56d7acc5097..f468f2f04421 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -86,6 +86,7 @@
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
@@ -324,7 +325,7 @@ private SqlSegmentMetadataTransactionFactory createTransactionFactory()
objectMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), cacheMode)),
derbyConnectorRule.metadataTablesConfigSupplier(),
- new SegmentSchemaCache(),
+ new NoopSegmentSchemaCache(),
derbyConnectorRule.getConnector(),
ScheduledExecutors::fixed,
NoopServiceEmitter.instance()
diff --git a/server/src/main/java/org/apache/druid/segment/metadata/NoopSegmentSchemaCache.java b/server/src/main/java/org/apache/druid/segment/metadata/NoopSegmentSchemaCache.java
index 56dc14e1b91a..7385cbefce0c 100644
--- a/server/src/main/java/org/apache/druid/segment/metadata/NoopSegmentSchemaCache.java
+++ b/server/src/main/java/org/apache/druid/segment/metadata/NoopSegmentSchemaCache.java
@@ -27,7 +27,8 @@
import java.util.Map;
/**
- * Noop implementation of {@link SegmentSchemaCache}.
+ * No-op implementation of {@link SegmentSchemaCache} that always returns false
+ * for {@link #isEnabled()} and {@link #isInitialized()}.
*/
public class NoopSegmentSchemaCache extends SegmentSchemaCache
{
diff --git a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
index 237b3944a176..c77bdecad022 100644
--- a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
+++ b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
@@ -99,6 +99,7 @@ public class SegmentSchemaCache
*/
public boolean isEnabled()
{
+ // Always return true since this implementation is bound only when caching is enabled
return true;
}
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
index b8b72ff72e68..48656aa93e96 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
@@ -36,7 +36,7 @@
import org.apache.druid.segment.TestDataSource;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
-import org.apache.druid.segment.metadata.SegmentSchemaCache;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
@@ -103,7 +103,7 @@ public void setup()
mapper,
() -> new SegmentsMetadataManagerConfig(null, cacheMode),
derbyConnectorRule.metadataTablesConfigSupplier(),
- new SegmentSchemaCache(),
+ new NoopSegmentSchemaCache(),
derbyConnector,
(corePoolSize, nameFormat) -> new WrappingScheduledExecutorService(
nameFormat,
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 2562b3733510..3042f53124e5 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -44,7 +44,7 @@
import org.apache.druid.segment.TestDataSource;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.FingerprintGenerator;
-import org.apache.druid.segment.metadata.SegmentSchemaCache;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.metadata.SegmentSchemaTestUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
@@ -153,7 +153,7 @@ public void setUp()
mapper,
() -> new SegmentsMetadataManagerConfig(null, cacheMode),
derbyConnectorRule.metadataTablesConfigSupplier(),
- new SegmentSchemaCache(),
+ new NoopSegmentSchemaCache(),
derbyConnector,
(corePoolSize, nameFormat) -> new WrappingScheduledExecutorService(
nameFormat,
diff --git a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
index bdd02ba12d8d..4a79610b5abb 100644
--- a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
@@ -34,7 +34,7 @@
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestDataSource;
import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.metadata.SegmentSchemaCache;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
@@ -105,7 +105,7 @@ private void setupTargetWithCaching(SegmentMetadataCache.UsageMode cacheMode)
TestHelper.JSON_MAPPER,
() -> metadataManagerConfig,
derbyConnectorRule.metadataTablesConfigSupplier(),
- new SegmentSchemaCache(),
+ new NoopSegmentSchemaCache(),
derbyConnector,
executorFactory,
serviceEmitter
From 9877493a25ee3add70a5cb56b425ea0cd55f6a53 Mon Sep 17 00:00:00 2001
From: Kashif Faraz
Date: Sat, 17 May 2025 14:11:45 +0530
Subject: [PATCH 12/18] Fix logic for delta sync of segment schemas
---
.../metadata/SqlSegmentsMetadataQuery.java | 90 +++++++
.../HeapMemoryDatasourceSegmentCache.java | 20 +-
.../cache/HeapMemorySegmentMetadataCache.java | 233 +++++++++---------
.../segment/cache/SegmentSchemaRecord.java | 47 ++++
.../segment/cache/SegmentSyncResult.java | 8 +-
.../HeapMemorySegmentMetadataCacheTest.java | 2 +-
6 files changed, 276 insertions(+), 124 deletions(-)
create mode 100644 server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentSchemaRecord.java
diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index b1d7d16a1453..899b98f9562a 100644
--- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -39,6 +39,9 @@
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.segment.cache.SegmentSchemaRecord;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
@@ -607,6 +610,71 @@ private CloseableIterator retrieveSegmentBatchById(
return CloseableIterators.wrap(resultIterator, resultIterator);
}
+ /**
+ * Retrieves all used segment schemas present in the metadata store irrespective
+ * of their last updated time.
+ */
+ public List retrieveAllUsedSegmentSchemas()
+ {
+ return retrieveValidSchemaRecordsWithSql(
+ StringUtils.format(
+ "SELECT fingerprint, payload FROM %s"
+ + " WHERE version = %s AND used = true",
+ dbTables.getSegmentSchemasTable(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+ )
+ );
+ }
+
+ /**
+ * Retrieves segment schemas from the metadata store for the given fingerprints.
+ */
+ public List retrieveUsedSegmentSchemasForFingerprints(
+ Set schemaFingerprints
+ )
+ {
+ final List> fingerprintBatches = Lists.partition(
+ List.copyOf(schemaFingerprints),
+ MAX_INTERVALS_PER_BATCH
+ );
+
+ final List records = new ArrayList<>();
+ for (List fingerprintBatch : fingerprintBatches) {
+ records.addAll(
+ retrieveBatchOfSegmentSchemas(fingerprintBatch)
+ );
+ }
+
+ return records;
+ }
+
+ /**
+ * Retrieves a batch of segment schema records for the given fingerprints.
+ */
+ private List retrieveBatchOfSegmentSchemas(List schemaFingerprint)
+ {
+ return retrieveValidSchemaRecordsWithSql(
+ StringUtils.format(
+ "SELECT fingerprint, payload FROM %s"
+ + " WHERE version = %s AND used = true"
+ + " %s",
+ dbTables.getSegmentSchemasTable(),
+ CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
+ getParameterizedInConditionForColumn("fingerprint", schemaFingerprint)
+ )
+ );
+ }
+
+ private List retrieveValidSchemaRecordsWithSql(String sql)
+ {
+ return handle.createQuery(sql)
+ .setFetchSize(connector.getStreamingFetchSize())
+ .map((index, r, ctx) -> mapToSchemaRecord(r))
+ .list()
+ .stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
/**
* Marks the given segment IDs as used.
*
@@ -1467,6 +1535,28 @@ private Interval mapToInterval(ResultSet resultSet, String dataSource)
}
}
+ /**
+ * Tries to parse the fields of the result set into a {@link SegmentSchemaRecord}.
+ *
+ * @return null if an error occurred while parsing the result
+ */
+ @Nullable
+ private SegmentSchemaRecord mapToSchemaRecord(ResultSet resultSet)
+ {
+ String fingerprint = null;
+ try {
+ fingerprint = resultSet.getString("fingerprint");
+ return new SegmentSchemaRecord(
+ fingerprint,
+ jsonMapper.readValue(resultSet.getBytes("payload"), SchemaPayload.class)
+ );
+ }
+ catch (Throwable t) {
+ log.error(t, "Could not read segment schema with fingerprint[%s]", fingerprint);
+ return null;
+ }
+ }
+
private ResultIterator getDataSegmentResultIterator(Query