Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
Expand Down Expand Up @@ -172,10 +173,13 @@ private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMappe
= useSegmentMetadataCache
? SegmentMetadataCache.UsageMode.ALWAYS
: SegmentMetadataCache.UsageMode.NEVER;

segmentMetadataCache = new HeapMemorySegmentMetadataCache(
objectMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
Suppliers.ofInstance(metadataStorageTablesConfig),
Suppliers.ofInstance(CentralizedDatasourceSchemaConfig.create()),
new SegmentSchemaCache(),
testDerbyConnector,
(poolSize, name) -> new WrappingScheduledExecutorService(name, metadataCachePollExec, false),
NoopServiceEmitter.instance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void setUpIngestionTestBase() throws IOException
derbyConnectorRule.getConnector(),
() -> new SegmentsMetadataManagerConfig(null, null),
derbyConnectorRule.metadataTablesConfigSupplier(),
CentralizedDatasourceSchemaConfig.create(),
CentralizedDatasourceSchemaConfig::create,
NoopServiceEmitter.instance(),
objectMapper
);
Expand Down Expand Up @@ -325,6 +325,8 @@ private SqlSegmentMetadataTransactionFactory createTransactionFactory()
objectMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), cacheMode)),
derbyConnectorRule.metadataTablesConfigSupplier(),
Suppliers.ofInstance(CentralizedDatasourceSchemaConfig.create()),
new SegmentSchemaCache(),
derbyConnectorRule.getConnector(),
ScheduledExecutors::fixed,
NoopServiceEmitter.instance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ services:
- druid_centralizedDatasourceSchema_backFillPeriod=15000
- druid_coordinator_segmentMetadata_metadataRefreshPeriod=PT15S
- druid_coordinator_segmentMetadata_disableSegmentMetadataQueries=true
- druid_manager_segments_useIncrementalCache=never
- druid_manager_segments_useIncrementalCache=always
depends_on:
- druid-overlord
- druid-metadata-storage
Expand All @@ -53,7 +53,7 @@ services:
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- druid_centralizedDatasourceSchema_enabled=true
- druid_manager_segments_useIncrementalCache=never
- druid_manager_segments_useIncrementalCache=always
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ services:
- druid_centralizedDatasourceSchema_backFillEnabled=true
- druid_centralizedDatasourceSchema_backFillPeriod=15000
- druid_coordinator_segmentMetadata_metadataRefreshPeriod=PT15S
- druid_manager_segments_useIncrementalCache=never
- druid_manager_segments_useIncrementalCache=always
depends_on:
- druid-overlord
- druid-metadata-storage
Expand All @@ -54,7 +54,7 @@ services:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- druid_centralizedDatasourceSchema_enabled=true
- druid_centralizedDatasourceSchema_taskSchemaPublishDisabled=true
- druid_manager_segments_useIncrementalCache=never
- druid_manager_segments_useIncrementalCache=always
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ services:
- druid_centralizedDatasourceSchema_backFillEnabled=true
- druid_centralizedDatasourceSchema_backFillPeriod=15000
- druid_coordinator_segmentMetadata_metadataRefreshPeriod=PT15S
- druid_manager_segments_useIncrementalCache=never
- druid_manager_segments_useIncrementalCache=always
depends_on:
- druid-overlord
- druid-metadata-storage
Expand All @@ -52,7 +52,7 @@ services:
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- druid_centralizedDatasourceSchema_enabled=true
- druid_manager_segments_useIncrementalCache=never
- druid_manager_segments_useIncrementalCache=always
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.druid.metadata.SqlSegmentsMetadataManagerProvider;
import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.server.audit.AuditManagerConfig;
import org.apache.druid.server.audit.AuditSerdeHelper;
Expand Down Expand Up @@ -77,6 +76,7 @@ public void createBindingChoices(Binder binder, String defaultValue)
PolyBind.createChoiceWithDefault(binder, prop, Key.get(IndexerMetadataStorageCoordinator.class), defaultValue);
PolyBind.createChoiceWithDefault(binder, prop, Key.get(MetadataStorageActionHandlerFactory.class), defaultValue);
PolyBind.createChoiceWithDefault(binder, prop, Key.get(MetadataSupervisorManager.class), defaultValue);
PolyBind.createChoiceWithDefault(binder, prop, Key.get(SegmentMetadataCache.class), defaultValue);

configureAuditManager(binder);
}
Expand All @@ -99,12 +99,6 @@ public void configure(Binder binder)
.to(SQLMetadataRuleManagerProvider.class)
.in(LazySingleton.class);

// SegmentMetadataCache is bound for all services but is used only by the Overlord and Coordinator
// similar to some other classes bound here, such as IndexerSQLMetadataStorageCoordinator
binder.bind(SegmentMetadataCache.class)
.to(HeapMemorySegmentMetadataCache.class)
.in(LazySingleton.class);

PolyBind.optionBinder(binder, Key.get(SegmentMetadataTransactionFactory.class))
.addBinding(type)
.to(SqlSegmentMetadataTransactionFactory.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1631,7 +1631,8 @@ private boolean shouldPersistSchema(SegmentSchemaMapping segmentSchemaMapping)
private void persistSchema(
final SegmentMetadataTransaction transaction,
final Set<DataSegment> segments,
final SegmentSchemaMapping segmentSchemaMapping
final SegmentSchemaMapping segmentSchemaMapping,
final DateTime updateTime
) throws JsonProcessingException
{
if (segmentSchemaMapping.getSchemaVersion() != CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) {
Expand All @@ -1650,7 +1651,8 @@ private void persistSchema(
transaction.getHandle(),
dataSource,
segmentSchemaMapping.getSchemaVersion(),
segmentSchemaMapping.getSchemaFingerprintToPayloadMap()
segmentSchemaMapping.getSchemaFingerprintToPayloadMap(),
updateTime
);
}

Expand All @@ -1662,10 +1664,11 @@ protected Set<DataSegment> insertSegments(
{
final Set<DataSegment> toInsertSegments = new HashSet<>();
try {
final DateTime createdTime = DateTimes.nowUtc();
boolean shouldPersistSchema = shouldPersistSchema(segmentSchemaMapping);

if (shouldPersistSchema) {
persistSchema(transaction, segments, segmentSchemaMapping);
persistSchema(transaction, segments, segmentSchemaMapping, createdTime);
}

final Set<SegmentId> segmentIds = segments.stream().map(DataSegment::getId).collect(Collectors.toSet());
Expand All @@ -1678,7 +1681,6 @@ protected Set<DataSegment> insertSegments(
}
}

final DateTime createdTime = DateTimes.nowUtc();
final Set<DataSegment> usedSegments = findNonOvershadowedSegments(segments);

final Set<DataSegmentPlus> segmentPlusToInsert = toInsertSegments.stream().map(segment -> {
Expand Down Expand Up @@ -1881,8 +1883,9 @@ private Set<DataSegment> insertSegments(
Map<String, String> upgradedFromSegmentIdMap
) throws Exception
{
final DateTime createdTime = DateTimes.nowUtc();
if (shouldPersistSchema(segmentSchemaMapping)) {
persistSchema(transaction, segments, segmentSchemaMapping);
persistSchema(transaction, segments, segmentSchemaMapping, createdTime);
}

// Do not insert segment IDs which already exist
Expand All @@ -1892,7 +1895,6 @@ private Set<DataSegment> insertSegments(
s -> !existingSegmentIds.contains(s.getId().toString())
).collect(Collectors.toSet());

final DateTime createdTime = DateTimes.nowUtc();
final Set<DataSegmentPlus> segmentPlusToInsert = segmentsToInsert.stream().map(segment -> {
SegmentMetadata segmentMetadata = getSegmentMetadataFromSchemaMappingOrUpgradeMetadata(
segment.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class SqlSegmentsMetadataManagerProvider implements SegmentsMetadataManag
private final ServiceEmitter serviceEmitter;
private final SegmentSchemaCache segmentSchemaCache;
private final SegmentMetadataCache segmentMetadataCache;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;
private final Supplier<CentralizedDatasourceSchemaConfig> centralizedDatasourceSchemaConfig;

@Inject
public SqlSegmentsMetadataManagerProvider(
Expand All @@ -50,7 +50,7 @@ public SqlSegmentsMetadataManagerProvider(
SQLMetadataConnector connector,
Lifecycle lifecycle,
SegmentSchemaCache segmentSchemaCache,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
Supplier<CentralizedDatasourceSchemaConfig> centralizedDatasourceSchemaConfig,
ServiceEmitter serviceEmitter
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,36 +63,25 @@ public class SqlSegmentsMetadataManagerV2 implements SegmentsMetadataManager
private final SegmentsMetadataManager delegate;
private final SegmentMetadataCache segmentMetadataCache;
private final SegmentsMetadataManagerConfig managerConfig;
private final CentralizedDatasourceSchemaConfig schemaConfig;

public SqlSegmentsMetadataManagerV2(
SegmentMetadataCache segmentMetadataCache,
SegmentSchemaCache segmentSchemaCache,
SQLMetadataConnector connector,
Supplier<SegmentsMetadataManagerConfig> managerConfig,
Supplier<MetadataStorageTablesConfig> tablesConfig,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
Supplier<CentralizedDatasourceSchemaConfig> centralizedDatasourceSchemaConfig,
ServiceEmitter serviceEmitter,
ObjectMapper jsonMapper
)
{
this.delegate = new SqlSegmentsMetadataManager(
jsonMapper,
managerConfig, tablesConfig, connector, segmentSchemaCache,
centralizedDatasourceSchemaConfig, serviceEmitter
centralizedDatasourceSchemaConfig.get(), serviceEmitter
);
this.managerConfig = managerConfig.get();
this.segmentMetadataCache = segmentMetadataCache;
this.schemaConfig = centralizedDatasourceSchemaConfig;

// Segment metadata cache currently cannot handle schema updates
if (segmentMetadataCache.isEnabled() && schemaConfig.isEnabled()) {
throw new IllegalArgumentException(
"Segment metadata incremental cache['druid.manager.segments.useIncrementalCache']"
+ " and segment schema cache['druid.centralizedDatasourceSchema.enabled']"
+ " must not be enabled together."
);
}
}

/**
Expand Down
Loading
Loading