From 2be7068f6e20b086731cdc7df44998d838f63493 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 20 Jul 2017 10:14:15 -0700 Subject: [PATCH] Fixes and improvements to SQL metadata caching. (#4551) * Fixes and improvements to SQL metadata caching. Also adds support for MultipleSpecificSegmentSpec to CachingClusteredClient. SQL changes: - Cache metadata on a per-segment level, in addition to per-dataSource, so we don't need to re-query all segments whenever a single new one appears. This should lower the load placed on the cluster by metadata queries. - Fix race condition in DruidSchema that can cause us to miss metadata. It was possible to notice new segments, then issue a query, and have that query not actually hit those segments, and not notice that it didn't hit those segments. Then, the metadata from those segments would be ignored. - Fix assumption in DruidSchema that all segments are immutable. Now, mutable segments are periodically re-queried. - Fix inappropriate re-use of SchemaPlus. Now we create one for each planning cycle, rather than sharing one. It caches table objects, which we want to avoid, since it can cause stale metadata. We do the caching in DruidSchema so we don't need the SchemaPlus caching. Server changes: - Add a TimelineCallback to TimelineServerView, for callers that want to get updates when the timeline has been modified. - Change CachingClusteredClient from a QueryRunner to a QuerySegmentWalker. This allows it to accept queries that are segment-descriptor-based rather than intervals-based. In particular it will now support MultipleSpecificSegmentSpec. * Fix DruidSchema, and unused imports. * Remove unused import. * Fix SqlBenchmark. --- .../druid/benchmark/query/SqlBenchmark.java | 32 +- .../sql/QuantileSqlAggregatorTest.java | 17 +- .../io/druid/client/BrokerServerView.java | 26 + .../druid/client/CachingClusteredClient.java | 98 +++- .../io/druid/client/TimelineServerView.java | 44 ++ .../server/ClientQuerySegmentWalker.java | 8 +- ...chingClusteredClientFunctionalityTest.java | 38 +- .../client/CachingClusteredClientTest.java | 92 ++-- .../sql/calcite/planner/PlannerFactory.java | 8 +- .../druid/sql/calcite/schema/DruidSchema.java | 456 ++++++++++++------ .../java/io/druid/sql/guice/SqlModule.java | 17 +- .../sql/avatica/DruidAvaticaHandlerTest.java | 20 +- .../druid/sql/avatica/DruidStatementTest.java | 12 +- .../druid/sql/calcite/CalciteQueryTest.java | 4 +- .../sql/calcite/http/SqlResourceTest.java | 8 +- .../sql/calcite/schema/DruidSchemaTest.java | 37 +- .../calcite/util/TestServerInventoryView.java | 43 +- 17 files changed, 632 insertions(+), 328 deletions(-) diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java index 13ce245495d..1cfe559ee1b 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java @@ -19,7 +19,6 @@ package io.druid.benchmark.query; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; @@ -33,29 +32,21 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; import io.druid.query.QueryRunnerFactoryConglomerate; -import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.groupby.GroupByQuery; import io.druid.segment.QueryableIndex; -import io.druid.segment.column.ValueType; import io.druid.server.initialization.ServerConfig; -import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidPlanner; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.planner.PlannerFactory; import io.druid.sql.calcite.planner.PlannerResult; -import io.druid.sql.calcite.table.DruidTable; -import io.druid.sql.calcite.table.RowSignature; import io.druid.sql.calcite.util.CalciteTests; import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; -import org.apache.calcite.schema.Schema; -import org.apache.calcite.schema.Table; -import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.commons.io.FileUtils; import org.joda.time.Interval; import org.openjdk.jmh.annotations.Benchmark; @@ -76,7 +67,6 @@ import org.openjdk.jmh.infra.Blackhole; import java.io.File; import java.util.ArrayList; import java.util.Arrays; -import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -124,28 +114,8 @@ public class SqlBenchmark this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index); - final Map tableMap = ImmutableMap.of( - "foo", - new DruidTable( - new TableDataSource("foo"), - RowSignature.builder() - .add("__time", ValueType.LONG) - .add("dimSequential", ValueType.STRING) - .add("dimZipf", ValueType.STRING) - .add("dimUniform", ValueType.STRING) - .build() - ) - ); - final Schema druidSchema = new AbstractSchema() - { - @Override - protected Map getTableMap() - { - return tableMap; - } - }; plannerFactory = new PlannerFactory( - Calcites.createRootSchema(druidSchema), + CalciteTests.createMockSchema(walker, plannerConfig), walker, CalciteTests.createOperatorTable(), CalciteTests.createExprMacroTable(), diff --git a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java index be2a4699e53..b38c4c93ffa 100644 --- a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java +++ b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java @@ -46,8 +46,6 @@ import io.druid.segment.column.ValueType; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.virtual.ExpressionVirtualColumn; import io.druid.server.initialization.ServerConfig; -import io.druid.sql.calcite.aggregation.SqlAggregator; -import io.druid.sql.calcite.expression.SqlOperatorConversion; import io.druid.sql.calcite.filtration.Filtration; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; @@ -55,12 +53,12 @@ import io.druid.sql.calcite.planner.DruidPlanner; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.planner.PlannerFactory; import io.druid.sql.calcite.planner.PlannerResult; +import io.druid.sql.calcite.schema.DruidSchema; import io.druid.sql.calcite.util.CalciteTests; import io.druid.sql.calcite.util.QueryLogHook; import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; -import org.apache.calcite.schema.SchemaPlus; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -126,18 +124,13 @@ public class QuantileSqlAggregatorTest ); final PlannerConfig plannerConfig = new PlannerConfig(); - final SchemaPlus rootSchema = Calcites.createRootSchema( - CalciteTests.createMockSchema( - walker, - plannerConfig - ) - ); + final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig); final DruidOperatorTable operatorTable = new DruidOperatorTable( - ImmutableSet.of(new QuantileSqlAggregator()), - ImmutableSet.of() + ImmutableSet.of(new QuantileSqlAggregator()), + ImmutableSet.of() ); plannerFactory = new PlannerFactory( - rootSchema, + druidSchema, walker, operatorTable, CalciteTests.createExprMacroTable(), diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index a1e30a23a01..a1d8ed04ca0 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -46,9 +46,11 @@ import io.druid.timeline.partition.PartitionChunk; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.function.Function; /** */ @@ -61,6 +63,7 @@ public class BrokerServerView implements TimelineServerView private final ConcurrentMap clients; private final Map selectors; private final Map> timelines; + private final ConcurrentMap timelineCallbacks = new ConcurrentHashMap<>(); private final QueryToolChestWarehouse warehouse; private final QueryWatcher queryWatcher; @@ -139,6 +142,7 @@ public class BrokerServerView implements TimelineServerView public CallbackAction segmentViewInitialized() { initialized = true; + runTimelineCallbacks(TimelineCallback::timelineInitialized); return ServerView.CallbackAction.CONTINUE; } }, @@ -237,6 +241,7 @@ public class BrokerServerView implements TimelineServerView queryableDruidServer = addServer(baseView.getInventoryValue(server.getName())); } selector.addServerAndUpdateSegment(queryableDruidServer, segment); + runTimelineCallbacks(callback -> callback.segmentAdded(server, segment)); } } @@ -278,6 +283,8 @@ public class BrokerServerView implements TimelineServerView segment.getInterval(), segment.getVersion() ); + } else { + runTimelineCallbacks(callback -> callback.segmentRemoved(segment)); } } } @@ -293,6 +300,12 @@ public class BrokerServerView implements TimelineServerView } } + @Override + public void registerTimelineCallback(final Executor exec, final TimelineCallback callback) + { + timelineCallbacks.put(callback, exec); + } + @Override public QueryRunner getQueryRunner(DruidServer server) { @@ -317,4 +330,17 @@ public class BrokerServerView implements TimelineServerView { baseView.registerSegmentCallback(exec, callback, segmentFilter); } + + private void runTimelineCallbacks(final Function function) + { + for (Map.Entry entry : timelineCallbacks.entrySet()) { + entry.getValue().execute( + () -> { + if (CallbackAction.UNREGISTER == function.apply(entry.getKey())) { + timelineCallbacks.remove(entry.getKey()); + } + } + ); + } + } } diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index e885376a777..6cf2d27f6aa 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -21,6 +21,7 @@ package io.druid.client; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Optional; @@ -31,6 +32,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import com.google.common.collect.RangeSet; import com.google.common.collect.Sets; import com.google.common.hash.Hasher; @@ -62,6 +64,7 @@ import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; +import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; import io.druid.query.Result; @@ -74,7 +77,9 @@ import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineLookup; import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; +import io.druid.timeline.partition.PartitionHolder; import io.druid.timeline.partition.ShardSpec; import org.apache.commons.codec.binary.Base64; import org.joda.time.Interval; @@ -82,6 +87,7 @@ import org.joda.time.Interval; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; @@ -89,10 +95,12 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** */ -public class CachingClusteredClient implements QueryRunner +public class CachingClusteredClient implements QuerySegmentWalker { private static final EmittingLogger log = new EmittingLogger(CachingClusteredClient.class); private final QueryToolChestWarehouse warehouse; @@ -141,7 +149,69 @@ public class CachingClusteredClient implements QueryRunner } @Override - public Sequence run(final QueryPlus queryPlus, final Map responseContext) + public QueryRunner getQueryRunnerForIntervals( + final Query query, + final Iterable intervals + ) + { + return new QueryRunner() + { + @Override + public Sequence run(final QueryPlus queryPlus, final Map responseContext) + { + return CachingClusteredClient.this.run( + queryPlus, + responseContext, + timeline -> timeline + ); + } + }; + } + + @Override + public QueryRunner getQueryRunnerForSegments( + final Query query, + final Iterable specs + ) + { + return new QueryRunner() + { + @Override + public Sequence run(final QueryPlus queryPlus, final Map responseContext) + { + return CachingClusteredClient.this.run( + queryPlus, + responseContext, + timeline -> { + final List> retVal = new LinkedList<>(); + final VersionedIntervalTimeline timeline2 = new VersionedIntervalTimeline<>( + Ordering.natural() + ); + for (SegmentDescriptor spec : specs) { + final PartitionHolder entry = timeline.findEntry(spec.getInterval(), spec.getVersion()); + if (entry != null) { + final PartitionChunk chunk = entry.getChunk(spec.getPartitionNumber()); + if (chunk != null) { + timeline2.add(spec.getInterval(), spec.getVersion(), chunk); + } + } + } + return timeline2; + } + ); + } + }; + } + + /** + * Run a query. The timelineFunction will be given the "master" timeline and can be used to return a different + * timeline, if desired. This is used by getQueryRunnerForSegments. + */ + private Sequence run( + final QueryPlus queryPlus, + final Map responseContext, + final Function, TimelineLookup> timelineFunction + ) { final Query query = queryPlus.getQuery(); final QueryToolChest> toolChest = warehouse.getToolChest(query); @@ -176,7 +246,11 @@ public class CachingClusteredClient implements QueryRunner // build set of segments to query Set> segments = Sets.newLinkedHashSet(); - List> serversLookup = Lists.newLinkedList(); + timeline = timelineFunction.apply(timeline); + List> serversLookup = CachingClusteredClient.lookupIntervalsInTimeline( + timeline, + query.getIntervals() + ); // Note that enabling this leads to putting uncovered intervals information in the response headers // and might blow up in some cases https://github.com/druid-io/druid/issues/2108 @@ -201,7 +275,6 @@ public class CachingClusteredClient implements QueryRunner } } startMillis = holderInterval.getEndMillis(); - serversLookup.add(holder); } if (!uncoveredIntervalsOverflowed && startMillis < endMillis) { @@ -221,10 +294,6 @@ public class CachingClusteredClient implements QueryRunner responseContext.put("uncoveredIntervals", uncoveredIntervals); responseContext.put("uncoveredIntervalsOverflowed", uncoveredIntervalsOverflowed); } - } else { - for (Interval interval : query.getIntervals()) { - Iterables.addAll(serversLookup, timeline.lookup(interval)); - } } // Let tool chest filter out unneeded segments @@ -571,7 +640,18 @@ public class CachingClusteredClient implements QueryRunner ); } - protected Sequence mergeCachedAndUncachedSequences( + private static List> lookupIntervalsInTimeline( + final TimelineLookup timeline, + final Iterable intervals + ) + { + return StreamSupport.stream(intervals.spliterator(), false) + .flatMap(interval -> StreamSupport.stream(timeline.lookup(interval).spliterator(), false)) + .collect(Collectors.toList()); + } + + @VisibleForTesting + static Sequence mergeCachedAndUncachedSequences( Query query, List> sequencesByInterval ) diff --git a/server/src/main/java/io/druid/client/TimelineServerView.java b/server/src/main/java/io/druid/client/TimelineServerView.java index 79a7917262e..54366707c09 100644 --- a/server/src/main/java/io/druid/client/TimelineServerView.java +++ b/server/src/main/java/io/druid/client/TimelineServerView.java @@ -22,12 +22,56 @@ package io.druid.client; import io.druid.client.selector.ServerSelector; import io.druid.query.DataSource; import io.druid.query.QueryRunner; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineLookup; +import java.util.concurrent.Executor; + /** */ public interface TimelineServerView extends ServerView { TimelineLookup getTimeline(DataSource dataSource); + QueryRunner getQueryRunner(DruidServer server); + + /** + * Register a callback for state changes in the timeline managed by this TimelineServerView. The callback will be + * called after the relevant change is made to this TimelineServerView's timeline. + * + * @param exec executor in which to run the callback + * @param callback the callback + */ + void registerTimelineCallback(Executor exec, TimelineCallback callback); + + interface TimelineCallback + { + /** + * Called once, when the timeline has been initialized. + * + * @return continue or unregister + */ + CallbackAction timelineInitialized(); + + /** + * Called when a segment on a particular server has been added to the timeline. May be called multiple times for + * the same segment, if that segment is added on multiple servers. + * + * @param server the server + * @param segment the segment + * + * @return continue or unregister + */ + CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment); + + /** + * Called when a segment has been removed from all servers and is no longer present in the timeline. + * + * @param segment the segment + * + * @return continue or unregister + */ + CallbackAction segmentRemoved(final DataSegment segment); + } } diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 3af0014439b..defd17e3452 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -65,16 +65,16 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { - return makeRunner(query); + return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals)); } @Override public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) { - return makeRunner(query); + return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs)); } - private QueryRunner makeRunner(Query query) + private QueryRunner makeRunner(Query query, QueryRunner baseClientRunner) { QueryToolChest> toolChest = warehouse.getToolChest(query); PostProcessingOperator postProcessing = objectMapper.convertValue( @@ -87,7 +87,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker return new FluentQueryRunnerBuilder<>(toolChest) .create( new RetryQueryRunner<>( - baseClient, + baseClientRunner, toolChest, retryConfig, objectMapper diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java index 176d99c2f2a..e6814f166ed 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java @@ -29,9 +29,11 @@ import io.druid.client.cache.MapCache; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelector; import io.druid.client.selector.TierSelectorStrategy; +import io.druid.java.util.common.guava.Sequence; import io.druid.query.DataSource; import io.druid.query.Druids; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -95,47 +97,47 @@ public class CachingClusteredClientFunctionalityTest )); Map responseContext = new HashMap<>(); - client.run(builder.build(), responseContext); + runQuery(client, builder.build(), responseContext); Assert.assertNull(responseContext.get("uncoveredIntervals")); builder.intervals("2015-01-01/2015-01-03"); responseContext = new HashMap<>(); - client.run(builder.build(), responseContext); + runQuery(client, builder.build(), responseContext); assertUncovered(responseContext, false, "2015-01-01/2015-01-02"); builder.intervals("2015-01-01/2015-01-04"); responseContext = new HashMap<>(); - client.run(builder.build(), responseContext); + runQuery(client, builder.build(), responseContext); assertUncovered(responseContext, false, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04"); builder.intervals("2015-01-02/2015-01-04"); responseContext = new HashMap<>(); - client.run(builder.build(), responseContext); + runQuery(client, builder.build(), responseContext); assertUncovered(responseContext, false, "2015-01-03/2015-01-04"); builder.intervals("2015-01-01/2015-01-30"); responseContext = new HashMap<>(); - client.run(builder.build(), responseContext); + runQuery(client, builder.build(), responseContext); assertUncovered(responseContext, false, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30"); builder.intervals("2015-01-02/2015-01-30"); responseContext = new HashMap<>(); - client.run(builder.build(), responseContext); + runQuery(client, builder.build(), responseContext); assertUncovered(responseContext, false, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30"); builder.intervals("2015-01-04/2015-01-30"); responseContext = new HashMap<>(); - client.run(builder.build(), responseContext); + runQuery(client, builder.build(), responseContext); assertUncovered(responseContext, false, "2015-01-05/2015-01-30"); builder.intervals("2015-01-10/2015-01-30"); responseContext = new HashMap<>(); - client.run(builder.build(), responseContext); + runQuery(client, builder.build(), responseContext); assertUncovered(responseContext, false, "2015-01-10/2015-01-30"); builder.intervals("2015-01-01/2015-02-25"); responseContext = new HashMap<>(); - client.run(builder.build(), responseContext); + runQuery(client, builder.build(), responseContext); assertUncovered(responseContext, true, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-02-04"); } @@ -223,6 +225,12 @@ public class CachingClusteredClientFunctionalityTest return timeline; } + @Override + public void registerTimelineCallback(final Executor exec, final TimelineCallback callback) + { + throw new UnsupportedOperationException(); + } + @Override public QueryRunner getQueryRunner(DruidServer server) { @@ -266,4 +274,16 @@ public class CachingClusteredClientFunctionalityTest } ); } + + private static Sequence runQuery( + CachingClusteredClient client, + final Query query, + final Map responseContext + ) + { + return client.getQueryRunnerForIntervals(query, query.getIntervals()).run( + QueryPlus.wrap(query), + responseContext + ); + } } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index f19c1faeedb..e72d54b98ec 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -457,7 +457,7 @@ public class CachingClusteredClientTest .context(CONTEXT); QueryRunner runner = new FinalizeResultsQueryRunner( - client, new TimeseriesQueryQueryToolChest( + getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest( QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) ); @@ -496,7 +496,7 @@ public class CachingClusteredClientTest .context(CONTEXT); QueryRunner runner = new FinalizeResultsQueryRunner( - client, new TimeseriesQueryQueryToolChest( + getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest( QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) ); @@ -591,7 +591,7 @@ public class CachingClusteredClientTest selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), dataSegment); timeline.add(interval, "v", new SingleElementPartitionChunk<>(selector)); - client.run(query, context); + getDefaultQueryRunner().run(query, context); Assert.assertTrue("Capture cache keys", cacheKeyCapture.hasCaptured()); Assert.assertTrue("Cache key below limit", ImmutableList.copyOf(cacheKeyCapture.getValue()).size() <= limit); @@ -605,7 +605,7 @@ public class CachingClusteredClientTest .once(); EasyMock.replay(cache); client = makeClient(MoreExecutors.sameThreadExecutor(), cache, 0); - client.run(query, context); + getDefaultQueryRunner().run(query, context); EasyMock.verify(cache); EasyMock.verify(dataSegment); Assert.assertTrue("Capture cache keys", cacheKeyCapture.hasCaptured()); @@ -625,7 +625,7 @@ public class CachingClusteredClientTest .context(CONTEXT); QueryRunner runner = new FinalizeResultsQueryRunner( - client, new TimeseriesQueryQueryToolChest( + getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest( QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) ); @@ -688,7 +688,7 @@ public class CachingClusteredClientTest .context(CONTEXT); QueryRunner runner = new FinalizeResultsQueryRunner( - client, new TimeseriesQueryQueryToolChest( + getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest( QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) ); @@ -734,7 +734,7 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); QueryRunner runner = new FinalizeResultsQueryRunner( - client, new TimeseriesQueryQueryToolChest( + getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest( QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) ); @@ -775,7 +775,7 @@ public class CachingClusteredClientTest Assert.assertEquals(0, cache.getStats().getNumMisses()); testQueryCaching( - client, + getDefaultQueryRunner(), 1, false, builder.context( @@ -809,7 +809,7 @@ public class CachingClusteredClientTest .context(CONTEXT); QueryRunner runner = new FinalizeResultsQueryRunner( - client, + getDefaultQueryRunner(), new TopNQueryQueryToolChest( new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() @@ -887,7 +887,7 @@ public class CachingClusteredClientTest .context(CONTEXT); QueryRunner runner = new FinalizeResultsQueryRunner( - client, new TopNQueryQueryToolChest( + getDefaultQueryRunner(), new TopNQueryQueryToolChest( new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) @@ -988,7 +988,7 @@ public class CachingClusteredClientTest .context(CONTEXT); QueryRunner runner = new FinalizeResultsQueryRunner( - client, new TopNQueryQueryToolChest( + getDefaultQueryRunner(), new TopNQueryQueryToolChest( new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) @@ -1062,7 +1062,7 @@ public class CachingClusteredClientTest .context(CONTEXT); QueryRunner runner = new FinalizeResultsQueryRunner( - client, new TopNQueryQueryToolChest( + getDefaultQueryRunner(), new TopNQueryQueryToolChest( new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) @@ -1134,7 +1134,7 @@ public class CachingClusteredClientTest .context(CONTEXT); testQueryCaching( - client, + getDefaultQueryRunner(), builder.build(), new Interval("2011-01-01/2011-01-02"), makeSearchResults(TOP_DIM, new DateTime("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4), @@ -1164,7 +1164,7 @@ public class CachingClusteredClientTest ); QueryRunner runner = new FinalizeResultsQueryRunner( - client, new SearchQueryQueryToolChest( + getDefaultQueryRunner(), new SearchQueryQueryToolChest( new SearchQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) @@ -1208,7 +1208,7 @@ public class CachingClusteredClientTest .context(CONTEXT); testQueryCaching( - client, + getDefaultQueryRunner(), builder.build(), new Interval("2011-01-01/2011-01-02"), makeSearchResults(TOP_DIM, new DateTime("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4), @@ -1238,7 +1238,7 @@ public class CachingClusteredClientTest ); QueryRunner runner = new FinalizeResultsQueryRunner( - client, new SearchQueryQueryToolChest( + getDefaultQueryRunner(), new SearchQueryQueryToolChest( new SearchQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) @@ -1311,7 +1311,7 @@ public class CachingClusteredClientTest .context(CONTEXT); testQueryCaching( - client, + getDefaultQueryRunner(), builder.build(), new Interval("2011-01-01/2011-01-02"), makeSelectResults(dimensions, metrics, new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1)), @@ -1337,7 +1337,7 @@ public class CachingClusteredClientTest ); QueryRunner runner = new FinalizeResultsQueryRunner( - client, + getDefaultQueryRunner(), new SelectQueryQueryToolChest( jsonMapper, QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator(), @@ -1384,7 +1384,7 @@ public class CachingClusteredClientTest .context(CONTEXT); testQueryCaching( - client, + getDefaultQueryRunner(), builder.build(), new Interval("2011-01-01/2011-01-02"), makeSelectResults(dimensions, metrics, new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1)), @@ -1414,7 +1414,7 @@ public class CachingClusteredClientTest ); QueryRunner runner = new FinalizeResultsQueryRunner( - client, + getDefaultQueryRunner(), new SelectQueryQueryToolChest( jsonMapper, QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator(), @@ -1495,7 +1495,7 @@ public class CachingClusteredClientTest collector.add(hashFn.hashString("123abc", Charsets.UTF_8).asBytes()); testQueryCaching( - client, + getDefaultQueryRunner(), builder.build(), new Interval("2011-01-01/2011-01-02"), makeGroupByResults( @@ -1539,7 +1539,7 @@ public class CachingClusteredClientTest ); QueryRunner runner = new FinalizeResultsQueryRunner( - client, + getDefaultQueryRunner(), GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig()).getToolchest() ); HashMap context = new HashMap(); @@ -1579,7 +1579,7 @@ public class CachingClusteredClientTest public void testTimeBoundaryCaching() throws Exception { testQueryCaching( - client, + getDefaultQueryRunner(), Druids.newTimeBoundaryQueryBuilder() .dataSource(CachingClusteredClientTest.DATA_SOURCE) .intervals(CachingClusteredClientTest.SEG_SPEC) @@ -1599,7 +1599,7 @@ public class CachingClusteredClientTest ); testQueryCaching( - client, + getDefaultQueryRunner(), Druids.newTimeBoundaryQueryBuilder() .dataSource(CachingClusteredClientTest.DATA_SOURCE) .intervals(CachingClusteredClientTest.SEG_SPEC) @@ -1620,7 +1620,7 @@ public class CachingClusteredClientTest ); testQueryCaching( - client, + getDefaultQueryRunner(), Druids.newTimeBoundaryQueryBuilder() .dataSource(CachingClusteredClientTest.DATA_SOURCE) .intervals(CachingClusteredClientTest.SEG_SPEC) @@ -1680,7 +1680,7 @@ public class CachingClusteredClientTest .context(CONTEXT); QueryRunner runner = new FinalizeResultsQueryRunner( - client, new TimeseriesQueryQueryToolChest( + getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest( QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) ); @@ -1759,14 +1759,14 @@ public class CachingClusteredClientTest .postAggregators(RENAMED_POST_AGGS); TimeseriesQuery query = builder.build(); - Map context = new HashMap<>(); + Map context = new HashMap<>(); final Interval interval1 = new Interval("2011-01-06/2011-01-07"); final Interval interval2 = new Interval("2011-01-07/2011-01-08"); final Interval interval3 = new Interval("2011-01-08/2011-01-09"); QueryRunner runner = new FinalizeResultsQueryRunner( - client, new TimeseriesQueryQueryToolChest( + getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest( QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) ); @@ -1787,7 +1787,7 @@ public class CachingClusteredClientTest timeline.add(interval3, "v", new StringPartitionChunk<>(null, null, 6, selector6)); final Capture capture = Capture.newInstance(); - final Capture> contextCap = Capture.newInstance(); + final Capture> contextCap = Capture.newInstance(); QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class); EasyMock.expect(mockRunner.run(EasyMock.capture(capture), EasyMock.capture(contextCap))) @@ -1962,7 +1962,7 @@ public class CachingClusteredClientTest @Override public void run() { - HashMap context = new HashMap(); + HashMap context = new HashMap<>(); for (int i = 0; i < numTimesToQuery; ++i) { TestHelper.assertExpectedResults( expected, @@ -2755,6 +2755,12 @@ public class CachingClusteredClientTest return serverView.getQueryRunner(server); } + @Override + public void registerTimelineCallback(final Executor exec, final TimelineCallback callback) + { + throw new UnsupportedOperationException(); + } + @Override public void registerServerCallback(Executor exec, ServerCallback callback) { @@ -3000,7 +3006,7 @@ public class CachingClusteredClientTest public void testTimeBoundaryCachingWhenTimeIsInteger() throws Exception { testQueryCaching( - client, + getDefaultQueryRunner(), Druids.newTimeBoundaryQueryBuilder() .dataSource(CachingClusteredClientTest.DATA_SOURCE) .intervals(CachingClusteredClientTest.SEG_SPEC) @@ -3020,7 +3026,7 @@ public class CachingClusteredClientTest ); testQueryCaching( - client, + getDefaultQueryRunner(), Druids.newTimeBoundaryQueryBuilder() .dataSource(CachingClusteredClientTest.DATA_SOURCE) .intervals(CachingClusteredClientTest.SEG_SPEC) @@ -3041,7 +3047,7 @@ public class CachingClusteredClientTest ); testQueryCaching( - client, + getDefaultQueryRunner(), Druids.newTimeBoundaryQueryBuilder() .dataSource(CachingClusteredClientTest.DATA_SOURCE) .intervals(CachingClusteredClientTest.SEG_SPEC) @@ -3075,7 +3081,7 @@ public class CachingClusteredClientTest .setContext(CONTEXT); testQueryCaching( - client, + getDefaultQueryRunner(), builder.build(), new Interval("2011-01-01/2011-01-02"), makeGroupByResults( @@ -3109,7 +3115,7 @@ public class CachingClusteredClientTest ); QueryRunner runner = new FinalizeResultsQueryRunner( - client, + getDefaultQueryRunner(), GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig()).getToolchest() ); HashMap context = new HashMap(); @@ -3190,10 +3196,22 @@ public class CachingClusteredClientTest .build(); - Map responseContext = new HashMap<>(); + Map responseContext = new HashMap<>(); - client.run(query, responseContext); + getDefaultQueryRunner().run(query, responseContext); Assert.assertEquals("Z/eS4rQz5v477iq7Aashr6JPZa0=", responseContext.get("ETag")); } + @SuppressWarnings("unchecked") + private QueryRunner getDefaultQueryRunner() + { + return new QueryRunner() { + @Override + public Sequence run(final QueryPlus queryPlus, final Map responseContext) + { + return client.getQueryRunnerForIntervals(queryPlus.getQuery(), queryPlus.getQuery().getIntervals()) + .run(queryPlus, responseContext); + } + }; + } } diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java b/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java index 6314f530d2f..7ce0254b8a3 100644 --- a/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java +++ b/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java @@ -49,7 +49,7 @@ public class PlannerFactory .setConformance(DruidConformance.instance()) .build(); - private final SchemaPlus rootSchema; + private final DruidSchema druidSchema; private final QuerySegmentWalker walker; private final DruidOperatorTable operatorTable; private final ExprMacroTable macroTable; @@ -58,7 +58,7 @@ public class PlannerFactory @Inject public PlannerFactory( - final SchemaPlus rootSchema, + final DruidSchema druidSchema, final QuerySegmentWalker walker, final DruidOperatorTable operatorTable, final ExprMacroTable macroTable, @@ -66,7 +66,7 @@ public class PlannerFactory final ServerConfig serverConfig ) { - this.rootSchema = rootSchema; + this.druidSchema = druidSchema; this.walker = walker; this.operatorTable = operatorTable; this.macroTable = macroTable; @@ -76,12 +76,12 @@ public class PlannerFactory public DruidPlanner createPlanner(final Map queryContext) { + final SchemaPlus rootSchema = Calcites.createRootSchema(druidSchema); final PlannerContext plannerContext = PlannerContext.create(operatorTable, macroTable, plannerConfig, queryContext); final QueryMaker queryMaker = new QueryMaker(walker, plannerContext, serverConfig); final FrameworkConfig frameworkConfig = Frameworks .newConfigBuilder() .parserConfig(PARSER_CONFIG) - .defaultSchema(rootSchema) .traitDefs(ConventionTraitDef.INSTANCE, RelCollationTraitDef.INSTANCE) .convertletTable(new DruidConvertletTable(plannerContext)) .operatorTable(operatorTable) diff --git a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java index 48d9af42aa9..0f0d5b41181 100644 --- a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java @@ -20,11 +20,10 @@ package io.druid.sql.calcite.schema; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.Lists; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; @@ -32,24 +31,24 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.client.DirectDruidClient; -import io.druid.client.DruidDataSource; -import io.druid.client.DruidServer; import io.druid.client.ServerView; import io.druid.client.TimelineServerView; -import io.druid.common.utils.JodaUtils; import io.druid.guice.ManageLifecycle; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.common.guava.Sequence; -import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.common.guava.Yielder; +import io.druid.java.util.common.guava.Yielders; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.query.QueryPlus; import io.druid.query.QuerySegmentWalker; import io.druid.query.TableDataSource; +import io.druid.query.metadata.metadata.AllColumnIncluderator; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.SegmentAnalysis; import io.druid.query.metadata.metadata.SegmentMetadataQuery; +import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.segment.column.ValueType; import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ServerConfig; @@ -59,42 +58,66 @@ import io.druid.sql.calcite.table.RowSignature; import io.druid.sql.calcite.view.DruidViewMacro; import io.druid.sql.calcite.view.ViewManager; import io.druid.timeline.DataSegment; -import org.apache.calcite.schema.Function; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; import org.joda.time.DateTime; +import java.io.IOException; +import java.util.Comparator; import java.util.EnumSet; -import java.util.List; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; @ManageLifecycle public class DruidSchema extends AbstractSchema { + // Newest segments first, so they override older ones. + private static final Comparator SEGMENT_ORDER = Comparator + .comparing((DataSegment segment) -> segment.getInterval().getStart()).reversed() + .thenComparing(Function.identity()); + public static final String NAME = "druid"; private static final EmittingLogger log = new EmittingLogger(DruidSchema.class); + private static final int MAX_SEGMENTS_PER_QUERY = 15000; private final QuerySegmentWalker walker; private final TimelineServerView serverView; private final PlannerConfig config; private final ViewManager viewManager; private final ExecutorService cacheExec; - private final ConcurrentMap tables; + private final ConcurrentMap tables; private final ServerConfig serverConfig; // For awaitInitialization. private final CountDownLatch initializationLatch = new CountDownLatch(1); - // Protects access to dataSourcesNeedingRefresh, lastRefresh, isServerViewInitialized + // Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized private final Object lock = new Object(); - // List of dataSources that need metadata refreshes. - private final Set dataSourcesNeedingRefresh = Sets.newHashSet(); + // DataSource -> Segment -> RowSignature for that segment. + // Use TreeMap for segments so they are merged in deterministic order, from older to newer. + private final Map> segmentSignatures = new HashMap<>(); + + // All mutable segments. + private final Set mutableSegments = new TreeSet<>(SEGMENT_ORDER); + + // All dataSources that need tables regenerated. + private final Set dataSourcesNeedingRebuild = new HashSet<>(); + + // All segments that need to be refreshed. + private final TreeSet segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER); + private boolean refreshImmediately = false; private long lastRefresh = 0L; private boolean isServerViewInitialized = false; @@ -128,43 +151,63 @@ public class DruidSchema extends AbstractSchema { try { while (!Thread.currentThread().isInterrupted()) { - final Set dataSources = Sets.newHashSet(); + final Set segmentsToRefresh = new TreeSet<>(); + final Set dataSourcesToRebuild = new TreeSet<>(); try { synchronized (lock) { - final long nextRefresh = new DateTime(lastRefresh).plus(config.getMetadataRefreshPeriod()) - .getMillis(); + final long nextRefreshNoFuzz = new DateTime(lastRefresh) + .plus(config.getMetadataRefreshPeriod()) + .getMillis(); + + // Fuzz a bit to spread load out when we have multiple brokers. + final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10); while (!( isServerViewInitialized - && !dataSourcesNeedingRefresh.isEmpty() + && (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) && (refreshImmediately || nextRefresh < System.currentTimeMillis()) )) { lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis())); } - dataSources.addAll(dataSourcesNeedingRefresh); - dataSourcesNeedingRefresh.clear(); + segmentsToRefresh.addAll(segmentsNeedingRefresh); + segmentsNeedingRefresh.clear(); + + // Mutable segments need a refresh every period, since new columns could be added dynamically. + segmentsNeedingRefresh.addAll(mutableSegments); + lastRefresh = System.currentTimeMillis(); refreshImmediately = false; } - // Refresh dataSources. - for (final String dataSource : dataSources) { - log.debug("Refreshing metadata for dataSource[%s].", dataSource); - final long startTime = System.currentTimeMillis(); - final DruidTable druidTable = computeTable(dataSource); - if (druidTable == null) { - if (tables.remove(dataSource) != null) { - log.info("Removed dataSource[%s] from the list of active dataSources.", dataSource); - } - } else { - tables.put(dataSource, druidTable); - log.info( - "Refreshed metadata for dataSource[%s] in %,dms.", + // Refresh the segments. + final Set refreshed = refreshSegments(segmentsToRefresh); + + synchronized (lock) { + // Add missing segments back to the refresh list. + segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed)); + + // Compute the list of dataSources to rebuild tables for. + dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild); + refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource())); + dataSourcesNeedingRebuild.clear(); + + lock.notifyAll(); + } + + // Rebuild the dataSources. + for (String dataSource : dataSourcesToRebuild) { + final DruidTable druidTable = buildDruidTable(dataSource); + final DruidTable oldTable = tables.put(dataSource, druidTable); + if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { + log.debug( + "Table for dataSource[%s] has new signature[%s].", dataSource, - System.currentTimeMillis() - startTime + druidTable.getRowSignature() ); + } else { + log.debug("Table for dataSource[%s] signature is unchanged.", dataSource); } } @@ -175,15 +218,12 @@ public class DruidSchema extends AbstractSchema throw e; } catch (Exception e) { - log.warn( - e, - "Metadata refresh failed for dataSources[%s], trying again soon.", - Joiner.on(", ").join(dataSources) - ); + log.warn(e, "Metadata refresh failed, trying again soon."); synchronized (lock) { - // Add dataSources back to the refresh list. - dataSourcesNeedingRefresh.addAll(dataSources); + // Add our segments and dataSources back to their refresh and rebuild lists. + segmentsNeedingRefresh.addAll(segmentsToRefresh); + dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild); lock.notifyAll(); } } @@ -205,12 +245,12 @@ public class DruidSchema extends AbstractSchema } ); - serverView.registerSegmentCallback( + serverView.registerTimelineCallback( MoreExecutors.sameThreadExecutor(), - new ServerView.SegmentCallback() + new TimelineServerView.TimelineCallback() { @Override - public ServerView.CallbackAction segmentViewInitialized() + public ServerView.CallbackAction timelineInitialized() { synchronized (lock) { isServerViewInitialized = true; @@ -221,50 +261,16 @@ public class DruidSchema extends AbstractSchema } @Override - public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) + public ServerView.CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment) { - synchronized (lock) { - dataSourcesNeedingRefresh.add(segment.getDataSource()); - if (!tables.containsKey(segment.getDataSource())) { - refreshImmediately = true; - } - - lock.notifyAll(); - } - + addSegment(server, segment); return ServerView.CallbackAction.CONTINUE; } @Override - public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) + public ServerView.CallbackAction segmentRemoved(final DataSegment segment) { - synchronized (lock) { - dataSourcesNeedingRefresh.add(segment.getDataSource()); - lock.notifyAll(); - } - - return ServerView.CallbackAction.CONTINUE; - } - } - ); - - serverView.registerServerCallback( - MoreExecutors.sameThreadExecutor(), - new ServerView.ServerCallback() - { - @Override - public ServerView.CallbackAction serverRemoved(DruidServer server) - { - final List dataSourceNames = Lists.newArrayList(); - for (DruidDataSource druidDataSource : server.getDataSources()) { - dataSourceNames.add(druidDataSource.getName()); - } - - synchronized (lock) { - dataSourcesNeedingRefresh.addAll(dataSourceNames); - lock.notifyAll(); - } - + removeSegment(segment); return ServerView.CallbackAction.CONTINUE; } } @@ -290,92 +296,248 @@ public class DruidSchema extends AbstractSchema } @Override - protected Multimap getFunctionMultimap() + protected Multimap getFunctionMultimap() { - final ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); + final ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); for (Map.Entry entry : viewManager.getViews().entrySet()) { builder.put(entry); } return builder.build(); } - private DruidTable computeTable(final String dataSource) + private void addSegment(final DruidServerMetadata server, final DataSegment segment) { - SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery( - new TableDataSource(dataSource), - null, - null, - false, - ImmutableMap.of("useCache", false, "populateCache", false), - EnumSet.of(SegmentMetadataQuery.AnalysisType.INTERVAL), - null, - true + synchronized (lock) { + final Map knownSegments = segmentSignatures.get(segment.getDataSource()); + if (knownSegments == null || !knownSegments.containsKey(segment)) { + // Unknown segment. + setSegmentSignature(segment, null); + segmentsNeedingRefresh.add(segment); + + if (!server.segmentReplicatable()) { + log.debug("Added new mutable segment[%s].", segment.getIdentifier()); + mutableSegments.add(segment); + } else { + log.debug("Added new immutable segment[%s].", segment.getIdentifier()); + } + } else if (server.segmentReplicatable()) { + // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, + // even if it's also available on non-replicatable (realtime) servers. + mutableSegments.remove(segment); + log.debug("Segment[%s] has become immutable.", segment.getIdentifier()); + } + + if (!tables.containsKey(segment.getDataSource())) { + refreshImmediately = true; + } + + lock.notifyAll(); + } + } + + private void removeSegment(final DataSegment segment) + { + synchronized (lock) { + log.debug("Segment[%s] is gone.", segment.getIdentifier()); + + dataSourcesNeedingRebuild.add(segment.getDataSource()); + segmentsNeedingRefresh.remove(segment); + mutableSegments.remove(segment); + + final Map dataSourceSegments = segmentSignatures.get(segment.getDataSource()); + dataSourceSegments.remove(segment); + + if (dataSourceSegments.isEmpty()) { + segmentSignatures.remove(segment.getDataSource()); + tables.remove(segment.getDataSource()); + log.info("Removed all metadata for dataSource[%s].", segment.getDataSource()); + } + + lock.notifyAll(); + } + } + + /** + * Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed, + * which may be a subset of the asked-for set. + */ + private Set refreshSegments(final Set segments) throws IOException + { + final Set retVal = new HashSet<>(); + + // Organize segments by dataSource. + final Map> segmentMap = new TreeMap<>(); + + for (DataSegment segment : segments) { + segmentMap.computeIfAbsent(segment.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER)) + .add(segment); + } + + for (Map.Entry> entry : segmentMap.entrySet()) { + final String dataSource = entry.getKey(); + retVal.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue())); + } + + return retVal; + } + + /** + * Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of + * segments actually refreshed, which may be a subset of the asked-for set. + */ + private Set refreshSegmentsForDataSource( + final String dataSource, + final Set segments + ) throws IOException + { + log.debug("Refreshing metadata for dataSource[%s].", dataSource); + + final long startTime = System.currentTimeMillis(); + + // Segment identifier -> segment object. + final Map segmentMap = segments.stream().collect( + Collectors.toMap( + DataSegment::getIdentifier, + Function.identity() + ) ); - segmentMetadataQuery = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( - segmentMetadataQuery, + final Set retVal = new HashSet<>(); + final Sequence sequence = runSegmentMetadataQuery( + walker, + serverConfig, + Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY) + ); + + Yielder yielder = Yielders.each(sequence); + + try { + while (!yielder.isDone()) { + final SegmentAnalysis analysis = yielder.get(); + final DataSegment segment = segmentMap.get(analysis.getId()); + + if (segment == null) { + log.warn("Got analysis for segment[%s] we didn't ask for, ignoring.", analysis.getId()); + } else { + final RowSignature rowSignature = analysisToRowSignature(analysis); + log.debug("Segment[%s] has signature[%s].", segment.getIdentifier(), rowSignature); + setSegmentSignature(segment, rowSignature); + retVal.add(segment); + } + + yielder = yielder.next(null); + } + } + finally { + yielder.close(); + } + + log.info( + "Refreshed metadata for dataSource[%s] in %,d ms (%d segments queried, %d segments left).", + dataSource, + System.currentTimeMillis() - startTime, + retVal.size(), + segments.size() - retVal.size() + ); + + return retVal; + } + + private void setSegmentSignature(final DataSegment segment, final RowSignature rowSignature) + { + synchronized (lock) { + segmentSignatures.computeIfAbsent(segment.getDataSource(), x -> new TreeMap<>(SEGMENT_ORDER)) + .put(segment, rowSignature); + } + } + + private DruidTable buildDruidTable(final String dataSource) + { + synchronized (lock) { + final TreeMap segmentMap = segmentSignatures.get(dataSource); + final Map columnTypes = new TreeMap<>(); + + if (segmentMap != null) { + for (RowSignature rowSignature : segmentMap.values()) { + if (rowSignature != null) { + for (String column : rowSignature.getRowOrder()) { + // Newer column types should override older ones. + columnTypes.putIfAbsent(column, rowSignature.getColumnType(column)); + } + } + } + } + + final RowSignature.Builder builder = RowSignature.builder(); + columnTypes.forEach(builder::add); + return new DruidTable(new TableDataSource(dataSource), builder.build()); + } + } + + private static Sequence runSegmentMetadataQuery( + final QuerySegmentWalker walker, + final ServerConfig serverConfig, + final Iterable segments + ) + { + // Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource. + final String dataSource = Iterables.getOnlyElement( + StreamSupport.stream(segments.spliterator(), false) + .map(DataSegment::getDataSource).collect(Collectors.toSet()) + ); + + final MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec( + StreamSupport.stream(segments.spliterator(), false) + .map(DataSegment::toDescriptor).collect(Collectors.toList()) + ); + + final SegmentMetadataQuery segmentMetadataQuery = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( + new SegmentMetadataQuery( + new TableDataSource(dataSource), + querySegmentSpec, + new AllColumnIncluderator(), + false, + ImmutableMap.of(), + EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), + false, + false + ), serverConfig ); - final Sequence sequence = QueryPlus.wrap(segmentMetadataQuery) - .run( - walker, - DirectDruidClient.makeResponseContextForQuery( - segmentMetadataQuery, - System.currentTimeMillis() - ) - ); - final List results = Sequences.toList(sequence, Lists.newArrayList()); - if (results.isEmpty()) { - return null; - } + return QueryPlus.wrap(segmentMetadataQuery) + .run( + walker, + DirectDruidClient.makeResponseContextForQuery( + segmentMetadataQuery, + System.currentTimeMillis() + ) + ); + } - final Map columnTypes = Maps.newLinkedHashMap(); + private static RowSignature analysisToRowSignature(final SegmentAnalysis analysis) + { + final RowSignature.Builder rowSignatureBuilder = RowSignature.builder(); - // Resolve conflicts by taking the latest metadata. This aids in gradual schema evolution. - long maxTimestamp = JodaUtils.MIN_INSTANT; - - for (SegmentAnalysis analysis : results) { - final long timestamp; - - if (analysis.getIntervals() != null && analysis.getIntervals().size() > 0) { - timestamp = analysis.getIntervals().get(analysis.getIntervals().size() - 1).getEndMillis(); - } else { - timestamp = JodaUtils.MIN_INSTANT; + for (Map.Entry entry : analysis.getColumns().entrySet()) { + if (entry.getValue().isError()) { + // Skip columns with analysis errors. + continue; } - for (Map.Entry entry : analysis.getColumns().entrySet()) { - if (entry.getValue().isError()) { - // Skip columns with analysis errors. - continue; - } - - if (!columnTypes.containsKey(entry.getKey()) || timestamp >= maxTimestamp) { - ValueType valueType; - try { - valueType = ValueType.valueOf(StringUtils.toUpperCase(entry.getValue().getType())); - } - catch (IllegalArgumentException e) { - // Assume unrecognized types are some flavor of COMPLEX. This throws away information about exactly - // what kind of complex column it is, which we may want to preserve some day. - valueType = ValueType.COMPLEX; - } - - columnTypes.put(entry.getKey(), valueType); - - maxTimestamp = timestamp; - } + ValueType valueType; + try { + valueType = ValueType.valueOf(StringUtils.toUpperCase(entry.getValue().getType())); } + catch (IllegalArgumentException e) { + // Assume unrecognized types are some flavor of COMPLEX. This throws away information about exactly + // what kind of complex column it is, which we may want to preserve some day. + valueType = ValueType.COMPLEX; + } + + rowSignatureBuilder.add(entry.getKey(), valueType); } - final RowSignature.Builder rowSignature = RowSignature.builder(); - for (Map.Entry entry : columnTypes.entrySet()) { - rowSignature.add(entry.getKey(), entry.getValue()); - } - - return new DruidTable( - new TableDataSource(dataSource), - rowSignature.build() - ); + return rowSignatureBuilder.build(); } } diff --git a/sql/src/main/java/io/druid/sql/guice/SqlModule.java b/sql/src/main/java/io/druid/sql/guice/SqlModule.java index 3e266cd3a93..c79028d2f1f 100644 --- a/sql/src/main/java/io/druid/sql/guice/SqlModule.java +++ b/sql/src/main/java/io/druid/sql/guice/SqlModule.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Module; -import com.google.inject.Provider; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; @@ -57,7 +56,6 @@ import io.druid.sql.calcite.schema.DruidSchema; import io.druid.sql.calcite.view.NoopViewManager; import io.druid.sql.calcite.view.ViewManager; import io.druid.sql.http.SqlResource; -import org.apache.calcite.schema.SchemaPlus; import java.util.List; import java.util.Properties; @@ -106,8 +104,7 @@ public class SqlModule implements Module JsonConfigProvider.bind(binder, "druid.sql.planner", PlannerConfig.class); JsonConfigProvider.bind(binder, "druid.sql.avatica", AvaticaServerConfig.class); LifecycleModule.register(binder, DruidSchema.class); - binder.bind(ViewManager.class).to(NoopViewManager.class); - binder.bind(SchemaPlus.class).toProvider(SchemaPlusProvider.class); + binder.bind(ViewManager.class).to(NoopViewManager.class).in(LazySingleton.class); for (Class clazz : DEFAULT_AGGREGATOR_CLASSES) { SqlBindings.addAggregator(binder, clazz); @@ -129,18 +126,6 @@ public class SqlModule implements Module } } - public static class SchemaPlusProvider implements Provider - { - @Inject - private DruidSchema druidSchema; - - @Override - public SchemaPlus get() - { - return Calcites.createRootSchema(druidSchema); - } - } - private boolean isEnabled() { Preconditions.checkNotNull(props, "props"); diff --git a/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java index c46aa6ee1e6..212d9a4ffa6 100644 --- a/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -39,6 +39,7 @@ import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.planner.PlannerFactory; +import io.druid.sql.calcite.schema.DruidSchema; import io.druid.sql.calcite.util.CalciteTests; import io.druid.sql.calcite.util.QueryLogHook; import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; @@ -46,7 +47,6 @@ import org.apache.calcite.avatica.AvaticaClientRuntimeException; import org.apache.calcite.avatica.Meta; import org.apache.calcite.avatica.MissingResultsException; import org.apache.calcite.avatica.NoSuchStatementException; -import org.apache.calcite.schema.SchemaPlus; import org.eclipse.jetty.server.Server; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -116,16 +116,11 @@ public class DruidAvaticaHandlerTest Calcites.setSystemProperties(); walker = CalciteTests.createMockWalker(temporaryFolder.newFolder()); final PlannerConfig plannerConfig = new PlannerConfig(); - final SchemaPlus rootSchema = Calcites.createRootSchema( - CalciteTests.createMockSchema( - walker, - plannerConfig - ) - ); + final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); druidMeta = new DruidMeta( - new PlannerFactory(rootSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()), + new PlannerFactory(druidSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()), AVATICA_CONFIG ); final DruidAvaticaHandler handler = new DruidAvaticaHandler( @@ -544,18 +539,13 @@ public class DruidAvaticaHandlerTest }; final PlannerConfig plannerConfig = new PlannerConfig(); - final SchemaPlus rootSchema = Calcites.createRootSchema( - CalciteTests.createMockSchema( - walker, - plannerConfig - ) - ); + final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); final List frames = new ArrayList<>(); DruidMeta smallFrameDruidMeta = new DruidMeta( - new PlannerFactory(rootSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()), + new PlannerFactory(druidSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()), smallFrameConfig ) { diff --git a/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java index 68552d4321c..7cde6e2012e 100644 --- a/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java +++ b/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java @@ -27,12 +27,12 @@ import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.planner.PlannerFactory; +import io.druid.sql.calcite.schema.DruidSchema; import io.druid.sql.calcite.util.CalciteTests; import io.druid.sql.calcite.util.QueryLogHook; import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.avatica.Meta; -import org.apache.calcite.schema.SchemaPlus; import org.joda.time.DateTime; import org.junit.After; import org.junit.Assert; @@ -60,16 +60,14 @@ public class DruidStatementTest Calcites.setSystemProperties(); walker = CalciteTests.createMockWalker(temporaryFolder.newFolder()); final PlannerConfig plannerConfig = new PlannerConfig(); - final SchemaPlus rootSchema = Calcites.createRootSchema( - CalciteTests.createMockSchema( - walker, - plannerConfig - ) + final DruidSchema druidSchema = CalciteTests.createMockSchema( + walker, + plannerConfig ); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); plannerFactory = new PlannerFactory( - rootSchema, + druidSchema, walker, operatorTable, macroTable, diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index f81f3047922..5d0ee07e1c3 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -96,7 +96,6 @@ import io.druid.sql.calcite.util.QueryLogHook; import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import io.druid.sql.calcite.view.InProcessViewManager; import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.schema.SchemaPlus; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Interval; @@ -5351,11 +5350,10 @@ public class CalciteQueryTest { final InProcessViewManager viewManager = new InProcessViewManager(); final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig, viewManager); - final SchemaPlus rootSchema = Calcites.createRootSchema(druidSchema); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); final PlannerFactory plannerFactory = new PlannerFactory( - rootSchema, + druidSchema, walker, operatorTable, macroTable, diff --git a/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java index 83d5ae016ef..8280ab682b0 100644 --- a/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java @@ -35,12 +35,12 @@ import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.planner.PlannerContext; import io.druid.sql.calcite.planner.PlannerFactory; +import io.druid.sql.calcite.schema.DruidSchema; import io.druid.sql.calcite.util.CalciteTests; import io.druid.sql.calcite.util.QueryLogHook; import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import io.druid.sql.http.SqlQuery; import io.druid.sql.http.SqlResource; -import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.tools.ValidationException; import org.junit.After; import org.junit.Assert; @@ -75,14 +75,12 @@ public class SqlResourceTest Calcites.setSystemProperties(); walker = CalciteTests.createMockWalker(temporaryFolder.newFolder()); final PlannerConfig plannerConfig = new PlannerConfig(); - final SchemaPlus rootSchema = Calcites.createRootSchema( - CalciteTests.createMockSchema(walker, plannerConfig) - ); + final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); resource = new SqlResource( JSON_MAPPER, - new PlannerFactory(rootSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()) + new PlannerFactory(druidSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()) ); } diff --git a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java index 990f674c1f5..c3ad3ea8cf8 100644 --- a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java @@ -166,8 +166,12 @@ public class DruidSchemaTest final Map tableMap = schema.getTableMap(); Assert.assertEquals(ImmutableSet.of("foo", "foo2"), tableMap.keySet()); + } - final DruidTable fooTable = (DruidTable) tableMap.get("foo"); + @Test + public void testGetTableMapFoo() + { + final DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl()); final List fields = rowType.getFieldList(); @@ -182,13 +186,32 @@ public class DruidSchemaTest Assert.assertEquals("dim1", fields.get(2).getName()); Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(2).getType().getSqlTypeName()); - Assert.assertEquals("m1", fields.get(3).getName()); - Assert.assertEquals(SqlTypeName.BIGINT, fields.get(3).getType().getSqlTypeName()); + Assert.assertEquals("dim2", fields.get(3).getName()); + Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(3).getType().getSqlTypeName()); - Assert.assertEquals("unique_dim1", fields.get(4).getName()); - Assert.assertEquals(SqlTypeName.OTHER, fields.get(4).getType().getSqlTypeName()); + Assert.assertEquals("m1", fields.get(4).getName()); + Assert.assertEquals(SqlTypeName.BIGINT, fields.get(4).getType().getSqlTypeName()); - Assert.assertEquals("dim2", fields.get(5).getName()); - Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(5).getType().getSqlTypeName()); + Assert.assertEquals("unique_dim1", fields.get(5).getName()); + Assert.assertEquals(SqlTypeName.OTHER, fields.get(5).getType().getSqlTypeName()); + } + + @Test + public void testGetTableMapFoo2() + { + final DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo2"); + final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl()); + final List fields = rowType.getFieldList(); + + Assert.assertEquals(3, fields.size()); + + Assert.assertEquals("__time", fields.get(0).getName()); + Assert.assertEquals(SqlTypeName.TIMESTAMP, fields.get(0).getType().getSqlTypeName()); + + Assert.assertEquals("dim2", fields.get(1).getName()); + Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(1).getType().getSqlTypeName()); + + Assert.assertEquals("m1", fields.get(2).getName()); + Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName()); } } diff --git a/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java index baccc0dd125..08888f1184d 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java @@ -36,6 +36,15 @@ import java.util.concurrent.Executor; public class TestServerInventoryView implements TimelineServerView { + private static final DruidServerMetadata DUMMY_SERVER = new DruidServerMetadata( + "dummy", + "dummy", + null, + 0, + ServerType.HISTORICAL, + "dummy", + 0 + ); private final List segments; public TestServerInventoryView(List segments) @@ -52,31 +61,21 @@ public class TestServerInventoryView implements TimelineServerView @Override public void registerSegmentCallback(Executor exec, final SegmentCallback callback) { - final DruidServerMetadata dummyServer = new DruidServerMetadata("dummy", "dummy", null, 0, ServerType.HISTORICAL, "dummy", 0); - for (final DataSegment segment : segments) { - exec.execute( - new Runnable() - { - @Override - public void run() - { - callback.segmentAdded(dummyServer, segment); - } - } - ); + exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment)); } - exec.execute( - new Runnable() - { - @Override - public void run() - { - callback.segmentViewInitialized(); - } - } - ); + exec.execute(callback::segmentViewInitialized); + } + + @Override + public void registerTimelineCallback(final Executor exec, final TimelineCallback callback) + { + for (DataSegment segment : segments) { + exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment)); + } + + exec.execute(callback::timelineInitialized); } @Override