From 8ad8582dc85721e920aa57dcada2803c65fd6294 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Tue, 9 Aug 2022 21:54:04 -0700 Subject: [PATCH] Refactor DruidSchema & DruidTable (#12835) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactors the DruidSchema and DruidTable abstractions to prepare for the Druid Catalog. As we add the catalog, we’ll want to combine physical segment metadata information with “hints” provided by the catalog. This is best done if we tidy up the existing code to more clearly separate responsibilities. This PR is purely a refactoring move: no functionality changed. There is no difference to user functionality or external APIs. Functionality changes will come later as we add the catalog itself. DruidSchema In the present code, DruidSchema does three tasks: Holds the segment metadata cache Interfaces with an external schema manager Acts as a schema to Calcite This PR splits those responsibilities. DruidSchema holds the Calcite schema for the druid namespace, combining information fro the segment metadata cache, from the external schema manager and (later) from the catalog. SegmentMetadataCache holds the segment metadata cache formerly in DruidSchema. DruidTable The present DruidTable class is a bit of a kitchen sink: it holds all the various kinds of tables which Druid supports, and uses if-statements to handle behavior that differs between types. Yet, any given DruidTable will handle only one such table type. To more clearly model the actual table types, we split DruidTable into several classes: DruidTable becomes an abstract base class to hold Druid-specific methods. DatasourceTable represents a datasource. ExternalTable represents an external table, such as from EXTERN or (later) from the catalog. InlineTable represents the internal case in which we attach data directly to a table. LookupTable represents Druid’s lookup table mechanism. The new subclasses are more focused: they can be selective about the data they hold and the various predicates since they represent just one table type. This will be important as the catalog information will differ depending on table type and the new structure makes adding that logic cleaner. DatasourceMetadata Previously, the DruidSchema segment cache would work with DruidTable objects. With the catalog, we need a layer between the segment metadata and the table as presented to Calcite. To fix this, the new SegmentMetadataCache class uses a new DatasourceMetadata class as its cache entry to hold only the “physical” segment metadata information: it is up to the DruidTable to combine this with the catalog information in a later PR. More Efficient Table Resolution Calcite provides a convenient base class for schema objects: AbstractSchema. However, this class is a bit too convenient: all we have to do is provide a map of tables and Calcite does the rest. This means that, to resolve any single datasource, say, foo, we need to cache segment metadata, external schema information, and catalog information for all tables. Just so Calcite can do a map lookup. There is nothing special about AbstractSchema. We can handle table lookups ourselves. The new AbstractTableSchema does this. In fact, all the rest of Calcite wants is to resolve individual tables by name, and, for commands we don’t use, to provide a list of table names. DruidSchema now extends AbstractTableSchema. SegmentMetadataCache resolves individual tables (and provides table names.) DruidSchemaManager DruidSchemaManager provides a way to specify table schemas externally. In this sense, it is similar to the catalog, but only for datasources. It originally followed the AbstractSchema pattern: it implements provide a map of tables. This PR provides new optional methods for the table lookup and table names operations. The default implementations work the same way that AbstractSchema works: we get the entire map and pick out the information we need. Extensions that use this API should be revised to support the individual operations instead. Druid code no longer calls the original getTables() method. The PR has one breaking change: since the DruidSchemaManager map is read-only to the rest of Druid, we should return a Map, not a ConcurrentMap. --- ...ruidSchemaInternRowSignatureBenchmark.java | 20 +- .../input/impl/HttpInputSourceConfigTest.java | 1 - .../java/org/apache/druid/cli/CliRouter.java | 4 +- .../calcite/external/ExternalTableMacro.java | 11 +- .../calcite/external/ExternalTableScan.java | 12 +- .../druid/sql/calcite/rel/DruidQueryRel.java | 1 + .../calcite/rule/DruidLogicalValuesRule.java | 11 +- .../calcite/schema/AbstractTableSchema.java | 104 ++ .../schema/DruidCalciteSchemaModule.java | 5 +- .../druid/sql/calcite/schema/DruidSchema.java | 946 +---------------- .../calcite/schema/DruidSchemaManager.java | 35 +- .../sql/calcite/schema/InformationSchema.java | 1 + .../sql/calcite/schema/LookupSchema.java | 9 +- .../schema/NoopDruidSchemaManager.java | 3 +- .../calcite/schema/SegmentMetadataCache.java | 971 ++++++++++++++++++ .../sql/calcite/schema/SystemSchema.java | 6 +- .../sql/calcite/table/DatasourceTable.java | 189 ++++ .../druid/sql/calcite/table/DruidTable.java | 106 +- .../sql/calcite/table/ExternalTable.java | 98 ++ .../druid/sql/calcite/table/InlineTable.java | 82 ++ .../druid/sql/calcite/table/LookupTable.java | 79 ++ .../expression/ExpressionTestHelper.java | 1 + .../external/ExternalTableScanRuleTest.java | 1 - .../druid/sql/calcite/rel/DruidRelsTest.java | 2 +- .../rule/DruidUnionDataSourceRuleTest.java | 23 +- .../schema/DruidSchemaNoDataInitTest.java | 12 +- ...a => SegmentDataCacheConcurrencyTest.java} | 66 +- ...n.java => SegmentMetadataCacheCommon.java} | 2 +- ...est.java => SegmentMetadataCacheTest.java} | 153 ++- .../sql/calcite/schema/SystemSchemaTest.java | 10 +- .../druid/sql/calcite/util/CalciteTests.java | 20 +- 31 files changed, 1774 insertions(+), 1210 deletions(-) create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/schema/AbstractTableSchema.java create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/table/InlineTable.java create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/table/LookupTable.java rename sql/src/test/java/org/apache/druid/sql/calcite/schema/{DruidSchemaConcurrencyTest.java => SegmentDataCacheConcurrencyTest.java} (87%) rename sql/src/test/java/org/apache/druid/sql/calcite/schema/{DruidSchemaTestCommon.java => SegmentMetadataCacheCommon.java} (98%) rename sql/src/test/java/org/apache/druid/sql/calcite/schema/{DruidSchemaTest.java => SegmentMetadataCacheTest.java} (91%) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java index c9b46d678e5..049b78f0328 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java @@ -37,7 +37,7 @@ import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.Escalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; -import org.apache.druid.sql.calcite.schema.DruidSchema; +import org.apache.druid.sql.calcite.schema.SegmentMetadataCache; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; @@ -67,12 +67,11 @@ import java.util.concurrent.TimeUnit; @Measurement(iterations = 10) public class DruidSchemaInternRowSignatureBenchmark { + private SegmentMetadataCacheForBenchmark cache; - private DruidSchemaForBenchmark druidSchema; - - private static class DruidSchemaForBenchmark extends DruidSchema + private static class SegmentMetadataCacheForBenchmark extends SegmentMetadataCache { - public DruidSchemaForBenchmark( + public SegmentMetadataCacheForBenchmark( final QueryLifecycleFactory queryLifecycleFactory, final TimelineServerView serverView, final SegmentManager segmentManager, @@ -89,8 +88,7 @@ public class DruidSchemaInternRowSignatureBenchmark joinableFactory, config, escalator, - brokerInternalQueryConfig, - null + brokerInternalQueryConfig ); } @@ -101,7 +99,6 @@ public class DruidSchemaInternRowSignatureBenchmark return super.refreshSegments(segments); } - @Override public void addSegment(final DruidServerMetadata server, final DataSegment segment) { @@ -173,8 +170,7 @@ public class DruidSchemaInternRowSignatureBenchmark @Setup public void setup() { - - druidSchema = new DruidSchemaForBenchmark( + cache = new SegmentMetadataCacheForBenchmark( EasyMock.mock(QueryLifecycleFactory.class), EasyMock.mock(TimelineServerView.class), null, @@ -202,7 +198,7 @@ public class DruidSchemaInternRowSignatureBenchmark for (int i = 0; i < 10000; ++i) { DataSegment dataSegment = builder.interval(Intervals.of(i + "/" + (i + 1))) .build(); - druidSchema.addSegment(serverMetadata, dataSegment); + cache.addSegment(serverMetadata, dataSegment); } } @@ -211,6 +207,6 @@ public class DruidSchemaInternRowSignatureBenchmark @OutputTimeUnit(TimeUnit.MICROSECONDS) public void addSegments(MyState state, Blackhole blackhole) throws IOException { - blackhole.consume(druidSchema.refreshSegments(state.segmentIds)); + blackhole.consume(cache.refreshSegments(state.segmentIds)); } } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java index a3f24b9551e..7f88fc7bf62 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java @@ -26,7 +26,6 @@ import org.junit.Test; public class HttpInputSourceConfigTest { - @Test public void testEquals() { diff --git a/services/src/main/java/org/apache/druid/cli/CliRouter.java b/services/src/main/java/org/apache/druid/cli/CliRouter.java index d3fcc3e4223..af0ca7c6eb2 100644 --- a/services/src/main/java/org/apache/druid/cli/CliRouter.java +++ b/services/src/main/java/org/apache/druid/cli/CliRouter.java @@ -66,8 +66,8 @@ import java.util.Set; */ @Command( name = "router", - description = "Experimental! Understands tiers and routes things to different brokers, " - + "see https://druid.apache.org/docs/latest/development/router.html for a description" + description = "Understands tiers and routes requests to Druid nodes. " + + "See https://druid.apache.org/docs/latest/design/router.html" ) public class CliRouter extends ServerRunnable { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java index 57c84ef73ab..23c0b1d9c40 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.sql.calcite.table.ExternalTable; import java.util.List; import java.util.Optional; @@ -73,12 +74,10 @@ public class ExternalTableMacro implements TableMacro + "Please change the column name to something other than __time"); } - return new DruidTable( - new ExternalDataSource(inputSource, inputFormat, signature), - signature, - jsonMapper, - false, - false + return new ExternalTable( + new ExternalDataSource(inputSource, inputFormat, signature), + signature, + jsonMapper ); } catch (JsonProcessingException e) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableScan.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableScan.java index d01d5784e0e..70b3ac270dd 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableScan.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableScan.java @@ -27,22 +27,24 @@ import org.apache.calcite.rel.AbstractRelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.type.RelDataType; import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.sql.calcite.table.ExternalTable; /** - * Represents a scan of an external table. Generated by {@link DruidTable} when its datasource is an - * {@link ExternalDataSource}. + * Represents a scan of an external table. Generated by {@link ExternalTable}, + * which represents an {@link ExternalDataSource}. * - * This class is exercised in CalciteInsertDmlTest but is not currently exposed to end users. + * This class is exercised in CalciteInsertDmlTest but is not currently + * exposed to end users. */ public class ExternalTableScan extends AbstractRelNode { private final ObjectMapper jsonMapper; - private final DruidTable druidTable; + private final ExternalTable druidTable; public ExternalTableScan( final RelOptCluster cluster, final ObjectMapper jsonMapper, - final DruidTable druidTable + final ExternalTable druidTable ) { super(cluster, cluster.traitSetOf(Convention.NONE)); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java index 373e3d4abff..3e5c72a71fd 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java @@ -37,6 +37,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.table.DruidTable; import javax.annotation.Nullable; + import java.util.Set; /** diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidLogicalValuesRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidLogicalValuesRule.java index aae2148bb4c..a4660ba853c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidLogicalValuesRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidLogicalValuesRule.java @@ -32,9 +32,11 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.planner.UnsupportedSQLQueryException; import org.apache.druid.sql.calcite.rel.DruidQueryRel; import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.sql.calcite.table.InlineTable; import org.apache.druid.sql.calcite.table.RowSignatures; import javax.annotation.Nullable; + import java.util.List; import java.util.stream.Collectors; @@ -76,12 +78,9 @@ public class DruidLogicalValuesRule extends RelOptRule values.getRowType().getFieldNames(), values.getRowType() ); - final DruidTable druidTable = new DruidTable( - InlineDataSource.fromIterable(objectTuples, rowSignature), - rowSignature, - null, - true, - false + final DruidTable druidTable = new InlineTable( + InlineDataSource.fromIterable(objectTuples, rowSignature), + rowSignature ); call.transformTo( DruidQueryRel.scanValues(values, druidTable, plannerContext) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/AbstractTableSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AbstractTableSchema.java new file mode 100644 index 00000000000..cd4e5a2a00c --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AbstractTableSchema.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.google.common.collect.ImmutableSet; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; + +/** + * Calcite provides two "schema" abstractions. {@link Schema} is an interface, + * while {@link org.apache.calcite.schema.impl.AbstractSchema} is a base class + * that "locks down" the {@link #getTable} method to obtain the table from a + * map. This forces the extensions of that class to materialize all tables in + * the schema, so that Calcite can pick the one it wants. This class, by + * contrast, provides the same defaults as {@link + * org.apache.calcite.schema.impl.AbstractSchema AbstractSchema}, but assumes + * its subslasses will implement {@code getTable()} to directly look up that + * one table, ignoring all others. Doing so lowers the cost of table resolution, + * especially when the system has to fetch catalog information for the table: + * we only fetch the information we need, not information for all tables. + */ +public abstract class AbstractTableSchema implements Schema +{ + protected static final Set EMPTY_NAMES = ImmutableSet.of(); + + @Override + public RelProtoDataType getType(String name) + { + return null; + } + + @Override + public Set getTypeNames() + { + return EMPTY_NAMES; + } + + @Override + public Collection getFunctions(String name) + { + return Collections.emptyList(); + } + + @Override + public Set getFunctionNames() + { + return EMPTY_NAMES; + } + + @Override + public Schema getSubSchema(String name) + { + return null; + } + + @Override + public Set getSubSchemaNames() + { + return EMPTY_NAMES; + } + + @Override + public Expression getExpression(SchemaPlus parentSchema, String name) + { + return null; + } + + @Override + public boolean isMutable() + { + return false; + } + + @Override + public Schema snapshot(SchemaVersion version) + { + return this; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModule.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModule.java index 0a8de292271..cc1371315ea 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModule.java @@ -49,8 +49,9 @@ public class DruidCalciteSchemaModule implements Module .toProvider(RootSchemaProvider.class) .in(Scopes.SINGLETON); - // DruidSchema needs to listen to changes for incoming segments - LifecycleModule.register(binder, DruidSchema.class); + // SegmentMetadataCache needs to listen to changes for incoming segments + LifecycleModule.register(binder, SegmentMetadataCache.class); + binder.bind(DruidSchema.class).in(Scopes.SINGLETON); binder.bind(SystemSchema.class).in(Scopes.SINGLETON); binder.bind(InformationSchema.class).in(Scopes.SINGLETON); binder.bind(LookupSchema.class).in(Scopes.SINGLETON); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 898489cce39..ec0b9cb2c11 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -19,952 +19,54 @@ package org.apache.druid.sql.calcite.schema; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.errorprone.annotations.concurrent.GuardedBy; -import com.google.inject.Inject; import org.apache.calcite.schema.Table; -import org.apache.calcite.schema.impl.AbstractSchema; -import org.apache.druid.client.BrokerInternalQueryConfig; -import org.apache.druid.client.ServerView; -import org.apache.druid.client.TimelineServerView; -import org.apache.druid.guice.ManageLifecycle; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.guava.Sequence; -import org.apache.druid.java.util.common.guava.Yielder; -import org.apache.druid.java.util.common.guava.Yielders; -import org.apache.druid.java.util.common.lifecycle.LifecycleStart; -import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.query.GlobalTableDataSource; -import org.apache.druid.query.TableDataSource; -import org.apache.druid.query.metadata.metadata.AllColumnIncluderator; -import org.apache.druid.query.metadata.metadata.ColumnAnalysis; -import org.apache.druid.query.metadata.metadata.SegmentAnalysis; -import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; -import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; -import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.segment.join.JoinableFactory; -import org.apache.druid.server.QueryLifecycleFactory; -import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.coordination.DruidServerMetadata; -import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.security.Access; -import org.apache.druid.server.security.Escalator; -import org.apache.druid.sql.calcite.planner.PlannerConfig; -import org.apache.druid.sql.calcite.table.DruidTable; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; +import org.apache.druid.sql.calcite.table.DatasourceTable; + +import javax.inject.Inject; -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; -@ManageLifecycle -public class DruidSchema extends AbstractSchema +public class DruidSchema extends AbstractTableSchema { - // Newest segments first, so they override older ones. - private static final Comparator SEGMENT_ORDER = Comparator - .comparing((SegmentId segmentId) -> segmentId.getInterval().getStart()) - .reversed() - .thenComparing(Function.identity()); - - private static final EmittingLogger log = new EmittingLogger(DruidSchema.class); - private static final int MAX_SEGMENTS_PER_QUERY = 15000; - private static final long DEFAULT_NUM_ROWS = 0; - - private final QueryLifecycleFactory queryLifecycleFactory; - private final PlannerConfig config; - // Escalator, so we can attach an authentication result to queries we generate. - private final Escalator escalator; - private final SegmentManager segmentManager; - private final JoinableFactory joinableFactory; - private final ExecutorService cacheExec; - private final ExecutorService callbackExec; + private final SegmentMetadataCache segmentCache; private final DruidSchemaManager druidSchemaManager; - /** - * Map of DataSource -> DruidTable. - * This map can be accessed by {@link #cacheExec} and {@link #callbackExec} threads. - */ - private final ConcurrentMap tables = new ConcurrentHashMap<>(); - - private static final Interner ROW_SIGNATURE_INTERNER = Interners.newWeakInterner(); - - /** - * DataSource -> Segment -> AvailableSegmentMetadata(contains RowSignature) for that segment. - * Use SortedMap for segments so they are merged in deterministic order, from older to newer. - * - * This map is updated by these two threads. - * - * - {@link #callbackExec} can update it in {@link #addSegment}, {@link #removeServerSegment}, - * and {@link #removeSegment}. - * - {@link #cacheExec} can update it in {@link #refreshSegmentsForDataSource}. - * - * While it is being updated, this map is read by these two types of thread. - * - * - {@link #cacheExec} can iterate all {@link AvailableSegmentMetadata}s per datasource. - * See {@link #buildDruidTable}. - * - Query threads can create a snapshot of the entire map for processing queries on the system table. - * See {@link #getSegmentMetadataSnapshot()}. - * - * As the access pattern of this map is read-intensive, we should minimize the contention between writers and readers. - * Since there are two threads that can update this map at the same time, those writers should lock the inner map - * first and then lock the entry before it updates segment metadata. This can be done using - * {@link ConcurrentMap#compute} as below. Note that, if you need to update the variables guarded by {@link #lock} - * inside of compute(), you should get the lock before calling compute() to keep the function executed in compute() - * not expensive. - * - *
-   *   segmentMedataInfo.compute(
-   *     datasourceParam,
-   *     (datasource, segmentsMap) -> {
-   *       if (segmentsMap == null) return null;
-   *       else {
-   *         segmentsMap.compute(
-   *           segmentIdParam,
-   *           (segmentId, segmentMetadata) -> {
-   *             // update segmentMetadata
-   *           }
-   *         );
-   *         return segmentsMap;
-   *       }
-   *     }
-   *   );
-   * 
- * - * Readers can simply delegate the locking to the concurrent map and iterate map entries. - */ - private final ConcurrentHashMap> segmentMetadataInfo - = new ConcurrentHashMap<>(); - - // For awaitInitialization. - private final CountDownLatch initialized = new CountDownLatch(1); - - /** - * This lock coordinates the access from multiple threads to those variables guarded by this lock. - * Currently, there are 2 threads that can access these variables. - * - * - {@link #callbackExec} executes the timeline callbacks whenever BrokerServerView changes. - * - {@link #cacheExec} periodically refreshes segment metadata and {@link DruidTable} if necessary - * based on the information collected via timeline callbacks. - */ - private final Object lock = new Object(); - - // All mutable segments. - @GuardedBy("lock") - private final TreeSet mutableSegments = new TreeSet<>(SEGMENT_ORDER); - - // All dataSources that need tables regenerated. - @GuardedBy("lock") - private final Set dataSourcesNeedingRebuild = new HashSet<>(); - - // All segments that need to be refreshed. - @GuardedBy("lock") - private final TreeSet segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER); - - // Configured context to attach to internally generated queries. - private final BrokerInternalQueryConfig brokerInternalQueryConfig; - - @GuardedBy("lock") - private boolean refreshImmediately = false; - - @GuardedBy("lock") - private boolean isServerViewInitialized = false; - - /** - * Counts the total number of known segments. This variable is used only for the segments table in the system schema - * to initialize a map with a more proper size when it creates a snapshot. As a result, it doesn't have to be exact, - * and thus there is no concurrency control for this variable. - */ - private int totalSegments = 0; - @Inject public DruidSchema( - final QueryLifecycleFactory queryLifecycleFactory, - final TimelineServerView serverView, - final SegmentManager segmentManager, - final JoinableFactory joinableFactory, - final PlannerConfig config, - final Escalator escalator, - final BrokerInternalQueryConfig brokerInternalQueryConfig, - final DruidSchemaManager druidSchemaManager - ) + SegmentMetadataCache segmentCache, + final DruidSchemaManager druidSchemaManager) { - this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory"); - Preconditions.checkNotNull(serverView, "serverView"); - this.segmentManager = segmentManager; - this.joinableFactory = joinableFactory; - this.config = Preconditions.checkNotNull(config, "config"); - this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d"); - this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d"); - this.escalator = escalator; - this.brokerInternalQueryConfig = brokerInternalQueryConfig; - this.druidSchemaManager = druidSchemaManager; - - initServerViewTimelineCallback(serverView); - } - - private void initServerViewTimelineCallback(final TimelineServerView serverView) - { - serverView.registerTimelineCallback( - callbackExec, - new TimelineServerView.TimelineCallback() - { - @Override - public ServerView.CallbackAction timelineInitialized() - { - synchronized (lock) { - isServerViewInitialized = true; - lock.notifyAll(); - } - - return ServerView.CallbackAction.CONTINUE; - } - - @Override - public ServerView.CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment) - { - addSegment(server, segment); - return ServerView.CallbackAction.CONTINUE; - } - - @Override - public ServerView.CallbackAction segmentRemoved(final DataSegment segment) - { - removeSegment(segment); - return ServerView.CallbackAction.CONTINUE; - } - - @Override - public ServerView.CallbackAction serverSegmentRemoved( - final DruidServerMetadata server, - final DataSegment segment - ) - { - removeServerSegment(server, segment); - return ServerView.CallbackAction.CONTINUE; - } - } - ); - } - - private void startCacheExec() - { - cacheExec.submit( - () -> { - long lastRefresh = 0L; - long lastFailure = 0L; - - try { - while (!Thread.currentThread().isInterrupted()) { - final Set segmentsToRefresh = new TreeSet<>(); - final Set dataSourcesToRebuild = new TreeSet<>(); - - try { - synchronized (lock) { - final long nextRefreshNoFuzz = DateTimes - .utc(lastRefresh) - .plus(config.getMetadataRefreshPeriod()) - .getMillis(); - - // Fuzz a bit to spread load out when we have multiple brokers. - final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10); - - while (true) { - // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure). - final boolean wasRecentFailure = DateTimes.utc(lastFailure) - .plus(config.getMetadataRefreshPeriod()) - .isAfterNow(); - - if (isServerViewInitialized && - !wasRecentFailure && - (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) && - (refreshImmediately || nextRefresh < System.currentTimeMillis())) { - // We need to do a refresh. Break out of the waiting loop. - break; - } - - // lastFailure != 0L means exceptions happened before and there're some refresh work was not completed. - // so that even ServerView is initialized, we can't let broker complete initialization. - if (isServerViewInitialized && lastFailure == 0L) { - // Server view is initialized, but we don't need to do a refresh. Could happen if there are - // no segments in the system yet. Just mark us as initialized, then. - initialized.countDown(); - } - - // Wait some more, we'll wake up when it might be time to do another refresh. - lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis())); - } - - segmentsToRefresh.addAll(segmentsNeedingRefresh); - segmentsNeedingRefresh.clear(); - - // Mutable segments need a refresh every period, since new columns could be added dynamically. - segmentsNeedingRefresh.addAll(mutableSegments); - - lastFailure = 0L; - lastRefresh = System.currentTimeMillis(); - refreshImmediately = false; - } - - refresh(segmentsToRefresh, dataSourcesToRebuild); - - initialized.countDown(); - } - catch (InterruptedException e) { - // Fall through. - throw e; - } - catch (Exception e) { - log.warn(e, "Metadata refresh failed, trying again soon."); - - synchronized (lock) { - // Add our segments and dataSources back to their refresh and rebuild lists. - segmentsNeedingRefresh.addAll(segmentsToRefresh); - dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild); - lastFailure = System.currentTimeMillis(); - } - } - } - } - catch (InterruptedException e) { - // Just exit. - } - catch (Throwable e) { - // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like - // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata. - log.makeAlert(e, "Metadata refresh failed permanently").emit(); - throw e; - } - finally { - log.info("Metadata refresh stopped."); - } - } - ); - } - - @LifecycleStart - public void start() throws InterruptedException - { - startCacheExec(); - - if (config.isAwaitInitializationOnStart()) { - final long startNanos = System.nanoTime(); - log.debug("%s waiting for initialization.", getClass().getSimpleName()); - awaitInitialization(); - log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), (System.nanoTime() - startNanos) / 1000000); + this.segmentCache = segmentCache; + if (druidSchemaManager != null && !(druidSchemaManager instanceof NoopDruidSchemaManager)) { + this.druidSchemaManager = druidSchemaManager; + } else { + this.druidSchemaManager = null; } } - @VisibleForTesting - void refresh(final Set segmentsToRefresh, final Set dataSourcesToRebuild) throws IOException + protected SegmentMetadataCache cache() { - // Refresh the segments. - final Set refreshed = refreshSegments(segmentsToRefresh); - - synchronized (lock) { - // Add missing segments back to the refresh list. - segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed)); - - // Compute the list of dataSources to rebuild tables for. - dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild); - refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource())); - dataSourcesNeedingRebuild.clear(); - } - - // Rebuild the dataSources. - for (String dataSource : dataSourcesToRebuild) { - final DruidTable druidTable = buildDruidTable(dataSource); - if (druidTable == null) { - log.info("dataSource[%s] no longer exists, all metadata removed.", dataSource); - tables.remove(dataSource); - continue; - } - final DruidTable oldTable = tables.put(dataSource, druidTable); - final String description = druidTable.getDataSource().isGlobal() ? "global dataSource" : "dataSource"; - if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { - log.info("%s [%s] has new signature: %s.", description, dataSource, druidTable.getRowSignature()); - } else { - log.debug("%s [%s] signature is unchanged.", description, dataSource); - } - } - } - - @LifecycleStop - public void stop() - { - cacheExec.shutdownNow(); - callbackExec.shutdownNow(); - } - - public void awaitInitialization() throws InterruptedException - { - initialized.await(); + return segmentCache; } @Override - protected Map getTableMap() + public Table getTable(String name) { - if (druidSchemaManager != null && !(druidSchemaManager instanceof NoopDruidSchemaManager)) { - return ImmutableMap.copyOf(druidSchemaManager.getTables()); + if (druidSchemaManager != null) { + return druidSchemaManager.getTable(name); } else { - return ImmutableMap.copyOf(tables); + DatasourceTable.PhysicalDatasourceMetadata dsMetadata = segmentCache.getDatasource(name); + return dsMetadata == null ? null : new DatasourceTable(dsMetadata); } } - @VisibleForTesting - protected void addSegment(final DruidServerMetadata server, final DataSegment segment) + @Override + public Set getTableNames() { - // Get lock first so that we won't wait in ConcurrentMap.compute(). - synchronized (lock) { - // someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking - // broker served segments in the timeline, to ensure that removeSegment the event is triggered accurately - if (server.getType().equals(ServerType.BROKER)) { - // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the - // historical, however mark the datasource for refresh because it needs to be globalized - markDataSourceAsNeedRebuild(segment.getDataSource()); - } else { - segmentMetadataInfo.compute( - segment.getDataSource(), - (datasource, segmentsMap) -> { - if (segmentsMap == null) { - segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER); - } - segmentsMap.compute( - segment.getId(), - (segmentId, segmentMetadata) -> { - if (segmentMetadata == null) { - // Unknown segment. - totalSegments++; - // segmentReplicatable is used to determine if segments are served by historical or realtime servers - long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1; - segmentMetadata = AvailableSegmentMetadata - .builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a refresh - .build(); - markSegmentAsNeedRefresh(segment.getId()); - if (!server.isSegmentReplicationTarget()) { - log.debug("Added new mutable segment[%s].", segment.getId()); - markSegmentAsMutable(segment.getId()); - } else { - log.debug("Added new immutable segment[%s].", segment.getId()); - } - } else { - // We know this segment. - final Set segmentServers = segmentMetadata.getReplicas(); - final ImmutableSet servers = new ImmutableSet.Builder() - .addAll(segmentServers) - .add(server) - .build(); - segmentMetadata = AvailableSegmentMetadata - .from(segmentMetadata) - .withReplicas(servers) - .withRealtime(recomputeIsRealtime(servers)) - .build(); - if (server.isSegmentReplicationTarget()) { - // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, - // even if it's also available on non-replicatable (realtime) servers. - unmarkSegmentAsMutable(segment.getId()); - log.debug("Segment[%s] has become immutable.", segment.getId()); - } - } - assert segmentMetadata != null; - return segmentMetadata; - } - ); - - return segmentsMap; - } - ); - } - if (!tables.containsKey(segment.getDataSource())) { - refreshImmediately = true; - } - - lock.notifyAll(); - } - } - - @VisibleForTesting - void removeSegment(final DataSegment segment) - { - // Get lock first so that we won't wait in ConcurrentMap.compute(). - synchronized (lock) { - log.debug("Segment[%s] is gone.", segment.getId()); - - segmentsNeedingRefresh.remove(segment.getId()); - unmarkSegmentAsMutable(segment.getId()); - - segmentMetadataInfo.compute( - segment.getDataSource(), - (dataSource, segmentsMap) -> { - if (segmentsMap == null) { - log.warn("Unknown segment[%s] was removed from the cluster. Ignoring this event.", segment.getId()); - return null; - } else { - if (segmentsMap.remove(segment.getId()) == null) { - log.warn("Unknown segment[%s] was removed from the cluster. Ignoring this event.", segment.getId()); - } else { - totalSegments--; - } - if (segmentsMap.isEmpty()) { - tables.remove(segment.getDataSource()); - log.info("dataSource[%s] no longer exists, all metadata removed.", segment.getDataSource()); - return null; - } else { - markDataSourceAsNeedRebuild(segment.getDataSource()); - return segmentsMap; - } - } - } - ); - - lock.notifyAll(); - } - } - - @VisibleForTesting - void removeServerSegment(final DruidServerMetadata server, final DataSegment segment) - { - // Get lock first so that we won't wait in ConcurrentMap.compute(). - synchronized (lock) { - log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); - segmentMetadataInfo.compute( - segment.getDataSource(), - (datasource, knownSegments) -> { - if (knownSegments == null) { - log.warn( - "Unknown segment[%s] is removed from server[%s]. Ignoring this event", - segment.getId(), - server.getHost() - ); - return null; - } - - if (server.getType().equals(ServerType.BROKER)) { - // for brokers, if the segment drops from all historicals before the broker this could be null. - if (!knownSegments.isEmpty()) { - // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the - // historical, however mark the datasource for refresh because it might no longer be broadcast or something - markDataSourceAsNeedRebuild(segment.getDataSource()); - } - } else { - knownSegments.compute( - segment.getId(), - (segmentId, segmentMetadata) -> { - if (segmentMetadata == null) { - log.warn( - "Unknown segment[%s] is removed from server[%s]. Ignoring this event", - segment.getId(), - server.getHost() - ); - return null; - } else { - final Set segmentServers = segmentMetadata.getReplicas(); - final ImmutableSet servers = FluentIterable - .from(segmentServers) - .filter(Predicates.not(Predicates.equalTo(server))) - .toSet(); - return AvailableSegmentMetadata - .from(segmentMetadata) - .withReplicas(servers) - .withRealtime(recomputeIsRealtime(servers)) - .build(); - } - } - ); - } - if (knownSegments.isEmpty()) { - return null; - } else { - return knownSegments; - } - } - ); - - lock.notifyAll(); - } - } - - private void markSegmentAsNeedRefresh(SegmentId segmentId) - { - synchronized (lock) { - segmentsNeedingRefresh.add(segmentId); - } - } - - private void markSegmentAsMutable(SegmentId segmentId) - { - synchronized (lock) { - mutableSegments.add(segmentId); - } - } - - private void unmarkSegmentAsMutable(SegmentId segmentId) - { - synchronized (lock) { - mutableSegments.remove(segmentId); - } - } - - @VisibleForTesting - void markDataSourceAsNeedRebuild(String datasource) - { - synchronized (lock) { - dataSourcesNeedingRebuild.add(datasource); - } - } - - /** - * Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed, - * which may be a subset of the asked-for set. - */ - @VisibleForTesting - protected Set refreshSegments(final Set segments) throws IOException - { - final Set retVal = new HashSet<>(); - - // Organize segments by dataSource. - final Map> segmentMap = new TreeMap<>(); - - for (SegmentId segmentId : segments) { - segmentMap.computeIfAbsent(segmentId.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER)) - .add(segmentId); - } - - for (Map.Entry> entry : segmentMap.entrySet()) { - final String dataSource = entry.getKey(); - retVal.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue())); - } - - return retVal; - } - - private long recomputeIsRealtime(ImmutableSet servers) - { - if (servers.isEmpty()) { - return 0; - } - final Optional historicalServer = servers - .stream() - // Ideally, this filter should have checked whether it's a broadcast segment loaded in brokers. - // However, we don't current track of the broadcast segments loaded in brokers, so this filter is still valid. - // See addSegment(), removeServerSegment(), and removeSegment() - .filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL)) - .findAny(); - - // if there is any historical server in the replicas, isRealtime flag should be unset - return historicalServer.isPresent() ? 0 : 1; - } - - /** - * Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of - * segments actually refreshed, which may be a subset of the asked-for set. - */ - private Set refreshSegmentsForDataSource(final String dataSource, final Set segments) - throws IOException - { - if (!segments.stream().allMatch(segmentId -> segmentId.getDataSource().equals(dataSource))) { - // Sanity check. We definitely expect this to pass. - throw new ISE("'segments' must all match 'dataSource'!"); - } - - log.debug("Refreshing metadata for dataSource[%s].", dataSource); - - final long startTime = System.currentTimeMillis(); - - // Segment id string -> SegmentId object. - final Map segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString); - - final Set retVal = new HashSet<>(); - final Sequence sequence = runSegmentMetadataQuery( - Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY) - ); - - Yielder yielder = Yielders.each(sequence); - - try { - while (!yielder.isDone()) { - final SegmentAnalysis analysis = yielder.get(); - final SegmentId segmentId = segmentIdMap.get(analysis.getId()); - - if (segmentId == null) { - log.warn("Got analysis for segment[%s] we didn't ask for, ignoring.", analysis.getId()); - } else { - final RowSignature rowSignature = analysisToRowSignature(analysis); - log.debug("Segment[%s] has signature[%s].", segmentId, rowSignature); - segmentMetadataInfo.compute( - dataSource, - (datasourceKey, dataSourceSegments) -> { - if (dataSourceSegments == null) { - // Datasource may have been removed or become unavailable while this refresh was ongoing. - log.warn( - "No segment map found with datasource[%s], skipping refresh of segment[%s]", - datasourceKey, - segmentId - ); - return null; - } else { - dataSourceSegments.compute( - segmentId, - (segmentIdKey, segmentMetadata) -> { - if (segmentMetadata == null) { - log.warn("No segment[%s] found, skipping refresh", segmentId); - return null; - } else { - final AvailableSegmentMetadata updatedSegmentMetadata = AvailableSegmentMetadata - .from(segmentMetadata) - .withRowSignature(rowSignature) - .withNumRows(analysis.getNumRows()) - .build(); - retVal.add(segmentId); - return updatedSegmentMetadata; - } - } - ); - - if (dataSourceSegments.isEmpty()) { - return null; - } else { - return dataSourceSegments; - } - } - } - ); - } - - yielder = yielder.next(null); - } - } - finally { - yielder.close(); - } - - log.debug( - "Refreshed metadata for dataSource[%s] in %,d ms (%d segments queried, %d segments left).", - dataSource, - System.currentTimeMillis() - startTime, - retVal.size(), - segments.size() - retVal.size() - ); - - return retVal; - } - - @VisibleForTesting - @Nullable - DruidTable buildDruidTable(final String dataSource) - { - ConcurrentSkipListMap segmentsMap = segmentMetadataInfo.get(dataSource); - - // Preserve order. - final Map columnTypes = new LinkedHashMap<>(); - - if (segmentsMap != null && !segmentsMap.isEmpty()) { - for (AvailableSegmentMetadata availableSegmentMetadata : segmentsMap.values()) { - final RowSignature rowSignature = availableSegmentMetadata.getRowSignature(); - if (rowSignature != null) { - for (String column : rowSignature.getColumnNames()) { - // Newer column types should override older ones. - final ColumnType columnType = - rowSignature.getColumnType(column) - .orElseThrow(() -> new ISE("Encountered null type for column[%s]", column)); - - columnTypes.putIfAbsent(column, columnType); - } - } - } + if (druidSchemaManager != null) { + return druidSchemaManager.getTableNames(); } else { - // table has no segments - return null; - } - - final RowSignature.Builder builder = RowSignature.builder(); - columnTypes.forEach(builder::add); - - final TableDataSource tableDataSource; - - // to be a GlobalTableDataSource instead of a TableDataSource, it must appear on all servers (inferred by existing - // in the segment cache, which in this case belongs to the broker meaning only broadcast segments live here) - // to be joinable, it must be possibly joinable according to the factory. we only consider broadcast datasources - // at this time, and isGlobal is currently strongly coupled with joinable, so only make a global table datasource - // if also joinable - final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource); - final boolean isJoinable = joinableFactory.isDirectlyJoinable(maybeGlobal); - final boolean isBroadcast = segmentManager.getDataSourceNames().contains(dataSource); - if (isBroadcast && isJoinable) { - tableDataSource = maybeGlobal; - } else { - tableDataSource = new TableDataSource(dataSource); - } - return new DruidTable(tableDataSource, builder.build(), null, isJoinable, isBroadcast); - } - - @VisibleForTesting - Map getSegmentMetadataSnapshot() - { - final Map segmentMetadata = Maps.newHashMapWithExpectedSize(totalSegments); - for (ConcurrentSkipListMap val : segmentMetadataInfo.values()) { - segmentMetadata.putAll(val); - } - return segmentMetadata; - } - - /** - * Returns total number of segments. This method doesn't use the lock intentionally to avoid expensive contention. - * As a result, the returned value might be inexact. - */ - int getTotalSegments() - { - return totalSegments; - } - - @VisibleForTesting - TreeSet getSegmentsNeedingRefresh() - { - synchronized (lock) { - return segmentsNeedingRefresh; - } - } - - @VisibleForTesting - TreeSet getMutableSegments() - { - synchronized (lock) { - return mutableSegments; - } - } - - @VisibleForTesting - Set getDataSourcesNeedingRebuild() - { - synchronized (lock) { - return dataSourcesNeedingRebuild; - } - } - - /** - * Execute a SegmentMetadata query and return a {@link Sequence} of {@link SegmentAnalysis}. - * - * @param segments Iterable of {@link SegmentId} objects that are subject of the SegmentMetadata query. - * @return {@link Sequence} of {@link SegmentAnalysis} objects - */ - @VisibleForTesting - protected Sequence runSegmentMetadataQuery( - final Iterable segments - ) - { - // Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource. - final String dataSource = Iterables.getOnlyElement( - StreamSupport.stream(segments.spliterator(), false) - .map(SegmentId::getDataSource).collect(Collectors.toSet()) - ); - - final MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec( - StreamSupport.stream(segments.spliterator(), false) - .map(SegmentId::toDescriptor).collect(Collectors.toList()) - ); - - final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery( - new TableDataSource(dataSource), - querySegmentSpec, - new AllColumnIncluderator(), - false, - brokerInternalQueryConfig.getContext(), - EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), - false, - false - ); - - return queryLifecycleFactory - .factorize() - .runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK); - } - - @VisibleForTesting - static RowSignature analysisToRowSignature(final SegmentAnalysis analysis) - { - final RowSignature.Builder rowSignatureBuilder = RowSignature.builder(); - for (Map.Entry entry : analysis.getColumns().entrySet()) { - if (entry.getValue().isError()) { - // Skip columns with analysis errors. - continue; - } - - ColumnType valueType = entry.getValue().getTypeSignature(); - - // this shouldn't happen, but if it does, first try to fall back to legacy type information field in case - // standard upgrade order was not followed for 0.22 to 0.23+, and if that also fails, then assume types are some - // flavor of COMPLEX. - if (valueType == null) { - // at some point in the future this can be simplified to the contents of the catch clause here, once the - // likelyhood of upgrading from some version lower than 0.23 is low - try { - valueType = ColumnType.fromString(entry.getValue().getType()); - } - catch (IllegalArgumentException ignored) { - valueType = ColumnType.UNKNOWN_COMPLEX; - } - } - - rowSignatureBuilder.add(entry.getKey(), valueType); - } - return ROW_SIGNATURE_INTERNER.intern(rowSignatureBuilder.build()); - } - - /** - * This method is not thread-safe and must be used only in unit tests. - */ - @VisibleForTesting - void setAvailableSegmentMetadata(final SegmentId segmentId, final AvailableSegmentMetadata availableSegmentMetadata) - { - final ConcurrentSkipListMap dataSourceSegments = segmentMetadataInfo - .computeIfAbsent( - segmentId.getDataSource(), - k -> new ConcurrentSkipListMap<>(SEGMENT_ORDER) - ); - if (dataSourceSegments.put(segmentId, availableSegmentMetadata) == null) { - totalSegments++; - } - } - - /** - * This is a helper method for unit tests to emulate heavy work done with {@link #lock}. - * It must be used only in unit tests. - */ - @VisibleForTesting - void doInLock(Runnable runnable) - { - synchronized (lock) { - runnable.run(); + return segmentCache.getDatasourceNames(); } } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchemaManager.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchemaManager.java index dd3e7380312..c203ab18825 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchemaManager.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchemaManager.java @@ -23,16 +23,41 @@ import org.apache.druid.guice.annotations.ExtensionPoint; import org.apache.druid.guice.annotations.UnstableApi; import org.apache.druid.sql.calcite.table.DruidTable; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; +import java.util.Set; /** - * This interface provides a map of datasource names to {@link DruidTable} objects, used by the {@link DruidSchema} - * class as the SQL planner's view of Druid datasource schemas. If a non-default implementation is provided, - * the segment metadata polling-based view of the Druid tables will not be built in DruidSchema. + * This interface provides a map of datasource names to {@link DruidTable} + * objects, used by the {@link DruidSchema} class as the SQL planner's + * view of Druid datasource schemas. If a non-default implementation is + * provided, the segment metadata polling-based view of the Druid tables + * will not be built in DruidSchema. */ @ExtensionPoint @UnstableApi public interface DruidSchemaManager { - ConcurrentMap getTables(); + /** + * Return all tables known to this schema manager. Deprecated because getting + * all tables is never actually needed in the current code. Calcite asks for + * the information for a single table (via {@link #getTable(String)}, or + * the list of all table names (via {@link #getTableNames()}. This + * method was originally used to allow Calcite's {@link + * org.apache.calcite.schema.impl.AbstractSchema AbstractSchema} class to do + * the lookup. The current code implements the operations directly. For + * backward compatibility, the default methods emulate the old behavior. + * Newer implementations should omit this method and implement the other two. + */ + @Deprecated + Map getTables(); + + default DruidTable getTable(String name) + { + return getTables().get(name); + } + + default Set getTableNames() + { + return getTables().keySet(); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java index e0f86f0afde..7f16d0ae9fa 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java @@ -59,6 +59,7 @@ import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.RowSignatures; import javax.annotation.Nullable; + import java.util.Collection; import java.util.Collections; import java.util.Map; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java index 1fea9116ff5..b8453ba551c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java @@ -28,7 +28,7 @@ import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.lookup.LookupColumnSelectorFactory; -import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.sql.calcite.table.LookupTable; import java.util.Map; @@ -57,11 +57,12 @@ public class LookupSchema extends AbstractSchema final ImmutableMap.Builder tableMapBuilder = ImmutableMap.builder(); for (final String lookupName : lookupProvider.getAllLookupNames()) { - // all lookups should be also joinable through lookup joinable factory, and lookups are effectively broadcast - // (if we ignore lookup tiers...) tableMapBuilder.put( lookupName, - new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE, null, true, true) + new LookupTable( + new LookupDataSource(lookupName), + ROW_SIGNATURE + ) ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/NoopDruidSchemaManager.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/NoopDruidSchemaManager.java index 4d62dfec6fe..3ea16e61ac9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/NoopDruidSchemaManager.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/NoopDruidSchemaManager.java @@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.schema; import org.apache.druid.sql.calcite.table.DruidTable; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -31,7 +32,7 @@ public class NoopDruidSchemaManager implements DruidSchemaManager private static ConcurrentMap MAP = new ConcurrentHashMap<>(); @Override - public ConcurrentMap getTables() + public Map getTables() { return MAP; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java new file mode 100644 index 00000000000..427108d9d83 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java @@ -0,0 +1,971 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.inject.Inject; +import org.apache.druid.client.BrokerInternalQueryConfig; +import org.apache.druid.client.ServerView; +import org.apache.druid.client.TimelineServerView; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.query.GlobalTableDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.metadata.metadata.AllColumnIncluderator; +import org.apache.druid.query.metadata.metadata.ColumnAnalysis; +import org.apache.druid.query.metadata.metadata.SegmentAnalysis; +import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.Escalator; +import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.table.DatasourceTable; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * Broker-side cache of segment metadata which combines segments to identify + * datasources which become "tables" in Calcite. This cache provides the "physical" + * metadata about a datasource which is blended with catalog "logical" metadata + * to provide the final user-view of each datasource. + */ +@ManageLifecycle +public class SegmentMetadataCache +{ + // Newest segments first, so they override older ones. + private static final Comparator SEGMENT_ORDER = Comparator + .comparing((SegmentId segmentId) -> segmentId.getInterval().getStart()) + .reversed() + .thenComparing(Function.identity()); + + private static final EmittingLogger log = new EmittingLogger(SegmentMetadataCache.class); + private static final int MAX_SEGMENTS_PER_QUERY = 15000; + private static final long DEFAULT_NUM_ROWS = 0; + + private final QueryLifecycleFactory queryLifecycleFactory; + private final PlannerConfig config; + // Escalator, so we can attach an authentication result to queries we generate. + private final Escalator escalator; + private final SegmentManager segmentManager; + private final JoinableFactory joinableFactory; + private final ExecutorService cacheExec; + private final ExecutorService callbackExec; + + /** + * Map of DataSource -> DruidTable. + * This map can be accessed by {@link #cacheExec} and {@link #callbackExec} threads. + */ + private final ConcurrentMap tables = new ConcurrentHashMap<>(); + + private static final Interner ROW_SIGNATURE_INTERNER = Interners.newWeakInterner(); + + /** + * DataSource -> Segment -> AvailableSegmentMetadata(contains RowSignature) for that segment. + * Use SortedMap for segments so they are merged in deterministic order, from older to newer. + * + * This map is updated by these two threads. + * + * - {@link #callbackExec} can update it in {@link #addSegment}, {@link #removeServerSegment}, + * and {@link #removeSegment}. + * - {@link #cacheExec} can update it in {@link #refreshSegmentsForDataSource}. + * + * While it is being updated, this map is read by these two types of thread. + * + * - {@link #cacheExec} can iterate all {@link AvailableSegmentMetadata}s per datasource. + * See {@link #buildDruidTable}. + * - Query threads can create a snapshot of the entire map for processing queries on the system table. + * See {@link #getSegmentMetadataSnapshot()}. + * + * As the access pattern of this map is read-intensive, we should minimize the contention between writers and readers. + * Since there are two threads that can update this map at the same time, those writers should lock the inner map + * first and then lock the entry before it updates segment metadata. This can be done using + * {@link ConcurrentMap#compute} as below. Note that, if you need to update the variables guarded by {@link #lock} + * inside of compute(), you should get the lock before calling compute() to keep the function executed in compute() + * not expensive. + * + *
+   *   segmentMedataInfo.compute(
+   *     datasourceParam,
+   *     (datasource, segmentsMap) -> {
+   *       if (segmentsMap == null) return null;
+   *       else {
+   *         segmentsMap.compute(
+   *           segmentIdParam,
+   *           (segmentId, segmentMetadata) -> {
+   *             // update segmentMetadata
+   *           }
+   *         );
+   *         return segmentsMap;
+   *       }
+   *     }
+   *   );
+   * 
+ * + * Readers can simply delegate the locking to the concurrent map and iterate map entries. + */ + private final ConcurrentHashMap> segmentMetadataInfo + = new ConcurrentHashMap<>(); + + // For awaitInitialization. + private final CountDownLatch initialized = new CountDownLatch(1); + + /** + * This lock coordinates the access from multiple threads to those variables guarded by this lock. + * Currently, there are 2 threads that can access these variables. + * + * - {@link #callbackExec} executes the timeline callbacks whenever BrokerServerView changes. + * - {@link #cacheExec} periodically refreshes segment metadata and {@link DatasourceTable} if necessary + * based on the information collected via timeline callbacks. + */ + private final Object lock = new Object(); + + // All mutable segments. + @GuardedBy("lock") + private final TreeSet mutableSegments = new TreeSet<>(SEGMENT_ORDER); + + // All dataSources that need tables regenerated. + @GuardedBy("lock") + private final Set dataSourcesNeedingRebuild = new HashSet<>(); + + // All segments that need to be refreshed. + @GuardedBy("lock") + private final TreeSet segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER); + + // Configured context to attach to internally generated queries. + private final BrokerInternalQueryConfig brokerInternalQueryConfig; + + @GuardedBy("lock") + private boolean refreshImmediately = false; + + @GuardedBy("lock") + private boolean isServerViewInitialized = false; + + /** + * Counts the total number of known segments. This variable is used only for the segments table in the system schema + * to initialize a map with a more proper size when it creates a snapshot. As a result, it doesn't have to be exact, + * and thus there is no concurrency control for this variable. + */ + private int totalSegments = 0; + + @Inject + public SegmentMetadataCache( + final QueryLifecycleFactory queryLifecycleFactory, + final TimelineServerView serverView, + final SegmentManager segmentManager, + final JoinableFactory joinableFactory, + final PlannerConfig config, + final Escalator escalator, + final BrokerInternalQueryConfig brokerInternalQueryConfig + ) + { + this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory"); + Preconditions.checkNotNull(serverView, "serverView"); + this.segmentManager = segmentManager; + this.joinableFactory = joinableFactory; + this.config = Preconditions.checkNotNull(config, "config"); + this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d"); + this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d"); + this.escalator = escalator; + this.brokerInternalQueryConfig = brokerInternalQueryConfig; + + initServerViewTimelineCallback(serverView); + } + + private void initServerViewTimelineCallback(final TimelineServerView serverView) + { + serverView.registerTimelineCallback( + callbackExec, + new TimelineServerView.TimelineCallback() + { + @Override + public ServerView.CallbackAction timelineInitialized() + { + synchronized (lock) { + isServerViewInitialized = true; + lock.notifyAll(); + } + + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment) + { + addSegment(server, segment); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentRemoved(final DataSegment segment) + { + removeSegment(segment); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction serverSegmentRemoved( + final DruidServerMetadata server, + final DataSegment segment + ) + { + removeServerSegment(server, segment); + return ServerView.CallbackAction.CONTINUE; + } + } + ); + } + + private void startCacheExec() + { + cacheExec.submit( + () -> { + long lastRefresh = 0L; + long lastFailure = 0L; + + try { + while (!Thread.currentThread().isInterrupted()) { + final Set segmentsToRefresh = new TreeSet<>(); + final Set dataSourcesToRebuild = new TreeSet<>(); + + try { + synchronized (lock) { + final long nextRefreshNoFuzz = DateTimes + .utc(lastRefresh) + .plus(config.getMetadataRefreshPeriod()) + .getMillis(); + + // Fuzz a bit to spread load out when we have multiple brokers. + final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10); + + while (true) { + // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure). + final boolean wasRecentFailure = DateTimes.utc(lastFailure) + .plus(config.getMetadataRefreshPeriod()) + .isAfterNow(); + + if (isServerViewInitialized && + !wasRecentFailure && + (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) && + (refreshImmediately || nextRefresh < System.currentTimeMillis())) { + // We need to do a refresh. Break out of the waiting loop. + break; + } + + // lastFailure != 0L means exceptions happened before and there're some refresh work was not completed. + // so that even ServerView is initialized, we can't let broker complete initialization. + if (isServerViewInitialized && lastFailure == 0L) { + // Server view is initialized, but we don't need to do a refresh. Could happen if there are + // no segments in the system yet. Just mark us as initialized, then. + initialized.countDown(); + } + + // Wait some more, we'll wake up when it might be time to do another refresh. + lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis())); + } + + segmentsToRefresh.addAll(segmentsNeedingRefresh); + segmentsNeedingRefresh.clear(); + + // Mutable segments need a refresh every period, since new columns could be added dynamically. + segmentsNeedingRefresh.addAll(mutableSegments); + + lastFailure = 0L; + lastRefresh = System.currentTimeMillis(); + refreshImmediately = false; + } + + refresh(segmentsToRefresh, dataSourcesToRebuild); + + initialized.countDown(); + } + catch (InterruptedException e) { + // Fall through. + throw e; + } + catch (Exception e) { + log.warn(e, "Metadata refresh failed, trying again soon."); + + synchronized (lock) { + // Add our segments and dataSources back to their refresh and rebuild lists. + segmentsNeedingRefresh.addAll(segmentsToRefresh); + dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild); + lastFailure = System.currentTimeMillis(); + } + } + } + } + catch (InterruptedException e) { + // Just exit. + } + catch (Throwable e) { + // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like + // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata. + log.makeAlert(e, "Metadata refresh failed permanently").emit(); + throw e; + } + finally { + log.info("Metadata refresh stopped."); + } + } + ); + } + + @LifecycleStart + public void start() throws InterruptedException + { + startCacheExec(); + + if (config.isAwaitInitializationOnStart()) { + final long startNanos = System.nanoTime(); + log.debug("%s waiting for initialization.", getClass().getSimpleName()); + awaitInitialization(); + log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), (System.nanoTime() - startNanos) / 1000000); + } + } + + @VisibleForTesting + void refresh(final Set segmentsToRefresh, final Set dataSourcesToRebuild) throws IOException + { + // Refresh the segments. + final Set refreshed = refreshSegments(segmentsToRefresh); + + synchronized (lock) { + // Add missing segments back to the refresh list. + segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed)); + + // Compute the list of dataSources to rebuild tables for. + dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild); + refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource())); + dataSourcesNeedingRebuild.clear(); + } + + // Rebuild the dataSources. + for (String dataSource : dataSourcesToRebuild) { + final DatasourceTable.PhysicalDatasourceMetadata druidTable = buildDruidTable(dataSource); + if (druidTable == null) { + log.info("dataSource [%s] no longer exists, all metadata removed.", dataSource); + tables.remove(dataSource); + continue; + } + final DatasourceTable.PhysicalDatasourceMetadata oldTable = tables.put(dataSource, druidTable); + final String description = druidTable.dataSource().isGlobal() ? "global dataSource" : "dataSource"; + if (oldTable == null || !oldTable.rowSignature().equals(druidTable.rowSignature())) { + log.info("%s [%s] has new signature: %s.", description, dataSource, druidTable.rowSignature()); + } else { + log.debug("%s [%s] signature is unchanged.", description, dataSource); + } + } + } + + @LifecycleStop + public void stop() + { + cacheExec.shutdownNow(); + callbackExec.shutdownNow(); + } + + public void awaitInitialization() throws InterruptedException + { + initialized.await(); + } + + protected DatasourceTable.PhysicalDatasourceMetadata getDatasource(String name) + { + return tables.get(name); + } + + protected Set getDatasourceNames() + { + return tables.keySet(); + } + + @VisibleForTesting + protected void addSegment(final DruidServerMetadata server, final DataSegment segment) + { + // Get lock first so that we won't wait in ConcurrentMap.compute(). + synchronized (lock) { + // someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking + // broker served segments in the timeline, to ensure that removeSegment the event is triggered accurately + if (server.getType().equals(ServerType.BROKER)) { + // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the + // historical, however mark the datasource for refresh because it needs to be globalized + markDataSourceAsNeedRebuild(segment.getDataSource()); + } else { + segmentMetadataInfo.compute( + segment.getDataSource(), + (datasource, segmentsMap) -> { + if (segmentsMap == null) { + segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER); + } + segmentsMap.compute( + segment.getId(), + (segmentId, segmentMetadata) -> { + if (segmentMetadata == null) { + // Unknown segment. + totalSegments++; + // segmentReplicatable is used to determine if segments are served by historical or realtime servers + long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1; + segmentMetadata = AvailableSegmentMetadata + .builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a refresh + .build(); + markSegmentAsNeedRefresh(segment.getId()); + if (!server.isSegmentReplicationTarget()) { + log.debug("Added new mutable segment [%s].", segment.getId()); + markSegmentAsMutable(segment.getId()); + } else { + log.debug("Added new immutable segment [%s].", segment.getId()); + } + } else { + // We know this segment. + final Set segmentServers = segmentMetadata.getReplicas(); + final ImmutableSet servers = new ImmutableSet.Builder() + .addAll(segmentServers) + .add(server) + .build(); + segmentMetadata = AvailableSegmentMetadata + .from(segmentMetadata) + .withReplicas(servers) + .withRealtime(recomputeIsRealtime(servers)) + .build(); + if (server.isSegmentReplicationTarget()) { + // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, + // even if it's also available on non-replicatable (realtime) servers. + unmarkSegmentAsMutable(segment.getId()); + log.debug("Segment[%s] has become immutable.", segment.getId()); + } + } + assert segmentMetadata != null; + return segmentMetadata; + } + ); + + return segmentsMap; + } + ); + } + if (!tables.containsKey(segment.getDataSource())) { + refreshImmediately = true; + } + + lock.notifyAll(); + } + } + + @VisibleForTesting + void removeSegment(final DataSegment segment) + { + // Get lock first so that we won't wait in ConcurrentMap.compute(). + synchronized (lock) { + log.debug("Segment [%s] is gone.", segment.getId()); + + segmentsNeedingRefresh.remove(segment.getId()); + unmarkSegmentAsMutable(segment.getId()); + + segmentMetadataInfo.compute( + segment.getDataSource(), + (dataSource, segmentsMap) -> { + if (segmentsMap == null) { + log.warn("Unknown segment [%s] was removed from the cluster. Ignoring this event.", segment.getId()); + return null; + } else { + if (segmentsMap.remove(segment.getId()) == null) { + log.warn("Unknown segment [%s] was removed from the cluster. Ignoring this event.", segment.getId()); + } else { + totalSegments--; + } + if (segmentsMap.isEmpty()) { + tables.remove(segment.getDataSource()); + log.info("dataSource [%s] no longer exists, all metadata removed.", segment.getDataSource()); + return null; + } else { + markDataSourceAsNeedRebuild(segment.getDataSource()); + return segmentsMap; + } + } + } + ); + + lock.notifyAll(); + } + } + + @VisibleForTesting + void removeServerSegment(final DruidServerMetadata server, final DataSegment segment) + { + // Get lock first so that we won't wait in ConcurrentMap.compute(). + synchronized (lock) { + log.debug("Segment [%s] is gone from server [%s]", segment.getId(), server.getName()); + segmentMetadataInfo.compute( + segment.getDataSource(), + (datasource, knownSegments) -> { + if (knownSegments == null) { + log.warn( + "Unknown segment [%s] is removed from server [%s]. Ignoring this event", + segment.getId(), + server.getHost() + ); + return null; + } + + if (server.getType().equals(ServerType.BROKER)) { + // for brokers, if the segment drops from all historicals before the broker this could be null. + if (!knownSegments.isEmpty()) { + // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the + // historical, however mark the datasource for refresh because it might no longer be broadcast or something + markDataSourceAsNeedRebuild(segment.getDataSource()); + } + } else { + knownSegments.compute( + segment.getId(), + (segmentId, segmentMetadata) -> { + if (segmentMetadata == null) { + log.warn( + "Unknown segment [%s] is removed from server [%s]. Ignoring this event", + segment.getId(), + server.getHost() + ); + return null; + } else { + final Set segmentServers = segmentMetadata.getReplicas(); + final ImmutableSet servers = FluentIterable + .from(segmentServers) + .filter(Predicates.not(Predicates.equalTo(server))) + .toSet(); + return AvailableSegmentMetadata + .from(segmentMetadata) + .withReplicas(servers) + .withRealtime(recomputeIsRealtime(servers)) + .build(); + } + } + ); + } + if (knownSegments.isEmpty()) { + return null; + } else { + return knownSegments; + } + } + ); + + lock.notifyAll(); + } + } + + private void markSegmentAsNeedRefresh(SegmentId segmentId) + { + synchronized (lock) { + segmentsNeedingRefresh.add(segmentId); + } + } + + private void markSegmentAsMutable(SegmentId segmentId) + { + synchronized (lock) { + mutableSegments.add(segmentId); + } + } + + private void unmarkSegmentAsMutable(SegmentId segmentId) + { + synchronized (lock) { + mutableSegments.remove(segmentId); + } + } + + @VisibleForTesting + void markDataSourceAsNeedRebuild(String datasource) + { + synchronized (lock) { + dataSourcesNeedingRebuild.add(datasource); + } + } + + /** + * Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed, + * which may be a subset of the asked-for set. + */ + @VisibleForTesting + protected Set refreshSegments(final Set segments) throws IOException + { + final Set retVal = new HashSet<>(); + + // Organize segments by dataSource. + final Map> segmentMap = new TreeMap<>(); + + for (SegmentId segmentId : segments) { + segmentMap.computeIfAbsent(segmentId.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER)) + .add(segmentId); + } + + for (Map.Entry> entry : segmentMap.entrySet()) { + final String dataSource = entry.getKey(); + retVal.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue())); + } + + return retVal; + } + + private long recomputeIsRealtime(ImmutableSet servers) + { + if (servers.isEmpty()) { + return 0; + } + final Optional historicalServer = servers + .stream() + // Ideally, this filter should have checked whether it's a broadcast segment loaded in brokers. + // However, we don't current track of the broadcast segments loaded in brokers, so this filter is still valid. + // See addSegment(), removeServerSegment(), and removeSegment() + .filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL)) + .findAny(); + + // if there is any historical server in the replicas, isRealtime flag should be unset + return historicalServer.isPresent() ? 0 : 1; + } + + /** + * Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of + * segments actually refreshed, which may be a subset of the asked-for set. + */ + private Set refreshSegmentsForDataSource(final String dataSource, final Set segments) + throws IOException + { + if (!segments.stream().allMatch(segmentId -> segmentId.getDataSource().equals(dataSource))) { + // Sanity check. We definitely expect this to pass. + throw new ISE("'segments' must all match 'dataSource'!"); + } + + log.debug("Refreshing metadata for dataSource[%s].", dataSource); + + final long startTime = System.currentTimeMillis(); + + // Segment id string -> SegmentId object. + final Map segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString); + + final Set retVal = new HashSet<>(); + final Sequence sequence = runSegmentMetadataQuery( + Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY) + ); + + Yielder yielder = Yielders.each(sequence); + + try { + while (!yielder.isDone()) { + final SegmentAnalysis analysis = yielder.get(); + final SegmentId segmentId = segmentIdMap.get(analysis.getId()); + + if (segmentId == null) { + log.warn("Got analysis for segment [%s] we didn't ask for, ignoring.", analysis.getId()); + } else { + final RowSignature rowSignature = analysisToRowSignature(analysis); + log.debug("Segment[%s] has signature[%s].", segmentId, rowSignature); + segmentMetadataInfo.compute( + dataSource, + (datasourceKey, dataSourceSegments) -> { + if (dataSourceSegments == null) { + // Datasource may have been removed or become unavailable while this refresh was ongoing. + log.warn( + "No segment map found with datasource [%s], skipping refresh of segment [%s]", + datasourceKey, + segmentId + ); + return null; + } else { + dataSourceSegments.compute( + segmentId, + (segmentIdKey, segmentMetadata) -> { + if (segmentMetadata == null) { + log.warn("No segment [%s] found, skipping refresh", segmentId); + return null; + } else { + final AvailableSegmentMetadata updatedSegmentMetadata = AvailableSegmentMetadata + .from(segmentMetadata) + .withRowSignature(rowSignature) + .withNumRows(analysis.getNumRows()) + .build(); + retVal.add(segmentId); + return updatedSegmentMetadata; + } + } + ); + + if (dataSourceSegments.isEmpty()) { + return null; + } else { + return dataSourceSegments; + } + } + } + ); + } + + yielder = yielder.next(null); + } + } + finally { + yielder.close(); + } + + log.debug( + "Refreshed metadata for dataSource [%s] in %,d ms (%d segments queried, %d segments left).", + dataSource, + System.currentTimeMillis() - startTime, + retVal.size(), + segments.size() - retVal.size() + ); + + return retVal; + } + + @VisibleForTesting + @Nullable + DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(final String dataSource) + { + ConcurrentSkipListMap segmentsMap = segmentMetadataInfo.get(dataSource); + + // Preserve order. + final Map columnTypes = new LinkedHashMap<>(); + + if (segmentsMap != null && !segmentsMap.isEmpty()) { + for (AvailableSegmentMetadata availableSegmentMetadata : segmentsMap.values()) { + final RowSignature rowSignature = availableSegmentMetadata.getRowSignature(); + if (rowSignature != null) { + for (String column : rowSignature.getColumnNames()) { + // Newer column types should override older ones. + final ColumnType columnType = + rowSignature.getColumnType(column) + .orElseThrow(() -> new ISE("Encountered null type for column [%s]", column)); + + columnTypes.putIfAbsent(column, columnType); + } + } + } + } else { + // table has no segments + return null; + } + + final RowSignature.Builder builder = RowSignature.builder(); + columnTypes.forEach(builder::add); + + final TableDataSource tableDataSource; + + // to be a GlobalTableDataSource instead of a TableDataSource, it must appear on all servers (inferred by existing + // in the segment cache, which in this case belongs to the broker meaning only broadcast segments live here) + // to be joinable, it must be possibly joinable according to the factory. we only consider broadcast datasources + // at this time, and isGlobal is currently strongly coupled with joinable, so only make a global table datasource + // if also joinable + final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource); + final boolean isJoinable = joinableFactory.isDirectlyJoinable(maybeGlobal); + final boolean isBroadcast = segmentManager.getDataSourceNames().contains(dataSource); + if (isBroadcast && isJoinable) { + tableDataSource = maybeGlobal; + } else { + tableDataSource = new TableDataSource(dataSource); + } + return new DatasourceTable.PhysicalDatasourceMetadata(tableDataSource, builder.build(), isJoinable, isBroadcast); + } + + @VisibleForTesting + Map getSegmentMetadataSnapshot() + { + final Map segmentMetadata = Maps.newHashMapWithExpectedSize(totalSegments); + for (ConcurrentSkipListMap val : segmentMetadataInfo.values()) { + segmentMetadata.putAll(val); + } + return segmentMetadata; + } + + /** + * Returns total number of segments. This method doesn't use the lock intentionally to avoid expensive contention. + * As a result, the returned value might be inexact. + */ + int getTotalSegments() + { + return totalSegments; + } + + @VisibleForTesting + TreeSet getSegmentsNeedingRefresh() + { + synchronized (lock) { + return segmentsNeedingRefresh; + } + } + + @VisibleForTesting + TreeSet getMutableSegments() + { + synchronized (lock) { + return mutableSegments; + } + } + + @VisibleForTesting + Set getDataSourcesNeedingRebuild() + { + synchronized (lock) { + return dataSourcesNeedingRebuild; + } + } + + /** + * Execute a SegmentMetadata query and return a {@link Sequence} of {@link SegmentAnalysis}. + * + * @param segments Iterable of {@link SegmentId} objects that are subject of the SegmentMetadata query. + * @return {@link Sequence} of {@link SegmentAnalysis} objects + */ + @VisibleForTesting + protected Sequence runSegmentMetadataQuery( + final Iterable segments + ) + { + // Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource. + final String dataSource = Iterables.getOnlyElement( + StreamSupport.stream(segments.spliterator(), false) + .map(SegmentId::getDataSource).collect(Collectors.toSet()) + ); + + final MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec( + StreamSupport.stream(segments.spliterator(), false) + .map(SegmentId::toDescriptor).collect(Collectors.toList()) + ); + + final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery( + new TableDataSource(dataSource), + querySegmentSpec, + new AllColumnIncluderator(), + false, + brokerInternalQueryConfig.getContext(), + EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), + false, + false + ); + + return queryLifecycleFactory + .factorize() + .runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK); + } + + @VisibleForTesting + static RowSignature analysisToRowSignature(final SegmentAnalysis analysis) + { + final RowSignature.Builder rowSignatureBuilder = RowSignature.builder(); + for (Map.Entry entry : analysis.getColumns().entrySet()) { + if (entry.getValue().isError()) { + // Skip columns with analysis errors. + continue; + } + + ColumnType valueType = entry.getValue().getTypeSignature(); + + // this shouldn't happen, but if it does, first try to fall back to legacy type information field in case + // standard upgrade order was not followed for 0.22 to 0.23+, and if that also fails, then assume types are some + // flavor of COMPLEX. + if (valueType == null) { + // at some point in the future this can be simplified to the contents of the catch clause here, once the + // likelyhood of upgrading from some version lower than 0.23 is low + try { + valueType = ColumnType.fromString(entry.getValue().getType()); + } + catch (IllegalArgumentException ignored) { + valueType = ColumnType.UNKNOWN_COMPLEX; + } + } + + rowSignatureBuilder.add(entry.getKey(), valueType); + } + return ROW_SIGNATURE_INTERNER.intern(rowSignatureBuilder.build()); + } + + /** + * This method is not thread-safe and must be used only in unit tests. + */ + @VisibleForTesting + void setAvailableSegmentMetadata(final SegmentId segmentId, final AvailableSegmentMetadata availableSegmentMetadata) + { + final ConcurrentSkipListMap dataSourceSegments = segmentMetadataInfo + .computeIfAbsent( + segmentId.getDataSource(), + k -> new ConcurrentSkipListMap<>(SEGMENT_ORDER) + ); + if (dataSourceSegments.put(segmentId, availableSegmentMetadata) == null) { + totalSegments++; + } + } + + /** + * This is a helper method for unit tests to emulate heavy work done with {@link #lock}. + * It must be used only in unit tests. + */ + @VisibleForTesting + void doInLock(Runnable runnable) + { + synchronized (lock) { + runnable.run(); + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 83068e2c1f7..4b1b62b044b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -273,13 +273,13 @@ public class SystemSchema extends AbstractSchema { //get available segments from druidSchema final Map availableSegmentMetadata = - druidSchema.getSegmentMetadataSnapshot(); + druidSchema.cache().getSegmentMetadataSnapshot(); final Iterator> availableSegmentEntries = availableSegmentMetadata.entrySet().iterator(); // in memory map to store segment data from available segments final Map partialSegmentDataMap = - Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments()); + Maps.newHashMapWithExpectedSize(druidSchema.cache().getTotalSegments()); for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) { PartialSegmentData partialSegmentData = new PartialSegmentData(IS_AVAILABLE_TRUE, h.isRealtime(), h.getNumReplicas(), h.getNumRows()); @@ -290,7 +290,7 @@ public class SystemSchema extends AbstractSchema // Coordinator. final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); - final Set segmentsAlreadySeen = Sets.newHashSetWithExpectedSize(druidSchema.getTotalSegments()); + final Set segmentsAlreadySeen = Sets.newHashSetWithExpectedSize(druidSchema.cache().getTotalSegments()); final FluentIterable publishedSegments = FluentIterable .from(() -> getAuthorizedPublishedSegments(metadataStoreSegments, root)) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java new file mode 100644 index 00000000000..d4c7073cae3 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.table; + +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.segment.column.RowSignature; + +import java.util.Objects; + +/** + * Represents a SQL table that models a Druid datasource. + *

+ * Once the catalog code is merged, this class will combine physical information + * from the segment cache with logical information from the catalog to produce + * the SQL-user's view of the table. The resulting merged view is used to plan + * queries and is the source of table information in the {@code INFORMATION_SCHEMA}. + */ +public class DatasourceTable extends DruidTable +{ + /** + * The physical metadata for a datasource, derived from the list of segments + * published in the Coordinator. Used only for datasources, since only + * datasources are computed from segments. + */ + public static class PhysicalDatasourceMetadata + { + private final TableDataSource dataSource; + private final RowSignature rowSignature; + private final boolean joinable; + private final boolean broadcast; + + public PhysicalDatasourceMetadata( + final TableDataSource dataSource, + final RowSignature rowSignature, + final boolean isJoinable, + final boolean isBroadcast + ) + { + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature"); + this.joinable = isJoinable; + this.broadcast = isBroadcast; + } + + public TableDataSource dataSource() + { + return dataSource; + } + + public RowSignature rowSignature() + { + return rowSignature; + } + + public boolean isJoinable() + { + return joinable; + } + + public boolean isBroadcast() + { + return broadcast; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + PhysicalDatasourceMetadata that = (PhysicalDatasourceMetadata) o; + + if (!Objects.equals(dataSource, that.dataSource)) { + return false; + } + return Objects.equals(rowSignature, that.rowSignature); + } + + @Override + public int hashCode() + { + int result = dataSource != null ? dataSource.hashCode() : 0; + result = 31 * result + (rowSignature != null ? rowSignature.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return "DatasourceMetadata{" + + "dataSource=" + dataSource + + ", rowSignature=" + rowSignature + + '}'; + } + } + + private final PhysicalDatasourceMetadata physicalMetadata; + + public DatasourceTable( + final PhysicalDatasourceMetadata physicalMetadata + ) + { + super(physicalMetadata.rowSignature()); + this.physicalMetadata = physicalMetadata; + } + + @Override + public DataSource getDataSource() + { + return physicalMetadata.dataSource(); + } + + @Override + public boolean isJoinable() + { + return physicalMetadata.isJoinable(); + } + + @Override + public boolean isBroadcast() + { + return physicalMetadata.isBroadcast(); + } + + @Override + public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable table) + { + return LogicalTableScan.create(context.getCluster(), table); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DatasourceTable that = (DatasourceTable) o; + + if (!Objects.equals(physicalMetadata, that.physicalMetadata)) { + return false; + } + return Objects.equals(getRowSignature(), that.getRowSignature()); + } + + @Override + public int hashCode() + { + return physicalMetadata.hashCode(); + } + + @Override + public String toString() + { + // Don't include the row signature: it is the same as in + // physicalMetadata. + return "DruidTable{" + + physicalMetadata + + '}'; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java index 4a49001244c..90d3cd653ff 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java @@ -19,12 +19,8 @@ package org.apache.druid.sql.calcite.table; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import org.apache.calcite.config.CalciteConnectionConfig; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.Schema; @@ -35,45 +31,19 @@ import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlNode; import org.apache.druid.query.DataSource; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.external.ExternalDataSource; -import org.apache.druid.sql.calcite.external.ExternalTableScan; -import javax.annotation.Nullable; -import java.util.Objects; - -public class DruidTable implements TranslatableTable +/** + * Abstract base class for the various kinds of tables which Druid supports. + */ +public abstract class DruidTable implements TranslatableTable { - private final DataSource dataSource; private final RowSignature rowSignature; - @Nullable - private final ObjectMapper objectMapper; - private final boolean joinable; - private final boolean broadcast; - public DruidTable( - final DataSource dataSource, - final RowSignature rowSignature, - @Nullable final ObjectMapper objectMapper, - final boolean isJoinable, - final boolean isBroadcast + final RowSignature rowSignature ) { - this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature"); - this.objectMapper = objectMapper; - this.joinable = isJoinable; - this.broadcast = isBroadcast; - - if (dataSource instanceof ExternalDataSource && objectMapper == null) { - // objectMapper is used by ExternalTableScan to generate its digest. - throw new NullPointerException("ObjectMapper is required for external datasources"); - } - } - - public DataSource getDataSource() - { - return dataSource; } public RowSignature getRowSignature() @@ -81,15 +51,11 @@ public class DruidTable implements TranslatableTable return rowSignature; } - public boolean isJoinable() - { - return joinable; - } + public abstract DataSource getDataSource(); - public boolean isBroadcast() - { - return broadcast; - } + public abstract boolean isJoinable(); + + public abstract boolean isBroadcast(); @Override public Schema.TableType getJdbcTableType() @@ -106,12 +72,7 @@ public class DruidTable implements TranslatableTable @Override public RelDataType getRowType(final RelDataTypeFactory typeFactory) { - // For external datasources, the row type should be determined by whatever the row signature has been explicitly - // passed in. Typecasting directly to SqlTypeName.TIMESTAMP will lead to inconsistencies with the Calcite functions - // For example, TIME_PARSE(__time) where __time is specified to be a string field in the external datasource - // would lead to an exception because __time would be interpreted as timestamp if we typecast it. - boolean typecastTimeColumn = !(dataSource instanceof ExternalDataSource); - return RowSignatures.toRelDataType(rowSignature, typeFactory, typecastTimeColumn); + return RowSignatures.toRelDataType(getRowSignature(), typeFactory); } @Override @@ -130,51 +91,4 @@ public class DruidTable implements TranslatableTable { return true; } - - @Override - public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable table) - { - if (dataSource instanceof ExternalDataSource) { - // Cannot use LogicalTableScan here, because its digest is solely based on the name of the table macro. - // Must use our own class that computes its own digest. - return new ExternalTableScan(context.getCluster(), objectMapper, this); - } else { - return LogicalTableScan.create(context.getCluster(), table); - } - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - DruidTable that = (DruidTable) o; - - if (!Objects.equals(dataSource, that.dataSource)) { - return false; - } - return Objects.equals(rowSignature, that.rowSignature); - } - - @Override - public int hashCode() - { - int result = dataSource != null ? dataSource.hashCode() : 0; - result = 31 * result + (rowSignature != null ? rowSignature.hashCode() : 0); - return result; - } - - @Override - public String toString() - { - return "DruidTable{" + - "dataSource=" + dataSource + - ", rowSignature=" + rowSignature + - '}'; - } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java new file mode 100644 index 00000000000..cc3ee7c3f6a --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.table; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptTable.ToRelContext; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.druid.query.DataSource; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.external.ExternalTableScan; + +/** + * Represents an source of data external to Druid: a CSV file, an HTTP request, etc. + * Each such table represents one of Druid's {@link DataSource} types. Since SQL + * requires knowledge of the schema of that input source, the user must provide + * that information in SQL (via the `EXTERN` or up-coming `STAGED` function) or + * from the upcoming Druid Catalog. + */ +public class ExternalTable extends DruidTable +{ + private final DataSource dataSource; + private final ObjectMapper objectMapper; + + public ExternalTable( + final DataSource dataSource, + final RowSignature rowSignature, + final ObjectMapper objectMapper + ) + { + super(rowSignature); + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.objectMapper = objectMapper; + } + + @Override + public DataSource getDataSource() + { + return dataSource; + } + + @Override + public boolean isJoinable() + { + return false; + } + + @Override + public boolean isBroadcast() + { + return false; + } + + @Override + public RelDataType getRowType(final RelDataTypeFactory typeFactory) + { + // For external datasources, the row type should be determined by whatever the row signature has been explicitly + // passed in. Typecasting directly to SqlTypeName.TIMESTAMP will lead to inconsistencies with the Calcite functions + // For example, TIME_PARSE(__time) where __time is specified to be a string field in the external datasource + // would lead to an exception because __time would be interpreted as timestamp if we typecast it. + return RowSignatures.toRelDataType(getRowSignature(), typeFactory, true); + } + + @Override + public RelNode toRel(ToRelContext context, RelOptTable relOptTable) + { + return new ExternalTableScan(context.getCluster(), objectMapper, this); + } + + @Override + public String toString() + { + return "ExternalTable{" + + "dataSource=" + dataSource + + ", rowSignature=" + getRowSignature() + + '}'; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/InlineTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/InlineTable.java new file mode 100644 index 00000000000..e8a7f8e2987 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/InlineTable.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.table; + +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptTable.ToRelContext; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.segment.column.RowSignature; + +/** + * Represents a specialized table used within Druid's Calcite-based planner. + * Used in {@link org.apache.druid.sql.calcite.rule.DruidLogicalValuesRule DruidLogicalValuesRule} + * to implement trivial {@code SELECT 1} style queries that return constant values represented + * by this inline table. + */ +public class InlineTable extends DruidTable +{ + private final DataSource dataSource; + + public InlineTable( + final InlineDataSource dataSource, + final RowSignature rowSignature + ) + { + super(rowSignature); + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + } + + @Override + public DataSource getDataSource() + { + return dataSource; + } + + @Override + public boolean isJoinable() + { + return false; + } + + @Override + public boolean isBroadcast() + { + return false; + } + + @Override + public RelNode toRel(ToRelContext context, RelOptTable table) + { + return LogicalTableScan.create(context.getCluster(), table); + } + + @Override + public String toString() + { + return "DatasourceMetadata{" + + "dataSource=" + dataSource + + ", rowSignature=" + getRowSignature() + + '}'; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/LookupTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/LookupTable.java new file mode 100644 index 00000000000..eac0e44443c --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/LookupTable.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.table; + +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptTable.ToRelContext; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.LookupDataSource; +import org.apache.druid.segment.column.RowSignature; + +/** + * Represents a Druid lookup as a Calcite table, allowing queries to read from a lookup. + */ +public class LookupTable extends DruidTable +{ + private final DataSource dataSource; + + public LookupTable( + final LookupDataSource dataSource, + final RowSignature rowSignature + ) + { + super(rowSignature); + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + } + + @Override + public DataSource getDataSource() + { + return dataSource; + } + + @Override + public boolean isJoinable() + { + return true; + } + + @Override + public boolean isBroadcast() + { + return true; + } + + @Override + public RelNode toRel(ToRelContext context, RelOptTable table) + { + return LogicalTableScan.create(context.getCluster(), table); + } + + @Override + public String toString() + { + return "LookupTable{" + + "dataSource=" + dataSource + + ", rowSignature=" + getRowSignature() + + '}'; + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/expression/ExpressionTestHelper.java b/sql/src/test/java/org/apache/druid/sql/calcite/expression/ExpressionTestHelper.java index 3117610ec8d..ce255fd4147 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/expression/ExpressionTestHelper.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/expression/ExpressionTestHelper.java @@ -63,6 +63,7 @@ import org.joda.time.DateTimeZone; import org.junit.Assert; import javax.annotation.Nullable; + import java.math.BigDecimal; import java.util.Arrays; import java.util.Collections; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/external/ExternalTableScanRuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/external/ExternalTableScanRuleTest.java index 5a07f59d679..c30b2c0262e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/external/ExternalTableScanRuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/external/ExternalTableScanRuleTest.java @@ -44,7 +44,6 @@ public class ExternalTableScanRuleTest @Test public void testMatchesWhenExternalScanUnsupported() throws ValidationException { - final PlannerContext plannerContext = PlannerContext.create( "DUMMY", // The actual query isn't important for this test CalciteTests.createOperatorTable(), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java index c1e24ada343..199d259d92f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java @@ -32,6 +32,7 @@ import org.junit.Assert; import org.junit.Test; import javax.annotation.Nullable; + import java.util.List; import java.util.function.Consumer; @@ -328,7 +329,6 @@ public class DruidRelsTest EasyMock.expect(mockPartialQuery.getWhereFilter()).andReturn(whereFilter).anyTimes(); final RelOptTable mockRelOptTable = EasyMock.mock(RelOptTable.class); - EasyMock.expect(mockRelOptTable.unwrap(DruidTable.class)).andReturn(druidTable).anyTimes(); final T mockRel = EasyMock.mock(clazz); EasyMock.expect(mockRel.getPartialDruidQuery()).andReturn(mockPartialQuery).anyTimes(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRuleTest.java index 8707b476631..9d6bad13bd5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRuleTest.java @@ -32,6 +32,8 @@ import org.apache.druid.sql.calcite.rel.DruidRel; import org.apache.druid.sql.calcite.rel.DruidRelsTest; import org.apache.druid.sql.calcite.rel.DruidUnionDataSourceRel; import org.apache.druid.sql.calcite.rel.PartialDruidQuery; +import org.apache.druid.sql.calcite.table.DatasourceTable; +import org.apache.druid.sql.calcite.table.DatasourceTable.PhysicalDatasourceMetadata; import org.apache.druid.sql.calcite.table.DruidTable; import org.easymock.EasyMock; import org.junit.Assert; @@ -43,16 +45,17 @@ import java.util.Optional; public class DruidUnionDataSourceRuleTest { - private final DruidTable fooDruidTable = new DruidTable( - new TableDataSource("foo"), - RowSignature.builder() - .addTimeColumn() - .add("col1", ColumnType.STRING) - .add("col2", ColumnType.LONG) - .build(), - null, - false, - false + private final DruidTable fooDruidTable = new DatasourceTable( + new PhysicalDatasourceMetadata( + new TableDataSource("foo"), + RowSignature.builder() + .addTimeColumn() + .add("col1", ColumnType.STRING) + .add("col2", ColumnType.LONG) + .build(), + false, + false + ) ); @Test diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java index 3f2c36f7b82..6c9226ebb38 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java @@ -49,7 +49,7 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase { try (final Closer closer = Closer.create()) { final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer); - final DruidSchema druidSchema = new DruidSchema( + final SegmentMetadataCache cache = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory( new SpecificSegmentsQuerySegmentWalker(conglomerate), conglomerate @@ -59,14 +59,14 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ); - druidSchema.start(); - druidSchema.awaitInitialization(); + cache.start(); + cache.awaitInitialization(); + final DruidSchema druidSchema = new DruidSchema(cache, null); - Assert.assertEquals(ImmutableMap.of(), druidSchema.getTableMap()); + Assert.assertEquals(ImmutableSet.of(), druidSchema.getTableNames()); } } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentDataCacheConcurrencyTest.java similarity index 87% rename from sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java rename to sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentDataCacheConcurrencyTest.java index 6beda7d0bd0..683be3b9495 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentDataCacheConcurrencyTest.java @@ -57,7 +57,7 @@ import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.NoopEscalator; -import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.timeline.DataSegment; @@ -71,6 +71,7 @@ import org.junit.Before; import org.junit.Test; import javax.annotation.Nullable; + import java.io.File; import java.util.ArrayList; import java.util.Collection; @@ -88,7 +89,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon +public class SegmentDataCacheConcurrencyTest extends SegmentMetadataCacheCommon { private static final String DATASOURCE = "datasource"; @@ -96,7 +97,7 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon private SpecificSegmentsQuerySegmentWalker walker; private TestServerInventoryView inventoryView; private BrokerServerView serverView; - private DruidSchema schema; + private SegmentMetadataCache schema; private ExecutorService exec; @Before @@ -119,30 +120,33 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon } /** - * This tests the contention between 3 components, DruidSchema, InventoryView, and BrokerServerView. - * It first triggers refreshing DruidSchema. To mimic some heavy work done with {@link DruidSchema#lock}, - * {@link DruidSchema#buildDruidTable} is overriden to sleep before doing real work. While refreshing DruidSchema, - * more new segments are added to InventoryView, which triggers updates of BrokerServerView. Finally, while - * BrokerServerView is updated, {@link BrokerServerView#getTimeline} is continuously called to mimic user query + * This tests the contention between three components, {@link SegmentMetadataCache}, + * {@code InventoryView}, and {@link BrokerServerView}. It first triggers + * refreshing {@code SegmentMetadataCache}. To mimic some heavy work done with + * {@link SegmentMetadataCache#lock}, {@link SegmentMetadataCache#buildDruidTable} + * is overridden to sleep before doing real work. While refreshing + * {@code SegmentMetadataCache}, more new segments are added to + * {@code InventoryView}, which triggers updates of {@code BrokerServerView}. + * Finally, while {@code BrokerServerView} is updated, + * {@link BrokerServerView#getTimeline} is continuously called to mimic user query * processing. All these calls must return without heavy contention. */ @Test(timeout = 30000L) - public void testDruidSchemaRefreshAndInventoryViewAddSegmentAndBrokerServerViewGetTimeline() + public void testSegmentMetadataRefreshAndInventoryViewAddSegmentAndBrokerServerViewGetTimeline() throws InterruptedException, ExecutionException, TimeoutException { - schema = new DruidSchema( + schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override - DruidTable buildDruidTable(final String dataSource) + DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(final String dataSource) { doInLock(() -> { try { @@ -194,8 +198,9 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon // Wait for all segments to be loaded in BrokerServerView Assert.assertTrue(segmentLoadLatch.await(5, TimeUnit.SECONDS)); - // Trigger refresh of DruidSchema. This will internally run the heavy work mimicked by the overriden buildDruidTable - Future refreshFuture = exec.submit(() -> { + // Trigger refresh of DruidSchema. This will internally run the heavy work + // mimicked by the overridden buildDruidTable + Future refreshFuture = exec.submit(() -> { schema.refresh( walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()), Sets.newHashSet(DATASOURCE) @@ -225,33 +230,36 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon } /** - * This tests the contention between 2 methods of DruidSchema, {@link DruidSchema#refresh} and - * {@link DruidSchema#getSegmentMetadataSnapshot()}. It first triggers refreshing DruidSchema. - * To mimic some heavy work done with {@link DruidSchema#lock}, {@link DruidSchema#buildDruidTable} is overriden - * to sleep before doing real work. While refreshing DruidSchema, getSegmentMetadataSnapshot() is continuously - * called to mimic reading the segments table of SystemSchema. All these calls must return without heavy contention. + * This tests the contention between two methods of {@link SegmentMetadataCache}: + * {@link SegmentMetadataCache#refresh} and + * {@link SegmentMetadataCache#getSegmentMetadataSnapshot()}. It first triggers + * refreshing {@code SegmentMetadataCache}. To mimic some heavy work done with + * {@link SegmentMetadataCache#lock}, {@link SegmentMetadataCache#buildDruidTable} + * is overridden to sleep before doing real work. While refreshing + * {@code SegmentMetadataCache}, {@code getSegmentMetadataSnapshot()} is continuously + * called to mimic reading the segments table of SystemSchema. All these calls + * must return without heavy contention. */ @Test(timeout = 30000L) - public void testDruidSchemaRefreshAndDruidSchemaGetSegmentMetadata() + public void testSegmentMetadataRefreshAndDruidSchemaGetSegmentMetadata() throws InterruptedException, ExecutionException, TimeoutException { - schema = new DruidSchema( + schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override - DruidTable buildDruidTable(final String dataSource) + DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(final String dataSource) { doInLock(() -> { try { - // Mimic some heavy work done in lock in DruidSchema + // Mimic some heavy work done in lock in SegmentMetadataCache Thread.sleep(5000); } catch (InterruptedException e) { @@ -299,8 +307,9 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon // Wait for all segments to be loaded in BrokerServerView Assert.assertTrue(segmentLoadLatch.await(5, TimeUnit.SECONDS)); - // Trigger refresh of DruidSchema. This will internally run the heavy work mimicked by the overriden buildDruidTable - Future refreshFuture = exec.submit(() -> { + // Trigger refresh of SegmentMetadataCache. This will internally run the heavy work mimicked + // by the overridden buildDruidTable + Future refreshFuture = exec.submit(() -> { schema.refresh( walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()), Sets.newHashSet(DATASOURCE) @@ -438,6 +447,7 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon segmentCallbacks.forEach(pair -> pair.rhs.execute(() -> pair.lhs.segmentRemoved(server.getMetadata(), segment))); } + @SuppressWarnings("unused") private void removeServer(DruidServer server) { serverMap.remove(server.getName()); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTestCommon.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheCommon.java similarity index 98% rename from sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTestCommon.java rename to sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheCommon.java index 2511d972101..edfa22a1e8a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTestCommon.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheCommon.java @@ -50,7 +50,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; -public abstract class DruidSchemaTestCommon extends CalciteTestBase +public abstract class SegmentMetadataCacheCommon extends CalciteTestBase { static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java similarity index 91% rename from sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java rename to sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java index 36e575e2bdb..39029fa9a48 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java @@ -27,7 +27,6 @@ import com.google.common.collect.Sets; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.ImmutableDruidServer; @@ -59,6 +58,7 @@ import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AllowAllAuthenticator; import org.apache.druid.server.security.NoopEscalator; +import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; @@ -86,13 +86,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -public class DruidSchemaTest extends DruidSchemaTestCommon +public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon { private SpecificSegmentsQuerySegmentWalker walker; private TestServerInventoryView serverView; private List druidServers; - private DruidSchema schema; - private DruidSchema schema2; + private SegmentMetadataCache schema; + private SegmentMetadataCache schema2; private CountDownLatch buildTableLatch = new CountDownLatch(1); private CountDownLatch markDataSourceLatch = new CountDownLatch(1); private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper(); @@ -173,7 +173,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments); druidServers = serverView.getDruidServers(); - schema = new DruidSchema( + schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, @@ -183,14 +183,13 @@ public class DruidSchemaTest extends DruidSchemaTestCommon ), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override - protected DruidTable buildDruidTable(String dataSource) + protected DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(String dataSource) { - DruidTable table = super.buildDruidTable(dataSource); + DatasourceTable.PhysicalDatasourceMetadata table = super.buildDruidTable(dataSource); buildTableLatch.countDown(); return table; } @@ -203,7 +202,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon } }; - schema2 = new DruidSchema( + schema2 = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, @@ -213,17 +212,15 @@ public class DruidSchemaTest extends DruidSchemaTestCommon ), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { - boolean throwException = true; @Override - protected DruidTable buildDruidTable(String dataSource) + protected DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(String dataSource) { - DruidTable table = super.buildDruidTable(dataSource); + DatasourceTable.PhysicalDatasourceMetadata table = super.buildDruidTable(dataSource); buildTableLatch.countDown(); return table; } @@ -261,10 +258,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon @Test public void testGetTableMap() { - Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema.getTableNames()); + Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema.getDatasourceNames()); - final Map tableMap = schema.getTableMap(); - Assert.assertEquals(ImmutableSet.of("foo", "foo2"), tableMap.keySet()); + final Set tableNames = schema.getDatasourceNames(); + Assert.assertEquals(ImmutableSet.of("foo", "foo2"), tableNames); } @Test @@ -272,18 +269,15 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { schema2.start(); schema2.awaitInitialization(); - Map tableMap = schema2.getTableMap(); - Assert.assertEquals(2, tableMap.size()); - Assert.assertTrue(tableMap.containsKey("foo")); - Assert.assertTrue(tableMap.containsKey("foo2")); + Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema2.getDatasourceNames()); schema2.stop(); } - @Test public void testGetTableMapFoo() { - final DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); + final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo"); + final DruidTable fooTable = new DatasourceTable(fooDs); final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl()); final List fields = rowType.getFieldList(); @@ -311,7 +305,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon @Test public void testGetTableMapFoo2() { - final DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo2"); + final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo2"); + final DruidTable fooTable = new DatasourceTable(fooDs); final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl()); final List fields = rowType.getFieldList(); @@ -329,7 +324,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon /** * This tests that {@link AvailableSegmentMetadata#getNumRows()} is correct in case - * of multiple replicas i.e. when {@link DruidSchema#addSegment(DruidServerMetadata, DataSegment)} + * of multiple replicas i.e. when {@link SegmentMetadataCache#addSegment(DruidServerMetadata, DataSegment)} * is called more than once for same segment */ @Test @@ -366,7 +361,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon final ImmutableDruidServer server = pair.lhs; Assert.assertNotNull(server); final DruidServerMetadata druidServerMetadata = server.getMetadata(); - // invoke DruidSchema#addSegment on existingSegment + // invoke SegmentMetadataCache#addSegment on existingSegment schema.addSegment(druidServerMetadata, existingSegment); segmentsMetadata = schema.getSegmentMetadataSnapshot(); // get the only segment with datasource "foo2" @@ -399,7 +394,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon Assert.assertNotNull(segmentToRemove); schema.removeSegment(segmentToRemove); - // The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource + // The following line can cause NPE without segmentMetadata null check in + // SegmentMetadataCache#refreshSegmentsForDataSource schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size()); } @@ -421,7 +417,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon Assert.assertNotNull(segmentToRemove); schema.removeSegment(segmentToRemove); - // The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource + // The following line can cause NPE without segmentMetadata null check in + // SegmentMetadataCache#refreshSegmentsForDataSource schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size()); } @@ -485,15 +482,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { String datasource = "newSegmentAddTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -528,15 +524,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { String datasource = "newSegmentAddTest"; CountDownLatch addSegmentLatch = new CountDownLatch(2); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -575,15 +570,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { String datasource = "newSegmentAddTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -619,15 +613,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { String datasource = "newSegmentAddTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -660,15 +653,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon String datasource = "segmentRemoveTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); CountDownLatch removeSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -709,7 +701,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segment.getId())); Assert.assertFalse(schema.getMutableSegments().contains(segment.getId())); Assert.assertFalse(schema.getDataSourcesNeedingRebuild().contains(datasource)); - Assert.assertFalse(schema.getTableNames().contains(datasource)); + Assert.assertFalse(schema.getDatasourceNames().contains(datasource)); } @Test @@ -718,15 +710,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon String datasource = "segmentRemoveTest"; CountDownLatch addSegmentLatch = new CountDownLatch(2); CountDownLatch removeSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -771,7 +762,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId())); Assert.assertFalse(schema.getMutableSegments().contains(segments.get(0).getId())); Assert.assertTrue(schema.getDataSourcesNeedingRebuild().contains(datasource)); - Assert.assertTrue(schema.getTableNames().contains(datasource)); + Assert.assertTrue(schema.getDatasourceNames().contains(datasource)); } @Test @@ -779,15 +770,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { String datasource = "serverSegmentRemoveTest"; CountDownLatch removeServerSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -814,15 +804,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon String datasource = "serverSegmentRemoveTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); CountDownLatch removeServerSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -862,15 +851,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon String datasource = "serverSegmentRemoveTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); CountDownLatch removeServerSegmentLatch = new CountDownLatch(1); - DruidSchema schema = new DruidSchema( + SegmentMetadataCache schema = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ) { @Override @@ -917,10 +905,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon @Test public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws InterruptedException { - DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); + DatasourceTable.PhysicalDatasourceMetadata fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); - Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); - Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.isJoinable()); Assert.assertFalse(fooTable.isBroadcast()); @@ -949,10 +937,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); - fooTable = (DruidTable) schema.getTableMap().get("foo"); + fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); - Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); - Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof GlobalTableDataSource); Assert.assertTrue(fooTable.isJoinable()); Assert.assertTrue(fooTable.isBroadcast()); @@ -970,10 +958,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); - fooTable = (DruidTable) schema.getTableMap().get("foo"); + fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); - Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); - Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.isJoinable()); Assert.assertFalse(fooTable.isBroadcast()); } @@ -981,10 +969,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon @Test public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throws InterruptedException { - DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); + DatasourceTable.PhysicalDatasourceMetadata fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); - Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); - Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.isJoinable()); Assert.assertFalse(fooTable.isBroadcast()); @@ -1013,12 +1001,12 @@ public class DruidSchemaTest extends DruidSchemaTestCommon // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); - fooTable = (DruidTable) schema.getTableMap().get("foo"); + fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); - Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); // should not be a GlobalTableDataSource for now, because isGlobal is couple with joinability. idealy this will be // changed in the future and we should expect - Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource); Assert.assertTrue(fooTable.isBroadcast()); Assert.assertFalse(fooTable.isJoinable()); @@ -1035,10 +1023,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); - fooTable = (DruidTable) schema.getTableMap().get("foo"); + fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); - Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); - Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.isBroadcast()); Assert.assertFalse(fooTable.isJoinable()); } @@ -1082,7 +1070,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class); // Need to create schema for this test because the available schemas don't mock the QueryLifecycleFactory, which I need for this test. - DruidSchema mySchema = new DruidSchema( + SegmentMetadataCache mySchema = new SegmentMetadataCache( factoryMock, serverView, segmentManager, @@ -1092,8 +1080,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon ), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - brokerInternalQueryConfig, - null + brokerInternalQueryConfig ); EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); @@ -1129,7 +1116,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon new ColumnAnalysis(ColumnType.DOUBLE, ColumnType.DOUBLE.asTypeString(), false, true, 1234, 26, null, null, null) ); - RowSignature signature = DruidSchema.analysisToRowSignature( + RowSignature signature = SegmentMetadataCache.analysisToRowSignature( new SegmentAnalysis( "id", ImmutableList.of(Intervals.utc(1L, 2L)), @@ -1157,7 +1144,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon @Test public void testSegmentMetadataFallbackType() { - RowSignature signature = DruidSchema.analysisToRowSignature( + RowSignature signature = SegmentMetadataCache.analysisToRowSignature( new SegmentAnalysis( "id", ImmutableList.of(Intervals.utc(1L, 2L)), @@ -1209,9 +1196,9 @@ public class DruidSchemaTest extends DruidSchemaTestCommon Set segments = new HashSet<>(); Set datasources = new HashSet<>(); datasources.add("wat"); - Assert.assertNull(schema.getTable("wat")); + Assert.assertNull(schema.getDatasource("wat")); schema.refresh(segments, datasources); - Assert.assertNull(schema.getTable("wat")); + Assert.assertNull(schema.getDatasource("wat")); } private static DataSegment newSegment(String datasource, int partitionId) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 4b76fbf8e21..a9ce9d987fb 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -245,18 +245,18 @@ public class SystemSchemaTest extends CalciteTestBase .add(segment2, index2) .add(segment3, index3); - druidSchema = new DruidSchema( + SegmentMetadataCache cache = new SegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments(), realtimeSegments), new SegmentManager(EasyMock.createMock(SegmentLoader.class)), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig(), - null + new BrokerInternalQueryConfig() ); - druidSchema.start(); - druidSchema.awaitInitialization(); + cache.start(); + cache.awaitInitialization(); + druidSchema = new DruidSchema(cache, null); metadataView = EasyMock.createMock(MetadataSegmentView.class); druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); serverInventoryView = EasyMock.createMock(FilteredServerInventoryView.class); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 29de594350a..b4bb127ffe8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -130,6 +130,7 @@ import org.apache.druid.sql.calcite.schema.NamedSchema; import org.apache.druid.sql.calcite.schema.NamedSystemSchema; import org.apache.druid.sql.calcite.schema.NamedViewSchema; import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager; +import org.apache.druid.sql.calcite.schema.SegmentMetadataCache; import org.apache.druid.sql.calcite.schema.SystemSchema; import org.apache.druid.sql.calcite.schema.ViewSchema; import org.apache.druid.sql.calcite.view.DruidViewMacroFactory; @@ -1228,34 +1229,33 @@ public class CalciteTests final DruidSchemaManager druidSchemaManager ) { - final DruidSchema schema = new DruidSchema( - CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + final SegmentMetadataCache cache = new SegmentMetadataCache( + createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments()), new SegmentManager(EasyMock.createMock(SegmentLoader.class)) { @Override public Set getDataSourceNames() { - return ImmutableSet.of(BROADCAST_DATASOURCE); + return ImmutableSet.of(CalciteTests.BROADCAST_DATASOURCE); } }, createDefaultJoinableFactory(), plannerConfig, - TEST_AUTHENTICATOR_ESCALATOR, - new BrokerInternalQueryConfig(), - druidSchemaManager + CalciteTests.TEST_AUTHENTICATOR_ESCALATOR, + new BrokerInternalQueryConfig() ); try { - schema.start(); - schema.awaitInitialization(); + cache.start(); + cache.awaitInitialization(); } catch (InterruptedException e) { throw new RuntimeException(e); } - schema.stop(); - return schema; + cache.stop(); + return new DruidSchema(cache, druidSchemaManager); } /**