Enable querying entirely cold datasources (#16676)

Add ability to query entirely cold datasources.
This commit is contained in:
Rishabh Singh 2024-07-15 15:02:59 +05:30 committed by GitHub
parent 209f8a9546
commit 64104533ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 853 additions and 42 deletions

View File

@ -69,4 +69,9 @@ public interface CoordinatorClient
* Returns a new instance backed by a ServiceClient which follows the provided retryPolicy
*/
CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy);
/**
* Retrieves list of datasources with used segments.
*/
ListenableFuture<Set<String>> fetchDataSourcesWithUsedSegments();
}

View File

@ -188,4 +188,17 @@ public class CoordinatorClientImpl implements CoordinatorClient
{
return new CoordinatorClientImpl(client.withRetryPolicy(retryPolicy), jsonMapper);
}
@Override
public ListenableFuture<Set<String>> fetchDataSourcesWithUsedSegments()
{
final String path = "/druid/coordinator/v1/metadata/datasources";
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.GET, path),
new BytesFullResponseHandler()
),
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference<Set<String>>() {})
);
}
}

View File

@ -200,7 +200,7 @@ public abstract class AbstractSegmentMetadataCache<T extends DataSourceInformati
* Map of datasource and generic object extending DataSourceInformation.
* This structure can be accessed by {@link #cacheExec} and {@link #callbackExec} threads.
*/
protected final ConcurrentMap<String, T> tables = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, T> tables = new ConcurrentHashMap<>();
/**
* This lock coordinates the access from multiple threads to those variables guarded by this lock.
@ -269,9 +269,10 @@ public abstract class AbstractSegmentMetadataCache<T extends DataSourceInformati
final boolean wasRecentFailure = DateTimes.utc(lastFailure)
.plus(config.getMetadataRefreshPeriod())
.isAfterNow();
if (isServerViewInitialized &&
!wasRecentFailure &&
(!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
shouldRefresh() &&
(refreshImmediately || nextRefresh < System.currentTimeMillis())) {
// We need to do a refresh. Break out of the waiting loop.
break;
@ -334,6 +335,7 @@ public abstract class AbstractSegmentMetadataCache<T extends DataSourceInformati
}
}
/**
* Lifecycle start method.
*/
@ -361,6 +363,15 @@ public abstract class AbstractSegmentMetadataCache<T extends DataSourceInformati
// noop
}
/**
* Refresh is executed only when there are segments or datasources needing refresh.
*/
@SuppressWarnings("GuardedBy")
protected boolean shouldRefresh()
{
return (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty());
}
public void awaitInitialization() throws InterruptedException
{
initialized.await();
@ -373,6 +384,7 @@ public abstract class AbstractSegmentMetadataCache<T extends DataSourceInformati
*
* @return schema information for the given datasource
*/
@Nullable
public T getDatasource(String name)
{
return tables.get(name);

View File

@ -20,19 +20,27 @@
package org.apache.druid.segment.metadata;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.segment.SchemaPayloadPlus;
@ -41,21 +49,30 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -71,17 +88,36 @@ import java.util.concurrent.atomic.AtomicBoolean;
* <ul><li>Metadata query is executed only for those non-realtime segments for which the schema is not cached.</li>
* <li>Datasources marked for refresh are then rebuilt.</li></ul>
* </li>
* <p>
* It is important to note that the datasource schema returned in {@link #getDatasource} & {@link #getDataSourceInformationMap()}
* also includes columns from cold segments.
* Cold segments are processed in a separate thread and datasource schema from cold segments is separately stored.
* </p>
*/
@ManageLifecycle
public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCache<DataSourceInformation>
{
private static final EmittingLogger log = new EmittingLogger(CoordinatorSegmentMetadataCache.class);
private static final Long COLD_SCHEMA_PERIOD_MULTIPLIER = 3L;
private static final Long COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS = TimeUnit.SECONDS.toMillis(50);
private final SegmentMetadataCacheConfig config;
private final ColumnTypeMergePolicy columnTypeMergePolicy;
private final SegmentSchemaCache segmentSchemaCache;
private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue;
private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private volatile SegmentReplicationStatus segmentReplicationStatus = null;
// Datasource schema built from only cold segments.
private final ConcurrentHashMap<String, DataSourceInformation> coldSchemaTable = new ConcurrentHashMap<>();
// Period for cold schema processing thread. This is a multiple of segment polling period.
// Cold schema processing runs slower than the segment poll to save processing cost of all segments.
// The downside is a delay in columns from cold segment reflecting in the datasource schema.
private final long coldSchemaExecPeriodMillis;
private final ScheduledExecutorService coldSchemaExec;
private @Nullable Future<?> cacheExecFuture = null;
private @Nullable Future<?> coldSchemaExecFuture = null;
@Inject
public CoordinatorSegmentMetadataCache(
@ -92,7 +128,9 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
InternalQueryConfig internalQueryConfig,
ServiceEmitter emitter,
SegmentSchemaCache segmentSchemaCache,
SegmentSchemaBackFillQueue segmentSchemaBackfillQueue
SegmentSchemaBackFillQueue segmentSchemaBackfillQueue,
SqlSegmentsMetadataManager sqlSegmentsMetadataManager,
Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier
)
{
super(queryLifecycleFactory, config, escalator, internalQueryConfig, emitter);
@ -100,6 +138,15 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
this.columnTypeMergePolicy = config.getMetadataColumnTypeMergePolicy();
this.segmentSchemaCache = segmentSchemaCache;
this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue;
this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
this.coldSchemaExecPeriodMillis =
segmentsMetadataManagerConfigSupplier.get().getPollDuration().getMillis() * COLD_SCHEMA_PERIOD_MULTIPLIER;
coldSchemaExec = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("DruidColdSchema-ScheduledExecutor-%d")
.setDaemon(false)
.build()
);
initServerViewTimelineCallback(serverView);
}
@ -168,11 +215,15 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
{
callbackExec.shutdownNow();
cacheExec.shutdownNow();
coldSchemaExec.shutdownNow();
segmentSchemaCache.onLeaderStop();
segmentSchemaBackfillQueue.onLeaderStop();
if (cacheExecFuture != null) {
cacheExecFuture.cancel(true);
}
if (coldSchemaExecFuture != null) {
coldSchemaExecFuture.cancel(true);
}
}
public void onLeaderStart()
@ -181,6 +232,12 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
try {
segmentSchemaBackfillQueue.onLeaderStart();
cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
coldSchemaExecFuture = coldSchemaExec.schedule(
this::coldDatasourceSchemaExec,
coldSchemaExecPeriodMillis,
TimeUnit.MILLISECONDS
);
if (config.isAwaitInitializationOnStart()) {
awaitInitialization();
}
@ -196,6 +253,9 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
if (cacheExecFuture != null) {
cacheExecFuture.cancel(true);
}
if (coldSchemaExecFuture != null) {
coldSchemaExecFuture.cancel(true);
}
segmentSchemaCache.onLeaderStop();
segmentSchemaBackfillQueue.onLeaderStop();
}
@ -209,6 +269,11 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
segmentSchemaCache.awaitInitialization();
}
public void updateSegmentReplicationStatus(SegmentReplicationStatus segmentReplicationStatus)
{
this.segmentReplicationStatus = segmentReplicationStatus;
}
@Override
protected void unmarkSegmentAsMutable(SegmentId segmentId)
{
@ -336,6 +401,62 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
return availableSegmentMetadata;
}
@Override
public DataSourceInformation getDatasource(String name)
{
return getMergedDatasourceInformation(tables.get(name), coldSchemaTable.get(name)).orElse(null);
}
@Override
public Map<String, DataSourceInformation> getDataSourceInformationMap()
{
Map<String, DataSourceInformation> hot = new HashMap<>(tables);
Map<String, DataSourceInformation> cold = new HashMap<>(coldSchemaTable);
Set<String> combinedDatasources = new HashSet<>(hot.keySet());
combinedDatasources.addAll(cold.keySet());
ImmutableMap.Builder<String, DataSourceInformation> combined = ImmutableMap.builder();
for (String dataSource : combinedDatasources) {
getMergedDatasourceInformation(hot.get(dataSource), cold.get(dataSource))
.ifPresent(merged -> combined.put(
dataSource,
merged
));
}
return combined.build();
}
private Optional<DataSourceInformation> getMergedDatasourceInformation(
final DataSourceInformation hot,
final DataSourceInformation cold
)
{
if (hot == null && cold == null) {
return Optional.empty();
} else if (hot != null && cold == null) {
return Optional.of(hot);
} else if (hot == null && cold != null) {
return Optional.of(cold);
} else {
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
List<RowSignature> signatures = new ArrayList<>();
// hot datasource schema takes precedence
signatures.add(hot.getRowSignature());
signatures.add(cold.getRowSignature());
for (RowSignature signature : signatures) {
mergeRowSignature(columnTypes, signature);
}
final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add);
return Optional.of(new DataSourceInformation(hot.getDataSource(), builder.build()));
}
}
/**
* Executes SegmentMetadataQuery to fetch schema information for each segment in the refresh list.
* The schema information for individual segments is combined to construct a table schema, which is then cached.
@ -382,6 +503,7 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
// Rebuild the datasources.
for (String dataSource : dataSourcesToRebuild) {
final RowSignature rowSignature = buildDataSourceRowSignature(dataSource);
if (rowSignature == null) {
log.info("RowSignature null for dataSource [%s], implying that it no longer exists. All metadata removed.", dataSource);
tables.remove(dataSource);
@ -419,6 +541,94 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
return cachedSegments;
}
@Nullable
private Integer getReplicationFactor(SegmentId segmentId)
{
if (segmentReplicationStatus == null) {
return null;
}
SegmentReplicaCount replicaCountsInCluster = segmentReplicationStatus.getReplicaCountsInCluster(segmentId);
return replicaCountsInCluster == null ? null : replicaCountsInCluster.required();
}
@VisibleForTesting
protected void coldDatasourceSchemaExec()
{
Stopwatch stopwatch = Stopwatch.createStarted();
Set<String> dataSourceWithColdSegmentSet = new HashSet<>();
int datasources = 0;
int segments = 0;
int dataSourceWithColdSegments = 0;
Collection<ImmutableDruidDataSource> immutableDataSources =
sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments();
for (ImmutableDruidDataSource dataSource : immutableDataSources) {
datasources++;
Collection<DataSegment> dataSegments = dataSource.getSegments();
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
for (DataSegment segment : dataSegments) {
Integer replicationFactor = getReplicationFactor(segment.getId());
if (replicationFactor != null && replicationFactor != 0) {
continue;
}
Optional<SchemaPayloadPlus> optionalSchema = segmentSchemaCache.getSchemaForSegment(segment.getId());
if (optionalSchema.isPresent()) {
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
mergeRowSignature(columnTypes, rowSignature);
}
segments++;
}
if (columnTypes.isEmpty()) {
// this datasource doesn't have any cold segment
continue;
}
final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add);
RowSignature coldSignature = builder.build();
String dataSourceName = dataSource.getName();
dataSourceWithColdSegmentSet.add(dataSourceName);
dataSourceWithColdSegments++;
log.debug("[%s] signature from cold segments is [%s]", dataSourceName, coldSignature);
coldSchemaTable.put(dataSourceName, new DataSourceInformation(dataSourceName, coldSignature));
}
// remove any stale datasource from the map
coldSchemaTable.keySet().retainAll(dataSourceWithColdSegmentSet);
String executionStatsLog = StringUtils.format(
"Cold schema processing took [%d] millis. "
+ "Processed total [%d] datasources, [%d] segments. Found [%d] datasources with cold segments.",
stopwatch.millisElapsed(), datasources, segments, dataSourceWithColdSegments
);
if (stopwatch.millisElapsed() > COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS) {
log.info(executionStatsLog);
} else {
log.debug(executionStatsLog);
}
}
private void mergeRowSignature(final Map<String, ColumnType> columnTypes, final RowSignature signature)
{
for (String column : signature.getColumnNames()) {
final ColumnType columnType =
signature.getColumnType(column)
.orElseThrow(() -> new ISE("Encountered null type for column [%s]", column));
columnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnType));
}
}
@VisibleForTesting
@Nullable
@Override
@ -434,13 +644,7 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
Optional<SchemaPayloadPlus> optionalSchema = segmentSchemaCache.getSchemaForSegment(segmentId);
if (optionalSchema.isPresent()) {
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
for (String column : rowSignature.getColumnNames()) {
final ColumnType columnType =
rowSignature.getColumnType(column)
.orElseThrow(() -> new ISE("Encountered null type for column [%s]", column));
columnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnType));
}
mergeRowSignature(columnTypes, rowSignature);
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);

View File

@ -816,6 +816,9 @@ public class DruidCoordinator
{
broadcastSegments = params.getBroadcastSegments();
segmentReplicationStatus = params.getSegmentReplicationStatus();
if (coordinatorSegmentMetadataCache != null) {
coordinatorSegmentMetadataCache.updateSegmentReplicationStatus(segmentReplicationStatus);
}
// Collect stats for unavailable and under-replicated segments
final CoordinatorRunStats stats = params.getCoordinatorStats();

View File

@ -75,4 +75,10 @@ public class NoopCoordinatorClient implements CoordinatorClient
// Ignore retryPolicy for the test client.
return this;
}
@Override
public ListenableFuture<Set<String>> fetchDataSourcesWithUsedSegments()
{
throw new UnsupportedOperationException();
}
}

View File

@ -20,6 +20,8 @@
package org.apache.druid.segment.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.druid.client.BrokerServerView;
@ -39,6 +41,8 @@ import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.TableDataSource;
@ -61,16 +65,19 @@ import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -101,6 +108,8 @@ public class CoordinatorSegmentDataCacheConcurrencyTest extends SegmentMetadataC
private TestSegmentMetadataQueryWalker walker;
private SegmentSchemaCache segmentSchemaCache;
private SegmentSchemaBackFillQueue backFillQueue;
private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier;
private final ObjectMapper mapper = TestHelper.makeJsonMapper();
@Before
@ -190,6 +199,12 @@ public class CoordinatorSegmentDataCacheConcurrencyTest extends SegmentMetadataC
}
);
sqlSegmentsMetadataManager = Mockito.mock(SqlSegmentsMetadataManager.class);
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class);
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig);
inventoryView.init();
initLatch.await();
exec = Execs.multiThreaded(4, "DruidSchemaConcurrencyTest-%d");
@ -227,7 +242,9 @@ public class CoordinatorSegmentDataCacheConcurrencyTest extends SegmentMetadataC
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -341,7 +358,9 @@ public class CoordinatorSegmentDataCacheConcurrencyTest extends SegmentMetadataC
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override

View File

@ -22,11 +22,14 @@ package org.apache.druid.segment.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.Intervals;
@ -37,6 +40,8 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QueryContexts;
@ -66,6 +71,8 @@ import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AllowAllAuthenticator;
@ -74,18 +81,23 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.skife.jdbi.v2.StatementContext;
import java.io.File;
import java.io.IOException;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@ -106,12 +118,19 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
private CoordinatorSegmentMetadataCache runningSchema;
private CountDownLatch buildTableLatch = new CountDownLatch(1);
private CountDownLatch markDataSourceLatch = new CountDownLatch(1);
private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier;
@Before
@Override
public void setUp() throws Exception
{
super.setUp();
sqlSegmentsMetadataManager = Mockito.mock(SqlSegmentsMetadataManager.class);
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class);
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig);
}
@After
@ -132,6 +151,7 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
public CoordinatorSegmentMetadataCache buildSchemaMarkAndTableLatch(SegmentMetadataCacheConfig config) throws InterruptedException
{
Preconditions.checkState(runningSchema == null);
runningSchema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
@ -140,7 +160,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -178,7 +200,7 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
public void testGetTableMapFoo() throws InterruptedException
{
CoordinatorSegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
verifyFooDSSchema(schema);
verifyFooDSSchema(schema, 6);
}
@Test
@ -312,7 +334,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -523,7 +547,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -558,6 +584,11 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(2);
SqlSegmentsMetadataManager sqlSegmentsMetadataManager = Mockito.mock(SqlSegmentsMetadataManager.class);
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class);
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
@ -566,7 +597,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -605,6 +638,11 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1);
SqlSegmentsMetadataManager sqlSegmentsMetadataManager = Mockito.mock(SqlSegmentsMetadataManager.class);
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class);
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
@ -613,7 +651,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -649,6 +689,11 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1);
SqlSegmentsMetadataManager sqlSegmentsMetadataManager = Mockito.mock(SqlSegmentsMetadataManager.class);
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class);
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
@ -657,7 +702,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -698,7 +745,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -756,7 +805,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -817,7 +868,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -852,7 +905,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -900,7 +955,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -972,7 +1029,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
internalQueryConfig,
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
);
Map<String, Object> queryContext = ImmutableMap.of(
@ -1141,7 +1200,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
emitter,
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
@ -1306,7 +1367,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
void updateSchemaForRealtimeSegments(SegmentSchemas segmentSchemas)
@ -1385,7 +1448,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToRebuild)
@ -1565,7 +1630,9 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
public Set<SegmentId> refreshSegmentsForDataSource(String dataSource, Set<SegmentId> segments)
@ -1594,7 +1661,7 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
Assert.assertEquals(0, refreshCount.get());
// verify that datasource schema is built
verifyFooDSSchema(schema);
verifyFooDSSchema(schema, 6);
serverView.addSegment(segment3, ServerType.HISTORICAL);
@ -1721,12 +1788,384 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
Assert.assertEquals(existingMetadata.getNumReplicas(), currentMetadata.getNumReplicas());
}
private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema)
private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest()
{
// foo has both hot and cold segments
DataSegment coldSegment =
DataSegment.builder()
.dataSource(DATASOURCE1)
.interval(Intervals.of("1998/P2Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();
// cold has only cold segments
DataSegment singleColdSegment =
DataSegment.builder()
.dataSource("cold")
.interval(Intervals.of("2000/P2Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();
ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new ImmutableMap.Builder<>();
segmentStatsMap.put(coldSegment.getId(), new SegmentMetadata(20L, "foo-fingerprint"));
segmentStatsMap.put(singleColdSegment.getId(), new SegmentMetadata(20L, "cold-fingerprint"));
ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new ImmutableMap.Builder<>();
schemaPayloadMap.put(
"foo-fingerprint",
new SchemaPayload(RowSignature.builder()
.add("dim1", ColumnType.STRING)
.add("c1", ColumnType.STRING)
.add("c2", ColumnType.LONG)
.build())
);
schemaPayloadMap.put(
"cold-fingerprint",
new SchemaPayload(
RowSignature.builder()
.add("f1", ColumnType.STRING)
.add("f2", ColumnType.DOUBLE)
.build()
)
);
segmentSchemaCache.updateFinalizedSegmentSchema(
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
);
List<ImmutableDruidDataSource> druidDataSources = new ArrayList<>();
Map<SegmentId, DataSegment> segmentMap = new HashMap<>();
segmentMap.put(coldSegment.getId(), coldSegment);
segmentMap.put(segment1.getId(), segment1);
segmentMap.put(segment2.getId(), segment2);
druidDataSources.add(new ImmutableDruidDataSource(
coldSegment.getDataSource(),
Collections.emptyMap(),
segmentMap
));
druidDataSources.add(new ImmutableDruidDataSource(
singleColdSegment.getDataSource(),
Collections.emptyMap(),
Collections.singletonMap(singleColdSegment.getId(), singleColdSegment)
));
Mockito.when(
sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
.thenReturn(druidDataSources);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
);
SegmentReplicaCount zeroSegmentReplicaCount = Mockito.mock(SegmentReplicaCount.class);
SegmentReplicaCount nonZeroSegmentReplicaCount = Mockito.mock(SegmentReplicaCount.class);
Mockito.when(zeroSegmentReplicaCount.required()).thenReturn(0);
Mockito.when(nonZeroSegmentReplicaCount.required()).thenReturn(1);
SegmentReplicationStatus segmentReplicationStatus = Mockito.mock(SegmentReplicationStatus.class);
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegment.getId())))
.thenReturn(zeroSegmentReplicaCount);
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(singleColdSegment.getId())))
.thenReturn(zeroSegmentReplicaCount);
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(segment1.getId())))
.thenReturn(nonZeroSegmentReplicaCount);
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(segment2.getId())))
.thenReturn(nonZeroSegmentReplicaCount);
schema.updateSegmentReplicationStatus(segmentReplicationStatus);
schema.updateSegmentReplicationStatus(segmentReplicationStatus);
return schema;
}
@Test
public void testColdDatasourceSchema_refreshAfterColdSchemaExec() throws IOException
{
CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest();
schema.coldDatasourceSchemaExec();
Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), schema.getDataSourceInformationMap().keySet());
// verify that cold schema for both foo and cold is present
RowSignature fooSignature = schema.getDatasource("foo").getRowSignature();
List<String> columnNames = fooSignature.getColumnNames();
// verify that foo schema doesn't contain columns from hot segments
Assert.assertEquals(3, columnNames.size());
Assert.assertEquals("dim1", columnNames.get(0));
Assert.assertEquals(ColumnType.STRING, fooSignature.getColumnType(columnNames.get(0)).get());
Assert.assertEquals("c1", columnNames.get(1));
Assert.assertEquals(ColumnType.STRING, fooSignature.getColumnType(columnNames.get(1)).get());
Assert.assertEquals("c2", columnNames.get(2));
Assert.assertEquals(ColumnType.LONG, fooSignature.getColumnType(columnNames.get(2)).get());
RowSignature coldSignature = schema.getDatasource("cold").getRowSignature();
columnNames = coldSignature.getColumnNames();
Assert.assertEquals("f1", columnNames.get(0));
Assert.assertEquals(ColumnType.STRING, coldSignature.getColumnType(columnNames.get(0)).get());
Assert.assertEquals("f2", columnNames.get(1));
Assert.assertEquals(ColumnType.DOUBLE, coldSignature.getColumnType(columnNames.get(1)).get());
Set<SegmentId> segmentIds = new HashSet<>();
segmentIds.add(segment1.getId());
segmentIds.add(segment2.getId());
schema.refresh(segmentIds, new HashSet<>());
Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), schema.getDataSourceInformationMap().keySet());
coldSignature = schema.getDatasource("cold").getRowSignature();
columnNames = coldSignature.getColumnNames();
Assert.assertEquals("f1", columnNames.get(0));
Assert.assertEquals(ColumnType.STRING, coldSignature.getColumnType(columnNames.get(0)).get());
Assert.assertEquals("f2", columnNames.get(1));
Assert.assertEquals(ColumnType.DOUBLE, coldSignature.getColumnType(columnNames.get(1)).get());
// foo now contains schema from both hot and cold segments
verifyFooDSSchema(schema, 8);
RowSignature rowSignature = schema.getDatasource("foo").getRowSignature();
// cold columns should be present at the end
columnNames = rowSignature.getColumnNames();
Assert.assertEquals("c1", columnNames.get(6));
Assert.assertEquals(ColumnType.STRING, rowSignature.getColumnType(columnNames.get(6)).get());
Assert.assertEquals("c2", columnNames.get(7));
Assert.assertEquals(ColumnType.LONG, rowSignature.getColumnType(columnNames.get(7)).get());
}
@Test
public void testColdDatasourceSchema_coldSchemaExecAfterRefresh() throws IOException
{
CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest();
Set<SegmentId> segmentIds = new HashSet<>();
segmentIds.add(segment1.getId());
segmentIds.add(segment2.getId());
schema.refresh(segmentIds, new HashSet<>());
// cold datasource shouldn't be present
Assert.assertEquals(Collections.singleton("foo"), schema.getDataSourceInformationMap().keySet());
// cold columns shouldn't be present
verifyFooDSSchema(schema, 6);
Assert.assertNull(schema.getDatasource("cold"));
schema.coldDatasourceSchemaExec();
// could datasource should be present now
Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), schema.getDataSourceInformationMap().keySet());
RowSignature coldSignature = schema.getDatasource("cold").getRowSignature();
List<String> columnNames = coldSignature.getColumnNames();
Assert.assertEquals("f1", columnNames.get(0));
Assert.assertEquals(ColumnType.STRING, coldSignature.getColumnType(columnNames.get(0)).get());
Assert.assertEquals("f2", columnNames.get(1));
Assert.assertEquals(ColumnType.DOUBLE, coldSignature.getColumnType(columnNames.get(1)).get());
// columns from cold datasource should be present
verifyFooDSSchema(schema, 8);
RowSignature rowSignature = schema.getDatasource("foo").getRowSignature();
columnNames = rowSignature.getColumnNames();
Assert.assertEquals("c1", columnNames.get(6));
Assert.assertEquals(ColumnType.STRING, rowSignature.getColumnType(columnNames.get(6)).get());
Assert.assertEquals("c2", columnNames.get(7));
Assert.assertEquals(ColumnType.LONG, rowSignature.getColumnType(columnNames.get(7)).get());
}
@Test
public void testColdDatasourceSchema_verifyStaleDatasourceRemoved()
{
DataSegment coldSegmentAlpha =
DataSegment.builder()
.dataSource("alpha")
.interval(Intervals.of("2000/P2Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();
DataSegment coldSegmentBeta =
DataSegment.builder()
.dataSource("beta")
.interval(Intervals.of("2000/P2Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();
DataSegment coldSegmentGamma =
DataSegment.builder()
.dataSource("gamma")
.interval(Intervals.of("2000/P2Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();
DataSegment hotSegmentGamma =
DataSegment.builder()
.dataSource("gamma")
.interval(Intervals.of("2001/P2Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();
ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new ImmutableMap.Builder<>();
segmentStatsMap.put(coldSegmentAlpha.getId(), new SegmentMetadata(20L, "cold"));
segmentStatsMap.put(coldSegmentBeta.getId(), new SegmentMetadata(20L, "cold"));
segmentStatsMap.put(hotSegmentGamma.getId(), new SegmentMetadata(20L, "hot"));
segmentStatsMap.put(coldSegmentGamma.getId(), new SegmentMetadata(20L, "cold"));
ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new ImmutableMap.Builder<>();
schemaPayloadMap.put(
"cold",
new SchemaPayload(RowSignature.builder()
.add("dim1", ColumnType.STRING)
.add("c1", ColumnType.STRING)
.add("c2", ColumnType.LONG)
.build())
);
schemaPayloadMap.put(
"hot",
new SchemaPayload(RowSignature.builder()
.add("c3", ColumnType.STRING)
.add("c4", ColumnType.STRING)
.build())
);
segmentSchemaCache.updateFinalizedSegmentSchema(
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
);
List<ImmutableDruidDataSource> druidDataSources = new ArrayList<>();
druidDataSources.add(
new ImmutableDruidDataSource(
"alpha",
Collections.emptyMap(),
Collections.singletonMap(coldSegmentAlpha.getId(), coldSegmentAlpha)
)
);
Map<SegmentId, DataSegment> gammaSegments = new HashMap<>();
gammaSegments.put(hotSegmentGamma.getId(), hotSegmentGamma);
gammaSegments.put(coldSegmentGamma.getId(), coldSegmentGamma);
druidDataSources.add(
new ImmutableDruidDataSource(
"gamma",
Collections.emptyMap(),
gammaSegments
)
);
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
.thenReturn(druidDataSources);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
);
SegmentReplicaCount zeroSegmentReplicaCount = Mockito.mock(SegmentReplicaCount.class);
SegmentReplicaCount nonZeroSegmentReplicaCount = Mockito.mock(SegmentReplicaCount.class);
Mockito.when(zeroSegmentReplicaCount.required()).thenReturn(0);
Mockito.when(nonZeroSegmentReplicaCount.required()).thenReturn(1);
SegmentReplicationStatus segmentReplicationStatus = Mockito.mock(SegmentReplicationStatus.class);
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegmentAlpha.getId())))
.thenReturn(zeroSegmentReplicaCount);
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegmentBeta.getId())))
.thenReturn(zeroSegmentReplicaCount);
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegmentGamma.getId())))
.thenReturn(zeroSegmentReplicaCount);
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(hotSegmentGamma.getId())))
.thenReturn(nonZeroSegmentReplicaCount);
schema.updateSegmentReplicationStatus(segmentReplicationStatus);
schema.coldDatasourceSchemaExec();
// alpha has only 1 cold segment
Assert.assertNotNull(schema.getDatasource("alpha"));
// gamma has both hot and cold segment
Assert.assertNotNull(schema.getDatasource("gamma"));
// assert that cold schema for gamma doesn't contain any columns from hot segment
RowSignature rowSignature = schema.getDatasource("gamma").getRowSignature();
Assert.assertTrue(rowSignature.contains("dim1"));
Assert.assertTrue(rowSignature.contains("c1"));
Assert.assertTrue(rowSignature.contains("c2"));
Assert.assertFalse(rowSignature.contains("c3"));
Assert.assertFalse(rowSignature.contains("c4"));
Assert.assertEquals(new HashSet<>(Arrays.asList("alpha", "gamma")), schema.getDataSourceInformationMap().keySet());
druidDataSources.clear();
druidDataSources.add(
new ImmutableDruidDataSource(
"beta",
Collections.emptyMap(),
Collections.singletonMap(coldSegmentBeta.getId(), coldSegmentBeta)
)
);
druidDataSources.add(
new ImmutableDruidDataSource(
"gamma",
Collections.emptyMap(),
Collections.singletonMap(hotSegmentGamma.getId(), hotSegmentGamma)
)
);
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
.thenReturn(druidDataSources);
schema.coldDatasourceSchemaExec();
Assert.assertNotNull(schema.getDatasource("beta"));
// alpha doesn't have any segments
Assert.assertNull(schema.getDatasource("alpha"));
// gamma just has 1 hot segment
Assert.assertNull(schema.getDatasource("gamma"));
Assert.assertNull(schema.getDatasource("doesnotexist"));
Assert.assertEquals(Collections.singleton("beta"), schema.getDataSourceInformationMap().keySet());
}
private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
{
final DataSourceInformation fooDs = schema.getDatasource("foo");
final RowSignature fooRowSignature = fooDs.getRowSignature();
List<String> columnNames = fooRowSignature.getColumnNames();
Assert.assertEquals(6, columnNames.size());
Assert.assertEquals(columns, columnNames.size());
Assert.assertEquals("__time", columnNames.get(0));
Assert.assertEquals(ColumnType.LONG, fooRowSignature.getColumnType(columnNames.get(0)).get());

View File

@ -173,6 +173,16 @@ public class BrokerSegmentMetadataCache extends AbstractSegmentMetadataCache<Phy
callbackExec.shutdownNow();
}
/**
* Execute refresh on the broker in each cycle if CentralizedDatasourceSchema is enabled
* else if there are segments or datasources to be refreshed.
*/
@Override
protected boolean shouldRefresh()
{
return centralizedDatasourceSchemaConfig.isEnabled() || super.shouldRefresh();
}
/**
* Refreshes the set of segments in two steps:
* <ul>
@ -196,6 +206,11 @@ public class BrokerSegmentMetadataCache extends AbstractSegmentMetadataCache<Phy
// segmentMetadataInfo keys should be a superset of all other sets including datasources to refresh
final Set<String> dataSourcesToQuery = new HashSet<>(segmentMetadataInfo.keySet());
// this is the complete set of datasources polled from the Coordinator
final Set<String> polledDatasources = queryDataSources();
dataSourcesToQuery.addAll(polledDatasources);
log.debug("Querying schema for [%s] datasources from Coordinator.", dataSourcesToQuery);
// Fetch datasource information from the Coordinator
@ -227,14 +242,7 @@ public class BrokerSegmentMetadataCache extends AbstractSegmentMetadataCache<Phy
// Remove those datasource for which we received schema from the Coordinator.
dataSourcesToRebuild.removeAll(polledDataSourceMetadata.keySet());
if (centralizedDatasourceSchemaConfig.isEnabled()) {
// this is a hacky way to ensure refresh is executed even if there are no new segments to refresh
// once, CentralizedDatasourceSchema feature is GA, brokers should simply poll schema for all datasources
dataSourcesNeedingRebuild.addAll(segmentMetadataInfo.keySet());
} else {
dataSourcesNeedingRebuild.clear();
}
log.debug("DatasourcesNeedingRebuild are [%s]", dataSourcesNeedingRebuild);
dataSourcesNeedingRebuild.clear();
}
// Rebuild the datasources.
@ -267,6 +275,23 @@ public class BrokerSegmentMetadataCache extends AbstractSegmentMetadataCache<Phy
// noop, no additional action needed when segment is removed.
}
private Set<String> queryDataSources()
{
Set<String> dataSources = new HashSet<>();
try {
Set<String> polled = FutureUtils.getUnchecked(coordinatorClient.fetchDataSourcesWithUsedSegments(), true);
if (polled != null) {
dataSources.addAll(polled);
}
}
catch (Exception e) {
log.debug(e, "Failed to query datasources from the Coordinator.");
}
return dataSources;
}
private Map<String, PhysicalDatasourceMetadata> queryDataSourceInformation(Set<String> dataSourcesToQuery)
{
Stopwatch stopwatch = Stopwatch.createStarted();

View File

@ -74,11 +74,13 @@ import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.TestTimelineServerView;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -332,6 +334,9 @@ public class BrokerSegmentMetadataCacheTest extends BrokerSegmentMetadataCacheTe
ArgumentCaptor<Set<String>> argumentCaptor = ArgumentCaptor.forClass(Set.class);
CoordinatorClient coordinatorClient = Mockito.mock(CoordinatorClient.class);
Mockito.when(coordinatorClient.fetchDataSourceInformation(argumentCaptor.capture())).thenReturn(Futures.immediateFuture(null));
Set<String> datsources = Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3, SOME_DATASOURCE, "xyz", "coldDS");
Mockito.when(coordinatorClient.fetchDataSourcesWithUsedSegments()).thenReturn(Futures.immediateFuture(datsources));
BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
@ -347,7 +352,7 @@ public class BrokerSegmentMetadataCacheTest extends BrokerSegmentMetadataCacheTe
schema.start();
schema.awaitInitialization();
Assert.assertEquals(Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3, SOME_DATASOURCE), argumentCaptor.getValue());
Assert.assertEquals(datsources, argumentCaptor.getValue());
refreshLatch = new CountDownLatch(1);
serverView.addSegment(newSegment("xyz", 0), ServerType.HISTORICAL);
@ -355,7 +360,87 @@ public class BrokerSegmentMetadataCacheTest extends BrokerSegmentMetadataCacheTe
refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
// verify that previously refreshed are included in the last coordinator poll
Assert.assertEquals(Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3, SOME_DATASOURCE, "xyz"), argumentCaptor.getValue());
Assert.assertEquals(datsources, argumentCaptor.getValue());
}
@Test
public void testRefreshOnEachCycleCentralizedDatasourceSchemaEnabled() throws InterruptedException
{
CentralizedDatasourceSchemaConfig config = CentralizedDatasourceSchemaConfig.create();
config.setEnabled(true);
serverView = new TestTimelineServerView(walker.getSegments(), Collections.emptyList());
druidServers = serverView.getDruidServers();
BrokerSegmentMetadataCacheConfig metadataCacheConfig = BrokerSegmentMetadataCacheConfig.create("PT1S");
metadataCacheConfig.setMetadataRefreshPeriod(Period.parse("PT0.001S"));
BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
config
) {
@Override
public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToRebuild)
throws IOException
{
super.refresh(segmentsToRefresh, dataSourcesToRebuild);
refreshLatch.countDown();
}
};
// refresh should be executed more than once, with the feature disabled refresh should be executed only once
refreshLatch = new CountDownLatch(3);
schema.start();
schema.awaitInitialization();
refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
Assert.assertEquals(0, refreshLatch.getCount());
}
@Test
public void testRefreshOnEachCycleCentralizedDatasourceSchemaDisabled() throws InterruptedException
{
BrokerSegmentMetadataCacheConfig metadataCacheConfig = BrokerSegmentMetadataCacheConfig.create("PT1S");
metadataCacheConfig.setMetadataRefreshPeriod(Period.parse("PT0.001S"));
serverView = new TestTimelineServerView(walker.getSegments(), Collections.emptyList());
druidServers = serverView.getDruidServers();
BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
) {
@Override
public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToRebuild)
throws IOException
{
super.refresh(segmentsToRefresh, dataSourcesToRebuild);
refreshLatch.countDown();
}
};
// refresh should be executed only once
refreshLatch = new CountDownLatch(3);
schema.start();
schema.awaitInitialization();
refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
Assert.assertEquals(2, refreshLatch.getCount());
}
@Test