Refactor DruidSchema & DruidTable (#12835)

Refactors the DruidSchema and DruidTable abstractions to prepare for the Druid Catalog.

As we add the catalog, we’ll want to combine physical segment metadata information with “hints” provided by the catalog. This is best done if we tidy up the existing code to more clearly separate responsibilities.

This PR is purely a refactoring move: no functionality changed. There is no difference to user functionality or external APIs. Functionality changes will come later as we add the catalog itself.

DruidSchema
In the present code, DruidSchema does three tasks:

Holds the segment metadata cache
Interfaces with an external schema manager
Acts as a schema to Calcite
This PR splits those responsibilities.

DruidSchema holds the Calcite schema for the druid namespace, combining information fro the segment metadata cache, from the external schema manager and (later) from the catalog.
SegmentMetadataCache holds the segment metadata cache formerly in DruidSchema.
DruidTable
The present DruidTable class is a bit of a kitchen sink: it holds all the various kinds of tables which Druid supports, and uses if-statements to handle behavior that differs between types. Yet, any given DruidTable will handle only one such table type. To more clearly model the actual table types, we split DruidTable into several classes:

DruidTable becomes an abstract base class to hold Druid-specific methods.
DatasourceTable represents a datasource.
ExternalTable represents an external table, such as from EXTERN or (later) from the catalog.
InlineTable represents the internal case in which we attach data directly to a table.
LookupTable represents Druid’s lookup table mechanism.
The new subclasses are more focused: they can be selective about the data they hold and the various predicates since they represent just one table type. This will be important as the catalog information will differ depending on table type and the new structure makes adding that logic cleaner.

DatasourceMetadata
Previously, the DruidSchema segment cache would work with DruidTable objects. With the catalog, we need a layer between the segment metadata and the table as presented to Calcite. To fix this, the new SegmentMetadataCache class uses a new DatasourceMetadata class as its cache entry to hold only the “physical” segment metadata information: it is up to the DruidTable to combine this with the catalog information in a later PR.

More Efficient Table Resolution
Calcite provides a convenient base class for schema objects: AbstractSchema. However, this class is a bit too convenient: all we have to do is provide a map of tables and Calcite does the rest. This means that, to resolve any single datasource, say, foo, we need to cache segment metadata, external schema information, and catalog information for all tables. Just so Calcite can do a map lookup.

There is nothing special about AbstractSchema. We can handle table lookups ourselves. The new AbstractTableSchema does this. In fact, all the rest of Calcite wants is to resolve individual tables by name, and, for commands we don’t use, to provide a list of table names.

DruidSchema now extends AbstractTableSchema. SegmentMetadataCache resolves individual tables (and provides table names.)

DruidSchemaManager
DruidSchemaManager provides a way to specify table schemas externally. In this sense, it is similar to the catalog, but only for datasources. It originally followed the AbstractSchema pattern: it implements provide a map of tables. This PR provides new optional methods for the table lookup and table names operations. The default implementations work the same way that AbstractSchema works: we get the entire map and pick out the information we need. Extensions that use this API should be revised to support the individual operations instead. Druid code no longer calls the original getTables() method.

The PR has one breaking change: since the DruidSchemaManager map is read-only to the rest of Druid, we should return a Map, not a ConcurrentMap.
This commit is contained in:
Paul Rogers 2022-08-09 21:54:04 -07:00 committed by GitHub
parent ee41cc770f
commit 8ad8582dc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1774 additions and 1210 deletions

View File

@ -37,7 +37,7 @@ import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.Escalator; import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.schema.DruidSchema; import org.apache.druid.sql.calcite.schema.SegmentMetadataCache;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.LinearShardSpec;
@ -67,12 +67,11 @@ import java.util.concurrent.TimeUnit;
@Measurement(iterations = 10) @Measurement(iterations = 10)
public class DruidSchemaInternRowSignatureBenchmark public class DruidSchemaInternRowSignatureBenchmark
{ {
private SegmentMetadataCacheForBenchmark cache;
private DruidSchemaForBenchmark druidSchema; private static class SegmentMetadataCacheForBenchmark extends SegmentMetadataCache
private static class DruidSchemaForBenchmark extends DruidSchema
{ {
public DruidSchemaForBenchmark( public SegmentMetadataCacheForBenchmark(
final QueryLifecycleFactory queryLifecycleFactory, final QueryLifecycleFactory queryLifecycleFactory,
final TimelineServerView serverView, final TimelineServerView serverView,
final SegmentManager segmentManager, final SegmentManager segmentManager,
@ -89,8 +88,7 @@ public class DruidSchemaInternRowSignatureBenchmark
joinableFactory, joinableFactory,
config, config,
escalator, escalator,
brokerInternalQueryConfig, brokerInternalQueryConfig
null
); );
} }
@ -101,7 +99,6 @@ public class DruidSchemaInternRowSignatureBenchmark
return super.refreshSegments(segments); return super.refreshSegments(segments);
} }
@Override @Override
public void addSegment(final DruidServerMetadata server, final DataSegment segment) public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{ {
@ -173,8 +170,7 @@ public class DruidSchemaInternRowSignatureBenchmark
@Setup @Setup
public void setup() public void setup()
{ {
cache = new SegmentMetadataCacheForBenchmark(
druidSchema = new DruidSchemaForBenchmark(
EasyMock.mock(QueryLifecycleFactory.class), EasyMock.mock(QueryLifecycleFactory.class),
EasyMock.mock(TimelineServerView.class), EasyMock.mock(TimelineServerView.class),
null, null,
@ -202,7 +198,7 @@ public class DruidSchemaInternRowSignatureBenchmark
for (int i = 0; i < 10000; ++i) { for (int i = 0; i < 10000; ++i) {
DataSegment dataSegment = builder.interval(Intervals.of(i + "/" + (i + 1))) DataSegment dataSegment = builder.interval(Intervals.of(i + "/" + (i + 1)))
.build(); .build();
druidSchema.addSegment(serverMetadata, dataSegment); cache.addSegment(serverMetadata, dataSegment);
} }
} }
@ -211,6 +207,6 @@ public class DruidSchemaInternRowSignatureBenchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS) @OutputTimeUnit(TimeUnit.MICROSECONDS)
public void addSegments(MyState state, Blackhole blackhole) throws IOException public void addSegments(MyState state, Blackhole blackhole) throws IOException
{ {
blackhole.consume(druidSchema.refreshSegments(state.segmentIds)); blackhole.consume(cache.refreshSegments(state.segmentIds));
} }
} }

View File

@ -26,7 +26,6 @@ import org.junit.Test;
public class HttpInputSourceConfigTest public class HttpInputSourceConfigTest
{ {
@Test @Test
public void testEquals() public void testEquals()
{ {

View File

@ -66,8 +66,8 @@ import java.util.Set;
*/ */
@Command( @Command(
name = "router", name = "router",
description = "Experimental! Understands tiers and routes things to different brokers, " description = "Understands tiers and routes requests to Druid nodes. "
+ "see https://druid.apache.org/docs/latest/development/router.html for a description" + "See https://druid.apache.org/docs/latest/design/router.html"
) )
public class CliRouter extends ServerRunnable public class CliRouter extends ServerRunnable
{ {

View File

@ -36,6 +36,7 @@ import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.table.ExternalTable;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -73,12 +74,10 @@ public class ExternalTableMacro implements TableMacro
+ "Please change the column name to something other than __time"); + "Please change the column name to something other than __time");
} }
return new DruidTable( return new ExternalTable(
new ExternalDataSource(inputSource, inputFormat, signature), new ExternalDataSource(inputSource, inputFormat, signature),
signature, signature,
jsonMapper, jsonMapper
false,
false
); );
} }
catch (JsonProcessingException e) { catch (JsonProcessingException e) {

View File

@ -27,22 +27,24 @@ import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataType;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.table.ExternalTable;
/** /**
* Represents a scan of an external table. Generated by {@link DruidTable} when its datasource is an * Represents a scan of an external table. Generated by {@link ExternalTable},
* {@link ExternalDataSource}. * which represents an {@link ExternalDataSource}.
* *
* This class is exercised in CalciteInsertDmlTest but is not currently exposed to end users. * This class is exercised in CalciteInsertDmlTest but is not currently
* exposed to end users.
*/ */
public class ExternalTableScan extends AbstractRelNode public class ExternalTableScan extends AbstractRelNode
{ {
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final DruidTable druidTable; private final ExternalTable druidTable;
public ExternalTableScan( public ExternalTableScan(
final RelOptCluster cluster, final RelOptCluster cluster,
final ObjectMapper jsonMapper, final ObjectMapper jsonMapper,
final DruidTable druidTable final ExternalTable druidTable
) )
{ {
super(cluster, cluster.traitSetOf(Convention.NONE)); super(cluster, cluster.traitSetOf(Convention.NONE));

View File

@ -37,6 +37,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.DruidTable;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Set; import java.util.Set;
/** /**

View File

@ -32,9 +32,11 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.UnsupportedSQLQueryException; import org.apache.druid.sql.calcite.planner.UnsupportedSQLQueryException;
import org.apache.druid.sql.calcite.rel.DruidQueryRel; import org.apache.druid.sql.calcite.rel.DruidQueryRel;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.table.InlineTable;
import org.apache.druid.sql.calcite.table.RowSignatures; import org.apache.druid.sql.calcite.table.RowSignatures;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -76,12 +78,9 @@ public class DruidLogicalValuesRule extends RelOptRule
values.getRowType().getFieldNames(), values.getRowType().getFieldNames(),
values.getRowType() values.getRowType()
); );
final DruidTable druidTable = new DruidTable( final DruidTable druidTable = new InlineTable(
InlineDataSource.fromIterable(objectTuples, rowSignature), InlineDataSource.fromIterable(objectTuples, rowSignature),
rowSignature, rowSignature
null,
true,
false
); );
call.transformTo( call.transformTo(
DruidQueryRel.scanValues(values, druidTable, plannerContext) DruidQueryRel.scanValues(values, druidTable, plannerContext)

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.schema;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
/**
* Calcite provides two "schema" abstractions. {@link Schema} is an interface,
* while {@link org.apache.calcite.schema.impl.AbstractSchema} is a base class
* that "locks down" the {@link #getTable} method to obtain the table from a
* map. This forces the extensions of that class to materialize all tables in
* the schema, so that Calcite can pick the one it wants. This class, by
* contrast, provides the same defaults as {@link
* org.apache.calcite.schema.impl.AbstractSchema AbstractSchema}, but assumes
* its subslasses will implement {@code getTable()} to directly look up that
* one table, ignoring all others. Doing so lowers the cost of table resolution,
* especially when the system has to fetch catalog information for the table:
* we only fetch the information we need, not information for all tables.
*/
public abstract class AbstractTableSchema implements Schema
{
protected static final Set<String> EMPTY_NAMES = ImmutableSet.of();
@Override
public RelProtoDataType getType(String name)
{
return null;
}
@Override
public Set<String> getTypeNames()
{
return EMPTY_NAMES;
}
@Override
public Collection<Function> getFunctions(String name)
{
return Collections.emptyList();
}
@Override
public Set<String> getFunctionNames()
{
return EMPTY_NAMES;
}
@Override
public Schema getSubSchema(String name)
{
return null;
}
@Override
public Set<String> getSubSchemaNames()
{
return EMPTY_NAMES;
}
@Override
public Expression getExpression(SchemaPlus parentSchema, String name)
{
return null;
}
@Override
public boolean isMutable()
{
return false;
}
@Override
public Schema snapshot(SchemaVersion version)
{
return this;
}
}

View File

@ -49,8 +49,9 @@ public class DruidCalciteSchemaModule implements Module
.toProvider(RootSchemaProvider.class) .toProvider(RootSchemaProvider.class)
.in(Scopes.SINGLETON); .in(Scopes.SINGLETON);
// DruidSchema needs to listen to changes for incoming segments // SegmentMetadataCache needs to listen to changes for incoming segments
LifecycleModule.register(binder, DruidSchema.class); LifecycleModule.register(binder, SegmentMetadataCache.class);
binder.bind(DruidSchema.class).in(Scopes.SINGLETON);
binder.bind(SystemSchema.class).in(Scopes.SINGLETON); binder.bind(SystemSchema.class).in(Scopes.SINGLETON);
binder.bind(InformationSchema.class).in(Scopes.SINGLETON); binder.bind(InformationSchema.class).in(Scopes.SINGLETON);
binder.bind(LookupSchema.class).in(Scopes.SINGLETON); binder.bind(LookupSchema.class).in(Scopes.SINGLETON);

View File

@ -19,952 +19,54 @@
package org.apache.druid.sql.calcite.schema; package org.apache.druid.sql.calcite.schema;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.calcite.schema.Table; import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.ServerView; import javax.inject.Inject;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ManageLifecycle public class DruidSchema extends AbstractTableSchema
public class DruidSchema extends AbstractSchema
{ {
// Newest segments first, so they override older ones. private final SegmentMetadataCache segmentCache;
private static final Comparator<SegmentId> SEGMENT_ORDER = Comparator
.comparing((SegmentId segmentId) -> segmentId.getInterval().getStart())
.reversed()
.thenComparing(Function.identity());
private static final EmittingLogger log = new EmittingLogger(DruidSchema.class);
private static final int MAX_SEGMENTS_PER_QUERY = 15000;
private static final long DEFAULT_NUM_ROWS = 0;
private final QueryLifecycleFactory queryLifecycleFactory;
private final PlannerConfig config;
// Escalator, so we can attach an authentication result to queries we generate.
private final Escalator escalator;
private final SegmentManager segmentManager;
private final JoinableFactory joinableFactory;
private final ExecutorService cacheExec;
private final ExecutorService callbackExec;
private final DruidSchemaManager druidSchemaManager; private final DruidSchemaManager druidSchemaManager;
/**
* Map of DataSource -> DruidTable.
* This map can be accessed by {@link #cacheExec} and {@link #callbackExec} threads.
*/
private final ConcurrentMap<String, DruidTable> tables = new ConcurrentHashMap<>();
private static final Interner<RowSignature> ROW_SIGNATURE_INTERNER = Interners.newWeakInterner();
/**
* DataSource -> Segment -> AvailableSegmentMetadata(contains RowSignature) for that segment.
* Use SortedMap for segments so they are merged in deterministic order, from older to newer.
*
* This map is updated by these two threads.
*
* - {@link #callbackExec} can update it in {@link #addSegment}, {@link #removeServerSegment},
* and {@link #removeSegment}.
* - {@link #cacheExec} can update it in {@link #refreshSegmentsForDataSource}.
*
* While it is being updated, this map is read by these two types of thread.
*
* - {@link #cacheExec} can iterate all {@link AvailableSegmentMetadata}s per datasource.
* See {@link #buildDruidTable}.
* - Query threads can create a snapshot of the entire map for processing queries on the system table.
* See {@link #getSegmentMetadataSnapshot()}.
*
* As the access pattern of this map is read-intensive, we should minimize the contention between writers and readers.
* Since there are two threads that can update this map at the same time, those writers should lock the inner map
* first and then lock the entry before it updates segment metadata. This can be done using
* {@link ConcurrentMap#compute} as below. Note that, if you need to update the variables guarded by {@link #lock}
* inside of compute(), you should get the lock before calling compute() to keep the function executed in compute()
* not expensive.
*
* <pre>
* segmentMedataInfo.compute(
* datasourceParam,
* (datasource, segmentsMap) -> {
* if (segmentsMap == null) return null;
* else {
* segmentsMap.compute(
* segmentIdParam,
* (segmentId, segmentMetadata) -> {
* // update segmentMetadata
* }
* );
* return segmentsMap;
* }
* }
* );
* </pre>
*
* Readers can simply delegate the locking to the concurrent map and iterate map entries.
*/
private final ConcurrentHashMap<String, ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata>> segmentMetadataInfo
= new ConcurrentHashMap<>();
// For awaitInitialization.
private final CountDownLatch initialized = new CountDownLatch(1);
/**
* This lock coordinates the access from multiple threads to those variables guarded by this lock.
* Currently, there are 2 threads that can access these variables.
*
* - {@link #callbackExec} executes the timeline callbacks whenever BrokerServerView changes.
* - {@link #cacheExec} periodically refreshes segment metadata and {@link DruidTable} if necessary
* based on the information collected via timeline callbacks.
*/
private final Object lock = new Object();
// All mutable segments.
@GuardedBy("lock")
private final TreeSet<SegmentId> mutableSegments = new TreeSet<>(SEGMENT_ORDER);
// All dataSources that need tables regenerated.
@GuardedBy("lock")
private final Set<String> dataSourcesNeedingRebuild = new HashSet<>();
// All segments that need to be refreshed.
@GuardedBy("lock")
private final TreeSet<SegmentId> segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER);
// Configured context to attach to internally generated queries.
private final BrokerInternalQueryConfig brokerInternalQueryConfig;
@GuardedBy("lock")
private boolean refreshImmediately = false;
@GuardedBy("lock")
private boolean isServerViewInitialized = false;
/**
* Counts the total number of known segments. This variable is used only for the segments table in the system schema
* to initialize a map with a more proper size when it creates a snapshot. As a result, it doesn't have to be exact,
* and thus there is no concurrency control for this variable.
*/
private int totalSegments = 0;
@Inject @Inject
public DruidSchema( public DruidSchema(
final QueryLifecycleFactory queryLifecycleFactory, SegmentMetadataCache segmentCache,
final TimelineServerView serverView, final DruidSchemaManager druidSchemaManager)
final SegmentManager segmentManager,
final JoinableFactory joinableFactory,
final PlannerConfig config,
final Escalator escalator,
final BrokerInternalQueryConfig brokerInternalQueryConfig,
final DruidSchemaManager druidSchemaManager
)
{ {
this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory"); this.segmentCache = segmentCache;
Preconditions.checkNotNull(serverView, "serverView"); if (druidSchemaManager != null && !(druidSchemaManager instanceof NoopDruidSchemaManager)) {
this.segmentManager = segmentManager; this.druidSchemaManager = druidSchemaManager;
this.joinableFactory = joinableFactory; } else {
this.config = Preconditions.checkNotNull(config, "config"); this.druidSchemaManager = null;
this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d");
this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d");
this.escalator = escalator;
this.brokerInternalQueryConfig = brokerInternalQueryConfig;
this.druidSchemaManager = druidSchemaManager;
initServerViewTimelineCallback(serverView);
}
private void initServerViewTimelineCallback(final TimelineServerView serverView)
{
serverView.registerTimelineCallback(
callbackExec,
new TimelineServerView.TimelineCallback()
{
@Override
public ServerView.CallbackAction timelineInitialized()
{
synchronized (lock) {
isServerViewInitialized = true;
lock.notifyAll();
}
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment)
{
addSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(final DataSegment segment)
{
removeSegment(segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction serverSegmentRemoved(
final DruidServerMetadata server,
final DataSegment segment
)
{
removeServerSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
private void startCacheExec()
{
cacheExec.submit(
() -> {
long lastRefresh = 0L;
long lastFailure = 0L;
try {
while (!Thread.currentThread().isInterrupted()) {
final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
final Set<String> dataSourcesToRebuild = new TreeSet<>();
try {
synchronized (lock) {
final long nextRefreshNoFuzz = DateTimes
.utc(lastRefresh)
.plus(config.getMetadataRefreshPeriod())
.getMillis();
// Fuzz a bit to spread load out when we have multiple brokers.
final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10);
while (true) {
// Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure).
final boolean wasRecentFailure = DateTimes.utc(lastFailure)
.plus(config.getMetadataRefreshPeriod())
.isAfterNow();
if (isServerViewInitialized &&
!wasRecentFailure &&
(!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
(refreshImmediately || nextRefresh < System.currentTimeMillis())) {
// We need to do a refresh. Break out of the waiting loop.
break;
}
// lastFailure != 0L means exceptions happened before and there're some refresh work was not completed.
// so that even ServerView is initialized, we can't let broker complete initialization.
if (isServerViewInitialized && lastFailure == 0L) {
// Server view is initialized, but we don't need to do a refresh. Could happen if there are
// no segments in the system yet. Just mark us as initialized, then.
initialized.countDown();
}
// Wait some more, we'll wake up when it might be time to do another refresh.
lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
}
segmentsToRefresh.addAll(segmentsNeedingRefresh);
segmentsNeedingRefresh.clear();
// Mutable segments need a refresh every period, since new columns could be added dynamically.
segmentsNeedingRefresh.addAll(mutableSegments);
lastFailure = 0L;
lastRefresh = System.currentTimeMillis();
refreshImmediately = false;
}
refresh(segmentsToRefresh, dataSourcesToRebuild);
initialized.countDown();
}
catch (InterruptedException e) {
// Fall through.
throw e;
}
catch (Exception e) {
log.warn(e, "Metadata refresh failed, trying again soon.");
synchronized (lock) {
// Add our segments and dataSources back to their refresh and rebuild lists.
segmentsNeedingRefresh.addAll(segmentsToRefresh);
dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
lastFailure = System.currentTimeMillis();
}
}
}
}
catch (InterruptedException e) {
// Just exit.
}
catch (Throwable e) {
// Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like
// OOMEs. Anyway, let's just emit an alert and stop refreshing metadata.
log.makeAlert(e, "Metadata refresh failed permanently").emit();
throw e;
}
finally {
log.info("Metadata refresh stopped.");
}
}
);
}
@LifecycleStart
public void start() throws InterruptedException
{
startCacheExec();
if (config.isAwaitInitializationOnStart()) {
final long startNanos = System.nanoTime();
log.debug("%s waiting for initialization.", getClass().getSimpleName());
awaitInitialization();
log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), (System.nanoTime() - startNanos) / 1000000);
} }
} }
@VisibleForTesting protected SegmentMetadataCache cache()
void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> dataSourcesToRebuild) throws IOException
{ {
// Refresh the segments. return segmentCache;
final Set<SegmentId> refreshed = refreshSegments(segmentsToRefresh);
synchronized (lock) {
// Add missing segments back to the refresh list.
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
// Compute the list of dataSources to rebuild tables for.
dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
dataSourcesNeedingRebuild.clear();
}
// Rebuild the dataSources.
for (String dataSource : dataSourcesToRebuild) {
final DruidTable druidTable = buildDruidTable(dataSource);
if (druidTable == null) {
log.info("dataSource[%s] no longer exists, all metadata removed.", dataSource);
tables.remove(dataSource);
continue;
}
final DruidTable oldTable = tables.put(dataSource, druidTable);
final String description = druidTable.getDataSource().isGlobal() ? "global dataSource" : "dataSource";
if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
log.info("%s [%s] has new signature: %s.", description, dataSource, druidTable.getRowSignature());
} else {
log.debug("%s [%s] signature is unchanged.", description, dataSource);
}
}
}
@LifecycleStop
public void stop()
{
cacheExec.shutdownNow();
callbackExec.shutdownNow();
}
public void awaitInitialization() throws InterruptedException
{
initialized.await();
} }
@Override @Override
protected Map<String, Table> getTableMap() public Table getTable(String name)
{ {
if (druidSchemaManager != null && !(druidSchemaManager instanceof NoopDruidSchemaManager)) { if (druidSchemaManager != null) {
return ImmutableMap.copyOf(druidSchemaManager.getTables()); return druidSchemaManager.getTable(name);
} else { } else {
return ImmutableMap.copyOf(tables); DatasourceTable.PhysicalDatasourceMetadata dsMetadata = segmentCache.getDatasource(name);
return dsMetadata == null ? null : new DatasourceTable(dsMetadata);
} }
} }
@VisibleForTesting @Override
protected void addSegment(final DruidServerMetadata server, final DataSegment segment) public Set<String> getTableNames()
{ {
// Get lock first so that we won't wait in ConcurrentMap.compute(). if (druidSchemaManager != null) {
synchronized (lock) { return druidSchemaManager.getTableNames();
// someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking
// broker served segments in the timeline, to ensure that removeSegment the event is triggered accurately
if (server.getType().equals(ServerType.BROKER)) {
// a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the
// historical, however mark the datasource for refresh because it needs to be globalized
markDataSourceAsNeedRebuild(segment.getDataSource());
} else {
segmentMetadataInfo.compute(
segment.getDataSource(),
(datasource, segmentsMap) -> {
if (segmentsMap == null) {
segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER);
}
segmentsMap.compute(
segment.getId(),
(segmentId, segmentMetadata) -> {
if (segmentMetadata == null) {
// Unknown segment.
totalSegments++;
// segmentReplicatable is used to determine if segments are served by historical or realtime servers
long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1;
segmentMetadata = AvailableSegmentMetadata
.builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a refresh
.build();
markSegmentAsNeedRefresh(segment.getId());
if (!server.isSegmentReplicationTarget()) {
log.debug("Added new mutable segment[%s].", segment.getId());
markSegmentAsMutable(segment.getId());
} else {
log.debug("Added new immutable segment[%s].", segment.getId());
}
} else {
// We know this segment.
final Set<DruidServerMetadata> segmentServers = segmentMetadata.getReplicas();
final ImmutableSet<DruidServerMetadata> servers = new ImmutableSet.Builder<DruidServerMetadata>()
.addAll(segmentServers)
.add(server)
.build();
segmentMetadata = AvailableSegmentMetadata
.from(segmentMetadata)
.withReplicas(servers)
.withRealtime(recomputeIsRealtime(servers))
.build();
if (server.isSegmentReplicationTarget()) {
// If a segment shows up on a replicatable (historical) server at any point, then it must be immutable,
// even if it's also available on non-replicatable (realtime) servers.
unmarkSegmentAsMutable(segment.getId());
log.debug("Segment[%s] has become immutable.", segment.getId());
}
}
assert segmentMetadata != null;
return segmentMetadata;
}
);
return segmentsMap;
}
);
}
if (!tables.containsKey(segment.getDataSource())) {
refreshImmediately = true;
}
lock.notifyAll();
}
}
@VisibleForTesting
void removeSegment(final DataSegment segment)
{
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
log.debug("Segment[%s] is gone.", segment.getId());
segmentsNeedingRefresh.remove(segment.getId());
unmarkSegmentAsMutable(segment.getId());
segmentMetadataInfo.compute(
segment.getDataSource(),
(dataSource, segmentsMap) -> {
if (segmentsMap == null) {
log.warn("Unknown segment[%s] was removed from the cluster. Ignoring this event.", segment.getId());
return null;
} else {
if (segmentsMap.remove(segment.getId()) == null) {
log.warn("Unknown segment[%s] was removed from the cluster. Ignoring this event.", segment.getId());
} else {
totalSegments--;
}
if (segmentsMap.isEmpty()) {
tables.remove(segment.getDataSource());
log.info("dataSource[%s] no longer exists, all metadata removed.", segment.getDataSource());
return null;
} else {
markDataSourceAsNeedRebuild(segment.getDataSource());
return segmentsMap;
}
}
}
);
lock.notifyAll();
}
}
@VisibleForTesting
void removeServerSegment(final DruidServerMetadata server, final DataSegment segment)
{
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName());
segmentMetadataInfo.compute(
segment.getDataSource(),
(datasource, knownSegments) -> {
if (knownSegments == null) {
log.warn(
"Unknown segment[%s] is removed from server[%s]. Ignoring this event",
segment.getId(),
server.getHost()
);
return null;
}
if (server.getType().equals(ServerType.BROKER)) {
// for brokers, if the segment drops from all historicals before the broker this could be null.
if (!knownSegments.isEmpty()) {
// a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the
// historical, however mark the datasource for refresh because it might no longer be broadcast or something
markDataSourceAsNeedRebuild(segment.getDataSource());
}
} else {
knownSegments.compute(
segment.getId(),
(segmentId, segmentMetadata) -> {
if (segmentMetadata == null) {
log.warn(
"Unknown segment[%s] is removed from server[%s]. Ignoring this event",
segment.getId(),
server.getHost()
);
return null;
} else {
final Set<DruidServerMetadata> segmentServers = segmentMetadata.getReplicas();
final ImmutableSet<DruidServerMetadata> servers = FluentIterable
.from(segmentServers)
.filter(Predicates.not(Predicates.equalTo(server)))
.toSet();
return AvailableSegmentMetadata
.from(segmentMetadata)
.withReplicas(servers)
.withRealtime(recomputeIsRealtime(servers))
.build();
}
}
);
}
if (knownSegments.isEmpty()) {
return null;
} else {
return knownSegments;
}
}
);
lock.notifyAll();
}
}
private void markSegmentAsNeedRefresh(SegmentId segmentId)
{
synchronized (lock) {
segmentsNeedingRefresh.add(segmentId);
}
}
private void markSegmentAsMutable(SegmentId segmentId)
{
synchronized (lock) {
mutableSegments.add(segmentId);
}
}
private void unmarkSegmentAsMutable(SegmentId segmentId)
{
synchronized (lock) {
mutableSegments.remove(segmentId);
}
}
@VisibleForTesting
void markDataSourceAsNeedRebuild(String datasource)
{
synchronized (lock) {
dataSourcesNeedingRebuild.add(datasource);
}
}
/**
* Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed,
* which may be a subset of the asked-for set.
*/
@VisibleForTesting
protected Set<SegmentId> refreshSegments(final Set<SegmentId> segments) throws IOException
{
final Set<SegmentId> retVal = new HashSet<>();
// Organize segments by dataSource.
final Map<String, TreeSet<SegmentId>> segmentMap = new TreeMap<>();
for (SegmentId segmentId : segments) {
segmentMap.computeIfAbsent(segmentId.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER))
.add(segmentId);
}
for (Map.Entry<String, TreeSet<SegmentId>> entry : segmentMap.entrySet()) {
final String dataSource = entry.getKey();
retVal.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue()));
}
return retVal;
}
private long recomputeIsRealtime(ImmutableSet<DruidServerMetadata> servers)
{
if (servers.isEmpty()) {
return 0;
}
final Optional<DruidServerMetadata> historicalServer = servers
.stream()
// Ideally, this filter should have checked whether it's a broadcast segment loaded in brokers.
// However, we don't current track of the broadcast segments loaded in brokers, so this filter is still valid.
// See addSegment(), removeServerSegment(), and removeSegment()
.filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL))
.findAny();
// if there is any historical server in the replicas, isRealtime flag should be unset
return historicalServer.isPresent() ? 0 : 1;
}
/**
* Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of
* segments actually refreshed, which may be a subset of the asked-for set.
*/
private Set<SegmentId> refreshSegmentsForDataSource(final String dataSource, final Set<SegmentId> segments)
throws IOException
{
if (!segments.stream().allMatch(segmentId -> segmentId.getDataSource().equals(dataSource))) {
// Sanity check. We definitely expect this to pass.
throw new ISE("'segments' must all match 'dataSource'!");
}
log.debug("Refreshing metadata for dataSource[%s].", dataSource);
final long startTime = System.currentTimeMillis();
// Segment id string -> SegmentId object.
final Map<String, SegmentId> segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString);
final Set<SegmentId> retVal = new HashSet<>();
final Sequence<SegmentAnalysis> sequence = runSegmentMetadataQuery(
Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY)
);
Yielder<SegmentAnalysis> yielder = Yielders.each(sequence);
try {
while (!yielder.isDone()) {
final SegmentAnalysis analysis = yielder.get();
final SegmentId segmentId = segmentIdMap.get(analysis.getId());
if (segmentId == null) {
log.warn("Got analysis for segment[%s] we didn't ask for, ignoring.", analysis.getId());
} else {
final RowSignature rowSignature = analysisToRowSignature(analysis);
log.debug("Segment[%s] has signature[%s].", segmentId, rowSignature);
segmentMetadataInfo.compute(
dataSource,
(datasourceKey, dataSourceSegments) -> {
if (dataSourceSegments == null) {
// Datasource may have been removed or become unavailable while this refresh was ongoing.
log.warn(
"No segment map found with datasource[%s], skipping refresh of segment[%s]",
datasourceKey,
segmentId
);
return null;
} else {
dataSourceSegments.compute(
segmentId,
(segmentIdKey, segmentMetadata) -> {
if (segmentMetadata == null) {
log.warn("No segment[%s] found, skipping refresh", segmentId);
return null;
} else {
final AvailableSegmentMetadata updatedSegmentMetadata = AvailableSegmentMetadata
.from(segmentMetadata)
.withRowSignature(rowSignature)
.withNumRows(analysis.getNumRows())
.build();
retVal.add(segmentId);
return updatedSegmentMetadata;
}
}
);
if (dataSourceSegments.isEmpty()) {
return null;
} else {
return dataSourceSegments;
}
}
}
);
}
yielder = yielder.next(null);
}
}
finally {
yielder.close();
}
log.debug(
"Refreshed metadata for dataSource[%s] in %,d ms (%d segments queried, %d segments left).",
dataSource,
System.currentTimeMillis() - startTime,
retVal.size(),
segments.size() - retVal.size()
);
return retVal;
}
@VisibleForTesting
@Nullable
DruidTable buildDruidTable(final String dataSource)
{
ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> segmentsMap = segmentMetadataInfo.get(dataSource);
// Preserve order.
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
if (segmentsMap != null && !segmentsMap.isEmpty()) {
for (AvailableSegmentMetadata availableSegmentMetadata : segmentsMap.values()) {
final RowSignature rowSignature = availableSegmentMetadata.getRowSignature();
if (rowSignature != null) {
for (String column : rowSignature.getColumnNames()) {
// Newer column types should override older ones.
final ColumnType columnType =
rowSignature.getColumnType(column)
.orElseThrow(() -> new ISE("Encountered null type for column[%s]", column));
columnTypes.putIfAbsent(column, columnType);
}
}
}
} else { } else {
// table has no segments return segmentCache.getDatasourceNames();
return null;
}
final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add);
final TableDataSource tableDataSource;
// to be a GlobalTableDataSource instead of a TableDataSource, it must appear on all servers (inferred by existing
// in the segment cache, which in this case belongs to the broker meaning only broadcast segments live here)
// to be joinable, it must be possibly joinable according to the factory. we only consider broadcast datasources
// at this time, and isGlobal is currently strongly coupled with joinable, so only make a global table datasource
// if also joinable
final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource);
final boolean isJoinable = joinableFactory.isDirectlyJoinable(maybeGlobal);
final boolean isBroadcast = segmentManager.getDataSourceNames().contains(dataSource);
if (isBroadcast && isJoinable) {
tableDataSource = maybeGlobal;
} else {
tableDataSource = new TableDataSource(dataSource);
}
return new DruidTable(tableDataSource, builder.build(), null, isJoinable, isBroadcast);
}
@VisibleForTesting
Map<SegmentId, AvailableSegmentMetadata> getSegmentMetadataSnapshot()
{
final Map<SegmentId, AvailableSegmentMetadata> segmentMetadata = Maps.newHashMapWithExpectedSize(totalSegments);
for (ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> val : segmentMetadataInfo.values()) {
segmentMetadata.putAll(val);
}
return segmentMetadata;
}
/**
* Returns total number of segments. This method doesn't use the lock intentionally to avoid expensive contention.
* As a result, the returned value might be inexact.
*/
int getTotalSegments()
{
return totalSegments;
}
@VisibleForTesting
TreeSet<SegmentId> getSegmentsNeedingRefresh()
{
synchronized (lock) {
return segmentsNeedingRefresh;
}
}
@VisibleForTesting
TreeSet<SegmentId> getMutableSegments()
{
synchronized (lock) {
return mutableSegments;
}
}
@VisibleForTesting
Set<String> getDataSourcesNeedingRebuild()
{
synchronized (lock) {
return dataSourcesNeedingRebuild;
}
}
/**
* Execute a SegmentMetadata query and return a {@link Sequence} of {@link SegmentAnalysis}.
*
* @param segments Iterable of {@link SegmentId} objects that are subject of the SegmentMetadata query.
* @return {@link Sequence} of {@link SegmentAnalysis} objects
*/
@VisibleForTesting
protected Sequence<SegmentAnalysis> runSegmentMetadataQuery(
final Iterable<SegmentId> segments
)
{
// Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource.
final String dataSource = Iterables.getOnlyElement(
StreamSupport.stream(segments.spliterator(), false)
.map(SegmentId::getDataSource).collect(Collectors.toSet())
);
final MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec(
StreamSupport.stream(segments.spliterator(), false)
.map(SegmentId::toDescriptor).collect(Collectors.toList())
);
final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
new TableDataSource(dataSource),
querySegmentSpec,
new AllColumnIncluderator(),
false,
brokerInternalQueryConfig.getContext(),
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
false
);
return queryLifecycleFactory
.factorize()
.runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK);
}
@VisibleForTesting
static RowSignature analysisToRowSignature(final SegmentAnalysis analysis)
{
final RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
for (Map.Entry<String, ColumnAnalysis> entry : analysis.getColumns().entrySet()) {
if (entry.getValue().isError()) {
// Skip columns with analysis errors.
continue;
}
ColumnType valueType = entry.getValue().getTypeSignature();
// this shouldn't happen, but if it does, first try to fall back to legacy type information field in case
// standard upgrade order was not followed for 0.22 to 0.23+, and if that also fails, then assume types are some
// flavor of COMPLEX.
if (valueType == null) {
// at some point in the future this can be simplified to the contents of the catch clause here, once the
// likelyhood of upgrading from some version lower than 0.23 is low
try {
valueType = ColumnType.fromString(entry.getValue().getType());
}
catch (IllegalArgumentException ignored) {
valueType = ColumnType.UNKNOWN_COMPLEX;
}
}
rowSignatureBuilder.add(entry.getKey(), valueType);
}
return ROW_SIGNATURE_INTERNER.intern(rowSignatureBuilder.build());
}
/**
* This method is not thread-safe and must be used only in unit tests.
*/
@VisibleForTesting
void setAvailableSegmentMetadata(final SegmentId segmentId, final AvailableSegmentMetadata availableSegmentMetadata)
{
final ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> dataSourceSegments = segmentMetadataInfo
.computeIfAbsent(
segmentId.getDataSource(),
k -> new ConcurrentSkipListMap<>(SEGMENT_ORDER)
);
if (dataSourceSegments.put(segmentId, availableSegmentMetadata) == null) {
totalSegments++;
}
}
/**
* This is a helper method for unit tests to emulate heavy work done with {@link #lock}.
* It must be used only in unit tests.
*/
@VisibleForTesting
void doInLock(Runnable runnable)
{
synchronized (lock) {
runnable.run();
} }
} }
} }

View File

@ -23,16 +23,41 @@ import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.guice.annotations.UnstableApi; import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.DruidTable;
import java.util.concurrent.ConcurrentMap; import java.util.Map;
import java.util.Set;
/** /**
* This interface provides a map of datasource names to {@link DruidTable} objects, used by the {@link DruidSchema} * This interface provides a map of datasource names to {@link DruidTable}
* class as the SQL planner's view of Druid datasource schemas. If a non-default implementation is provided, * objects, used by the {@link DruidSchema} class as the SQL planner's
* the segment metadata polling-based view of the Druid tables will not be built in DruidSchema. * view of Druid datasource schemas. If a non-default implementation is
* provided, the segment metadata polling-based view of the Druid tables
* will not be built in DruidSchema.
*/ */
@ExtensionPoint @ExtensionPoint
@UnstableApi @UnstableApi
public interface DruidSchemaManager public interface DruidSchemaManager
{ {
ConcurrentMap<String, DruidTable> getTables(); /**
* Return all tables known to this schema manager. Deprecated because getting
* all tables is never actually needed in the current code. Calcite asks for
* the information for a single table (via {@link #getTable(String)}, or
* the list of all table <i>names</i> (via {@link #getTableNames()}. This
* method was originally used to allow Calcite's {@link
* org.apache.calcite.schema.impl.AbstractSchema AbstractSchema} class to do
* the lookup. The current code implements the operations directly. For
* backward compatibility, the default methods emulate the old behavior.
* Newer implementations should omit this method and implement the other two.
*/
@Deprecated
Map<String, DruidTable> getTables();
default DruidTable getTable(String name)
{
return getTables().get(name);
}
default Set<String> getTableNames()
{
return getTables().keySet();
}
} }

View File

@ -59,6 +59,7 @@ import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.table.RowSignatures; import org.apache.druid.sql.calcite.table.RowSignatures;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;

View File

@ -28,7 +28,7 @@ import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.lookup.LookupColumnSelectorFactory; import org.apache.druid.segment.join.lookup.LookupColumnSelectorFactory;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.LookupTable;
import java.util.Map; import java.util.Map;
@ -57,11 +57,12 @@ public class LookupSchema extends AbstractSchema
final ImmutableMap.Builder<String, Table> tableMapBuilder = ImmutableMap.builder(); final ImmutableMap.Builder<String, Table> tableMapBuilder = ImmutableMap.builder();
for (final String lookupName : lookupProvider.getAllLookupNames()) { for (final String lookupName : lookupProvider.getAllLookupNames()) {
// all lookups should be also joinable through lookup joinable factory, and lookups are effectively broadcast
// (if we ignore lookup tiers...)
tableMapBuilder.put( tableMapBuilder.put(
lookupName, lookupName,
new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE, null, true, true) new LookupTable(
new LookupDataSource(lookupName),
ROW_SIGNATURE
)
); );
} }

View File

@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.schema;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.DruidTable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -31,7 +32,7 @@ public class NoopDruidSchemaManager implements DruidSchemaManager
private static ConcurrentMap<String, DruidTable> MAP = new ConcurrentHashMap<>(); private static ConcurrentMap<String, DruidTable> MAP = new ConcurrentHashMap<>();
@Override @Override
public ConcurrentMap<String, DruidTable> getTables() public Map<String, DruidTable> getTables()
{ {
return MAP; return MAP;
} }

View File

@ -0,0 +1,971 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.schema;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* Broker-side cache of segment metadata which combines segments to identify
* datasources which become "tables" in Calcite. This cache provides the "physical"
* metadata about a datasource which is blended with catalog "logical" metadata
* to provide the final user-view of each datasource.
*/
@ManageLifecycle
public class SegmentMetadataCache
{
// Newest segments first, so they override older ones.
private static final Comparator<SegmentId> SEGMENT_ORDER = Comparator
.comparing((SegmentId segmentId) -> segmentId.getInterval().getStart())
.reversed()
.thenComparing(Function.identity());
private static final EmittingLogger log = new EmittingLogger(SegmentMetadataCache.class);
private static final int MAX_SEGMENTS_PER_QUERY = 15000;
private static final long DEFAULT_NUM_ROWS = 0;
private final QueryLifecycleFactory queryLifecycleFactory;
private final PlannerConfig config;
// Escalator, so we can attach an authentication result to queries we generate.
private final Escalator escalator;
private final SegmentManager segmentManager;
private final JoinableFactory joinableFactory;
private final ExecutorService cacheExec;
private final ExecutorService callbackExec;
/**
* Map of DataSource -> DruidTable.
* This map can be accessed by {@link #cacheExec} and {@link #callbackExec} threads.
*/
private final ConcurrentMap<String, DatasourceTable.PhysicalDatasourceMetadata> tables = new ConcurrentHashMap<>();
private static final Interner<RowSignature> ROW_SIGNATURE_INTERNER = Interners.newWeakInterner();
/**
* DataSource -> Segment -> AvailableSegmentMetadata(contains RowSignature) for that segment.
* Use SortedMap for segments so they are merged in deterministic order, from older to newer.
*
* This map is updated by these two threads.
*
* - {@link #callbackExec} can update it in {@link #addSegment}, {@link #removeServerSegment},
* and {@link #removeSegment}.
* - {@link #cacheExec} can update it in {@link #refreshSegmentsForDataSource}.
*
* While it is being updated, this map is read by these two types of thread.
*
* - {@link #cacheExec} can iterate all {@link AvailableSegmentMetadata}s per datasource.
* See {@link #buildDruidTable}.
* - Query threads can create a snapshot of the entire map for processing queries on the system table.
* See {@link #getSegmentMetadataSnapshot()}.
*
* As the access pattern of this map is read-intensive, we should minimize the contention between writers and readers.
* Since there are two threads that can update this map at the same time, those writers should lock the inner map
* first and then lock the entry before it updates segment metadata. This can be done using
* {@link ConcurrentMap#compute} as below. Note that, if you need to update the variables guarded by {@link #lock}
* inside of compute(), you should get the lock before calling compute() to keep the function executed in compute()
* not expensive.
*
* <pre>
* segmentMedataInfo.compute(
* datasourceParam,
* (datasource, segmentsMap) -> {
* if (segmentsMap == null) return null;
* else {
* segmentsMap.compute(
* segmentIdParam,
* (segmentId, segmentMetadata) -> {
* // update segmentMetadata
* }
* );
* return segmentsMap;
* }
* }
* );
* </pre>
*
* Readers can simply delegate the locking to the concurrent map and iterate map entries.
*/
private final ConcurrentHashMap<String, ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata>> segmentMetadataInfo
= new ConcurrentHashMap<>();
// For awaitInitialization.
private final CountDownLatch initialized = new CountDownLatch(1);
/**
* This lock coordinates the access from multiple threads to those variables guarded by this lock.
* Currently, there are 2 threads that can access these variables.
*
* - {@link #callbackExec} executes the timeline callbacks whenever BrokerServerView changes.
* - {@link #cacheExec} periodically refreshes segment metadata and {@link DatasourceTable} if necessary
* based on the information collected via timeline callbacks.
*/
private final Object lock = new Object();
// All mutable segments.
@GuardedBy("lock")
private final TreeSet<SegmentId> mutableSegments = new TreeSet<>(SEGMENT_ORDER);
// All dataSources that need tables regenerated.
@GuardedBy("lock")
private final Set<String> dataSourcesNeedingRebuild = new HashSet<>();
// All segments that need to be refreshed.
@GuardedBy("lock")
private final TreeSet<SegmentId> segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER);
// Configured context to attach to internally generated queries.
private final BrokerInternalQueryConfig brokerInternalQueryConfig;
@GuardedBy("lock")
private boolean refreshImmediately = false;
@GuardedBy("lock")
private boolean isServerViewInitialized = false;
/**
* Counts the total number of known segments. This variable is used only for the segments table in the system schema
* to initialize a map with a more proper size when it creates a snapshot. As a result, it doesn't have to be exact,
* and thus there is no concurrency control for this variable.
*/
private int totalSegments = 0;
@Inject
public SegmentMetadataCache(
final QueryLifecycleFactory queryLifecycleFactory,
final TimelineServerView serverView,
final SegmentManager segmentManager,
final JoinableFactory joinableFactory,
final PlannerConfig config,
final Escalator escalator,
final BrokerInternalQueryConfig brokerInternalQueryConfig
)
{
this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory");
Preconditions.checkNotNull(serverView, "serverView");
this.segmentManager = segmentManager;
this.joinableFactory = joinableFactory;
this.config = Preconditions.checkNotNull(config, "config");
this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d");
this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d");
this.escalator = escalator;
this.brokerInternalQueryConfig = brokerInternalQueryConfig;
initServerViewTimelineCallback(serverView);
}
private void initServerViewTimelineCallback(final TimelineServerView serverView)
{
serverView.registerTimelineCallback(
callbackExec,
new TimelineServerView.TimelineCallback()
{
@Override
public ServerView.CallbackAction timelineInitialized()
{
synchronized (lock) {
isServerViewInitialized = true;
lock.notifyAll();
}
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment)
{
addSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(final DataSegment segment)
{
removeSegment(segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction serverSegmentRemoved(
final DruidServerMetadata server,
final DataSegment segment
)
{
removeServerSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
private void startCacheExec()
{
cacheExec.submit(
() -> {
long lastRefresh = 0L;
long lastFailure = 0L;
try {
while (!Thread.currentThread().isInterrupted()) {
final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
final Set<String> dataSourcesToRebuild = new TreeSet<>();
try {
synchronized (lock) {
final long nextRefreshNoFuzz = DateTimes
.utc(lastRefresh)
.plus(config.getMetadataRefreshPeriod())
.getMillis();
// Fuzz a bit to spread load out when we have multiple brokers.
final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10);
while (true) {
// Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure).
final boolean wasRecentFailure = DateTimes.utc(lastFailure)
.plus(config.getMetadataRefreshPeriod())
.isAfterNow();
if (isServerViewInitialized &&
!wasRecentFailure &&
(!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
(refreshImmediately || nextRefresh < System.currentTimeMillis())) {
// We need to do a refresh. Break out of the waiting loop.
break;
}
// lastFailure != 0L means exceptions happened before and there're some refresh work was not completed.
// so that even ServerView is initialized, we can't let broker complete initialization.
if (isServerViewInitialized && lastFailure == 0L) {
// Server view is initialized, but we don't need to do a refresh. Could happen if there are
// no segments in the system yet. Just mark us as initialized, then.
initialized.countDown();
}
// Wait some more, we'll wake up when it might be time to do another refresh.
lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
}
segmentsToRefresh.addAll(segmentsNeedingRefresh);
segmentsNeedingRefresh.clear();
// Mutable segments need a refresh every period, since new columns could be added dynamically.
segmentsNeedingRefresh.addAll(mutableSegments);
lastFailure = 0L;
lastRefresh = System.currentTimeMillis();
refreshImmediately = false;
}
refresh(segmentsToRefresh, dataSourcesToRebuild);
initialized.countDown();
}
catch (InterruptedException e) {
// Fall through.
throw e;
}
catch (Exception e) {
log.warn(e, "Metadata refresh failed, trying again soon.");
synchronized (lock) {
// Add our segments and dataSources back to their refresh and rebuild lists.
segmentsNeedingRefresh.addAll(segmentsToRefresh);
dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
lastFailure = System.currentTimeMillis();
}
}
}
}
catch (InterruptedException e) {
// Just exit.
}
catch (Throwable e) {
// Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like
// OOMEs. Anyway, let's just emit an alert and stop refreshing metadata.
log.makeAlert(e, "Metadata refresh failed permanently").emit();
throw e;
}
finally {
log.info("Metadata refresh stopped.");
}
}
);
}
@LifecycleStart
public void start() throws InterruptedException
{
startCacheExec();
if (config.isAwaitInitializationOnStart()) {
final long startNanos = System.nanoTime();
log.debug("%s waiting for initialization.", getClass().getSimpleName());
awaitInitialization();
log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), (System.nanoTime() - startNanos) / 1000000);
}
}
@VisibleForTesting
void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> dataSourcesToRebuild) throws IOException
{
// Refresh the segments.
final Set<SegmentId> refreshed = refreshSegments(segmentsToRefresh);
synchronized (lock) {
// Add missing segments back to the refresh list.
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
// Compute the list of dataSources to rebuild tables for.
dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
dataSourcesNeedingRebuild.clear();
}
// Rebuild the dataSources.
for (String dataSource : dataSourcesToRebuild) {
final DatasourceTable.PhysicalDatasourceMetadata druidTable = buildDruidTable(dataSource);
if (druidTable == null) {
log.info("dataSource [%s] no longer exists, all metadata removed.", dataSource);
tables.remove(dataSource);
continue;
}
final DatasourceTable.PhysicalDatasourceMetadata oldTable = tables.put(dataSource, druidTable);
final String description = druidTable.dataSource().isGlobal() ? "global dataSource" : "dataSource";
if (oldTable == null || !oldTable.rowSignature().equals(druidTable.rowSignature())) {
log.info("%s [%s] has new signature: %s.", description, dataSource, druidTable.rowSignature());
} else {
log.debug("%s [%s] signature is unchanged.", description, dataSource);
}
}
}
@LifecycleStop
public void stop()
{
cacheExec.shutdownNow();
callbackExec.shutdownNow();
}
public void awaitInitialization() throws InterruptedException
{
initialized.await();
}
protected DatasourceTable.PhysicalDatasourceMetadata getDatasource(String name)
{
return tables.get(name);
}
protected Set<String> getDatasourceNames()
{
return tables.keySet();
}
@VisibleForTesting
protected void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
// someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking
// broker served segments in the timeline, to ensure that removeSegment the event is triggered accurately
if (server.getType().equals(ServerType.BROKER)) {
// a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the
// historical, however mark the datasource for refresh because it needs to be globalized
markDataSourceAsNeedRebuild(segment.getDataSource());
} else {
segmentMetadataInfo.compute(
segment.getDataSource(),
(datasource, segmentsMap) -> {
if (segmentsMap == null) {
segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER);
}
segmentsMap.compute(
segment.getId(),
(segmentId, segmentMetadata) -> {
if (segmentMetadata == null) {
// Unknown segment.
totalSegments++;
// segmentReplicatable is used to determine if segments are served by historical or realtime servers
long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1;
segmentMetadata = AvailableSegmentMetadata
.builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a refresh
.build();
markSegmentAsNeedRefresh(segment.getId());
if (!server.isSegmentReplicationTarget()) {
log.debug("Added new mutable segment [%s].", segment.getId());
markSegmentAsMutable(segment.getId());
} else {
log.debug("Added new immutable segment [%s].", segment.getId());
}
} else {
// We know this segment.
final Set<DruidServerMetadata> segmentServers = segmentMetadata.getReplicas();
final ImmutableSet<DruidServerMetadata> servers = new ImmutableSet.Builder<DruidServerMetadata>()
.addAll(segmentServers)
.add(server)
.build();
segmentMetadata = AvailableSegmentMetadata
.from(segmentMetadata)
.withReplicas(servers)
.withRealtime(recomputeIsRealtime(servers))
.build();
if (server.isSegmentReplicationTarget()) {
// If a segment shows up on a replicatable (historical) server at any point, then it must be immutable,
// even if it's also available on non-replicatable (realtime) servers.
unmarkSegmentAsMutable(segment.getId());
log.debug("Segment[%s] has become immutable.", segment.getId());
}
}
assert segmentMetadata != null;
return segmentMetadata;
}
);
return segmentsMap;
}
);
}
if (!tables.containsKey(segment.getDataSource())) {
refreshImmediately = true;
}
lock.notifyAll();
}
}
@VisibleForTesting
void removeSegment(final DataSegment segment)
{
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
log.debug("Segment [%s] is gone.", segment.getId());
segmentsNeedingRefresh.remove(segment.getId());
unmarkSegmentAsMutable(segment.getId());
segmentMetadataInfo.compute(
segment.getDataSource(),
(dataSource, segmentsMap) -> {
if (segmentsMap == null) {
log.warn("Unknown segment [%s] was removed from the cluster. Ignoring this event.", segment.getId());
return null;
} else {
if (segmentsMap.remove(segment.getId()) == null) {
log.warn("Unknown segment [%s] was removed from the cluster. Ignoring this event.", segment.getId());
} else {
totalSegments--;
}
if (segmentsMap.isEmpty()) {
tables.remove(segment.getDataSource());
log.info("dataSource [%s] no longer exists, all metadata removed.", segment.getDataSource());
return null;
} else {
markDataSourceAsNeedRebuild(segment.getDataSource());
return segmentsMap;
}
}
}
);
lock.notifyAll();
}
}
@VisibleForTesting
void removeServerSegment(final DruidServerMetadata server, final DataSegment segment)
{
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
log.debug("Segment [%s] is gone from server [%s]", segment.getId(), server.getName());
segmentMetadataInfo.compute(
segment.getDataSource(),
(datasource, knownSegments) -> {
if (knownSegments == null) {
log.warn(
"Unknown segment [%s] is removed from server [%s]. Ignoring this event",
segment.getId(),
server.getHost()
);
return null;
}
if (server.getType().equals(ServerType.BROKER)) {
// for brokers, if the segment drops from all historicals before the broker this could be null.
if (!knownSegments.isEmpty()) {
// a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the
// historical, however mark the datasource for refresh because it might no longer be broadcast or something
markDataSourceAsNeedRebuild(segment.getDataSource());
}
} else {
knownSegments.compute(
segment.getId(),
(segmentId, segmentMetadata) -> {
if (segmentMetadata == null) {
log.warn(
"Unknown segment [%s] is removed from server [%s]. Ignoring this event",
segment.getId(),
server.getHost()
);
return null;
} else {
final Set<DruidServerMetadata> segmentServers = segmentMetadata.getReplicas();
final ImmutableSet<DruidServerMetadata> servers = FluentIterable
.from(segmentServers)
.filter(Predicates.not(Predicates.equalTo(server)))
.toSet();
return AvailableSegmentMetadata
.from(segmentMetadata)
.withReplicas(servers)
.withRealtime(recomputeIsRealtime(servers))
.build();
}
}
);
}
if (knownSegments.isEmpty()) {
return null;
} else {
return knownSegments;
}
}
);
lock.notifyAll();
}
}
private void markSegmentAsNeedRefresh(SegmentId segmentId)
{
synchronized (lock) {
segmentsNeedingRefresh.add(segmentId);
}
}
private void markSegmentAsMutable(SegmentId segmentId)
{
synchronized (lock) {
mutableSegments.add(segmentId);
}
}
private void unmarkSegmentAsMutable(SegmentId segmentId)
{
synchronized (lock) {
mutableSegments.remove(segmentId);
}
}
@VisibleForTesting
void markDataSourceAsNeedRebuild(String datasource)
{
synchronized (lock) {
dataSourcesNeedingRebuild.add(datasource);
}
}
/**
* Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed,
* which may be a subset of the asked-for set.
*/
@VisibleForTesting
protected Set<SegmentId> refreshSegments(final Set<SegmentId> segments) throws IOException
{
final Set<SegmentId> retVal = new HashSet<>();
// Organize segments by dataSource.
final Map<String, TreeSet<SegmentId>> segmentMap = new TreeMap<>();
for (SegmentId segmentId : segments) {
segmentMap.computeIfAbsent(segmentId.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER))
.add(segmentId);
}
for (Map.Entry<String, TreeSet<SegmentId>> entry : segmentMap.entrySet()) {
final String dataSource = entry.getKey();
retVal.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue()));
}
return retVal;
}
private long recomputeIsRealtime(ImmutableSet<DruidServerMetadata> servers)
{
if (servers.isEmpty()) {
return 0;
}
final Optional<DruidServerMetadata> historicalServer = servers
.stream()
// Ideally, this filter should have checked whether it's a broadcast segment loaded in brokers.
// However, we don't current track of the broadcast segments loaded in brokers, so this filter is still valid.
// See addSegment(), removeServerSegment(), and removeSegment()
.filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL))
.findAny();
// if there is any historical server in the replicas, isRealtime flag should be unset
return historicalServer.isPresent() ? 0 : 1;
}
/**
* Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of
* segments actually refreshed, which may be a subset of the asked-for set.
*/
private Set<SegmentId> refreshSegmentsForDataSource(final String dataSource, final Set<SegmentId> segments)
throws IOException
{
if (!segments.stream().allMatch(segmentId -> segmentId.getDataSource().equals(dataSource))) {
// Sanity check. We definitely expect this to pass.
throw new ISE("'segments' must all match 'dataSource'!");
}
log.debug("Refreshing metadata for dataSource[%s].", dataSource);
final long startTime = System.currentTimeMillis();
// Segment id string -> SegmentId object.
final Map<String, SegmentId> segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString);
final Set<SegmentId> retVal = new HashSet<>();
final Sequence<SegmentAnalysis> sequence = runSegmentMetadataQuery(
Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY)
);
Yielder<SegmentAnalysis> yielder = Yielders.each(sequence);
try {
while (!yielder.isDone()) {
final SegmentAnalysis analysis = yielder.get();
final SegmentId segmentId = segmentIdMap.get(analysis.getId());
if (segmentId == null) {
log.warn("Got analysis for segment [%s] we didn't ask for, ignoring.", analysis.getId());
} else {
final RowSignature rowSignature = analysisToRowSignature(analysis);
log.debug("Segment[%s] has signature[%s].", segmentId, rowSignature);
segmentMetadataInfo.compute(
dataSource,
(datasourceKey, dataSourceSegments) -> {
if (dataSourceSegments == null) {
// Datasource may have been removed or become unavailable while this refresh was ongoing.
log.warn(
"No segment map found with datasource [%s], skipping refresh of segment [%s]",
datasourceKey,
segmentId
);
return null;
} else {
dataSourceSegments.compute(
segmentId,
(segmentIdKey, segmentMetadata) -> {
if (segmentMetadata == null) {
log.warn("No segment [%s] found, skipping refresh", segmentId);
return null;
} else {
final AvailableSegmentMetadata updatedSegmentMetadata = AvailableSegmentMetadata
.from(segmentMetadata)
.withRowSignature(rowSignature)
.withNumRows(analysis.getNumRows())
.build();
retVal.add(segmentId);
return updatedSegmentMetadata;
}
}
);
if (dataSourceSegments.isEmpty()) {
return null;
} else {
return dataSourceSegments;
}
}
}
);
}
yielder = yielder.next(null);
}
}
finally {
yielder.close();
}
log.debug(
"Refreshed metadata for dataSource [%s] in %,d ms (%d segments queried, %d segments left).",
dataSource,
System.currentTimeMillis() - startTime,
retVal.size(),
segments.size() - retVal.size()
);
return retVal;
}
@VisibleForTesting
@Nullable
DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(final String dataSource)
{
ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> segmentsMap = segmentMetadataInfo.get(dataSource);
// Preserve order.
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
if (segmentsMap != null && !segmentsMap.isEmpty()) {
for (AvailableSegmentMetadata availableSegmentMetadata : segmentsMap.values()) {
final RowSignature rowSignature = availableSegmentMetadata.getRowSignature();
if (rowSignature != null) {
for (String column : rowSignature.getColumnNames()) {
// Newer column types should override older ones.
final ColumnType columnType =
rowSignature.getColumnType(column)
.orElseThrow(() -> new ISE("Encountered null type for column [%s]", column));
columnTypes.putIfAbsent(column, columnType);
}
}
}
} else {
// table has no segments
return null;
}
final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add);
final TableDataSource tableDataSource;
// to be a GlobalTableDataSource instead of a TableDataSource, it must appear on all servers (inferred by existing
// in the segment cache, which in this case belongs to the broker meaning only broadcast segments live here)
// to be joinable, it must be possibly joinable according to the factory. we only consider broadcast datasources
// at this time, and isGlobal is currently strongly coupled with joinable, so only make a global table datasource
// if also joinable
final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource);
final boolean isJoinable = joinableFactory.isDirectlyJoinable(maybeGlobal);
final boolean isBroadcast = segmentManager.getDataSourceNames().contains(dataSource);
if (isBroadcast && isJoinable) {
tableDataSource = maybeGlobal;
} else {
tableDataSource = new TableDataSource(dataSource);
}
return new DatasourceTable.PhysicalDatasourceMetadata(tableDataSource, builder.build(), isJoinable, isBroadcast);
}
@VisibleForTesting
Map<SegmentId, AvailableSegmentMetadata> getSegmentMetadataSnapshot()
{
final Map<SegmentId, AvailableSegmentMetadata> segmentMetadata = Maps.newHashMapWithExpectedSize(totalSegments);
for (ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> val : segmentMetadataInfo.values()) {
segmentMetadata.putAll(val);
}
return segmentMetadata;
}
/**
* Returns total number of segments. This method doesn't use the lock intentionally to avoid expensive contention.
* As a result, the returned value might be inexact.
*/
int getTotalSegments()
{
return totalSegments;
}
@VisibleForTesting
TreeSet<SegmentId> getSegmentsNeedingRefresh()
{
synchronized (lock) {
return segmentsNeedingRefresh;
}
}
@VisibleForTesting
TreeSet<SegmentId> getMutableSegments()
{
synchronized (lock) {
return mutableSegments;
}
}
@VisibleForTesting
Set<String> getDataSourcesNeedingRebuild()
{
synchronized (lock) {
return dataSourcesNeedingRebuild;
}
}
/**
* Execute a SegmentMetadata query and return a {@link Sequence} of {@link SegmentAnalysis}.
*
* @param segments Iterable of {@link SegmentId} objects that are subject of the SegmentMetadata query.
* @return {@link Sequence} of {@link SegmentAnalysis} objects
*/
@VisibleForTesting
protected Sequence<SegmentAnalysis> runSegmentMetadataQuery(
final Iterable<SegmentId> segments
)
{
// Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource.
final String dataSource = Iterables.getOnlyElement(
StreamSupport.stream(segments.spliterator(), false)
.map(SegmentId::getDataSource).collect(Collectors.toSet())
);
final MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec(
StreamSupport.stream(segments.spliterator(), false)
.map(SegmentId::toDescriptor).collect(Collectors.toList())
);
final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
new TableDataSource(dataSource),
querySegmentSpec,
new AllColumnIncluderator(),
false,
brokerInternalQueryConfig.getContext(),
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
false
);
return queryLifecycleFactory
.factorize()
.runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK);
}
@VisibleForTesting
static RowSignature analysisToRowSignature(final SegmentAnalysis analysis)
{
final RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
for (Map.Entry<String, ColumnAnalysis> entry : analysis.getColumns().entrySet()) {
if (entry.getValue().isError()) {
// Skip columns with analysis errors.
continue;
}
ColumnType valueType = entry.getValue().getTypeSignature();
// this shouldn't happen, but if it does, first try to fall back to legacy type information field in case
// standard upgrade order was not followed for 0.22 to 0.23+, and if that also fails, then assume types are some
// flavor of COMPLEX.
if (valueType == null) {
// at some point in the future this can be simplified to the contents of the catch clause here, once the
// likelyhood of upgrading from some version lower than 0.23 is low
try {
valueType = ColumnType.fromString(entry.getValue().getType());
}
catch (IllegalArgumentException ignored) {
valueType = ColumnType.UNKNOWN_COMPLEX;
}
}
rowSignatureBuilder.add(entry.getKey(), valueType);
}
return ROW_SIGNATURE_INTERNER.intern(rowSignatureBuilder.build());
}
/**
* This method is not thread-safe and must be used only in unit tests.
*/
@VisibleForTesting
void setAvailableSegmentMetadata(final SegmentId segmentId, final AvailableSegmentMetadata availableSegmentMetadata)
{
final ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> dataSourceSegments = segmentMetadataInfo
.computeIfAbsent(
segmentId.getDataSource(),
k -> new ConcurrentSkipListMap<>(SEGMENT_ORDER)
);
if (dataSourceSegments.put(segmentId, availableSegmentMetadata) == null) {
totalSegments++;
}
}
/**
* This is a helper method for unit tests to emulate heavy work done with {@link #lock}.
* It must be used only in unit tests.
*/
@VisibleForTesting
void doInLock(Runnable runnable)
{
synchronized (lock) {
runnable.run();
}
}
}

View File

@ -273,13 +273,13 @@ public class SystemSchema extends AbstractSchema
{ {
//get available segments from druidSchema //get available segments from druidSchema
final Map<SegmentId, AvailableSegmentMetadata> availableSegmentMetadata = final Map<SegmentId, AvailableSegmentMetadata> availableSegmentMetadata =
druidSchema.getSegmentMetadataSnapshot(); druidSchema.cache().getSegmentMetadataSnapshot();
final Iterator<Entry<SegmentId, AvailableSegmentMetadata>> availableSegmentEntries = final Iterator<Entry<SegmentId, AvailableSegmentMetadata>> availableSegmentEntries =
availableSegmentMetadata.entrySet().iterator(); availableSegmentMetadata.entrySet().iterator();
// in memory map to store segment data from available segments // in memory map to store segment data from available segments
final Map<SegmentId, PartialSegmentData> partialSegmentDataMap = final Map<SegmentId, PartialSegmentData> partialSegmentDataMap =
Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments()); Maps.newHashMapWithExpectedSize(druidSchema.cache().getTotalSegments());
for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) { for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) {
PartialSegmentData partialSegmentData = PartialSegmentData partialSegmentData =
new PartialSegmentData(IS_AVAILABLE_TRUE, h.isRealtime(), h.getNumReplicas(), h.getNumRows()); new PartialSegmentData(IS_AVAILABLE_TRUE, h.isRealtime(), h.getNumReplicas(), h.getNumRows());
@ -290,7 +290,7 @@ public class SystemSchema extends AbstractSchema
// Coordinator. // Coordinator.
final Iterator<SegmentWithOvershadowedStatus> metadataStoreSegments = metadataView.getPublishedSegments(); final Iterator<SegmentWithOvershadowedStatus> metadataStoreSegments = metadataView.getPublishedSegments();
final Set<SegmentId> segmentsAlreadySeen = Sets.newHashSetWithExpectedSize(druidSchema.getTotalSegments()); final Set<SegmentId> segmentsAlreadySeen = Sets.newHashSetWithExpectedSize(druidSchema.cache().getTotalSegments());
final FluentIterable<Object[]> publishedSegments = FluentIterable final FluentIterable<Object[]> publishedSegments = FluentIterable
.from(() -> getAuthorizedPublishedSegments(metadataStoreSegments, root)) .from(() -> getAuthorizedPublishedSegments(metadataStoreSegments, root))

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.table;
import com.google.common.base.Preconditions;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.column.RowSignature;
import java.util.Objects;
/**
* Represents a SQL table that models a Druid datasource.
* <p>
* Once the catalog code is merged, this class will combine physical information
* from the segment cache with logical information from the catalog to produce
* the SQL-user's view of the table. The resulting merged view is used to plan
* queries and is the source of table information in the {@code INFORMATION_SCHEMA}.
*/
public class DatasourceTable extends DruidTable
{
/**
* The physical metadata for a datasource, derived from the list of segments
* published in the Coordinator. Used only for datasources, since only
* datasources are computed from segments.
*/
public static class PhysicalDatasourceMetadata
{
private final TableDataSource dataSource;
private final RowSignature rowSignature;
private final boolean joinable;
private final boolean broadcast;
public PhysicalDatasourceMetadata(
final TableDataSource dataSource,
final RowSignature rowSignature,
final boolean isJoinable,
final boolean isBroadcast
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature");
this.joinable = isJoinable;
this.broadcast = isBroadcast;
}
public TableDataSource dataSource()
{
return dataSource;
}
public RowSignature rowSignature()
{
return rowSignature;
}
public boolean isJoinable()
{
return joinable;
}
public boolean isBroadcast()
{
return broadcast;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PhysicalDatasourceMetadata that = (PhysicalDatasourceMetadata) o;
if (!Objects.equals(dataSource, that.dataSource)) {
return false;
}
return Objects.equals(rowSignature, that.rowSignature);
}
@Override
public int hashCode()
{
int result = dataSource != null ? dataSource.hashCode() : 0;
result = 31 * result + (rowSignature != null ? rowSignature.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "DatasourceMetadata{" +
"dataSource=" + dataSource +
", rowSignature=" + rowSignature +
'}';
}
}
private final PhysicalDatasourceMetadata physicalMetadata;
public DatasourceTable(
final PhysicalDatasourceMetadata physicalMetadata
)
{
super(physicalMetadata.rowSignature());
this.physicalMetadata = physicalMetadata;
}
@Override
public DataSource getDataSource()
{
return physicalMetadata.dataSource();
}
@Override
public boolean isJoinable()
{
return physicalMetadata.isJoinable();
}
@Override
public boolean isBroadcast()
{
return physicalMetadata.isBroadcast();
}
@Override
public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable table)
{
return LogicalTableScan.create(context.getCluster(), table);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DatasourceTable that = (DatasourceTable) o;
if (!Objects.equals(physicalMetadata, that.physicalMetadata)) {
return false;
}
return Objects.equals(getRowSignature(), that.getRowSignature());
}
@Override
public int hashCode()
{
return physicalMetadata.hashCode();
}
@Override
public String toString()
{
// Don't include the row signature: it is the same as in
// physicalMetadata.
return "DruidTable{" +
physicalMetadata +
'}';
}
}

View File

@ -19,12 +19,8 @@
package org.apache.druid.sql.calcite.table; package org.apache.druid.sql.calcite.table;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.calcite.config.CalciteConnectionConfig; import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.Schema;
@ -35,45 +31,19 @@ import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNode;
import org.apache.druid.query.DataSource; import org.apache.druid.query.DataSource;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.ExternalTableScan;
import javax.annotation.Nullable; /**
import java.util.Objects; * Abstract base class for the various kinds of tables which Druid supports.
*/
public class DruidTable implements TranslatableTable public abstract class DruidTable implements TranslatableTable
{ {
private final DataSource dataSource;
private final RowSignature rowSignature; private final RowSignature rowSignature;
@Nullable
private final ObjectMapper objectMapper;
private final boolean joinable;
private final boolean broadcast;
public DruidTable( public DruidTable(
final DataSource dataSource, final RowSignature rowSignature
final RowSignature rowSignature,
@Nullable final ObjectMapper objectMapper,
final boolean isJoinable,
final boolean isBroadcast
) )
{ {
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature"); this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature");
this.objectMapper = objectMapper;
this.joinable = isJoinable;
this.broadcast = isBroadcast;
if (dataSource instanceof ExternalDataSource && objectMapper == null) {
// objectMapper is used by ExternalTableScan to generate its digest.
throw new NullPointerException("ObjectMapper is required for external datasources");
}
}
public DataSource getDataSource()
{
return dataSource;
} }
public RowSignature getRowSignature() public RowSignature getRowSignature()
@ -81,15 +51,11 @@ public class DruidTable implements TranslatableTable
return rowSignature; return rowSignature;
} }
public boolean isJoinable() public abstract DataSource getDataSource();
{
return joinable;
}
public boolean isBroadcast() public abstract boolean isJoinable();
{
return broadcast; public abstract boolean isBroadcast();
}
@Override @Override
public Schema.TableType getJdbcTableType() public Schema.TableType getJdbcTableType()
@ -106,12 +72,7 @@ public class DruidTable implements TranslatableTable
@Override @Override
public RelDataType getRowType(final RelDataTypeFactory typeFactory) public RelDataType getRowType(final RelDataTypeFactory typeFactory)
{ {
// For external datasources, the row type should be determined by whatever the row signature has been explicitly return RowSignatures.toRelDataType(getRowSignature(), typeFactory);
// passed in. Typecasting directly to SqlTypeName.TIMESTAMP will lead to inconsistencies with the Calcite functions
// For example, TIME_PARSE(__time) where __time is specified to be a string field in the external datasource
// would lead to an exception because __time would be interpreted as timestamp if we typecast it.
boolean typecastTimeColumn = !(dataSource instanceof ExternalDataSource);
return RowSignatures.toRelDataType(rowSignature, typeFactory, typecastTimeColumn);
} }
@Override @Override
@ -130,51 +91,4 @@ public class DruidTable implements TranslatableTable
{ {
return true; return true;
} }
@Override
public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable table)
{
if (dataSource instanceof ExternalDataSource) {
// Cannot use LogicalTableScan here, because its digest is solely based on the name of the table macro.
// Must use our own class that computes its own digest.
return new ExternalTableScan(context.getCluster(), objectMapper, this);
} else {
return LogicalTableScan.create(context.getCluster(), table);
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DruidTable that = (DruidTable) o;
if (!Objects.equals(dataSource, that.dataSource)) {
return false;
}
return Objects.equals(rowSignature, that.rowSignature);
}
@Override
public int hashCode()
{
int result = dataSource != null ? dataSource.hashCode() : 0;
result = 31 * result + (rowSignature != null ? rowSignature.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "DruidTable{" +
"dataSource=" + dataSource +
", rowSignature=" + rowSignature +
'}';
}
} }

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.table;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptTable.ToRelContext;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.druid.query.DataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.external.ExternalTableScan;
/**
* Represents an source of data external to Druid: a CSV file, an HTTP request, etc.
* Each such table represents one of Druid's {@link DataSource} types. Since SQL
* requires knowledge of the schema of that input source, the user must provide
* that information in SQL (via the `EXTERN` or up-coming `STAGED` function) or
* from the upcoming Druid Catalog.
*/
public class ExternalTable extends DruidTable
{
private final DataSource dataSource;
private final ObjectMapper objectMapper;
public ExternalTable(
final DataSource dataSource,
final RowSignature rowSignature,
final ObjectMapper objectMapper
)
{
super(rowSignature);
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.objectMapper = objectMapper;
}
@Override
public DataSource getDataSource()
{
return dataSource;
}
@Override
public boolean isJoinable()
{
return false;
}
@Override
public boolean isBroadcast()
{
return false;
}
@Override
public RelDataType getRowType(final RelDataTypeFactory typeFactory)
{
// For external datasources, the row type should be determined by whatever the row signature has been explicitly
// passed in. Typecasting directly to SqlTypeName.TIMESTAMP will lead to inconsistencies with the Calcite functions
// For example, TIME_PARSE(__time) where __time is specified to be a string field in the external datasource
// would lead to an exception because __time would be interpreted as timestamp if we typecast it.
return RowSignatures.toRelDataType(getRowSignature(), typeFactory, true);
}
@Override
public RelNode toRel(ToRelContext context, RelOptTable relOptTable)
{
return new ExternalTableScan(context.getCluster(), objectMapper, this);
}
@Override
public String toString()
{
return "ExternalTable{" +
"dataSource=" + dataSource +
", rowSignature=" + getRowSignature() +
'}';
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.table;
import com.google.common.base.Preconditions;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptTable.ToRelContext;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.segment.column.RowSignature;
/**
* Represents a specialized table used within Druid's Calcite-based planner.
* Used in {@link org.apache.druid.sql.calcite.rule.DruidLogicalValuesRule DruidLogicalValuesRule}
* to implement trivial {@code SELECT 1} style queries that return constant values represented
* by this inline table.
*/
public class InlineTable extends DruidTable
{
private final DataSource dataSource;
public InlineTable(
final InlineDataSource dataSource,
final RowSignature rowSignature
)
{
super(rowSignature);
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
}
@Override
public DataSource getDataSource()
{
return dataSource;
}
@Override
public boolean isJoinable()
{
return false;
}
@Override
public boolean isBroadcast()
{
return false;
}
@Override
public RelNode toRel(ToRelContext context, RelOptTable table)
{
return LogicalTableScan.create(context.getCluster(), table);
}
@Override
public String toString()
{
return "DatasourceMetadata{" +
"dataSource=" + dataSource +
", rowSignature=" + getRowSignature() +
'}';
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.table;
import com.google.common.base.Preconditions;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptTable.ToRelContext;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.segment.column.RowSignature;
/**
* Represents a Druid lookup as a Calcite table, allowing queries to read from a lookup.
*/
public class LookupTable extends DruidTable
{
private final DataSource dataSource;
public LookupTable(
final LookupDataSource dataSource,
final RowSignature rowSignature
)
{
super(rowSignature);
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
}
@Override
public DataSource getDataSource()
{
return dataSource;
}
@Override
public boolean isJoinable()
{
return true;
}
@Override
public boolean isBroadcast()
{
return true;
}
@Override
public RelNode toRel(ToRelContext context, RelOptTable table)
{
return LogicalTableScan.create(context.getCluster(), table);
}
@Override
public String toString()
{
return "LookupTable{" +
"dataSource=" + dataSource +
", rowSignature=" + getRowSignature() +
'}';
}
}

View File

@ -63,6 +63,7 @@ import org.joda.time.DateTimeZone;
import org.junit.Assert; import org.junit.Assert;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;

View File

@ -44,7 +44,6 @@ public class ExternalTableScanRuleTest
@Test @Test
public void testMatchesWhenExternalScanUnsupported() throws ValidationException public void testMatchesWhenExternalScanUnsupported() throws ValidationException
{ {
final PlannerContext plannerContext = PlannerContext.create( final PlannerContext plannerContext = PlannerContext.create(
"DUMMY", // The actual query isn't important for this test "DUMMY", // The actual query isn't important for this test
CalciteTests.createOperatorTable(), CalciteTests.createOperatorTable(),

View File

@ -32,6 +32,7 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.List; import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -328,7 +329,6 @@ public class DruidRelsTest
EasyMock.expect(mockPartialQuery.getWhereFilter()).andReturn(whereFilter).anyTimes(); EasyMock.expect(mockPartialQuery.getWhereFilter()).andReturn(whereFilter).anyTimes();
final RelOptTable mockRelOptTable = EasyMock.mock(RelOptTable.class); final RelOptTable mockRelOptTable = EasyMock.mock(RelOptTable.class);
EasyMock.expect(mockRelOptTable.unwrap(DruidTable.class)).andReturn(druidTable).anyTimes();
final T mockRel = EasyMock.mock(clazz); final T mockRel = EasyMock.mock(clazz);
EasyMock.expect(mockRel.getPartialDruidQuery()).andReturn(mockPartialQuery).anyTimes(); EasyMock.expect(mockRel.getPartialDruidQuery()).andReturn(mockPartialQuery).anyTimes();

View File

@ -32,6 +32,8 @@ import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.DruidRelsTest; import org.apache.druid.sql.calcite.rel.DruidRelsTest;
import org.apache.druid.sql.calcite.rel.DruidUnionDataSourceRel; import org.apache.druid.sql.calcite.rel.DruidUnionDataSourceRel;
import org.apache.druid.sql.calcite.rel.PartialDruidQuery; import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.DatasourceTable.PhysicalDatasourceMetadata;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.DruidTable;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.junit.Assert; import org.junit.Assert;
@ -43,16 +45,17 @@ import java.util.Optional;
public class DruidUnionDataSourceRuleTest public class DruidUnionDataSourceRuleTest
{ {
private final DruidTable fooDruidTable = new DruidTable( private final DruidTable fooDruidTable = new DatasourceTable(
new TableDataSource("foo"), new PhysicalDatasourceMetadata(
RowSignature.builder() new TableDataSource("foo"),
.addTimeColumn() RowSignature.builder()
.add("col1", ColumnType.STRING) .addTimeColumn()
.add("col2", ColumnType.LONG) .add("col1", ColumnType.STRING)
.build(), .add("col2", ColumnType.LONG)
null, .build(),
false, false,
false false
)
); );
@Test @Test

View File

@ -49,7 +49,7 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase
{ {
try (final Closer closer = Closer.create()) { try (final Closer closer = Closer.create()) {
final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer); final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
final DruidSchema druidSchema = new DruidSchema( final SegmentMetadataCache cache = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory( CalciteTests.createMockQueryLifecycleFactory(
new SpecificSegmentsQuerySegmentWalker(conglomerate), new SpecificSegmentsQuerySegmentWalker(conglomerate),
conglomerate conglomerate
@ -59,14 +59,14 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
); );
druidSchema.start(); cache.start();
druidSchema.awaitInitialization(); cache.awaitInitialization();
final DruidSchema druidSchema = new DruidSchema(cache, null);
Assert.assertEquals(ImmutableMap.of(), druidSchema.getTableMap()); Assert.assertEquals(ImmutableSet.of(), druidSchema.getTableNames());
} }
} }
} }

View File

@ -57,7 +57,7 @@ import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
@ -71,6 +71,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -88,7 +89,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon public class SegmentDataCacheConcurrencyTest extends SegmentMetadataCacheCommon
{ {
private static final String DATASOURCE = "datasource"; private static final String DATASOURCE = "datasource";
@ -96,7 +97,7 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon
private SpecificSegmentsQuerySegmentWalker walker; private SpecificSegmentsQuerySegmentWalker walker;
private TestServerInventoryView inventoryView; private TestServerInventoryView inventoryView;
private BrokerServerView serverView; private BrokerServerView serverView;
private DruidSchema schema; private SegmentMetadataCache schema;
private ExecutorService exec; private ExecutorService exec;
@Before @Before
@ -119,30 +120,33 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon
} }
/** /**
* This tests the contention between 3 components, DruidSchema, InventoryView, and BrokerServerView. * This tests the contention between three components, {@link SegmentMetadataCache},
* It first triggers refreshing DruidSchema. To mimic some heavy work done with {@link DruidSchema#lock}, * {@code InventoryView}, and {@link BrokerServerView}. It first triggers
* {@link DruidSchema#buildDruidTable} is overriden to sleep before doing real work. While refreshing DruidSchema, * refreshing {@code SegmentMetadataCache}. To mimic some heavy work done with
* more new segments are added to InventoryView, which triggers updates of BrokerServerView. Finally, while * {@link SegmentMetadataCache#lock}, {@link SegmentMetadataCache#buildDruidTable}
* BrokerServerView is updated, {@link BrokerServerView#getTimeline} is continuously called to mimic user query * is overridden to sleep before doing real work. While refreshing
* {@code SegmentMetadataCache}, more new segments are added to
* {@code InventoryView}, which triggers updates of {@code BrokerServerView}.
* Finally, while {@code BrokerServerView} is updated,
* {@link BrokerServerView#getTimeline} is continuously called to mimic user query
* processing. All these calls must return without heavy contention. * processing. All these calls must return without heavy contention.
*/ */
@Test(timeout = 30000L) @Test(timeout = 30000L)
public void testDruidSchemaRefreshAndInventoryViewAddSegmentAndBrokerServerViewGetTimeline() public void testSegmentMetadataRefreshAndInventoryViewAddSegmentAndBrokerServerViewGetTimeline()
throws InterruptedException, ExecutionException, TimeoutException throws InterruptedException, ExecutionException, TimeoutException
{ {
schema = new DruidSchema( schema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
@Override @Override
DruidTable buildDruidTable(final String dataSource) DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(final String dataSource)
{ {
doInLock(() -> { doInLock(() -> {
try { try {
@ -194,8 +198,9 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon
// Wait for all segments to be loaded in BrokerServerView // Wait for all segments to be loaded in BrokerServerView
Assert.assertTrue(segmentLoadLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(segmentLoadLatch.await(5, TimeUnit.SECONDS));
// Trigger refresh of DruidSchema. This will internally run the heavy work mimicked by the overriden buildDruidTable // Trigger refresh of DruidSchema. This will internally run the heavy work
Future refreshFuture = exec.submit(() -> { // mimicked by the overridden buildDruidTable
Future<?> refreshFuture = exec.submit(() -> {
schema.refresh( schema.refresh(
walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()), walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()),
Sets.newHashSet(DATASOURCE) Sets.newHashSet(DATASOURCE)
@ -225,33 +230,36 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon
} }
/** /**
* This tests the contention between 2 methods of DruidSchema, {@link DruidSchema#refresh} and * This tests the contention between two methods of {@link SegmentMetadataCache}:
* {@link DruidSchema#getSegmentMetadataSnapshot()}. It first triggers refreshing DruidSchema. * {@link SegmentMetadataCache#refresh} and
* To mimic some heavy work done with {@link DruidSchema#lock}, {@link DruidSchema#buildDruidTable} is overriden * {@link SegmentMetadataCache#getSegmentMetadataSnapshot()}. It first triggers
* to sleep before doing real work. While refreshing DruidSchema, getSegmentMetadataSnapshot() is continuously * refreshing {@code SegmentMetadataCache}. To mimic some heavy work done with
* called to mimic reading the segments table of SystemSchema. All these calls must return without heavy contention. * {@link SegmentMetadataCache#lock}, {@link SegmentMetadataCache#buildDruidTable}
* is overridden to sleep before doing real work. While refreshing
* {@code SegmentMetadataCache}, {@code getSegmentMetadataSnapshot()} is continuously
* called to mimic reading the segments table of SystemSchema. All these calls
* must return without heavy contention.
*/ */
@Test(timeout = 30000L) @Test(timeout = 30000L)
public void testDruidSchemaRefreshAndDruidSchemaGetSegmentMetadata() public void testSegmentMetadataRefreshAndDruidSchemaGetSegmentMetadata()
throws InterruptedException, ExecutionException, TimeoutException throws InterruptedException, ExecutionException, TimeoutException
{ {
schema = new DruidSchema( schema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
@Override @Override
DruidTable buildDruidTable(final String dataSource) DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(final String dataSource)
{ {
doInLock(() -> { doInLock(() -> {
try { try {
// Mimic some heavy work done in lock in DruidSchema // Mimic some heavy work done in lock in SegmentMetadataCache
Thread.sleep(5000); Thread.sleep(5000);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
@ -299,8 +307,9 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon
// Wait for all segments to be loaded in BrokerServerView // Wait for all segments to be loaded in BrokerServerView
Assert.assertTrue(segmentLoadLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(segmentLoadLatch.await(5, TimeUnit.SECONDS));
// Trigger refresh of DruidSchema. This will internally run the heavy work mimicked by the overriden buildDruidTable // Trigger refresh of SegmentMetadataCache. This will internally run the heavy work mimicked
Future refreshFuture = exec.submit(() -> { // by the overridden buildDruidTable
Future<?> refreshFuture = exec.submit(() -> {
schema.refresh( schema.refresh(
walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()), walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()),
Sets.newHashSet(DATASOURCE) Sets.newHashSet(DATASOURCE)
@ -438,6 +447,7 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon
segmentCallbacks.forEach(pair -> pair.rhs.execute(() -> pair.lhs.segmentRemoved(server.getMetadata(), segment))); segmentCallbacks.forEach(pair -> pair.rhs.execute(() -> pair.lhs.segmentRemoved(server.getMetadata(), segment)));
} }
@SuppressWarnings("unused")
private void removeServer(DruidServer server) private void removeServer(DruidServer server)
{ {
serverMap.remove(server.getName()); serverMap.remove(server.getName());

View File

@ -50,7 +50,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
public abstract class DruidSchemaTestCommon extends CalciteTestBase public abstract class SegmentMetadataCacheCommon extends CalciteTestBase
{ {
static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig() static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig()
{ {

View File

@ -27,7 +27,6 @@ import com.google.common.collect.Sets;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ImmutableDruidServer;
@ -59,6 +58,7 @@ import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AllowAllAuthenticator; import org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
@ -86,13 +86,13 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class DruidSchemaTest extends DruidSchemaTestCommon public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
{ {
private SpecificSegmentsQuerySegmentWalker walker; private SpecificSegmentsQuerySegmentWalker walker;
private TestServerInventoryView serverView; private TestServerInventoryView serverView;
private List<ImmutableDruidServer> druidServers; private List<ImmutableDruidServer> druidServers;
private DruidSchema schema; private SegmentMetadataCache schema;
private DruidSchema schema2; private SegmentMetadataCache schema2;
private CountDownLatch buildTableLatch = new CountDownLatch(1); private CountDownLatch buildTableLatch = new CountDownLatch(1);
private CountDownLatch markDataSourceLatch = new CountDownLatch(1); private CountDownLatch markDataSourceLatch = new CountDownLatch(1);
private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper(); private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
@ -173,7 +173,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments); serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments);
druidServers = serverView.getDruidServers(); druidServers = serverView.getDruidServers();
schema = new DruidSchema( schema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
@ -183,14 +183,13 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
), ),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
@Override @Override
protected DruidTable buildDruidTable(String dataSource) protected DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(String dataSource)
{ {
DruidTable table = super.buildDruidTable(dataSource); DatasourceTable.PhysicalDatasourceMetadata table = super.buildDruidTable(dataSource);
buildTableLatch.countDown(); buildTableLatch.countDown();
return table; return table;
} }
@ -203,7 +202,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
} }
}; };
schema2 = new DruidSchema( schema2 = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
@ -213,17 +212,15 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
), ),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
boolean throwException = true; boolean throwException = true;
@Override @Override
protected DruidTable buildDruidTable(String dataSource) protected DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(String dataSource)
{ {
DruidTable table = super.buildDruidTable(dataSource); DatasourceTable.PhysicalDatasourceMetadata table = super.buildDruidTable(dataSource);
buildTableLatch.countDown(); buildTableLatch.countDown();
return table; return table;
} }
@ -261,10 +258,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
@Test @Test
public void testGetTableMap() public void testGetTableMap()
{ {
Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema.getTableNames()); Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema.getDatasourceNames());
final Map<String, Table> tableMap = schema.getTableMap(); final Set<String> tableNames = schema.getDatasourceNames();
Assert.assertEquals(ImmutableSet.of("foo", "foo2"), tableMap.keySet()); Assert.assertEquals(ImmutableSet.of("foo", "foo2"), tableNames);
} }
@Test @Test
@ -272,18 +269,15 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
{ {
schema2.start(); schema2.start();
schema2.awaitInitialization(); schema2.awaitInitialization();
Map<String, Table> tableMap = schema2.getTableMap(); Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema2.getDatasourceNames());
Assert.assertEquals(2, tableMap.size());
Assert.assertTrue(tableMap.containsKey("foo"));
Assert.assertTrue(tableMap.containsKey("foo2"));
schema2.stop(); schema2.stop();
} }
@Test @Test
public void testGetTableMapFoo() public void testGetTableMapFoo()
{ {
final DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo");
final DruidTable fooTable = new DatasourceTable(fooDs);
final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl()); final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl());
final List<RelDataTypeField> fields = rowType.getFieldList(); final List<RelDataTypeField> fields = rowType.getFieldList();
@ -311,7 +305,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
@Test @Test
public void testGetTableMapFoo2() public void testGetTableMapFoo2()
{ {
final DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo2"); final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo2");
final DruidTable fooTable = new DatasourceTable(fooDs);
final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl()); final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl());
final List<RelDataTypeField> fields = rowType.getFieldList(); final List<RelDataTypeField> fields = rowType.getFieldList();
@ -329,7 +324,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
/** /**
* This tests that {@link AvailableSegmentMetadata#getNumRows()} is correct in case * This tests that {@link AvailableSegmentMetadata#getNumRows()} is correct in case
* of multiple replicas i.e. when {@link DruidSchema#addSegment(DruidServerMetadata, DataSegment)} * of multiple replicas i.e. when {@link SegmentMetadataCache#addSegment(DruidServerMetadata, DataSegment)}
* is called more than once for same segment * is called more than once for same segment
*/ */
@Test @Test
@ -366,7 +361,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
final ImmutableDruidServer server = pair.lhs; final ImmutableDruidServer server = pair.lhs;
Assert.assertNotNull(server); Assert.assertNotNull(server);
final DruidServerMetadata druidServerMetadata = server.getMetadata(); final DruidServerMetadata druidServerMetadata = server.getMetadata();
// invoke DruidSchema#addSegment on existingSegment // invoke SegmentMetadataCache#addSegment on existingSegment
schema.addSegment(druidServerMetadata, existingSegment); schema.addSegment(druidServerMetadata, existingSegment);
segmentsMetadata = schema.getSegmentMetadataSnapshot(); segmentsMetadata = schema.getSegmentMetadataSnapshot();
// get the only segment with datasource "foo2" // get the only segment with datasource "foo2"
@ -399,7 +394,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
Assert.assertNotNull(segmentToRemove); Assert.assertNotNull(segmentToRemove);
schema.removeSegment(segmentToRemove); schema.removeSegment(segmentToRemove);
// The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource // The following line can cause NPE without segmentMetadata null check in
// SegmentMetadataCache#refreshSegmentsForDataSource
schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size()); Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size());
} }
@ -421,7 +417,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
Assert.assertNotNull(segmentToRemove); Assert.assertNotNull(segmentToRemove);
schema.removeSegment(segmentToRemove); schema.removeSegment(segmentToRemove);
// The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource // The following line can cause NPE without segmentMetadata null check in
// SegmentMetadataCache#refreshSegmentsForDataSource
schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size()); Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size());
} }
@ -485,15 +482,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
{ {
String datasource = "newSegmentAddTest"; String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1); CountDownLatch addSegmentLatch = new CountDownLatch(1);
DruidSchema schema = new DruidSchema( SegmentMetadataCache schema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
@Override @Override
@ -528,15 +524,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
{ {
String datasource = "newSegmentAddTest"; String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(2); CountDownLatch addSegmentLatch = new CountDownLatch(2);
DruidSchema schema = new DruidSchema( SegmentMetadataCache schema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
@Override @Override
@ -575,15 +570,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
{ {
String datasource = "newSegmentAddTest"; String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1); CountDownLatch addSegmentLatch = new CountDownLatch(1);
DruidSchema schema = new DruidSchema( SegmentMetadataCache schema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
@Override @Override
@ -619,15 +613,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
{ {
String datasource = "newSegmentAddTest"; String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1); CountDownLatch addSegmentLatch = new CountDownLatch(1);
DruidSchema schema = new DruidSchema( SegmentMetadataCache schema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
@Override @Override
@ -660,15 +653,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
String datasource = "segmentRemoveTest"; String datasource = "segmentRemoveTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1); CountDownLatch addSegmentLatch = new CountDownLatch(1);
CountDownLatch removeSegmentLatch = new CountDownLatch(1); CountDownLatch removeSegmentLatch = new CountDownLatch(1);
DruidSchema schema = new DruidSchema( SegmentMetadataCache schema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
@Override @Override
@ -709,7 +701,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segment.getId())); Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segment.getId()));
Assert.assertFalse(schema.getMutableSegments().contains(segment.getId())); Assert.assertFalse(schema.getMutableSegments().contains(segment.getId()));
Assert.assertFalse(schema.getDataSourcesNeedingRebuild().contains(datasource)); Assert.assertFalse(schema.getDataSourcesNeedingRebuild().contains(datasource));
Assert.assertFalse(schema.getTableNames().contains(datasource)); Assert.assertFalse(schema.getDatasourceNames().contains(datasource));
} }
@Test @Test
@ -718,15 +710,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
String datasource = "segmentRemoveTest"; String datasource = "segmentRemoveTest";
CountDownLatch addSegmentLatch = new CountDownLatch(2); CountDownLatch addSegmentLatch = new CountDownLatch(2);
CountDownLatch removeSegmentLatch = new CountDownLatch(1); CountDownLatch removeSegmentLatch = new CountDownLatch(1);
DruidSchema schema = new DruidSchema( SegmentMetadataCache schema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
@Override @Override
@ -771,7 +762,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId())); Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId()));
Assert.assertFalse(schema.getMutableSegments().contains(segments.get(0).getId())); Assert.assertFalse(schema.getMutableSegments().contains(segments.get(0).getId()));
Assert.assertTrue(schema.getDataSourcesNeedingRebuild().contains(datasource)); Assert.assertTrue(schema.getDataSourcesNeedingRebuild().contains(datasource));
Assert.assertTrue(schema.getTableNames().contains(datasource)); Assert.assertTrue(schema.getDatasourceNames().contains(datasource));
} }
@Test @Test
@ -779,15 +770,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
{ {
String datasource = "serverSegmentRemoveTest"; String datasource = "serverSegmentRemoveTest";
CountDownLatch removeServerSegmentLatch = new CountDownLatch(1); CountDownLatch removeServerSegmentLatch = new CountDownLatch(1);
DruidSchema schema = new DruidSchema( SegmentMetadataCache schema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
@Override @Override
@ -814,15 +804,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
String datasource = "serverSegmentRemoveTest"; String datasource = "serverSegmentRemoveTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1); CountDownLatch addSegmentLatch = new CountDownLatch(1);
CountDownLatch removeServerSegmentLatch = new CountDownLatch(1); CountDownLatch removeServerSegmentLatch = new CountDownLatch(1);
DruidSchema schema = new DruidSchema( SegmentMetadataCache schema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
@Override @Override
@ -862,15 +851,14 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
String datasource = "serverSegmentRemoveTest"; String datasource = "serverSegmentRemoveTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1); CountDownLatch addSegmentLatch = new CountDownLatch(1);
CountDownLatch removeServerSegmentLatch = new CountDownLatch(1); CountDownLatch removeServerSegmentLatch = new CountDownLatch(1);
DruidSchema schema = new DruidSchema( SegmentMetadataCache schema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView, serverView,
segmentManager, segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
) )
{ {
@Override @Override
@ -917,10 +905,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
@Test @Test
public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws InterruptedException public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws InterruptedException
{ {
DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); DatasourceTable.PhysicalDatasourceMetadata fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable); Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource);
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource);
Assert.assertFalse(fooTable.isJoinable()); Assert.assertFalse(fooTable.isJoinable());
Assert.assertFalse(fooTable.isBroadcast()); Assert.assertFalse(fooTable.isBroadcast());
@ -949,10 +937,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
fooTable = (DruidTable) schema.getTableMap().get("foo"); fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable); Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource);
Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource); Assert.assertTrue(fooTable.dataSource() instanceof GlobalTableDataSource);
Assert.assertTrue(fooTable.isJoinable()); Assert.assertTrue(fooTable.isJoinable());
Assert.assertTrue(fooTable.isBroadcast()); Assert.assertTrue(fooTable.isBroadcast());
@ -970,10 +958,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
fooTable = (DruidTable) schema.getTableMap().get("foo"); fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable); Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource);
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource);
Assert.assertFalse(fooTable.isJoinable()); Assert.assertFalse(fooTable.isJoinable());
Assert.assertFalse(fooTable.isBroadcast()); Assert.assertFalse(fooTable.isBroadcast());
} }
@ -981,10 +969,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
@Test @Test
public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throws InterruptedException public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throws InterruptedException
{ {
DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); DatasourceTable.PhysicalDatasourceMetadata fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable); Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource);
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource);
Assert.assertFalse(fooTable.isJoinable()); Assert.assertFalse(fooTable.isJoinable());
Assert.assertFalse(fooTable.isBroadcast()); Assert.assertFalse(fooTable.isBroadcast());
@ -1013,12 +1001,12 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
fooTable = (DruidTable) schema.getTableMap().get("foo"); fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable); Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource);
// should not be a GlobalTableDataSource for now, because isGlobal is couple with joinability. idealy this will be // should not be a GlobalTableDataSource for now, because isGlobal is couple with joinability. idealy this will be
// changed in the future and we should expect // changed in the future and we should expect
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource);
Assert.assertTrue(fooTable.isBroadcast()); Assert.assertTrue(fooTable.isBroadcast());
Assert.assertFalse(fooTable.isJoinable()); Assert.assertFalse(fooTable.isJoinable());
@ -1035,10 +1023,10 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
fooTable = (DruidTable) schema.getTableMap().get("foo"); fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable); Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource);
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); Assert.assertFalse(fooTable.dataSource() instanceof GlobalTableDataSource);
Assert.assertFalse(fooTable.isBroadcast()); Assert.assertFalse(fooTable.isBroadcast());
Assert.assertFalse(fooTable.isJoinable()); Assert.assertFalse(fooTable.isJoinable());
} }
@ -1082,7 +1070,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class); QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class);
// Need to create schema for this test because the available schemas don't mock the QueryLifecycleFactory, which I need for this test. // Need to create schema for this test because the available schemas don't mock the QueryLifecycleFactory, which I need for this test.
DruidSchema mySchema = new DruidSchema( SegmentMetadataCache mySchema = new SegmentMetadataCache(
factoryMock, factoryMock,
serverView, serverView,
segmentManager, segmentManager,
@ -1092,8 +1080,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
), ),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
brokerInternalQueryConfig, brokerInternalQueryConfig
null
); );
EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
@ -1129,7 +1116,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
new ColumnAnalysis(ColumnType.DOUBLE, ColumnType.DOUBLE.asTypeString(), false, true, 1234, 26, null, null, null) new ColumnAnalysis(ColumnType.DOUBLE, ColumnType.DOUBLE.asTypeString(), false, true, 1234, 26, null, null, null)
); );
RowSignature signature = DruidSchema.analysisToRowSignature( RowSignature signature = SegmentMetadataCache.analysisToRowSignature(
new SegmentAnalysis( new SegmentAnalysis(
"id", "id",
ImmutableList.of(Intervals.utc(1L, 2L)), ImmutableList.of(Intervals.utc(1L, 2L)),
@ -1157,7 +1144,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
@Test @Test
public void testSegmentMetadataFallbackType() public void testSegmentMetadataFallbackType()
{ {
RowSignature signature = DruidSchema.analysisToRowSignature( RowSignature signature = SegmentMetadataCache.analysisToRowSignature(
new SegmentAnalysis( new SegmentAnalysis(
"id", "id",
ImmutableList.of(Intervals.utc(1L, 2L)), ImmutableList.of(Intervals.utc(1L, 2L)),
@ -1209,9 +1196,9 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
Set<SegmentId> segments = new HashSet<>(); Set<SegmentId> segments = new HashSet<>();
Set<String> datasources = new HashSet<>(); Set<String> datasources = new HashSet<>();
datasources.add("wat"); datasources.add("wat");
Assert.assertNull(schema.getTable("wat")); Assert.assertNull(schema.getDatasource("wat"));
schema.refresh(segments, datasources); schema.refresh(segments, datasources);
Assert.assertNull(schema.getTable("wat")); Assert.assertNull(schema.getDatasource("wat"));
} }
private static DataSegment newSegment(String datasource, int partitionId) private static DataSegment newSegment(String datasource, int partitionId)

View File

@ -245,18 +245,18 @@ public class SystemSchemaTest extends CalciteTestBase
.add(segment2, index2) .add(segment2, index2)
.add(segment3, index3); .add(segment3, index3);
druidSchema = new DruidSchema( SegmentMetadataCache cache = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments(), realtimeSegments), new TestServerInventoryView(walker.getSegments(), realtimeSegments),
new SegmentManager(EasyMock.createMock(SegmentLoader.class)), new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT, PLANNER_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
null
); );
druidSchema.start(); cache.start();
druidSchema.awaitInitialization(); cache.awaitInitialization();
druidSchema = new DruidSchema(cache, null);
metadataView = EasyMock.createMock(MetadataSegmentView.class); metadataView = EasyMock.createMock(MetadataSegmentView.class);
druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
serverInventoryView = EasyMock.createMock(FilteredServerInventoryView.class); serverInventoryView = EasyMock.createMock(FilteredServerInventoryView.class);

View File

@ -130,6 +130,7 @@ import org.apache.druid.sql.calcite.schema.NamedSchema;
import org.apache.druid.sql.calcite.schema.NamedSystemSchema; import org.apache.druid.sql.calcite.schema.NamedSystemSchema;
import org.apache.druid.sql.calcite.schema.NamedViewSchema; import org.apache.druid.sql.calcite.schema.NamedViewSchema;
import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager; import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager;
import org.apache.druid.sql.calcite.schema.SegmentMetadataCache;
import org.apache.druid.sql.calcite.schema.SystemSchema; import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.apache.druid.sql.calcite.schema.ViewSchema; import org.apache.druid.sql.calcite.schema.ViewSchema;
import org.apache.druid.sql.calcite.view.DruidViewMacroFactory; import org.apache.druid.sql.calcite.view.DruidViewMacroFactory;
@ -1228,34 +1229,33 @@ public class CalciteTests
final DruidSchemaManager druidSchemaManager final DruidSchemaManager druidSchemaManager
) )
{ {
final DruidSchema schema = new DruidSchema( final SegmentMetadataCache cache = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments()), new TestServerInventoryView(walker.getSegments()),
new SegmentManager(EasyMock.createMock(SegmentLoader.class)) new SegmentManager(EasyMock.createMock(SegmentLoader.class))
{ {
@Override @Override
public Set<String> getDataSourceNames() public Set<String> getDataSourceNames()
{ {
return ImmutableSet.of(BROADCAST_DATASOURCE); return ImmutableSet.of(CalciteTests.BROADCAST_DATASOURCE);
} }
}, },
createDefaultJoinableFactory(), createDefaultJoinableFactory(),
plannerConfig, plannerConfig,
TEST_AUTHENTICATOR_ESCALATOR, CalciteTests.TEST_AUTHENTICATOR_ESCALATOR,
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig()
druidSchemaManager
); );
try { try {
schema.start(); cache.start();
schema.awaitInitialization(); cache.awaitInitialization();
} }
catch (InterruptedException e) { catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
schema.stop(); cache.stop();
return schema; return new DruidSchema(cache, druidSchemaManager);
} }
/** /**