diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java index c9b46d678e5..049b78f0328 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java @@ -37,7 +37,7 @@ import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.Escalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; -import org.apache.druid.sql.calcite.schema.DruidSchema; +import org.apache.druid.sql.calcite.schema.SegmentMetadataCache; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; @@ -67,12 +67,11 @@ import java.util.concurrent.TimeUnit; @Measurement(iterations = 10) public class DruidSchemaInternRowSignatureBenchmark { + private SegmentMetadataCacheForBenchmark cache; - private DruidSchemaForBenchmark druidSchema; - - private static class DruidSchemaForBenchmark extends DruidSchema + private static class SegmentMetadataCacheForBenchmark extends SegmentMetadataCache { - public DruidSchemaForBenchmark( + public SegmentMetadataCacheForBenchmark( final QueryLifecycleFactory queryLifecycleFactory, final TimelineServerView serverView, final SegmentManager segmentManager, @@ -89,8 +88,7 @@ public class DruidSchemaInternRowSignatureBenchmark joinableFactory, config, escalator, - brokerInternalQueryConfig, - null + brokerInternalQueryConfig ); } @@ -101,7 +99,6 @@ public class DruidSchemaInternRowSignatureBenchmark return super.refreshSegments(segments); } - @Override public void addSegment(final DruidServerMetadata server, final DataSegment segment) { @@ -173,8 +170,7 @@ public class DruidSchemaInternRowSignatureBenchmark @Setup public void setup() { - - druidSchema = new DruidSchemaForBenchmark( + cache = new SegmentMetadataCacheForBenchmark( EasyMock.mock(QueryLifecycleFactory.class), EasyMock.mock(TimelineServerView.class), null, @@ -202,7 +198,7 @@ public class DruidSchemaInternRowSignatureBenchmark for (int i = 0; i < 10000; ++i) { DataSegment dataSegment = builder.interval(Intervals.of(i + "/" + (i + 1))) .build(); - druidSchema.addSegment(serverMetadata, dataSegment); + cache.addSegment(serverMetadata, dataSegment); } } @@ -211,6 +207,6 @@ public class DruidSchemaInternRowSignatureBenchmark @OutputTimeUnit(TimeUnit.MICROSECONDS) public void addSegments(MyState state, Blackhole blackhole) throws IOException { - blackhole.consume(druidSchema.refreshSegments(state.segmentIds)); + blackhole.consume(cache.refreshSegments(state.segmentIds)); } } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java index a3f24b9551e..7f88fc7bf62 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java @@ -26,7 +26,6 @@ import org.junit.Test; public class HttpInputSourceConfigTest { - @Test public void testEquals() { diff --git a/services/src/main/java/org/apache/druid/cli/CliRouter.java b/services/src/main/java/org/apache/druid/cli/CliRouter.java index d3fcc3e4223..af0ca7c6eb2 100644 --- a/services/src/main/java/org/apache/druid/cli/CliRouter.java +++ b/services/src/main/java/org/apache/druid/cli/CliRouter.java @@ -66,8 +66,8 @@ import java.util.Set; */ @Command( name = "router", - description = "Experimental! Understands tiers and routes things to different brokers, " - + "see https://druid.apache.org/docs/latest/development/router.html for a description" + description = "Understands tiers and routes requests to Druid nodes. " + + "See https://druid.apache.org/docs/latest/design/router.html" ) public class CliRouter extends ServerRunnable { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java index 57c84ef73ab..23c0b1d9c40 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.sql.calcite.table.ExternalTable; import java.util.List; import java.util.Optional; @@ -73,12 +74,10 @@ public class ExternalTableMacro implements TableMacro + "Please change the column name to something other than __time"); } - return new DruidTable( - new ExternalDataSource(inputSource, inputFormat, signature), - signature, - jsonMapper, - false, - false + return new ExternalTable( + new ExternalDataSource(inputSource, inputFormat, signature), + signature, + jsonMapper ); } catch (JsonProcessingException e) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableScan.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableScan.java index d01d5784e0e..70b3ac270dd 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableScan.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableScan.java @@ -27,22 +27,24 @@ import org.apache.calcite.rel.AbstractRelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.type.RelDataType; import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.sql.calcite.table.ExternalTable; /** - * Represents a scan of an external table. Generated by {@link DruidTable} when its datasource is an - * {@link ExternalDataSource}. + * Represents a scan of an external table. Generated by {@link ExternalTable}, + * which represents an {@link ExternalDataSource}. * - * This class is exercised in CalciteInsertDmlTest but is not currently exposed to end users. + * This class is exercised in CalciteInsertDmlTest but is not currently + * exposed to end users. */ public class ExternalTableScan extends AbstractRelNode { private final ObjectMapper jsonMapper; - private final DruidTable druidTable; + private final ExternalTable druidTable; public ExternalTableScan( final RelOptCluster cluster, final ObjectMapper jsonMapper, - final DruidTable druidTable + final ExternalTable druidTable ) { super(cluster, cluster.traitSetOf(Convention.NONE)); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java index 373e3d4abff..3e5c72a71fd 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java @@ -37,6 +37,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.table.DruidTable; import javax.annotation.Nullable; + import java.util.Set; /** diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidLogicalValuesRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidLogicalValuesRule.java index aae2148bb4c..a4660ba853c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidLogicalValuesRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidLogicalValuesRule.java @@ -32,9 +32,11 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.planner.UnsupportedSQLQueryException; import org.apache.druid.sql.calcite.rel.DruidQueryRel; import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.sql.calcite.table.InlineTable; import org.apache.druid.sql.calcite.table.RowSignatures; import javax.annotation.Nullable; + import java.util.List; import java.util.stream.Collectors; @@ -76,12 +78,9 @@ public class DruidLogicalValuesRule extends RelOptRule values.getRowType().getFieldNames(), values.getRowType() ); - final DruidTable druidTable = new DruidTable( - InlineDataSource.fromIterable(objectTuples, rowSignature), - rowSignature, - null, - true, - false + final DruidTable druidTable = new InlineTable( + InlineDataSource.fromIterable(objectTuples, rowSignature), + rowSignature ); call.transformTo( DruidQueryRel.scanValues(values, druidTable, plannerContext) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/AbstractTableSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AbstractTableSchema.java new file mode 100644 index 00000000000..cd4e5a2a00c --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AbstractTableSchema.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.google.common.collect.ImmutableSet; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; + +/** + * Calcite provides two "schema" abstractions. {@link Schema} is an interface, + * while {@link org.apache.calcite.schema.impl.AbstractSchema} is a base class + * that "locks down" the {@link #getTable} method to obtain the table from a + * map. This forces the extensions of that class to materialize all tables in + * the schema, so that Calcite can pick the one it wants. This class, by + * contrast, provides the same defaults as {@link + * org.apache.calcite.schema.impl.AbstractSchema AbstractSchema}, but assumes + * its subslasses will implement {@code getTable()} to directly look up that + * one table, ignoring all others. Doing so lowers the cost of table resolution, + * especially when the system has to fetch catalog information for the table: + * we only fetch the information we need, not information for all tables. + */ +public abstract class AbstractTableSchema implements Schema +{ + protected static final Set EMPTY_NAMES = ImmutableSet.of(); + + @Override + public RelProtoDataType getType(String name) + { + return null; + } + + @Override + public Set getTypeNames() + { + return EMPTY_NAMES; + } + + @Override + public Collection getFunctions(String name) + { + return Collections.emptyList(); + } + + @Override + public Set getFunctionNames() + { + return EMPTY_NAMES; + } + + @Override + public Schema getSubSchema(String name) + { + return null; + } + + @Override + public Set getSubSchemaNames() + { + return EMPTY_NAMES; + } + + @Override + public Expression getExpression(SchemaPlus parentSchema, String name) + { + return null; + } + + @Override + public boolean isMutable() + { + return false; + } + + @Override + public Schema snapshot(SchemaVersion version) + { + return this; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModule.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModule.java index 0a8de292271..cc1371315ea 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModule.java @@ -49,8 +49,9 @@ public class DruidCalciteSchemaModule implements Module .toProvider(RootSchemaProvider.class) .in(Scopes.SINGLETON); - // DruidSchema needs to listen to changes for incoming segments - LifecycleModule.register(binder, DruidSchema.class); + // SegmentMetadataCache needs to listen to changes for incoming segments + LifecycleModule.register(binder, SegmentMetadataCache.class); + binder.bind(DruidSchema.class).in(Scopes.SINGLETON); binder.bind(SystemSchema.class).in(Scopes.SINGLETON); binder.bind(InformationSchema.class).in(Scopes.SINGLETON); binder.bind(LookupSchema.class).in(Scopes.SINGLETON); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 898489cce39..ec0b9cb2c11 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -19,952 +19,54 @@ package org.apache.druid.sql.calcite.schema; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.errorprone.annotations.concurrent.GuardedBy; -import com.google.inject.Inject; import org.apache.calcite.schema.Table; -import org.apache.calcite.schema.impl.AbstractSchema; -import org.apache.druid.client.BrokerInternalQueryConfig; -import org.apache.druid.client.ServerView; -import org.apache.druid.client.TimelineServerView; -import org.apache.druid.guice.ManageLifecycle; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.guava.Sequence; -import org.apache.druid.java.util.common.guava.Yielder; -import org.apache.druid.java.util.common.guava.Yielders; -import org.apache.druid.java.util.common.lifecycle.LifecycleStart; -import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.query.GlobalTableDataSource; -import org.apache.druid.query.TableDataSource; -import org.apache.druid.query.metadata.metadata.AllColumnIncluderator; -import org.apache.druid.query.metadata.metadata.ColumnAnalysis; -import org.apache.druid.query.metadata.metadata.SegmentAnalysis; -import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; -import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; -import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.segment.join.JoinableFactory; -import org.apache.druid.server.QueryLifecycleFactory; -import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.coordination.DruidServerMetadata; -import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.security.Access; -import org.apache.druid.server.security.Escalator; -import org.apache.druid.sql.calcite.planner.PlannerConfig; -import org.apache.druid.sql.calcite.table.DruidTable; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; +import org.apache.druid.sql.calcite.table.DatasourceTable; + +import javax.inject.Inject; -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; -@ManageLifecycle -public class DruidSchema extends AbstractSchema +public class DruidSchema extends AbstractTableSchema { - // Newest segments first, so they override older ones. - private static final Comparator SEGMENT_ORDER = Comparator - .comparing((SegmentId segmentId) -> segmentId.getInterval().getStart()) - .reversed() - .thenComparing(Function.identity()); - - private static final EmittingLogger log = new EmittingLogger(DruidSchema.class); - private static final int MAX_SEGMENTS_PER_QUERY = 15000; - private static final long DEFAULT_NUM_ROWS = 0; - - private final QueryLifecycleFactory queryLifecycleFactory; - private final PlannerConfig config; - // Escalator, so we can attach an authentication result to queries we generate. - private final Escalator escalator; - private final SegmentManager segmentManager; - private final JoinableFactory joinableFactory; - private final ExecutorService cacheExec; - private final ExecutorService callbackExec; + private final SegmentMetadataCache segmentCache; private final DruidSchemaManager druidSchemaManager; - /** - * Map of DataSource -> DruidTable. - * This map can be accessed by {@link #cacheExec} and {@link #callbackExec} threads. - */ - private final ConcurrentMap tables = new ConcurrentHashMap<>(); - - private static final Interner ROW_SIGNATURE_INTERNER = Interners.newWeakInterner(); - - /** - * DataSource -> Segment -> AvailableSegmentMetadata(contains RowSignature) for that segment. - * Use SortedMap for segments so they are merged in deterministic order, from older to newer. - * - * This map is updated by these two threads. - * - * - {@link #callbackExec} can update it in {@link #addSegment}, {@link #removeServerSegment}, - * and {@link #removeSegment}. - * - {@link #cacheExec} can update it in {@link #refreshSegmentsForDataSource}. - * - * While it is being updated, this map is read by these two types of thread. - * - * - {@link #cacheExec} can iterate all {@link AvailableSegmentMetadata}s per datasource. - * See {@link #buildDruidTable}. - * - Query threads can create a snapshot of the entire map for processing queries on the system table. - * See {@link #getSegmentMetadataSnapshot()}. - * - * As the access pattern of this map is read-intensive, we should minimize the contention between writers and readers. - * Since there are two threads that can update this map at the same time, those writers should lock the inner map - * first and then lock the entry before it updates segment metadata. This can be done using - * {@link ConcurrentMap#compute} as below. Note that, if you need to update the variables guarded by {@link #lock} - * inside of compute(), you should get the lock before calling compute() to keep the function executed in compute() - * not expensive. - * - *
-   *   segmentMedataInfo.compute(
-   *     datasourceParam,
-   *     (datasource, segmentsMap) -> {
-   *       if (segmentsMap == null) return null;
-   *       else {
-   *         segmentsMap.compute(
-   *           segmentIdParam,
-   *           (segmentId, segmentMetadata) -> {
-   *             // update segmentMetadata
-   *           }
-   *         );
-   *         return segmentsMap;
-   *       }
-   *     }
-   *   );
-   * 
- * - * Readers can simply delegate the locking to the concurrent map and iterate map entries. - */ - private final ConcurrentHashMap> segmentMetadataInfo - = new ConcurrentHashMap<>(); - - // For awaitInitialization. - private final CountDownLatch initialized = new CountDownLatch(1); - - /** - * This lock coordinates the access from multiple threads to those variables guarded by this lock. - * Currently, there are 2 threads that can access these variables. - * - * - {@link #callbackExec} executes the timeline callbacks whenever BrokerServerView changes. - * - {@link #cacheExec} periodically refreshes segment metadata and {@link DruidTable} if necessary - * based on the information collected via timeline callbacks. - */ - private final Object lock = new Object(); - - // All mutable segments. - @GuardedBy("lock") - private final TreeSet mutableSegments = new TreeSet<>(SEGMENT_ORDER); - - // All dataSources that need tables regenerated. - @GuardedBy("lock") - private final Set dataSourcesNeedingRebuild = new HashSet<>(); - - // All segments that need to be refreshed. - @GuardedBy("lock") - private final TreeSet segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER); - - // Configured context to attach to internally generated queries. - private final BrokerInternalQueryConfig brokerInternalQueryConfig; - - @GuardedBy("lock") - private boolean refreshImmediately = false; - - @GuardedBy("lock") - private boolean isServerViewInitialized = false; - - /** - * Counts the total number of known segments. This variable is used only for the segments table in the system schema - * to initialize a map with a more proper size when it creates a snapshot. As a result, it doesn't have to be exact, - * and thus there is no concurrency control for this variable. - */ - private int totalSegments = 0; - @Inject public DruidSchema( - final QueryLifecycleFactory queryLifecycleFactory, - final TimelineServerView serverView, - final SegmentManager segmentManager, - final JoinableFactory joinableFactory, - final PlannerConfig config, - final Escalator escalator, - final BrokerInternalQueryConfig brokerInternalQueryConfig, - final DruidSchemaManager druidSchemaManager - ) + SegmentMetadataCache segmentCache, + final DruidSchemaManager druidSchemaManager) { - this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory"); - Preconditions.checkNotNull(serverView, "serverView"); - this.segmentManager = segmentManager; - this.joinableFactory = joinableFactory; - this.config = Preconditions.checkNotNull(config, "config"); - this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d"); - this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d"); - this.escalator = escalator; - this.brokerInternalQueryConfig = brokerInternalQueryConfig; - this.druidSchemaManager = druidSchemaManager; - - initServerViewTimelineCallback(serverView); - } - - private void initServerViewTimelineCallback(final TimelineServerView serverView) - { - serverView.registerTimelineCallback( - callbackExec, - new TimelineServerView.TimelineCallback() - { - @Override - public ServerView.CallbackAction timelineInitialized() - { - synchronized (lock) { - isServerViewInitialized = true; - lock.notifyAll(); - } - - return ServerView.CallbackAction.CONTINUE; - } - - @Override - public ServerView.CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment) - { - addSegment(server, segment); - return ServerView.CallbackAction.CONTINUE; - } - - @Override - public ServerView.CallbackAction segmentRemoved(final DataSegment segment) - { - removeSegment(segment); - return ServerView.CallbackAction.CONTINUE; - } - - @Override - public ServerView.CallbackAction serverSegmentRemoved( - final DruidServerMetadata server, - final DataSegment segment - ) - { - removeServerSegment(server, segment); - return ServerView.CallbackAction.CONTINUE; - } - } - ); - } - - private void startCacheExec() - { - cacheExec.submit( - () -> { - long lastRefresh = 0L; - long lastFailure = 0L; - - try { - while (!Thread.currentThread().isInterrupted()) { - final Set segmentsToRefresh = new TreeSet<>(); - final Set dataSourcesToRebuild = new TreeSet<>(); - - try { - synchronized (lock) { - final long nextRefreshNoFuzz = DateTimes - .utc(lastRefresh) - .plus(config.getMetadataRefreshPeriod()) - .getMillis(); - - // Fuzz a bit to spread load out when we have multiple brokers. - final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10); - - while (true) { - // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure). - final boolean wasRecentFailure = DateTimes.utc(lastFailure) - .plus(config.getMetadataRefreshPeriod()) - .isAfterNow(); - - if (isServerViewInitialized && - !wasRecentFailure && - (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) && - (refreshImmediately || nextRefresh < System.currentTimeMillis())) { - // We need to do a refresh. Break out of the waiting loop. - break; - } - - // lastFailure != 0L means exceptions happened before and there're some refresh work was not completed. - // so that even ServerView is initialized, we can't let broker complete initialization. - if (isServerViewInitialized && lastFailure == 0L) { - // Server view is initialized, but we don't need to do a refresh. Could happen if there are - // no segments in the system yet. Just mark us as initialized, then. - initialized.countDown(); - } - - // Wait some more, we'll wake up when it might be time to do another refresh. - lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis())); - } - - segmentsToRefresh.addAll(segmentsNeedingRefresh); - segmentsNeedingRefresh.clear(); - - // Mutable segments need a refresh every period, since new columns could be added dynamically. - segmentsNeedingRefresh.addAll(mutableSegments); - - lastFailure = 0L; - lastRefresh = System.currentTimeMillis(); - refreshImmediately = false; - } - - refresh(segmentsToRefresh, dataSourcesToRebuild); - - initialized.countDown(); - } - catch (InterruptedException e) { - // Fall through. - throw e; - } - catch (Exception e) { - log.warn(e, "Metadata refresh failed, trying again soon."); - - synchronized (lock) { - // Add our segments and dataSources back to their refresh and rebuild lists. - segmentsNeedingRefresh.addAll(segmentsToRefresh); - dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild); - lastFailure = System.currentTimeMillis(); - } - } - } - } - catch (InterruptedException e) { - // Just exit. - } - catch (Throwable e) { - // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like - // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata. - log.makeAlert(e, "Metadata refresh failed permanently").emit(); - throw e; - } - finally { - log.info("Metadata refresh stopped."); - } - } - ); - } - - @LifecycleStart - public void start() throws InterruptedException - { - startCacheExec(); - - if (config.isAwaitInitializationOnStart()) { - final long startNanos = System.nanoTime(); - log.debug("%s waiting for initialization.", getClass().getSimpleName()); - awaitInitialization(); - log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), (System.nanoTime() - startNanos) / 1000000); + this.segmentCache = segmentCache; + if (druidSchemaManager != null && !(druidSchemaManager instanceof NoopDruidSchemaManager)) { + this.druidSchemaManager = druidSchemaManager; + } else { + this.druidSchemaManager = null; } } - @VisibleForTesting - void refresh(final Set segmentsToRefresh, final Set dataSourcesToRebuild) throws IOException + protected SegmentMetadataCache cache() { - // Refresh the segments. - final Set refreshed = refreshSegments(segmentsToRefresh); - - synchronized (lock) { - // Add missing segments back to the refresh list. - segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed)); - - // Compute the list of dataSources to rebuild tables for. - dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild); - refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource())); - dataSourcesNeedingRebuild.clear(); - } - - // Rebuild the dataSources. - for (String dataSource : dataSourcesToRebuild) { - final DruidTable druidTable = buildDruidTable(dataSource); - if (druidTable == null) { - log.info("dataSource[%s] no longer exists, all metadata removed.", dataSource); - tables.remove(dataSource); - continue; - } - final DruidTable oldTable = tables.put(dataSource, druidTable); - final String description = druidTable.getDataSource().isGlobal() ? "global dataSource" : "dataSource"; - if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { - log.info("%s [%s] has new signature: %s.", description, dataSource, druidTable.getRowSignature()); - } else { - log.debug("%s [%s] signature is unchanged.", description, dataSource); - } - } - } - - @LifecycleStop - public void stop() - { - cacheExec.shutdownNow(); - callbackExec.shutdownNow(); - } - - public void awaitInitialization() throws InterruptedException - { - initialized.await(); + return segmentCache; } @Override - protected Map getTableMap() + public Table getTable(String name) { - if (druidSchemaManager != null && !(druidSchemaManager instanceof NoopDruidSchemaManager)) { - return ImmutableMap.copyOf(druidSchemaManager.getTables()); + if (druidSchemaManager != null) { + return druidSchemaManager.getTable(name); } else { - return ImmutableMap.copyOf(tables); + DatasourceTable.PhysicalDatasourceMetadata dsMetadata = segmentCache.getDatasource(name); + return dsMetadata == null ? null : new DatasourceTable(dsMetadata); } } - @VisibleForTesting - protected void addSegment(final DruidServerMetadata server, final DataSegment segment) + @Override + public Set getTableNames() { - // Get lock first so that we won't wait in ConcurrentMap.compute(). - synchronized (lock) { - // someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking - // broker served segments in the timeline, to ensure that removeSegment the event is triggered accurately - if (server.getType().equals(ServerType.BROKER)) { - // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the - // historical, however mark the datasource for refresh because it needs to be globalized - markDataSourceAsNeedRebuild(segment.getDataSource()); - } else { - segmentMetadataInfo.compute( - segment.getDataSource(), - (datasource, segmentsMap) -> { - if (segmentsMap == null) { - segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER); - } - segmentsMap.compute( - segment.getId(), - (segmentId, segmentMetadata) -> { - if (segmentMetadata == null) { - // Unknown segment. - totalSegments++; - // segmentReplicatable is used to determine if segments are served by historical or realtime servers - long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1; - segmentMetadata = AvailableSegmentMetadata - .builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a refresh - .build(); - markSegmentAsNeedRefresh(segment.getId()); - if (!server.isSegmentReplicationTarget()) { - log.debug("Added new mutable segment[%s].", segment.getId()); - markSegmentAsMutable(segment.getId()); - } else { - log.debug("Added new immutable segment[%s].", segment.getId()); - } - } else { - // We know this segment. - final Set segmentServers = segmentMetadata.getReplicas(); - final ImmutableSet servers = new ImmutableSet.Builder() - .addAll(segmentServers) - .add(server) - .build(); - segmentMetadata = AvailableSegmentMetadata - .from(segmentMetadata) - .withReplicas(servers) - .withRealtime(recomputeIsRealtime(servers)) - .build(); - if (server.isSegmentReplicationTarget()) { - // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, - // even if it's also available on non-replicatable (realtime) servers. - unmarkSegmentAsMutable(segment.getId()); - log.debug("Segment[%s] has become immutable.", segment.getId()); - } - } - assert segmentMetadata != null; - return segmentMetadata; - } - ); - - return segmentsMap; - } - ); - } - if (!tables.containsKey(segment.getDataSource())) { - refreshImmediately = true; - } - - lock.notifyAll(); - } - } - - @VisibleForTesting - void removeSegment(final DataSegment segment) - { - // Get lock first so that we won't wait in ConcurrentMap.compute(). - synchronized (lock) { - log.debug("Segment[%s] is gone.", segment.getId()); - - segmentsNeedingRefresh.remove(segment.getId()); - unmarkSegmentAsMutable(segment.getId()); - - segmentMetadataInfo.compute( - segment.getDataSource(), - (dataSource, segmentsMap) -> { - if (segmentsMap == null) { - log.warn("Unknown segment[%s] was removed from the cluster. Ignoring this event.", segment.getId()); - return null; - } else { - if (segmentsMap.remove(segment.getId()) == null) { - log.warn("Unknown segment[%s] was removed from the cluster. Ignoring this event.", segment.getId()); - } else { - totalSegments--; - } - if (segmentsMap.isEmpty()) { - tables.remove(segment.getDataSource()); - log.info("dataSource[%s] no longer exists, all metadata removed.", segment.getDataSource()); - return null; - } else { - markDataSourceAsNeedRebuild(segment.getDataSource()); - return segmentsMap; - } - } - } - ); - - lock.notifyAll(); - } - } - - @VisibleForTesting - void removeServerSegment(final DruidServerMetadata server, final DataSegment segment) - { - // Get lock first so that we won't wait in ConcurrentMap.compute(). - synchronized (lock) { - log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); - segmentMetadataInfo.compute( - segment.getDataSource(), - (datasource, knownSegments) -> { - if (knownSegments == null) { - log.warn( - "Unknown segment[%s] is removed from server[%s]. Ignoring this event", - segment.getId(), - server.getHost() - ); - return null; - } - - if (server.getType().equals(ServerType.BROKER)) { - // for brokers, if the segment drops from all historicals before the broker this could be null. - if (!knownSegments.isEmpty()) { - // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the - // historical, however mark the datasource for refresh because it might no longer be broadcast or something - markDataSourceAsNeedRebuild(segment.getDataSource()); - } - } else { - knownSegments.compute( - segment.getId(), - (segmentId, segmentMetadata) -> { - if (segmentMetadata == null) { - log.warn( - "Unknown segment[%s] is removed from server[%s]. Ignoring this event", - segment.getId(), - server.getHost() - ); - return null; - } else { - final Set segmentServers = segmentMetadata.getReplicas(); - final ImmutableSet servers = FluentIterable - .from(segmentServers) - .filter(Predicates.not(Predicates.equalTo(server))) - .toSet(); - return AvailableSegmentMetadata - .from(segmentMetadata) - .withReplicas(servers) - .withRealtime(recomputeIsRealtime(servers)) - .build(); - } - } - ); - } - if (knownSegments.isEmpty()) { - return null; - } else { - return knownSegments; - } - } - ); - - lock.notifyAll(); - } - } - - private void markSegmentAsNeedRefresh(SegmentId segmentId) - { - synchronized (lock) { - segmentsNeedingRefresh.add(segmentId); - } - } - - private void markSegmentAsMutable(SegmentId segmentId) - { - synchronized (lock) { - mutableSegments.add(segmentId); - } - } - - private void unmarkSegmentAsMutable(SegmentId segmentId) - { - synchronized (lock) { - mutableSegments.remove(segmentId); - } - } - - @VisibleForTesting - void markDataSourceAsNeedRebuild(String datasource) - { - synchronized (lock) { - dataSourcesNeedingRebuild.add(datasource); - } - } - - /** - * Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed, - * which may be a subset of the asked-for set. - */ - @VisibleForTesting - protected Set refreshSegments(final Set segments) throws IOException - { - final Set retVal = new HashSet<>(); - - // Organize segments by dataSource. - final Map> segmentMap = new TreeMap<>(); - - for (SegmentId segmentId : segments) { - segmentMap.computeIfAbsent(segmentId.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER)) - .add(segmentId); - } - - for (Map.Entry> entry : segmentMap.entrySet()) { - final String dataSource = entry.getKey(); - retVal.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue())); - } - - return retVal; - } - - private long recomputeIsRealtime(ImmutableSet servers) - { - if (servers.isEmpty()) { - return 0; - } - final Optional historicalServer = servers - .stream() - // Ideally, this filter should have checked whether it's a broadcast segment loaded in brokers. - // However, we don't current track of the broadcast segments loaded in brokers, so this filter is still valid. - // See addSegment(), removeServerSegment(), and removeSegment() - .filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL)) - .findAny(); - - // if there is any historical server in the replicas, isRealtime flag should be unset - return historicalServer.isPresent() ? 0 : 1; - } - - /** - * Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of - * segments actually refreshed, which may be a subset of the asked-for set. - */ - private Set refreshSegmentsForDataSource(final String dataSource, final Set segments) - throws IOException - { - if (!segments.stream().allMatch(segmentId -> segmentId.getDataSource().equals(dataSource))) { - // Sanity check. We definitely expect this to pass. - throw new ISE("'segments' must all match 'dataSource'!"); - } - - log.debug("Refreshing metadata for dataSource[%s].", dataSource); - - final long startTime = System.currentTimeMillis(); - - // Segment id string -> SegmentId object. - final Map segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString); - - final Set retVal = new HashSet<>(); - final Sequence sequence = runSegmentMetadataQuery( - Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY) - ); - - Yielder yielder = Yielders.each(sequence); - - try { - while (!yielder.isDone()) { - final SegmentAnalysis analysis = yielder.get(); - final SegmentId segmentId = segmentIdMap.get(analysis.getId()); - - if (segmentId == null) { - log.warn("Got analysis for segment[%s] we didn't ask for, ignoring.", analysis.getId()); - } else { - final RowSignature rowSignature = analysisToRowSignature(analysis); - log.debug("Segment[%s] has signature[%s].", segmentId, rowSignature); - segmentMetadataInfo.compute( - dataSource, - (datasourceKey, dataSourceSegments) -> { - if (dataSourceSegments == null) { - // Datasource may have been removed or become unavailable while this refresh was ongoing. - log.warn( - "No segment map found with datasource[%s], skipping refresh of segment[%s]", - datasourceKey, - segmentId - ); - return null; - } else { - dataSourceSegments.compute( - segmentId, - (segmentIdKey, segmentMetadata) -> { - if (segmentMetadata == null) { - log.warn("No segment[%s] found, skipping refresh", segmentId); - return null; - } else { - final AvailableSegmentMetadata updatedSegmentMetadata = AvailableSegmentMetadata - .from(segmentMetadata) - .withRowSignature(rowSignature) - .withNumRows(analysis.getNumRows()) - .build(); - retVal.add(segmentId); - return updatedSegmentMetadata; - } - } - ); - - if (dataSourceSegments.isEmpty()) { - return null; - } else { - return dataSourceSegments; - } - } - } - ); - } - - yielder = yielder.next(null); - } - } - finally { - yielder.close(); - } - - log.debug( - "Refreshed metadata for dataSource[%s] in %,d ms (%d segments queried, %d segments left).", - dataSource, - System.currentTimeMillis() - startTime, - retVal.size(), - segments.size() - retVal.size() - ); - - return retVal; - } - - @VisibleForTesting - @Nullable - DruidTable buildDruidTable(final String dataSource) - { - ConcurrentSkipListMap segmentsMap = segmentMetadataInfo.get(dataSource); - - // Preserve order. - final Map columnTypes = new LinkedHashMap<>(); - - if (segmentsMap != null && !segmentsMap.isEmpty()) { - for (AvailableSegmentMetadata availableSegmentMetadata : segmentsMap.values()) { - final RowSignature rowSignature = availableSegmentMetadata.getRowSignature(); - if (rowSignature != null) { - for (String column : rowSignature.getColumnNames()) { - // Newer column types should override older ones. - final ColumnType columnType = - rowSignature.getColumnType(column) - .orElseThrow(() -> new ISE("Encountered null type for column[%s]", column)); - - columnTypes.putIfAbsent(column, columnType); - } - } - } + if (druidSchemaManager != null) { + return druidSchemaManager.getTableNames(); } else { - // table has no segments - return null; - } - - final RowSignature.Builder builder = RowSignature.builder(); - columnTypes.forEach(builder::add); - - final TableDataSource tableDataSource; - - // to be a GlobalTableDataSource instead of a TableDataSource, it must appear on all servers (inferred by existing - // in the segment cache, which in this case belongs to the broker meaning only broadcast segments live here) - // to be joinable, it must be possibly joinable according to the factory. we only consider broadcast datasources - // at this time, and isGlobal is currently strongly coupled with joinable, so only make a global table datasource - // if also joinable - final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource); - final boolean isJoinable = joinableFactory.isDirectlyJoinable(maybeGlobal); - final boolean isBroadcast = segmentManager.getDataSourceNames().contains(dataSource); - if (isBroadcast && isJoinable) { - tableDataSource = maybeGlobal; - } else { - tableDataSource = new TableDataSource(dataSource); - } - return new DruidTable(tableDataSource, builder.build(), null, isJoinable, isBroadcast); - } - - @VisibleForTesting - Map getSegmentMetadataSnapshot() - { - final Map segmentMetadata = Maps.newHashMapWithExpectedSize(totalSegments); - for (ConcurrentSkipListMap val : segmentMetadataInfo.values()) { - segmentMetadata.putAll(val); - } - return segmentMetadata; - } - - /** - * Returns total number of segments. This method doesn't use the lock intentionally to avoid expensive contention. - * As a result, the returned value might be inexact. - */ - int getTotalSegments() - { - return totalSegments; - } - - @VisibleForTesting - TreeSet getSegmentsNeedingRefresh() - { - synchronized (lock) { - return segmentsNeedingRefresh; - } - } - - @VisibleForTesting - TreeSet getMutableSegments() - { - synchronized (lock) { - return mutableSegments; - } - } - - @VisibleForTesting - Set getDataSourcesNeedingRebuild() - { - synchronized (lock) { - return dataSourcesNeedingRebuild; - } - } - - /** - * Execute a SegmentMetadata query and return a {@link Sequence} of {@link SegmentAnalysis}. - * - * @param segments Iterable of {@link SegmentId} objects that are subject of the SegmentMetadata query. - * @return {@link Sequence} of {@link SegmentAnalysis} objects - */ - @VisibleForTesting - protected Sequence runSegmentMetadataQuery( - final Iterable segments - ) - { - // Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource. - final String dataSource = Iterables.getOnlyElement( - StreamSupport.stream(segments.spliterator(), false) - .map(SegmentId::getDataSource).collect(Collectors.toSet()) - ); - - final MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec( - StreamSupport.stream(segments.spliterator(), false) - .map(SegmentId::toDescriptor).collect(Collectors.toList()) - ); - - final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery( - new TableDataSource(dataSource), - querySegmentSpec, - new AllColumnIncluderator(), - false, - brokerInternalQueryConfig.getContext(), - EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), - false, - false - ); - - return queryLifecycleFactory - .factorize() - .runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK); - } - - @VisibleForTesting - static RowSignature analysisToRowSignature(final SegmentAnalysis analysis) - { - final RowSignature.Builder rowSignatureBuilder = RowSignature.builder(); - for (Map.Entry entry : analysis.getColumns().entrySet()) { - if (entry.getValue().isError()) { - // Skip columns with analysis errors. - continue; - } - - ColumnType valueType = entry.getValue().getTypeSignature(); - - // this shouldn't happen, but if it does, first try to fall back to legacy type information field in case - // standard upgrade order was not followed for 0.22 to 0.23+, and if that also fails, then assume types are some - // flavor of COMPLEX. - if (valueType == null) { - // at some point in the future this can be simplified to the contents of the catch clause here, once the - // likelyhood of upgrading from some version lower than 0.23 is low - try { - valueType = ColumnType.fromString(entry.getValue().getType()); - } - catch (IllegalArgumentException ignored) { - valueType = ColumnType.UNKNOWN_COMPLEX; - } - } - - rowSignatureBuilder.add(entry.getKey(), valueType); - } - return ROW_SIGNATURE_INTERNER.intern(rowSignatureBuilder.build()); - } - - /** - * This method is not thread-safe and must be used only in unit tests. - */ - @VisibleForTesting - void setAvailableSegmentMetadata(final SegmentId segmentId, final AvailableSegmentMetadata availableSegmentMetadata) - { - final ConcurrentSkipListMap dataSourceSegments = segmentMetadataInfo - .computeIfAbsent( - segmentId.getDataSource(), - k -> new ConcurrentSkipListMap<>(SEGMENT_ORDER) - ); - if (dataSourceSegments.put(segmentId, availableSegmentMetadata) == null) { - totalSegments++; - } - } - - /** - * This is a helper method for unit tests to emulate heavy work done with {@link #lock}. - * It must be used only in unit tests. - */ - @VisibleForTesting - void doInLock(Runnable runnable) - { - synchronized (lock) { - runnable.run(); + return segmentCache.getDatasourceNames(); } } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchemaManager.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchemaManager.java index dd3e7380312..c203ab18825 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchemaManager.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchemaManager.java @@ -23,16 +23,41 @@ import org.apache.druid.guice.annotations.ExtensionPoint; import org.apache.druid.guice.annotations.UnstableApi; import org.apache.druid.sql.calcite.table.DruidTable; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; +import java.util.Set; /** - * This interface provides a map of datasource names to {@link DruidTable} objects, used by the {@link DruidSchema} - * class as the SQL planner's view of Druid datasource schemas. If a non-default implementation is provided, - * the segment metadata polling-based view of the Druid tables will not be built in DruidSchema. + * This interface provides a map of datasource names to {@link DruidTable} + * objects, used by the {@link DruidSchema} class as the SQL planner's + * view of Druid datasource schemas. If a non-default implementation is + * provided, the segment metadata polling-based view of the Druid tables + * will not be built in DruidSchema. */ @ExtensionPoint @UnstableApi public interface DruidSchemaManager { - ConcurrentMap getTables(); + /** + * Return all tables known to this schema manager. Deprecated because getting + * all tables is never actually needed in the current code. Calcite asks for + * the information for a single table (via {@link #getTable(String)}, or + * the list of all table names (via {@link #getTableNames()}. This + * method was originally used to allow Calcite's {@link + * org.apache.calcite.schema.impl.AbstractSchema AbstractSchema} class to do + * the lookup. The current code implements the operations directly. For + * backward compatibility, the default methods emulate the old behavior. + * Newer implementations should omit this method and implement the other two. + */ + @Deprecated + Map getTables(); + + default DruidTable getTable(String name) + { + return getTables().get(name); + } + + default Set getTableNames() + { + return getTables().keySet(); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java index e0f86f0afde..7f16d0ae9fa 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java @@ -59,6 +59,7 @@ import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.RowSignatures; import javax.annotation.Nullable; + import java.util.Collection; import java.util.Collections; import java.util.Map; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java index 1fea9116ff5..b8453ba551c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java @@ -28,7 +28,7 @@ import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.lookup.LookupColumnSelectorFactory; -import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.sql.calcite.table.LookupTable; import java.util.Map; @@ -57,11 +57,12 @@ public class LookupSchema extends AbstractSchema final ImmutableMap.Builder tableMapBuilder = ImmutableMap.builder(); for (final String lookupName : lookupProvider.getAllLookupNames()) { - // all lookups should be also joinable through lookup joinable factory, and lookups are effectively broadcast - // (if we ignore lookup tiers...) tableMapBuilder.put( lookupName, - new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE, null, true, true) + new LookupTable( + new LookupDataSource(lookupName), + ROW_SIGNATURE + ) ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/NoopDruidSchemaManager.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/NoopDruidSchemaManager.java index 4d62dfec6fe..3ea16e61ac9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/NoopDruidSchemaManager.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/NoopDruidSchemaManager.java @@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.schema; import org.apache.druid.sql.calcite.table.DruidTable; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -31,7 +32,7 @@ public class NoopDruidSchemaManager implements DruidSchemaManager private static ConcurrentMap MAP = new ConcurrentHashMap<>(); @Override - public ConcurrentMap getTables() + public Map getTables() { return MAP; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java new file mode 100644 index 00000000000..427108d9d83 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java @@ -0,0 +1,971 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.inject.Inject; +import org.apache.druid.client.BrokerInternalQueryConfig; +import org.apache.druid.client.ServerView; +import org.apache.druid.client.TimelineServerView; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.query.GlobalTableDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.metadata.metadata.AllColumnIncluderator; +import org.apache.druid.query.metadata.metadata.ColumnAnalysis; +import org.apache.druid.query.metadata.metadata.SegmentAnalysis; +import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.Escalator; +import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.table.DatasourceTable; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * Broker-side cache of segment metadata which combines segments to identify + * datasources which become "tables" in Calcite. This cache provides the "physical" + * metadata about a datasource which is blended with catalog "logical" metadata + * to provide the final user-view of each datasource. + */ +@ManageLifecycle +public class SegmentMetadataCache +{ + // Newest segments first, so they override older ones. + private static final Comparator SEGMENT_ORDER = Comparator + .comparing((SegmentId segmentId) -> segmentId.getInterval().getStart()) + .reversed() + .thenComparing(Function.identity()); + + private static final EmittingLogger log = new EmittingLogger(SegmentMetadataCache.class); + private static final int MAX_SEGMENTS_PER_QUERY = 15000; + private static final long DEFAULT_NUM_ROWS = 0; + + private final QueryLifecycleFactory queryLifecycleFactory; + private final PlannerConfig config; + // Escalator, so we can attach an authentication result to queries we generate. + private final Escalator escalator; + private final SegmentManager segmentManager; + private final JoinableFactory joinableFactory; + private final ExecutorService cacheExec; + private final ExecutorService callbackExec; + + /** + * Map of DataSource -> DruidTable. + * This map can be accessed by {@link #cacheExec} and {@link #callbackExec} threads. + */ + private final ConcurrentMap tables = new ConcurrentHashMap<>(); + + private static final Interner ROW_SIGNATURE_INTERNER = Interners.newWeakInterner(); + + /** + * DataSource -> Segment -> AvailableSegmentMetadata(contains RowSignature) for that segment. + * Use SortedMap for segments so they are merged in deterministic order, from older to newer. + * + * This map is updated by these two threads. + * + * - {@link #callbackExec} can update it in {@link #addSegment}, {@link #removeServerSegment}, + * and {@link #removeSegment}. + * - {@link #cacheExec} can update it in {@link #refreshSegmentsForDataSource}. + * + * While it is being updated, this map is read by these two types of thread. + * + * - {@link #cacheExec} can iterate all {@link AvailableSegmentMetadata}s per datasource. + * See {@link #buildDruidTable}. + * - Query threads can create a snapshot of the entire map for processing queries on the system table. + * See {@link #getSegmentMetadataSnapshot()}. + * + * As the access pattern of this map is read-intensive, we should minimize the contention between writers and readers. + * Since there are two threads that can update this map at the same time, those writers should lock the inner map + * first and then lock the entry before it updates segment metadata. This can be done using + * {@link ConcurrentMap#compute} as below. Note that, if you need to update the variables guarded by {@link #lock} + * inside of compute(), you should get the lock before calling compute() to keep the function executed in compute() + * not expensive. + * + *
+   *   segmentMedataInfo.compute(
+   *     datasourceParam,
+   *     (datasource, segmentsMap) -> {
+   *       if (segmentsMap == null) return null;
+   *       else {
+   *         segmentsMap.compute(
+   *           segmentIdParam,
+   *           (segmentId, segmentMetadata) -> {
+   *             // update segmentMetadata
+   *           }
+   *         );
+   *         return segmentsMap;
+   *       }
+   *     }
+   *   );
+   * 
+ * + * Readers can simply delegate the locking to the concurrent map and iterate map entries. + */ + private final ConcurrentHashMap> segmentMetadataInfo + = new ConcurrentHashMap<>(); + + // For awaitInitialization. + private final CountDownLatch initialized = new CountDownLatch(1); + + /** + * This lock coordinates the access from multiple threads to those variables guarded by this lock. + * Currently, there are 2 threads that can access these variables. + * + * - {@link #callbackExec} executes the timeline callbacks whenever BrokerServerView changes. + * - {@link #cacheExec} periodically refreshes segment metadata and {@link DatasourceTable} if necessary + * based on the information collected via timeline callbacks. + */ + private final Object lock = new Object(); + + // All mutable segments. + @GuardedBy("lock") + private final TreeSet mutableSegments = new TreeSet<>(SEGMENT_ORDER); + + // All dataSources that need tables regenerated. + @GuardedBy("lock") + private final Set dataSourcesNeedingRebuild = new HashSet<>(); + + // All segments that need to be refreshed. + @GuardedBy("lock") + private final TreeSet segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER); + + // Configured context to attach to internally generated queries. + private final BrokerInternalQueryConfig brokerInternalQueryConfig; + + @GuardedBy("lock") + private boolean refreshImmediately = false; + + @GuardedBy("lock") + private boolean isServerViewInitialized = false; + + /** + * Counts the total number of known segments. This variable is used only for the segments table in the system schema + * to initialize a map with a more proper size when it creates a snapshot. As a result, it doesn't have to be exact, + * and thus there is no concurrency control for this variable. + */ + private int totalSegments = 0; + + @Inject + public SegmentMetadataCache( + final QueryLifecycleFactory queryLifecycleFactory, + final TimelineServerView serverView, + final SegmentManager segmentManager, + final JoinableFactory joinableFactory, + final PlannerConfig config, + final Escalator escalator, + final BrokerInternalQueryConfig brokerInternalQueryConfig + ) + { + this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory"); + Preconditions.checkNotNull(serverView, "serverView"); + this.segmentManager = segmentManager; + this.joinableFactory = joinableFactory; + this.config = Preconditions.checkNotNull(config, "config"); + this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d"); + this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d"); + this.escalator = escalator; + this.brokerInternalQueryConfig = brokerInternalQueryConfig; + + initServerViewTimelineCallback(serverView); + } + + private void initServerViewTimelineCallback(final TimelineServerView serverView) + { + serverView.registerTimelineCallback( + callbackExec, + new TimelineServerView.TimelineCallback() + { + @Override + public ServerView.CallbackAction timelineInitialized() + { + synchronized (lock) { + isServerViewInitialized = true; + lock.notifyAll(); + } + + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment) + { + addSegment(server, segment); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentRemoved(final DataSegment segment) + { + removeSegment(segment); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction serverSegmentRemoved( + final DruidServerMetadata server, + final DataSegment segment + ) + { + removeServerSegment(server, segment); + return ServerView.CallbackAction.CONTINUE; + } + } + ); + } + + private void startCacheExec() + { + cacheExec.submit( + () -> { + long lastRefresh = 0L; + long lastFailure = 0L; + + try { + while (!Thread.currentThread().isInterrupted()) { + final Set segmentsToRefresh = new TreeSet<>(); + final Set dataSourcesToRebuild = new TreeSet<>(); + + try { + synchronized (lock) { + final long nextRefreshNoFuzz = DateTimes + .utc(lastRefresh) + .plus(config.getMetadataRefreshPeriod()) + .getMillis(); + + // Fuzz a bit to spread load out when we have multiple brokers. + final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10); + + while (true) { + // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure). + final boolean wasRecentFailure = DateTimes.utc(lastFailure) + .plus(config.getMetadataRefreshPeriod()) + .isAfterNow(); + + if (isServerViewInitialized && + !wasRecentFailure && + (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) && + (refreshImmediately || nextRefresh < System.currentTimeMillis())) { + // We need to do a refresh. Break out of the waiting loop. + break; + } + + // lastFailure != 0L means exceptions happened before and there're some refresh work was not completed. + // so that even ServerView is initialized, we can't let broker complete initialization. + if (isServerViewInitialized && lastFailure == 0L) { + // Server view is initialized, but we don't need to do a refresh. Could happen if there are + // no segments in the system yet. Just mark us as initialized, then. + initialized.countDown(); + } + + // Wait some more, we'll wake up when it might be time to do another refresh. + lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis())); + } + + segmentsToRefresh.addAll(segmentsNeedingRefresh); + segmentsNeedingRefresh.clear(); + + // Mutable segments need a refresh every period, since new columns could be added dynamically. + segmentsNeedingRefresh.addAll(mutableSegments); + + lastFailure = 0L; + lastRefresh = System.currentTimeMillis(); + refreshImmediately = false; + } + + refresh(segmentsToRefresh, dataSourcesToRebuild); + + initialized.countDown(); + } + catch (InterruptedException e) { + // Fall through. + throw e; + } + catch (Exception e) { + log.warn(e, "Metadata refresh failed, trying again soon."); + + synchronized (lock) { + // Add our segments and dataSources back to their refresh and rebuild lists. + segmentsNeedingRefresh.addAll(segmentsToRefresh); + dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild); + lastFailure = System.currentTimeMillis(); + } + } + } + } + catch (InterruptedException e) { + // Just exit. + } + catch (Throwable e) { + // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like + // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata. + log.makeAlert(e, "Metadata refresh failed permanently").emit(); + throw e; + } + finally { + log.info("Metadata refresh stopped."); + } + } + ); + } + + @LifecycleStart + public void start() throws InterruptedException + { + startCacheExec(); + + if (config.isAwaitInitializationOnStart()) { + final long startNanos = System.nanoTime(); + log.debug("%s waiting for initialization.", getClass().getSimpleName()); + awaitInitialization(); + log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), (System.nanoTime() - startNanos) / 1000000); + } + } + + @VisibleForTesting + void refresh(final Set segmentsToRefresh, final Set dataSourcesToRebuild) throws IOException + { + // Refresh the segments. + final Set refreshed = refreshSegments(segmentsToRefresh); + + synchronized (lock) { + // Add missing segments back to the refresh list. + segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed)); + + // Compute the list of dataSources to rebuild tables for. + dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild); + refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource())); + dataSourcesNeedingRebuild.clear(); + } + + // Rebuild the dataSources. + for (String dataSource : dataSourcesToRebuild) { + final DatasourceTable.PhysicalDatasourceMetadata druidTable = buildDruidTable(dataSource); + if (druidTable == null) { + log.info("dataSource [%s] no longer exists, all metadata removed.", dataSource); + tables.remove(dataSource); + continue; + } + final DatasourceTable.PhysicalDatasourceMetadata oldTable = tables.put(dataSource, druidTable); + final String description = druidTable.dataSource().isGlobal() ? "global dataSource" : "dataSource"; + if (oldTable == null || !oldTable.rowSignature().equals(druidTable.rowSignature())) { + log.info("%s [%s] has new signature: %s.", description, dataSource, druidTable.rowSignature()); + } else { + log.debug("%s [%s] signature is unchanged.", description, dataSource); + } + } + } + + @LifecycleStop + public void stop() + { + cacheExec.shutdownNow(); + callbackExec.shutdownNow(); + } + + public void awaitInitialization() throws InterruptedException + { + initialized.await(); + } + + protected DatasourceTable.PhysicalDatasourceMetadata getDatasource(String name) + { + return tables.get(name); + } + + protected Set getDatasourceNames() + { + return tables.keySet(); + } + + @VisibleForTesting + protected void addSegment(final DruidServerMetadata server, final DataSegment segment) + { + // Get lock first so that we won't wait in ConcurrentMap.compute(). + synchronized (lock) { + // someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking + // broker served segments in the timeline, to ensure that removeSegment the event is triggered accurately + if (server.getType().equals(ServerType.BROKER)) { + // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the + // historical, however mark the datasource for refresh because it needs to be globalized + markDataSourceAsNeedRebuild(segment.getDataSource()); + } else { + segmentMetadataInfo.compute( + segment.getDataSource(), + (datasource, segmentsMap) -> { + if (segmentsMap == null) { + segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER); + } + segmentsMap.compute( + segment.getId(), + (segmentId, segmentMetadata) -> { + if (segmentMetadata == null) { + // Unknown segment. + totalSegments++; + // segmentReplicatable is used to determine if segments are served by historical or realtime servers + long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1; + segmentMetadata = AvailableSegmentMetadata + .builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a refresh + .build(); + markSegmentAsNeedRefresh(segment.getId()); + if (!server.isSegmentReplicationTarget()) { + log.debug("Added new mutable segment [%s].", segment.getId()); + markSegmentAsMutable(segment.getId()); + } else { + log.debug("Added new immutable segment [%s].", segment.getId()); + } + } else { + // We know this segment. + final Set segmentServers = segmentMetadata.getReplicas(); + final ImmutableSet servers = new ImmutableSet.Builder() + .addAll(segmentServers) + .add(server) + .build(); + segmentMetadata = AvailableSegmentMetadata + .from(segmentMetadata) + .withReplicas(servers) + .withRealtime(recomputeIsRealtime(servers)) + .build(); + if (server.isSegmentReplicationTarget()) { + // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, + // even if it's also available on non-replicatable (realtime) servers. + unmarkSegmentAsMutable(segment.getId()); + log.debug("Segment[%s] has become immutable.", segment.getId()); + } + } + assert segmentMetadata != null; + return segmentMetadata; + } + ); + + return segmentsMap; + } + ); + } + if (!tables.containsKey(segment.getDataSource())) { + refreshImmediately = true; + } + + lock.notifyAll(); + } + } + + @VisibleForTesting + void removeSegment(final DataSegment segment) + { + // Get lock first so that we won't wait in ConcurrentMap.compute(). + synchronized (lock) { + log.debug("Segment [%s] is gone.", segment.getId()); + + segmentsNeedingRefresh.remove(segment.getId()); + unmarkSegmentAsMutable(segment.getId()); + + segmentMetadataInfo.compute( + segment.getDataSource(), + (dataSource, segmentsMap) -> { + if (segmentsMap == null) { + log.warn("Unknown segment [%s] was removed from the cluster. Ignoring this event.", segment.getId()); + return null; + } else { + if (segmentsMap.remove(segment.getId()) == null) { + log.warn("Unknown segment [%s] was removed from the cluster. Ignoring this event.", segment.getId()); + } else { + totalSegments--; + } + if (segmentsMap.isEmpty()) { + tables.remove(segment.getDataSource()); + log.info("dataSource [%s] no longer exists, all metadata removed.", segment.getDataSource()); + return null; + } else { + markDataSourceAsNeedRebuild(segment.getDataSource()); + return segmentsMap; + } + } + } + ); + + lock.notifyAll(); + } + } + + @VisibleForTesting + void removeServerSegment(final DruidServerMetadata server, final DataSegment segment) + { + // Get lock first so that we won't wait in ConcurrentMap.compute(). + synchronized (lock) { + log.debug("Segment [%s] is gone from server [%s]", segment.getId(), server.getName()); + segmentMetadataInfo.compute( + segment.getDataSource(), + (datasource, knownSegments) -> { + if (knownSegments == null) { + log.warn( + "Unknown segment [%s] is removed from server [%s]. Ignoring this event", + segment.getId(), + server.getHost() + ); + return null; + } + + if (server.getType().equals(ServerType.BROKER)) { + // for brokers, if the segment drops from all historicals before the broker this could be null. + if (!knownSegments.isEmpty()) { + // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the + // historical, however mark the datasource for refresh because it might no longer be broadcast or something + markDataSourceAsNeedRebuild(segment.getDataSource()); + } + } else { + knownSegments.compute( + segment.getId(), + (segmentId, segmentMetadata) -> { + if (segmentMetadata == null) { + log.warn( + "Unknown segment [%s] is removed from server [%s]. Ignoring this event", + segment.getId(), + server.getHost() + ); + return null; + } else { + final Set segmentServers = segmentMetadata.getReplicas(); + final ImmutableSet servers = FluentIterable + .from(segmentServers) + .filter(Predicates.not(Predicates.equalTo(server))) + .toSet(); + return AvailableSegmentMetadata + .from(segmentMetadata) + .withReplicas(servers) + .withRealtime(recomputeIsRealtime(servers)) + .build(); + } + } + ); + } + if (knownSegments.isEmpty()) { + return null; + } else { + return knownSegments; + } + } + ); + + lock.notifyAll(); + } + } + + private void markSegmentAsNeedRefresh(SegmentId segmentId) + { + synchronized (lock) { + segmentsNeedingRefresh.add(segmentId); + } + } + + private void markSegmentAsMutable(SegmentId segmentId) + { + synchronized (lock) { + mutableSegments.add(segmentId); + } + } + + private void unmarkSegmentAsMutable(SegmentId segmentId) + { + synchronized (lock) { + mutableSegments.remove(segmentId); + } + } + + @VisibleForTesting + void markDataSourceAsNeedRebuild(String datasource) + { + synchronized (lock) { + dataSourcesNeedingRebuild.add(datasource); + } + } + + /** + * Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed, + * which may be a subset of the asked-for set. + */ + @VisibleForTesting + protected Set refreshSegments(final Set segments) throws IOException + { + final Set retVal = new HashSet<>(); + + // Organize segments by dataSource. + final Map> segmentMap = new TreeMap<>(); + + for (SegmentId segmentId : segments) { + segmentMap.computeIfAbsent(segmentId.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER)) + .add(segmentId); + } + + for (Map.Entry> entry : segmentMap.entrySet()) { + final String dataSource = entry.getKey(); + retVal.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue())); + } + + return retVal; + } + + private long recomputeIsRealtime(ImmutableSet servers) + { + if (servers.isEmpty()) { + return 0; + } + final Optional historicalServer = servers + .stream() + // Ideally, this filter should have checked whether it's a broadcast segment loaded in brokers. + // However, we don't current track of the broadcast segments loaded in brokers, so this filter is still valid. + // See addSegment(), removeServerSegment(), and removeSegment() + .filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL)) + .findAny(); + + // if there is any historical server in the replicas, isRealtime flag should be unset + return historicalServer.isPresent() ? 0 : 1; + } + + /** + * Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of + * segments actually refreshed, which may be a subset of the asked-for set. + */ + private Set refreshSegmentsForDataSource(final String dataSource, final Set segments) + throws IOException + { + if (!segments.stream().allMatch(segmentId -> segmentId.getDataSource().equals(dataSource))) { + // Sanity check. We definitely expect this to pass. + throw new ISE("'segments' must all match 'dataSource'!"); + } + + log.debug("Refreshing metadata for dataSource[%s].", dataSource); + + final long startTime = System.currentTimeMillis(); + + // Segment id string -> SegmentId object. + final Map segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString); + + final Set retVal = new HashSet<>(); + final Sequence sequence = runSegmentMetadataQuery( + Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY) + ); + + Yielder yielder = Yielders.each(sequence); + + try { + while (!yielder.isDone()) { + final SegmentAnalysis analysis = yielder.get(); + final SegmentId segmentId = segmentIdMap.get(analysis.getId()); + + if (segmentId == null) { + log.warn("Got analysis for segment [%s] we didn't ask for, ignoring.", analysis.getId()); + } else { + final RowSignature rowSignature = analysisToRowSignature(analysis); + log.debug("Segment[%s] has signature[%s].", segmentId, rowSignature); + segmentMetadataInfo.compute( + dataSource, + (datasourceKey, dataSourceSegments) -> { + if (dataSourceSegments == null) { + // Datasource may have been removed or become unavailable while this refresh was ongoing. + log.warn( + "No segment map found with datasource [%s], skipping refresh of segment [%s]", + datasourceKey, + segmentId + ); + return null; + } else { + dataSourceSegments.compute( + segmentId, + (segmentIdKey, segmentMetadata) -> { + if (segmentMetadata == null) { + log.warn("No segment [%s] found, skipping refresh", segmentId); + return null; + } else { + final AvailableSegmentMetadata updatedSegmentMetadata = AvailableSegmentMetadata + .from(segmentMetadata) + .withRowSignature(rowSignature) + .withNumRows(analysis.getNumRows()) + .build(); + retVal.add(segmentId); + return updatedSegmentMetadata; + } + } + ); + + if (dataSourceSegments.isEmpty()) { + return null; + } else { + return dataSourceSegments; + } + } + } + ); + } + + yielder = yielder.next(null); + } + } + finally { + yielder.close(); + } + + log.debug( + "Refreshed metadata for dataSource [%s] in %,d ms (%d segments queried, %d segments left).", + dataSource, + System.currentTimeMillis() - startTime, + retVal.size(), + segments.size() - retVal.size() + ); + + return retVal; + } + + @VisibleForTesting + @Nullable + DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(final String dataSource) + { + ConcurrentSkipListMap segmentsMap = segmentMetadataInfo.get(dataSource); + + // Preserve order. + final Map columnTypes = new LinkedHashMap<>(); + + if (segmentsMap != null && !segmentsMap.isEmpty()) { + for (AvailableSegmentMetadata availableSegmentMetadata : segmentsMap.values()) { + final RowSignature rowSignature = availableSegmentMetadata.getRowSignature(); + if (rowSignature != null) { + for (String column : rowSignature.getColumnNames()) { + // Newer column types should override older ones. + final ColumnType columnType = + rowSignature.getColumnType(column) + .orElseThrow(() -> new ISE("Encountered null type for column [%s]", column)); + + columnTypes.putIfAbsent(column, columnType); + } + } + } + } else { + // table has no segments + return null; + } + + final RowSignature.Builder builder = RowSignature.builder(); + columnTypes.forEach(builder::add); + + final TableDataSource tableDataSource; + + // to be a GlobalTableDataSource instead of a TableDataSource, it must appear on all servers (inferred by existing + // in the segment cache, which in this case belongs to the broker meaning only broadcast segments live here) + // to be joinable, it must be possibly joinable according to the factory. we only consider broadcast datasources + // at this time, and isGlobal is currently strongly coupled with joinable, so only make a global table datasource + // if also joinable + final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource); + final boolean isJoinable = joinableFactory.isDirectlyJoinable(maybeGlobal); + final boolean isBroadcast = segmentManager.getDataSourceNames().contains(dataSource); + if (isBroadcast && isJoinable) { + tableDataSource = maybeGlobal; + } else { + tableDataSource = new TableDataSource(dataSource); + } + return new DatasourceTable.PhysicalDatasourceMetadata(tableDataSource, builder.build(), isJoinable, isBroadcast); + } + + @VisibleForTesting + Map getSegmentMetadataSnapshot() + { + final Map segmentMetadata = Maps.newHashMapWithExpectedSize(totalSegments); + for (ConcurrentSkipListMap val : segmentMetadataInfo.values()) { + segmentMetadata.putAll(val); + } + return segmentMetadata; + } + + /** + * Returns total number of segments. This method doesn't use the lock intentionally to avoid expensive contention. + * As a result, the returned value might be inexact. + */ + int getTotalSegments() + { + return totalSegments; + } + + @VisibleForTesting + TreeSet getSegmentsNeedingRefresh() + { + synchronized (lock) { + return segmentsNeedingRefresh; + } + } + + @VisibleForTesting + TreeSet getMutableSegments() + { + synchronized (lock) { + return mutableSegments; + } + } + + @VisibleForTesting + Set getDataSourcesNeedingRebuild() + { + synchronized (lock) { + return dataSourcesNeedingRebuild; + } + } + + /** + * Execute a SegmentMetadata query and return a {@link Sequence} of {@link SegmentAnalysis}. + * + * @param segments Iterable of {@link SegmentId} objects that are subject of the SegmentMetadata query. + * @return {@link Sequence} of {@link SegmentAnalysis} objects + */ + @VisibleForTesting + protected Sequence runSegmentMetadataQuery( + final Iterable segments + ) + { + // Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource. + final String dataSource = Iterables.getOnlyElement( + StreamSupport.stream(segments.spliterator(), false) + .map(SegmentId::getDataSource).collect(Collectors.toSet()) + ); + + final MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec( + StreamSupport.stream(segments.spliterator(), false) + .map(SegmentId::toDescriptor).collect(Collectors.toList()) + ); + + final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery( + new TableDataSource(dataSource), + querySegmentSpec, + new AllColumnIncluderator(), + false, + brokerInternalQueryConfig.getContext(), + EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), + false, + false + ); + + return queryLifecycleFactory + .factorize() + .runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK); + } + + @VisibleForTesting + static RowSignature analysisToRowSignature(final SegmentAnalysis analysis) + { + final RowSignature.Builder rowSignatureBuilder = RowSignature.builder(); + for (Map.Entry entry : analysis.getColumns().entrySet()) { + if (entry.getValue().isError()) { + // Skip columns with analysis errors. + continue; + } + + ColumnType valueType = entry.getValue().getTypeSignature(); + + // this shouldn't happen, but if it does, first try to fall back to legacy type information field in case + // standard upgrade order was not followed for 0.22 to 0.23+, and if that also fails, then assume types are some + // flavor of COMPLEX. + if (valueType == null) { + // at some point in the future this can be simplified to the contents of the catch clause here, once the + // likelyhood of upgrading from some version lower than 0.23 is low + try { + valueType = ColumnType.fromString(entry.getValue().getType()); + } + catch (IllegalArgumentException ignored) { + valueType = ColumnType.UNKNOWN_COMPLEX; + } + } + + rowSignatureBuilder.add(entry.getKey(), valueType); + } + return ROW_SIGNATURE_INTERNER.intern(rowSignatureBuilder.build()); + } + + /** + * This method is not thread-safe and must be used only in unit tests. + */ + @VisibleForTesting + void setAvailableSegmentMetadata(final SegmentId segmentId, final AvailableSegmentMetadata availableSegmentMetadata) + { + final ConcurrentSkipListMap dataSourceSegments = segmentMetadataInfo + .computeIfAbsent( + segmentId.getDataSource(), + k -> new ConcurrentSkipListMap<>(SEGMENT_ORDER) + ); + if (dataSourceSegments.put(segmentId, availableSegmentMetadata) == null) { + totalSegments++; + } + } + + /** + * This is a helper method for unit tests to emulate heavy work done with {@link #lock}. + * It must be used only in unit tests. + */ + @VisibleForTesting + void doInLock(Runnable runnable) + { + synchronized (lock) { + runnable.run(); + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 83068e2c1f7..4b1b62b044b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -273,13 +273,13 @@ public class SystemSchema extends AbstractSchema { //get available segments from druidSchema final Map availableSegmentMetadata = - druidSchema.getSegmentMetadataSnapshot(); + druidSchema.cache().getSegmentMetadataSnapshot(); final Iterator> availableSegmentEntries = availableSegmentMetadata.entrySet().iterator(); // in memory map to store segment data from available segments final Map partialSegmentDataMap = - Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments()); + Maps.newHashMapWithExpectedSize(druidSchema.cache().getTotalSegments()); for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) { PartialSegmentData partialSegmentData = new PartialSegmentData(IS_AVAILABLE_TRUE, h.isRealtime(), h.getNumReplicas(), h.getNumRows()); @@ -290,7 +290,7 @@ public class SystemSchema extends AbstractSchema // Coordinator. final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); - final Set segmentsAlreadySeen = Sets.newHashSetWithExpectedSize(druidSchema.getTotalSegments()); + final Set segmentsAlreadySeen = Sets.newHashSetWithExpectedSize(druidSchema.cache().getTotalSegments()); final FluentIterable publishedSegments = FluentIterable .from(() -> getAuthorizedPublishedSegments(metadataStoreSegments, root)) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java new file mode 100644 index 00000000000..d4c7073cae3 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.table; + +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.segment.column.RowSignature; + +import java.util.Objects; + +/** + * Represents a SQL table that models a Druid datasource. + *

+ * Once the catalog code is merged, this class will combine physical information + * from the segment cache with logical information from the catalog to produce + * the SQL-user's view of the table. The resulting merged view is used to plan + * queries and is the source of table information in the {@code INFORMATION_SCHEMA}. + */ +public class DatasourceTable extends DruidTable +{ + /** + * The physical metadata for a datasource, derived from the list of segments + * published in the Coordinator. Used only for datasources, since only + * datasources are computed from segments. + */ + public static class PhysicalDatasourceMetadata + { + private final TableDataSource dataSource; + private final RowSignature rowSignature; + private final boolean joinable; + private final boolean broadcast; + + public PhysicalDatasourceMetadata( + final TableDataSource dataSource, + final RowSignature rowSignature, + final boolean isJoinable, + final boolean isBroadcast + ) + { + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature"); + this.joinable = isJoinable; + this.broadcast = isBroadcast; + } + + public TableDataSource dataSource() + { + return dataSource; + } + + public RowSignature rowSignature() + { + return rowSignature; + } + + public boolean isJoinable() + { + return joinable; + } + + public boolean isBroadcast() + { + return broadcast; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + PhysicalDatasourceMetadata that = (PhysicalDatasourceMetadata) o; + + if (!Objects.equals(dataSource, that.dataSource)) { + return false; + } + return Objects.equals(rowSignature, that.rowSignature); + } + + @Override + public int hashCode() + { + int result = dataSource != null ? dataSource.hashCode() : 0; + result = 31 * result + (rowSignature != null ? rowSignature.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return "DatasourceMetadata{" + + "dataSource=" + dataSource + + ", rowSignature=" + rowSignature + + '}'; + } + } + + private final PhysicalDatasourceMetadata physicalMetadata; + + public DatasourceTable( + final PhysicalDatasourceMetadata physicalMetadata + ) + { + super(physicalMetadata.rowSignature()); + this.physicalMetadata = physicalMetadata; + } + + @Override + public DataSource getDataSource() + { + return physicalMetadata.dataSource(); + } + + @Override + public boolean isJoinable() + { + return physicalMetadata.isJoinable(); + } + + @Override + public boolean isBroadcast() + { + return physicalMetadata.isBroadcast(); + } + + @Override + public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable table) + { + return LogicalTableScan.create(context.getCluster(), table); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DatasourceTable that = (DatasourceTable) o; + + if (!Objects.equals(physicalMetadata, that.physicalMetadata)) { + return false; + } + return Objects.equals(getRowSignature(), that.getRowSignature()); + } + + @Override + public int hashCode() + { + return physicalMetadata.hashCode(); + } + + @Override + public String toString() + { + // Don't include the row signature: it is the same as in + // physicalMetadata. + return "DruidTable{" + + physicalMetadata + + '}'; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java index 4a49001244c..90d3cd653ff 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java @@ -19,12 +19,8 @@ package org.apache.druid.sql.calcite.table; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import org.apache.calcite.config.CalciteConnectionConfig; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.Schema; @@ -35,45 +31,19 @@ import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlNode; import org.apache.druid.query.DataSource; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.external.ExternalDataSource; -import org.apache.druid.sql.calcite.external.ExternalTableScan; -import javax.annotation.Nullable; -import java.util.Objects; - -public class DruidTable implements TranslatableTable +/** + * Abstract base class for the various kinds of tables which Druid supports. + */ +public abstract class DruidTable implements TranslatableTable { - private final DataSource dataSource; private final RowSignature rowSignature; - @Nullable - private final ObjectMapper objectMapper; - private final boolean joinable; - private final boolean broadcast; - public DruidTable( - final DataSource dataSource, - final RowSignature rowSignature, - @Nullable final ObjectMapper objectMapper, - final boolean isJoinable, - final boolean isBroadcast + final RowSignature rowSignature ) { - this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature"); - this.objectMapper = objectMapper; - this.joinable = isJoinable; - this.broadcast = isBroadcast; - - if (dataSource instanceof ExternalDataSource && objectMapper == null) { - // objectMapper is used by ExternalTableScan to generate its digest. - throw new NullPointerException("ObjectMapper is required for external datasources"); - } - } - - public DataSource getDataSource() - { - return dataSource; } public RowSignature getRowSignature() @@ -81,15 +51,11 @@ public class DruidTable implements TranslatableTable return rowSignature; } - public boolean isJoinable() - { - return joinable; - } + public abstract DataSource getDataSource(); - public boolean isBroadcast() - { - return broadcast; - } + public abstract boolean isJoinable(); + + public abstract boolean isBroadcast(); @Override public Schema.TableType getJdbcTableType() @@ -106,12 +72,7 @@ public class DruidTable implements TranslatableTable @Override public RelDataType getRowType(final RelDataTypeFactory typeFactory) { - // For external datasources, the row type should be determined by whatever the row signature has been explicitly - // passed in. Typecasting directly to SqlTypeName.TIMESTAMP will lead to inconsistencies with the Calcite functions - // For example, TIME_PARSE(__time) where __time is specified to be a string field in the external datasource - // would lead to an exception because __time would be interpreted as timestamp if we typecast it. - boolean typecastTimeColumn = !(dataSource instanceof ExternalDataSource); - return RowSignatures.toRelDataType(rowSignature, typeFactory, typecastTimeColumn); + return RowSignatures.toRelDataType(getRowSignature(), typeFactory); } @Override @@ -130,51 +91,4 @@ public class DruidTable implements TranslatableTable { return true; } - - @Override - public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable table) - { - if (dataSource instanceof ExternalDataSource) { - // Cannot use LogicalTableScan here, because its digest is solely based on the name of the table macro. - // Must use our own class that computes its own digest. - return new ExternalTableScan(context.getCluster(), objectMapper, this); - } else { - return LogicalTableScan.create(context.getCluster(), table); - } - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - DruidTable that = (DruidTable) o; - - if (!Objects.equals(dataSource, that.dataSource)) { - return false; - } - return Objects.equals(rowSignature, that.rowSignature); - } - - @Override - public int hashCode() - { - int result = dataSource != null ? dataSource.hashCode() : 0; - result = 31 * result + (rowSignature != null ? rowSignature.hashCode() : 0); - return result; - } - - @Override - public String toString() - { - return "DruidTable{" + - "dataSource=" + dataSource + - ", rowSignature=" + rowSignature + - '}'; - } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java new file mode 100644 index 00000000000..cc3ee7c3f6a --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.table; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptTable.ToRelContext; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.druid.query.DataSource; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.external.ExternalTableScan; + +/** + * Represents an source of data external to Druid: a CSV file, an HTTP request, etc. + * Each such table represents one of Druid's {@link DataSource} types. Since SQL + * requires knowledge of the schema of that input source, the user must provide + * that information in SQL (via the `EXTERN` or up-coming `STAGED` function) or + * from the upcoming Druid Catalog. + */ +public class ExternalTable extends DruidTable +{ + private final DataSource dataSource; + private final ObjectMapper objectMapper; + + public ExternalTable( + final DataSource dataSource, + final RowSignature rowSignature, + final ObjectMapper objectMapper + ) + { + super(rowSignature); + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.objectMapper = objectMapper; + } + + @Override + public DataSource getDataSource() + { + return dataSource; + } + + @Override + public boolean isJoinable() + { + return false; + } + + @Override + public boolean isBroadcast() + { + return false; + } + + @Override + public RelDataType getRowType(final RelDataTypeFactory typeFactory) + { + // For external datasources, the row type should be determined by whatever the row signature has been explicitly + // passed in. Typecasting directly to SqlTypeName.TIMESTAMP will lead to inconsistencies with the Calcite functions + // For example, TIME_PARSE(__time) where __time is specified to be a string field in the external datasource + // would lead to an exception because __time would be interpreted as timestamp if we typecast it. + return RowSignatures.toRelDataType(getRowSignature(), typeFactory, true); + } + + @Override + public RelNode toRel(ToRelContext context, RelOptTable relOptTable) + { + return new ExternalTableScan(context.getCluster(), objectMapper, this); + } + + @Override + public String toString() + { + return "ExternalTable{" + + "dataSource=" + dataSource + + ", rowSignature=" + getRowSignature() + + '}'; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/InlineTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/InlineTable.java new file mode 100644 index 00000000000..e8a7f8e2987 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/InlineTable.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.table; + +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptTable.ToRelContext; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.segment.column.RowSignature; + +/** + * Represents a specialized table used within Druid's Calcite-based planner. + * Used in {@link org.apache.druid.sql.calcite.rule.DruidLogicalValuesRule DruidLogicalValuesRule} + * to implement trivial {@code SELECT 1} style queries that return constant values represented + * by this inline table. + */ +public class InlineTable extends DruidTable +{ + private final DataSource dataSource; + + public InlineTable( + final InlineDataSource dataSource, + final RowSignature rowSignature + ) + { + super(rowSignature); + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + } + + @Override + public DataSource getDataSource() + { + return dataSource; + } + + @Override + public boolean isJoinable() + { + return false; + } + + @Override + public boolean isBroadcast() + { + return false; + } + + @Override + public RelNode toRel(ToRelContext context, RelOptTable table) + { + return LogicalTableScan.create(context.getCluster(), table); + } + + @Override + public String toString() + { + return "DatasourceMetadata{" + + "dataSource=" + dataSource + + ", rowSignature=" + getRowSignature() + + '}'; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/LookupTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/LookupTable.java new file mode 100644 index 00000000000..eac0e44443c --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/LookupTable.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.table; + +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptTable.ToRelContext; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.LookupDataSource; +import org.apache.druid.segment.column.RowSignature; + +/** + * Represents a Druid lookup as a Calcite table, allowing queries to read from a lookup. + */ +public class LookupTable extends DruidTable +{ + private final DataSource dataSource; + + public LookupTable( + final LookupDataSource dataSource, + final RowSignature rowSignature + ) + { + super(rowSignature); + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + } + + @Override + public DataSource getDataSource() + { + return dataSource; + } + + @Override + public boolean isJoinable() + { + return true; + } + + @Override + public boolean isBroadcast() + { + return true; + } + + @Override + public RelNode toRel(ToRelContext context, RelOptTable table) + { + return LogicalTableScan.create(context.getCluster(), table); + } + + @Override + public String toString() + { + return "LookupTable{" + + "dataSource=" + dataSource + + ", rowSignature=" + getRowSignature() + + '}'; + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/expression/ExpressionTestHelper.java b/sql/src/test/java/org/apache/druid/sql/calcite/expression/ExpressionTestHelper.java index 3117610ec8d..ce255fd4147 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/expression/ExpressionTestHelper.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/expression/ExpressionTestHelper.java @@ -63,6 +63,7 @@ import org.joda.time.DateTimeZone; import org.junit.Assert; import javax.annotation.Nullable; + import java.math.BigDecimal; import java.util.Arrays; import java.util.Collections; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/external/ExternalTableScanRuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/external/ExternalTableScanRuleTest.java index 5a07f59d679..c30b2c0262e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/external/ExternalTableScanRuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/external/ExternalTableScanRuleTest.java @@ -44,7 +44,6 @@ public class ExternalTableScanRuleTest @Test public void testMatchesWhenExternalScanUnsupported() throws ValidationException { - final PlannerContext plannerContext = PlannerContext.create( "DUMMY", // The actual query isn't important for this test CalciteTests.createOperatorTable(), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java index c1e24ada343..199d259d92f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java @@ -32,6 +32,7 @@ import org.junit.Assert; import org.junit.Test; import javax.annotation.Nullable; + import java.util.List; import java.util.function.Consumer; @@ -328,7 +329,6 @@ public class DruidRelsTest EasyMock.expect(mockPartialQuery.getWhereFilter()).andReturn(whereFilter).anyTimes(); final RelOptTable mockRelOptTable = EasyMock.mock(RelOptTable.class); - EasyMock.expect(mockRelOptTable.unwrap(DruidTable.class)).andReturn(druidTable).anyTimes(); final T mockRel = EasyMock.mock(clazz); EasyMock.expect(mockRel.getPartialDruidQuery()).andReturn(mockPartialQuery).anyTimes(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRuleTest.java index 8707b476631..9d6bad13bd5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRuleTest.java @@ -32,6 +32,8 @@ import org.apache.druid.sql.calcite.rel.DruidRel; import org.apache.druid.sql.calcite.rel.DruidRelsTest; import org.apache.druid.sql.calcite.rel.DruidUnionDataSourceRel; import org.apache.druid.sql.calcite.rel.PartialDruidQuery; +import org.apache.druid.sql.calcite.table.DatasourceTable; +import org.apache.druid.sql.calcite.table.DatasourceTable.PhysicalDatasourceMetadata; import org.apache.druid.sql.calcite.table.DruidTable; import org.easymock.EasyMock; import org.junit.Assert; @@ -43,16 +45,17 @@ import java.util.Optional; public class DruidUnionDataSourceRuleTest { - private final DruidTable fooDruidTable = new DruidTable( - new TableDataSource("foo"), - RowSignature.builder() - .addTimeColumn() - .add("col1", ColumnType.STRING) - .add("col2", ColumnType.LONG) - .build(), - null, - false, - false + private final DruidTable fooDruidTable = new DatasourceTable( + new PhysicalDatasourceMetadata( + new TableDataSource("foo"), + RowSignature.builder() + .addTimeColumn() + .add("col1", ColumnType.STRING) + .add("col2", ColumnType.LONG) + .build(), + false, + false + ) ); @Test diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java index 3f2c36f7b82..6c9226ebb38 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java @@ -49,7 +49,7 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase { try (final Closer closer = Closer.create()) { final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer); - final DruidSchema druidSchema = new DruidSchema( + final SegmentMetadataCache cache = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory( new SpecificSegmentsQuerySegmentWalker(conglomerate), conglomerate @@ -59,14 +59,14 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ); - druidSchema.start(); - druidSchema.awaitInitialization(); + cache.start(); + cache.awaitInitialization(); + final DruidSchema druidSchema = new DruidSchema(cache, null); - Assert.assertEquals(ImmutableMap.of(), druidSchema.getTableMap()); + Assert.assertEquals(ImmutableSet.of(), druidSchema.getTableNames()); } } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentDataCacheConcurrencyTest.java similarity index 87% rename from sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java rename to sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentDataCacheConcurrencyTest.java index 6beda7d0bd0..683be3b9495 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentDataCacheConcurrencyTest.java @@ -57,7 +57,7 @@ import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.NoopEscalator; -import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.timeline.DataSegment; @@ -71,6 +71,7 @@ import org.junit.Before; import org.junit.Test; import javax.annotation.Nullable; + import java.io.File; import java.util.ArrayList; import java.util.Collection; @@ -88,7 +89,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon +public class SegmentDataCacheConcurrencyTest extends SegmentMetadataCacheCommon { private static final String DATASOURCE = "datasource"; @@ -96,7 +97,7 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon private SpecificSegmentsQuerySegmentWalker walker; private TestServerInventoryView inventoryView; private BrokerServerView serverView; - private DruidSchema schema; + private SegmentMetadataCache schema; private ExecutorService exec; @Before @@ -119,30 +120,33 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon } /** - * This tests the contention between 3 components, DruidSchema, InventoryView, and BrokerServerView. - * It first triggers refreshing DruidSchema. To mimic some heavy work done with {@link DruidSchema#lock}, - * {@link DruidSchema#buildDruidTable} is overriden to sleep before doing real work. While refreshing DruidSchema, - * more new segments are added to InventoryView, which triggers updates of BrokerServerView. Finally, while - * BrokerServerView is updated, {@link BrokerServerView#getTimeline} is continuously called to mimic user query + * This tests the contention between three components, {@link SegmentMetadataCache}, + * {@code InventoryView}, and {@link BrokerServerView}. It first triggers + * refreshing {@code SegmentMetadataCache}. To mimic some heavy work done with + * {@link SegmentMetadataCache#lock}, {@link SegmentMetadataCache#buildDruidTable} + * is overridden to sleep before doing real work. While refreshing + * {@code SegmentMetadataCache}, more new segments are added to + * {@code InventoryView}, which triggers updates of {@code BrokerServerView}. + * Finally, while {@code BrokerServerView} is updated, + * {@link BrokerServerView#getTimeline} is continuously called to mimic user query * processing. All these calls must return without heavy contention. */ @Test(timeout = 30000L) - public void testDruidSchemaRefreshAndInventoryViewAddSegmentAndBrokerServerViewGetTimeline() + public void testSegmentMetadataRefreshAndInventoryViewAddSegmentAndBrokerServerViewGetTimeline() throws InterruptedException, ExecutionException, TimeoutException { - schema = new DruidSchema( + schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override - DruidTable buildDruidTable(final String dataSource) + DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(final String dataSource) { doInLock(() -> { try { @@ -194,8 +198,9 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon // Wait for all segments to be loaded in BrokerServerView Assert.assertTrue(segmentLoadLatch.await(5, TimeUnit.SECONDS)); - // Trigger refresh of DruidSchema. This will internally run the heavy work mimicked by the overriden buildDruidTable - Future refreshFuture = exec.submit(() -> { + // Trigger refresh of DruidSchema. This will internally run the heavy work + // mimicked by the overridden buildDruidTable + Future refreshFuture = exec.submit(() -> { schema.refresh( walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()), Sets.newHashSet(DATASOURCE) @@ -225,33 +230,36 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon } /** - * This tests the contention between 2 methods of DruidSchema, {@link DruidSchema#refresh} and - * {@link DruidSchema#getSegmentMetadataSnapshot()}. It first triggers refreshing DruidSchema. - * To mimic some heavy work done with {@link DruidSchema#lock}, {@link DruidSchema#buildDruidTable} is overriden - * to sleep before doing real work. While refreshing DruidSchema, getSegmentMetadataSnapshot() is continuously - * called to mimic reading the segments table of SystemSchema. All these calls must return without heavy contention. + * This tests the contention between two methods of {@link SegmentMetadataCache}: + * {@link SegmentMetadataCache#refresh} and + * {@link SegmentMetadataCache#getSegmentMetadataSnapshot()}. It first triggers + * refreshing {@code SegmentMetadataCache}. To mimic some heavy work done with + * {@link SegmentMetadataCache#lock}, {@link SegmentMetadataCache#buildDruidTable} + * is overridden to sleep before doing real work. While refreshing + * {@code SegmentMetadataCache}, {@code getSegmentMetadataSnapshot()} is continuously + * called to mimic reading the segments table of SystemSchema. All these calls + * must return without heavy contention. */ @Test(timeout = 30000L) - public void testDruidSchemaRefreshAndDruidSchemaGetSegmentMetadata() + public void testSegmentMetadataRefreshAndDruidSchemaGetSegmentMetadata() throws InterruptedException, ExecutionException, TimeoutException { - schema = new DruidSchema( + schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override - DruidTable buildDruidTable(final String dataSource) + DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(final String dataSource) { doInLock(() -> { try { - // Mimic some heavy work done in lock in DruidSchema + // Mimic some heavy work done in lock in SegmentMetadataCache Thread.sleep(5000); } catch (InterruptedException e) { @@ -299,8 +307,9 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon // Wait for all segments to be loaded in BrokerServerView Assert.assertTrue(segmentLoadLatch.await(5, TimeUnit.SECONDS)); - // Trigger refresh of DruidSchema. This will internally run the heavy work mimicked by the overriden buildDruidTable - Future refreshFuture = exec.submit(() -> { + // Trigger refresh of SegmentMetadataCache. This will internally run the heavy work mimicked + // by the overridden buildDruidTable + Future refreshFuture = exec.submit(() -> { schema.refresh( walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()), Sets.newHashSet(DATASOURCE) @@ -438,6 +447,7 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon segmentCallbacks.forEach(pair -> pair.rhs.execute(() -> pair.lhs.segmentRemoved(server.getMetadata(), segment))); } + @SuppressWarnings("unused") private void removeServer(DruidServer server) { serverMap.remove(server.getName()); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTestCommon.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheCommon.java similarity index 98% rename from sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTestCommon.java rename to sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheCommon.java index 2511d972101..edfa22a1e8a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTestCommon.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheCommon.java @@ -50,7 +50,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; -public abstract class DruidSchemaTestCommon extends CalciteTestBase +public abstract class SegmentMetadataCacheCommon extends CalciteTestBase { static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java similarity index 91% rename from sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java rename to sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java index 36e575e2bdb..39029fa9a48 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java @@ -27,7 +27,6 @@ import com.google.common.collect.Sets; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.ImmutableDruidServer; @@ -59,6 +58,7 @@ import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AllowAllAuthenticator; import org.apache.druid.server.security.NoopEscalator; +import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; @@ -86,13 +86,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -public class DruidSchemaTest extends DruidSchemaTestCommon +public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon { private SpecificSegmentsQuerySegmentWalker walker; private TestServerInventoryView serverView; private List druidServers; - private DruidSchema schema; - private DruidSchema schema2; + private SegmentMetadataCache schema; + private SegmentMetadataCache schema2; private CountDownLatch buildTableLatch = new CountDownLatch(1); private CountDownLatch markDataSourceLatch = new CountDownLatch(1); private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper(); @@ -173,7 +173,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments); druidServers = serverView.getDruidServers(); - schema = new DruidSchema( + schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, @@ -183,14 +183,13 @@ public class DruidSchemaTest extends DruidSchemaTestCommon ), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override - protected DruidTable buildDruidTable(String dataSource) + protected DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(String dataSource) { - DruidTable table = super.buildDruidTable(dataSource); + DatasourceTable.PhysicalDatasourceMetadata table = super.buildDruidTable(dataSource); buildTableLatch.countDown(); return table; } @@ -203,7 +202,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon } }; - schema2 = new DruidSchema( + schema2 = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, @@ -213,17 +212,15 @@ public class DruidSchemaTest extends DruidSchemaTestCommon ), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { - boolean throwException = true; @Override - protected DruidTable buildDruidTable(String dataSource) + protected DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(String dataSource) { - DruidTable table = super.buildDruidTable(dataSource); + DatasourceTable.PhysicalDatasourceMetadata table = super.buildDruidTable(dataSource); buildTableLatch.countDown(); return table; } @@ -261,10 +258,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon @Test public void testGetTableMap() { - Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema.getTableNames()); + Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema.getDatasourceNames()); - final Map tableMap = schema.getTableMap(); - Assert.assertEquals(ImmutableSet.of("foo", "foo2"), tableMap.keySet()); + final Set tableNames = schema.getDatasourceNames(); + Assert.assertEquals(ImmutableSet.of("foo", "foo2"), tableNames); } @Test @@ -272,18 +269,15 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { schema2.start(); schema2.awaitInitialization(); - Map tableMap = schema2.getTableMap(); - Assert.assertEquals(2, tableMap.size()); - Assert.assertTrue(tableMap.containsKey("foo")); - Assert.assertTrue(tableMap.containsKey("foo2")); + Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema2.getDatasourceNames()); schema2.stop(); } - @Test public void testGetTableMapFoo() { - final DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); + final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo"); + final DruidTable fooTable = new DatasourceTable(fooDs); final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl()); final List fields = rowType.getFieldList(); @@ -311,7 +305,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon @Test public void testGetTableMapFoo2() { - final DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo2"); + final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo2"); + final DruidTable fooTable = new DatasourceTable(fooDs); final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl()); final List fields = rowType.getFieldList(); @@ -329,7 +324,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon /** * This tests that {@link AvailableSegmentMetadata#getNumRows()} is correct in case - * of multiple replicas i.e. when {@link DruidSchema#addSegment(DruidServerMetadata, DataSegment)} + * of multiple replicas i.e. when {@link SegmentMetadataCache#addSegment(DruidServerMetadata, DataSegment)} * is called more than once for same segment */ @Test @@ -366,7 +361,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon final ImmutableDruidServer server = pair.lhs; Assert.assertNotNull(server); final DruidServerMetadata druidServerMetadata = server.getMetadata(); - // invoke DruidSchema#addSegment on existingSegment + // invoke SegmentMetadataCache#addSegment on existingSegment schema.addSegment(druidServerMetadata, existingSegment); segmentsMetadata = schema.getSegmentMetadataSnapshot(); // get the only segment with datasource "foo2" @@ -399,7 +394,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon Assert.assertNotNull(segmentToRemove); schema.removeSegment(segmentToRemove); - // The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource + // The following line can cause NPE without segmentMetadata null check in + // SegmentMetadataCache#refreshSegmentsForDataSource schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size()); } @@ -421,7 +417,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon Assert.assertNotNull(segmentToRemove); schema.removeSegment(segmentToRemove); - // The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource + // The following line can cause NPE without segmentMetadata null check in + // SegmentMetadataCache#refreshSegmentsForDataSource schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size()); } @@ -485,15 +482,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { String datasource = "newSegmentAddTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -528,15 +524,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { String datasource = "newSegmentAddTest"; CountDownLatch addSegmentLatch = new CountDownLatch(2); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -575,15 +570,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { String datasource = "newSegmentAddTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -619,15 +613,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { String datasource = "newSegmentAddTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -660,15 +653,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon String datasource = "segmentRemoveTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); CountDownLatch removeSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -709,7 +701,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segment.getId())); Assert.assertFalse(schema.getMutableSegments().contains(segment.getId())); Assert.assertFalse(schema.getDataSourcesNeedingRebuild().contains(datasource)); - Assert.assertFalse(schema.getTableNames().contains(datasource)); + Assert.assertFalse(schema.getDatasourceNames().contains(datasource)); } @Test @@ -718,15 +710,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon String datasource = "segmentRemoveTest"; CountDownLatch addSegmentLatch = new CountDownLatch(2); CountDownLatch removeSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -771,7 +762,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId())); Assert.assertFalse(schema.getMutableSegments().contains(segments.get(0).getId())); Assert.assertTrue(schema.getDataSourcesNeedingRebuild().contains(datasource)); - Assert.assertTrue(schema.getTableNames().contains(datasource)); + Assert.assertTrue(schema.getDatasourceNames().contains(datasource)); } @Test @@ -779,15 +770,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { String datasource = "serverSegmentRemoveTest"; CountDownLatch removeServerSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -814,15 +804,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon String datasource = "serverSegmentRemoveTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); CountDownLatch removeServerSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -862,15 +851,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon String datasource = "serverSegmentRemoveTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); CountDownLatch removeServerSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -917,10 +905,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon @Test public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws InterruptedException { - DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); + DatasourceTable.PhysicalDatasourceMetadata fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); - Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); - Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.isJoinable()); Assert.assertFalse(fooTable.isBroadcast()); @@ -949,10 +937,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); - fooTable = (DruidTable) schema.getTableMap().get("foo"); + fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); - Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); - Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof GlobalTableDataSource); Assert.assertTrue(fooTable.isJoinable()); Assert.assertTrue(fooTable.isBroadcast()); @@ -970,10 +958,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); - fooTable = (DruidTable) schema.getTableMap().get("foo"); + fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); - Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); - Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.isJoinable()); Assert.assertFalse(fooTable.isBroadcast()); } @@ -981,10 +969,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon @Test public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throws InterruptedException { - DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); + DatasourceTable.PhysicalDatasourceMetadata fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); - Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); - Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.isJoinable()); Assert.assertFalse(fooTable.isBroadcast()); @@ -1013,12 +1001,12 @@ public class DruidSchemaTest extends DruidSchemaTestCommon // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); - fooTable = (DruidTable) schema.getTableMap().get("foo"); + fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); - Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); // should not be a GlobalTableDataSource for now, because isGlobal is couple with joinability. idealy this will be // changed in the future and we should expect - Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource); Assert.assertTrue(fooTable.isBroadcast()); Assert.assertFalse(fooTable.isJoinable()); @@ -1035,10 +1023,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); - fooTable = (DruidTable) schema.getTableMap().get("foo"); + fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); - Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); - Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.isBroadcast()); Assert.assertFalse(fooTable.isJoinable()); } @@ -1082,7 +1070,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class); // Need to create schema for this test because the available schemas don't mock the QueryLifecycleFactory, which I need for this test. - DruidSchema mySchema = new DruidSchema( + SegmentMetadataCache mySchema = new SegmentMetadataCache( factoryMock, serverView, segmentManager, @@ -1092,8 +1080,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon ), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - brokerInternalQueryConfig, - null + brokerInternalQueryConfig ); EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); @@ -1129,7 +1116,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon new ColumnAnalysis(ColumnType.DOUBLE, ColumnType.DOUBLE.asTypeString(), false, true, 1234, 26, null, null, null) ); - RowSignature signature = DruidSchema.analysisToRowSignature( + RowSignature signature = SegmentMetadataCache.analysisToRowSignature( new SegmentAnalysis( "id", ImmutableList.of(Intervals.utc(1L, 2L)), @@ -1157,7 +1144,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon @Test public void testSegmentMetadataFallbackType() { - RowSignature signature = DruidSchema.analysisToRowSignature( + RowSignature signature = SegmentMetadataCache.analysisToRowSignature( new SegmentAnalysis( "id", ImmutableList.of(Intervals.utc(1L, 2L)), @@ -1209,9 +1196,9 @@ public class DruidSchemaTest extends DruidSchemaTestCommon Set segments = new HashSet<>(); Set datasources = new HashSet<>(); datasources.add("wat"); - Assert.assertNull(schema.getTable("wat")); + Assert.assertNull(schema.getDatasource("wat")); schema.refresh(segments, datasources); - Assert.assertNull(schema.getTable("wat")); + Assert.assertNull(schema.getDatasource("wat")); } private static DataSegment newSegment(String datasource, int partitionId) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 4b76fbf8e21..a9ce9d987fb 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -245,18 +245,18 @@ public class SystemSchemaTest extends CalciteTestBase .add(segment2, index2) .add(segment3, index3); - druidSchema = new DruidSchema( + SegmentMetadataCache cache = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments(), realtimeSegments), new SegmentManager(EasyMock.createMock(SegmentLoader.class)), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ); - druidSchema.start(); - druidSchema.awaitInitialization(); + cache.start(); + cache.awaitInitialization(); + druidSchema = new DruidSchema(cache, null); metadataView = EasyMock.createMock(MetadataSegmentView.class); druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); serverInventoryView = EasyMock.createMock(FilteredServerInventoryView.class); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 29de594350a..b4bb127ffe8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -130,6 +130,7 @@ import org.apache.druid.sql.calcite.schema.NamedSchema; import org.apache.druid.sql.calcite.schema.NamedSystemSchema; import org.apache.druid.sql.calcite.schema.NamedViewSchema; import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager; +import org.apache.druid.sql.calcite.schema.SegmentMetadataCache; import org.apache.druid.sql.calcite.schema.SystemSchema; import org.apache.druid.sql.calcite.schema.ViewSchema; import org.apache.druid.sql.calcite.view.DruidViewMacroFactory; @@ -1228,34 +1229,33 @@ public class CalciteTests final DruidSchemaManager druidSchemaManager ) { - final DruidSchema schema = new DruidSchema( - CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + final SegmentMetadataCache cache = new SegmentMetadataCache( + createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments()), new SegmentManager(EasyMock.createMock(SegmentLoader.class)) { @Override public Set getDataSourceNames() { - return ImmutableSet.of(BROADCAST_DATASOURCE); + return ImmutableSet.of(CalciteTests.BROADCAST_DATASOURCE); } }, createDefaultJoinableFactory(), plannerConfig, - TEST_AUTHENTICATOR_ESCALATOR, - new BrokerInternalQueryConfig(), - druidSchemaManager + CalciteTests.TEST_AUTHENTICATOR_ESCALATOR, + new BrokerInternalQueryConfig() ); try { - schema.start(); - schema.awaitInitialization(); + cache.start(); + cache.awaitInitialization(); } catch (InterruptedException e) { throw new RuntimeException(e); } - schema.stop(); - return schema; + cache.stop(); + return new DruidSchema(cache, druidSchemaManager); } /**