Fixes and improvements to SQL metadata caching. (#4551)

* Fixes and improvements to SQL metadata caching.

Also adds support for MultipleSpecificSegmentSpec to CachingClusteredClient.

SQL changes:
- Cache metadata on a per-segment level, in addition to per-dataSource, so
  we don't need to re-query all segments whenever a single new one appears.
  This should lower the load placed on the cluster by metadata queries.
- Fix race condition in DruidSchema that can cause us to miss metadata. It was
  possible to notice new segments, then issue a query, and have that query
  not actually hit those segments, and not notice that it didn't hit those segments.
  Then, the metadata from those segments would be ignored.
- Fix assumption in DruidSchema that all segments are immutable. Now, mutable
  segments are periodically re-queried.
- Fix inappropriate re-use of SchemaPlus. Now we create one for each planning
  cycle, rather than sharing one. It caches table objects, which we want to
  avoid, since it can cause stale metadata. We do the caching in DruidSchema
  so we don't need the SchemaPlus caching.

Server changes:
- Add a TimelineCallback to TimelineServerView, for callers that want to get updates
  when the timeline has been modified.
- Change CachingClusteredClient from a QueryRunner to a QuerySegmentWalker. This
  allows it to accept queries that are segment-descriptor-based rather than
  intervals-based. In particular it will now support MultipleSpecificSegmentSpec.

* Fix DruidSchema, and unused imports.

* Remove unused import.

* Fix SqlBenchmark.
This commit is contained in:
Gian Merlino 2017-07-20 10:14:15 -07:00 committed by Fangjin Yang
parent 71e7a4c054
commit 2be7068f6e
17 changed files with 632 additions and 328 deletions

View File

@ -19,7 +19,6 @@
package io.druid.benchmark.query;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
@ -33,29 +32,21 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.segment.QueryableIndex;
import io.druid.segment.column.ValueType;
import io.druid.server.initialization.ServerConfig;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidPlanner;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.planner.PlannerResult;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.table.RowSignature;
import io.druid.sql.calcite.util.CalciteTests;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
@ -76,7 +67,6 @@ import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@ -124,28 +114,8 @@ public class SqlBenchmark
this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index);
final Map<String, Table> tableMap = ImmutableMap.<String, Table>of(
"foo",
new DruidTable(
new TableDataSource("foo"),
RowSignature.builder()
.add("__time", ValueType.LONG)
.add("dimSequential", ValueType.STRING)
.add("dimZipf", ValueType.STRING)
.add("dimUniform", ValueType.STRING)
.build()
)
);
final Schema druidSchema = new AbstractSchema()
{
@Override
protected Map<String, Table> getTableMap()
{
return tableMap;
}
};
plannerFactory = new PlannerFactory(
Calcites.createRootSchema(druidSchema),
CalciteTests.createMockSchema(walker, plannerConfig),
walker,
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),

View File

@ -46,8 +46,6 @@ import io.druid.segment.column.ValueType;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.virtual.ExpressionVirtualColumn;
import io.druid.server.initialization.ServerConfig;
import io.druid.sql.calcite.aggregation.SqlAggregator;
import io.druid.sql.calcite.expression.SqlOperatorConversion;
import io.druid.sql.calcite.filtration.Filtration;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
@ -55,12 +53,12 @@ import io.druid.sql.calcite.planner.DruidPlanner;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.planner.PlannerResult;
import io.druid.sql.calcite.schema.DruidSchema;
import io.druid.sql.calcite.util.CalciteTests;
import io.druid.sql.calcite.util.QueryLogHook;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.apache.calcite.schema.SchemaPlus;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -126,18 +124,13 @@ public class QuantileSqlAggregatorTest
);
final PlannerConfig plannerConfig = new PlannerConfig();
final SchemaPlus rootSchema = Calcites.createRootSchema(
CalciteTests.createMockSchema(
walker,
plannerConfig
)
);
final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig);
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.<SqlAggregator>of(new QuantileSqlAggregator()),
ImmutableSet.<SqlOperatorConversion>of()
ImmutableSet.of(new QuantileSqlAggregator()),
ImmutableSet.of()
);
plannerFactory = new PlannerFactory(
rootSchema,
druidSchema,
walker,
operatorTable,
CalciteTests.createExprMacroTable(),

View File

@ -46,9 +46,11 @@ import io.druid.timeline.partition.PartitionChunk;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
/**
*/
@ -61,6 +63,7 @@ public class BrokerServerView implements TimelineServerView
private final ConcurrentMap<String, QueryableDruidServer> clients;
private final Map<String, ServerSelector> selectors;
private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines;
private final ConcurrentMap<TimelineCallback, Executor> timelineCallbacks = new ConcurrentHashMap<>();
private final QueryToolChestWarehouse warehouse;
private final QueryWatcher queryWatcher;
@ -139,6 +142,7 @@ public class BrokerServerView implements TimelineServerView
public CallbackAction segmentViewInitialized()
{
initialized = true;
runTimelineCallbacks(TimelineCallback::timelineInitialized);
return ServerView.CallbackAction.CONTINUE;
}
},
@ -237,6 +241,7 @@ public class BrokerServerView implements TimelineServerView
queryableDruidServer = addServer(baseView.getInventoryValue(server.getName()));
}
selector.addServerAndUpdateSegment(queryableDruidServer, segment);
runTimelineCallbacks(callback -> callback.segmentAdded(server, segment));
}
}
@ -278,6 +283,8 @@ public class BrokerServerView implements TimelineServerView
segment.getInterval(),
segment.getVersion()
);
} else {
runTimelineCallbacks(callback -> callback.segmentRemoved(segment));
}
}
}
@ -293,6 +300,12 @@ public class BrokerServerView implements TimelineServerView
}
}
@Override
public void registerTimelineCallback(final Executor exec, final TimelineCallback callback)
{
timelineCallbacks.put(callback, exec);
}
@Override
public <T> QueryRunner<T> getQueryRunner(DruidServer server)
{
@ -317,4 +330,17 @@ public class BrokerServerView implements TimelineServerView
{
baseView.registerSegmentCallback(exec, callback, segmentFilter);
}
private void runTimelineCallbacks(final Function<TimelineCallback, CallbackAction> function)
{
for (Map.Entry<TimelineCallback, Executor> entry : timelineCallbacks.entrySet()) {
entry.getValue().execute(
() -> {
if (CallbackAction.UNREGISTER == function.apply(entry.getKey())) {
timelineCallbacks.remove(entry.getKey());
}
}
);
}
}
}

View File

@ -21,6 +21,7 @@ package io.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Optional;
@ -31,6 +32,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
import com.google.common.hash.Hasher;
@ -62,6 +64,7 @@ import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.Result;
@ -74,7 +77,9 @@ import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;
import io.druid.timeline.partition.ShardSpec;
import org.apache.commons.codec.binary.Base64;
import org.joda.time.Interval;
@ -82,6 +87,7 @@ import org.joda.time.Interval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@ -89,10 +95,12 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
*/
public class CachingClusteredClient<T> implements QueryRunner<T>
public class CachingClusteredClient implements QuerySegmentWalker
{
private static final EmittingLogger log = new EmittingLogger(CachingClusteredClient.class);
private final QueryToolChestWarehouse warehouse;
@ -141,7 +149,69 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
public <T> QueryRunner<T> getQueryRunnerForIntervals(
final Query<T> query,
final Iterable<Interval> intervals
)
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
return CachingClusteredClient.this.run(
queryPlus,
responseContext,
timeline -> timeline
);
}
};
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(
final Query<T> query,
final Iterable<SegmentDescriptor> specs
)
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
return CachingClusteredClient.this.run(
queryPlus,
responseContext,
timeline -> {
final List<TimelineObjectHolder<String, ServerSelector>> retVal = new LinkedList<>();
final VersionedIntervalTimeline<String, ServerSelector> timeline2 = new VersionedIntervalTimeline<>(
Ordering.natural()
);
for (SegmentDescriptor spec : specs) {
final PartitionHolder<ServerSelector> entry = timeline.findEntry(spec.getInterval(), spec.getVersion());
if (entry != null) {
final PartitionChunk<ServerSelector> chunk = entry.getChunk(spec.getPartitionNumber());
if (chunk != null) {
timeline2.add(spec.getInterval(), spec.getVersion(), chunk);
}
}
}
return timeline2;
}
);
}
};
}
/**
* Run a query. The timelineFunction will be given the "master" timeline and can be used to return a different
* timeline, if desired. This is used by getQueryRunnerForSegments.
*/
private <T> Sequence<T> run(
final QueryPlus<T> queryPlus,
final Map<String, Object> responseContext,
final Function<TimelineLookup<String, ServerSelector>, TimelineLookup<String, ServerSelector>> timelineFunction
)
{
final Query<T> query = queryPlus.getQuery();
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
@ -176,7 +246,11 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// build set of segments to query
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = Lists.newLinkedList();
timeline = timelineFunction.apply(timeline);
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = CachingClusteredClient.lookupIntervalsInTimeline(
timeline,
query.getIntervals()
);
// Note that enabling this leads to putting uncovered intervals information in the response headers
// and might blow up in some cases https://github.com/druid-io/druid/issues/2108
@ -201,7 +275,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
}
startMillis = holderInterval.getEndMillis();
serversLookup.add(holder);
}
if (!uncoveredIntervalsOverflowed && startMillis < endMillis) {
@ -221,10 +294,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
responseContext.put("uncoveredIntervals", uncoveredIntervals);
responseContext.put("uncoveredIntervalsOverflowed", uncoveredIntervalsOverflowed);
}
} else {
for (Interval interval : query.getIntervals()) {
Iterables.addAll(serversLookup, timeline.lookup(interval));
}
}
// Let tool chest filter out unneeded segments
@ -571,7 +640,18 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
);
}
protected Sequence<T> mergeCachedAndUncachedSequences(
private static <VersionType, ObjectType> List<TimelineObjectHolder<VersionType, ObjectType>> lookupIntervalsInTimeline(
final TimelineLookup<VersionType, ObjectType> timeline,
final Iterable<Interval> intervals
)
{
return StreamSupport.stream(intervals.spliterator(), false)
.flatMap(interval -> StreamSupport.stream(timeline.lookup(interval).spliterator(), false))
.collect(Collectors.toList());
}
@VisibleForTesting
static <T> Sequence<T> mergeCachedAndUncachedSequences(
Query<T> query,
List<Sequence<T>> sequencesByInterval
)

View File

@ -22,12 +22,56 @@ package io.druid.client;
import io.druid.client.selector.ServerSelector;
import io.druid.query.DataSource;
import io.druid.query.QueryRunner;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import java.util.concurrent.Executor;
/**
*/
public interface TimelineServerView extends ServerView
{
TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource);
<T> QueryRunner<T> getQueryRunner(DruidServer server);
/**
* Register a callback for state changes in the timeline managed by this TimelineServerView. The callback will be
* called after the relevant change is made to this TimelineServerView's timeline.
*
* @param exec executor in which to run the callback
* @param callback the callback
*/
void registerTimelineCallback(Executor exec, TimelineCallback callback);
interface TimelineCallback
{
/**
* Called once, when the timeline has been initialized.
*
* @return continue or unregister
*/
CallbackAction timelineInitialized();
/**
* Called when a segment on a particular server has been added to the timeline. May be called multiple times for
* the same segment, if that segment is added on multiple servers.
*
* @param server the server
* @param segment the segment
*
* @return continue or unregister
*/
CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment);
/**
* Called when a segment has been removed from all servers and is no longer present in the timeline.
*
* @param segment the segment
*
* @return continue or unregister
*/
CallbackAction segmentRemoved(final DataSegment segment);
}
}

View File

@ -65,16 +65,16 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
return makeRunner(query);
return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals));
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
return makeRunner(query);
return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs));
}
private <T> QueryRunner<T> makeRunner(Query<T> query)
private <T> QueryRunner<T> makeRunner(Query<T> query, QueryRunner<T> baseClientRunner)
{
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
PostProcessingOperator<T> postProcessing = objectMapper.convertValue(
@ -87,7 +87,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
return new FluentQueryRunnerBuilder<>(toolChest)
.create(
new RetryQueryRunner<>(
baseClient,
baseClientRunner,
toolChest,
retryConfig,
objectMapper

View File

@ -29,9 +29,11 @@ import io.druid.client.cache.MapCache;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.TierSelectorStrategy;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
@ -95,47 +97,47 @@ public class CachingClusteredClientFunctionalityTest
));
Map<String, Object> responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
runQuery(client, builder.build(), responseContext);
Assert.assertNull(responseContext.get("uncoveredIntervals"));
builder.intervals("2015-01-01/2015-01-03");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
runQuery(client, builder.build(), responseContext);
assertUncovered(responseContext, false, "2015-01-01/2015-01-02");
builder.intervals("2015-01-01/2015-01-04");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
runQuery(client, builder.build(), responseContext);
assertUncovered(responseContext, false, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04");
builder.intervals("2015-01-02/2015-01-04");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
runQuery(client, builder.build(), responseContext);
assertUncovered(responseContext, false, "2015-01-03/2015-01-04");
builder.intervals("2015-01-01/2015-01-30");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
runQuery(client, builder.build(), responseContext);
assertUncovered(responseContext, false, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");
builder.intervals("2015-01-02/2015-01-30");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
runQuery(client, builder.build(), responseContext);
assertUncovered(responseContext, false, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");
builder.intervals("2015-01-04/2015-01-30");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
runQuery(client, builder.build(), responseContext);
assertUncovered(responseContext, false, "2015-01-05/2015-01-30");
builder.intervals("2015-01-10/2015-01-30");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
runQuery(client, builder.build(), responseContext);
assertUncovered(responseContext, false, "2015-01-10/2015-01-30");
builder.intervals("2015-01-01/2015-02-25");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
runQuery(client, builder.build(), responseContext);
assertUncovered(responseContext, true, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-02-04");
}
@ -223,6 +225,12 @@ public class CachingClusteredClientFunctionalityTest
return timeline;
}
@Override
public void registerTimelineCallback(final Executor exec, final TimelineCallback callback)
{
throw new UnsupportedOperationException();
}
@Override
public <T> QueryRunner<T> getQueryRunner(DruidServer server)
{
@ -266,4 +274,16 @@ public class CachingClusteredClientFunctionalityTest
}
);
}
private static <T> Sequence<T> runQuery(
CachingClusteredClient client,
final Query<T> query,
final Map<String, Object> responseContext
)
{
return client.getQueryRunnerForIntervals(query, query.getIntervals()).run(
QueryPlus.wrap(query),
responseContext
);
}
}

View File

@ -457,7 +457,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
@ -496,7 +496,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
@ -591,7 +591,7 @@ public class CachingClusteredClientTest
selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), dataSegment);
timeline.add(interval, "v", new SingleElementPartitionChunk<>(selector));
client.run(query, context);
getDefaultQueryRunner().run(query, context);
Assert.assertTrue("Capture cache keys", cacheKeyCapture.hasCaptured());
Assert.assertTrue("Cache key below limit", ImmutableList.copyOf(cacheKeyCapture.getValue()).size() <= limit);
@ -605,7 +605,7 @@ public class CachingClusteredClientTest
.once();
EasyMock.replay(cache);
client = makeClient(MoreExecutors.sameThreadExecutor(), cache, 0);
client.run(query, context);
getDefaultQueryRunner().run(query, context);
EasyMock.verify(cache);
EasyMock.verify(dataSegment);
Assert.assertTrue("Capture cache keys", cacheKeyCapture.hasCaptured());
@ -625,7 +625,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
@ -688,7 +688,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
@ -734,7 +734,7 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
@ -775,7 +775,7 @@ public class CachingClusteredClientTest
Assert.assertEquals(0, cache.getStats().getNumMisses());
testQueryCaching(
client,
getDefaultQueryRunner(),
1,
false,
builder.context(
@ -809,7 +809,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
client,
getDefaultQueryRunner(),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
@ -887,7 +887,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TopNQueryQueryToolChest(
getDefaultQueryRunner(), new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
@ -988,7 +988,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TopNQueryQueryToolChest(
getDefaultQueryRunner(), new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
@ -1062,7 +1062,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TopNQueryQueryToolChest(
getDefaultQueryRunner(), new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
@ -1134,7 +1134,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
testQueryCaching(
client,
getDefaultQueryRunner(),
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeSearchResults(TOP_DIM, new DateTime("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4),
@ -1164,7 +1164,7 @@ public class CachingClusteredClientTest
);
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new SearchQueryQueryToolChest(
getDefaultQueryRunner(), new SearchQueryQueryToolChest(
new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
@ -1208,7 +1208,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
testQueryCaching(
client,
getDefaultQueryRunner(),
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeSearchResults(TOP_DIM, new DateTime("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4),
@ -1238,7 +1238,7 @@ public class CachingClusteredClientTest
);
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new SearchQueryQueryToolChest(
getDefaultQueryRunner(), new SearchQueryQueryToolChest(
new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
@ -1311,7 +1311,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
testQueryCaching(
client,
getDefaultQueryRunner(),
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeSelectResults(dimensions, metrics, new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1)),
@ -1337,7 +1337,7 @@ public class CachingClusteredClientTest
);
QueryRunner runner = new FinalizeResultsQueryRunner(
client,
getDefaultQueryRunner(),
new SelectQueryQueryToolChest(
jsonMapper,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator(),
@ -1384,7 +1384,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
testQueryCaching(
client,
getDefaultQueryRunner(),
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeSelectResults(dimensions, metrics, new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1)),
@ -1414,7 +1414,7 @@ public class CachingClusteredClientTest
);
QueryRunner runner = new FinalizeResultsQueryRunner(
client,
getDefaultQueryRunner(),
new SelectQueryQueryToolChest(
jsonMapper,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator(),
@ -1495,7 +1495,7 @@ public class CachingClusteredClientTest
collector.add(hashFn.hashString("123abc", Charsets.UTF_8).asBytes());
testQueryCaching(
client,
getDefaultQueryRunner(),
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeGroupByResults(
@ -1539,7 +1539,7 @@ public class CachingClusteredClientTest
);
QueryRunner runner = new FinalizeResultsQueryRunner(
client,
getDefaultQueryRunner(),
GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig()).getToolchest()
);
HashMap<String, Object> context = new HashMap<String, Object>();
@ -1579,7 +1579,7 @@ public class CachingClusteredClientTest
public void testTimeBoundaryCaching() throws Exception
{
testQueryCaching(
client,
getDefaultQueryRunner(),
Druids.newTimeBoundaryQueryBuilder()
.dataSource(CachingClusteredClientTest.DATA_SOURCE)
.intervals(CachingClusteredClientTest.SEG_SPEC)
@ -1599,7 +1599,7 @@ public class CachingClusteredClientTest
);
testQueryCaching(
client,
getDefaultQueryRunner(),
Druids.newTimeBoundaryQueryBuilder()
.dataSource(CachingClusteredClientTest.DATA_SOURCE)
.intervals(CachingClusteredClientTest.SEG_SPEC)
@ -1620,7 +1620,7 @@ public class CachingClusteredClientTest
);
testQueryCaching(
client,
getDefaultQueryRunner(),
Druids.newTimeBoundaryQueryBuilder()
.dataSource(CachingClusteredClientTest.DATA_SOURCE)
.intervals(CachingClusteredClientTest.SEG_SPEC)
@ -1680,7 +1680,7 @@ public class CachingClusteredClientTest
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
@ -1759,14 +1759,14 @@ public class CachingClusteredClientTest
.postAggregators(RENAMED_POST_AGGS);
TimeseriesQuery query = builder.build();
Map<String, List> context = new HashMap<>();
Map<String, Object> context = new HashMap<>();
final Interval interval1 = new Interval("2011-01-06/2011-01-07");
final Interval interval2 = new Interval("2011-01-07/2011-01-08");
final Interval interval3 = new Interval("2011-01-08/2011-01-09");
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
@ -1787,7 +1787,7 @@ public class CachingClusteredClientTest
timeline.add(interval3, "v", new StringPartitionChunk<>(null, null, 6, selector6));
final Capture<QueryPlus> capture = Capture.newInstance();
final Capture<Map<String, List>> contextCap = Capture.newInstance();
final Capture<Map<String, Object>> contextCap = Capture.newInstance();
QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class);
EasyMock.expect(mockRunner.run(EasyMock.capture(capture), EasyMock.capture(contextCap)))
@ -1962,7 +1962,7 @@ public class CachingClusteredClientTest
@Override
public void run()
{
HashMap<String, List> context = new HashMap<String, List>();
HashMap<String, Object> context = new HashMap<>();
for (int i = 0; i < numTimesToQuery; ++i) {
TestHelper.assertExpectedResults(
expected,
@ -2755,6 +2755,12 @@ public class CachingClusteredClientTest
return serverView.getQueryRunner(server);
}
@Override
public void registerTimelineCallback(final Executor exec, final TimelineCallback callback)
{
throw new UnsupportedOperationException();
}
@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
{
@ -3000,7 +3006,7 @@ public class CachingClusteredClientTest
public void testTimeBoundaryCachingWhenTimeIsInteger() throws Exception
{
testQueryCaching(
client,
getDefaultQueryRunner(),
Druids.newTimeBoundaryQueryBuilder()
.dataSource(CachingClusteredClientTest.DATA_SOURCE)
.intervals(CachingClusteredClientTest.SEG_SPEC)
@ -3020,7 +3026,7 @@ public class CachingClusteredClientTest
);
testQueryCaching(
client,
getDefaultQueryRunner(),
Druids.newTimeBoundaryQueryBuilder()
.dataSource(CachingClusteredClientTest.DATA_SOURCE)
.intervals(CachingClusteredClientTest.SEG_SPEC)
@ -3041,7 +3047,7 @@ public class CachingClusteredClientTest
);
testQueryCaching(
client,
getDefaultQueryRunner(),
Druids.newTimeBoundaryQueryBuilder()
.dataSource(CachingClusteredClientTest.DATA_SOURCE)
.intervals(CachingClusteredClientTest.SEG_SPEC)
@ -3075,7 +3081,7 @@ public class CachingClusteredClientTest
.setContext(CONTEXT);
testQueryCaching(
client,
getDefaultQueryRunner(),
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeGroupByResults(
@ -3109,7 +3115,7 @@ public class CachingClusteredClientTest
);
QueryRunner runner = new FinalizeResultsQueryRunner(
client,
getDefaultQueryRunner(),
GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig()).getToolchest()
);
HashMap<String, Object> context = new HashMap<String, Object>();
@ -3190,10 +3196,22 @@ public class CachingClusteredClientTest
.build();
Map<String, String> responseContext = new HashMap<>();
Map<String, Object> responseContext = new HashMap<>();
client.run(query, responseContext);
getDefaultQueryRunner().run(query, responseContext);
Assert.assertEquals("Z/eS4rQz5v477iq7Aashr6JPZa0=", responseContext.get("ETag"));
}
@SuppressWarnings("unchecked")
private QueryRunner getDefaultQueryRunner()
{
return new QueryRunner() {
@Override
public Sequence run(final QueryPlus queryPlus, final Map responseContext)
{
return client.getQueryRunnerForIntervals(queryPlus.getQuery(), queryPlus.getQuery().getIntervals())
.run(queryPlus, responseContext);
}
};
}
}

View File

@ -49,7 +49,7 @@ public class PlannerFactory
.setConformance(DruidConformance.instance())
.build();
private final SchemaPlus rootSchema;
private final DruidSchema druidSchema;
private final QuerySegmentWalker walker;
private final DruidOperatorTable operatorTable;
private final ExprMacroTable macroTable;
@ -58,7 +58,7 @@ public class PlannerFactory
@Inject
public PlannerFactory(
final SchemaPlus rootSchema,
final DruidSchema druidSchema,
final QuerySegmentWalker walker,
final DruidOperatorTable operatorTable,
final ExprMacroTable macroTable,
@ -66,7 +66,7 @@ public class PlannerFactory
final ServerConfig serverConfig
)
{
this.rootSchema = rootSchema;
this.druidSchema = druidSchema;
this.walker = walker;
this.operatorTable = operatorTable;
this.macroTable = macroTable;
@ -76,12 +76,12 @@ public class PlannerFactory
public DruidPlanner createPlanner(final Map<String, Object> queryContext)
{
final SchemaPlus rootSchema = Calcites.createRootSchema(druidSchema);
final PlannerContext plannerContext = PlannerContext.create(operatorTable, macroTable, plannerConfig, queryContext);
final QueryMaker queryMaker = new QueryMaker(walker, plannerContext, serverConfig);
final FrameworkConfig frameworkConfig = Frameworks
.newConfigBuilder()
.parserConfig(PARSER_CONFIG)
.defaultSchema(rootSchema)
.traitDefs(ConventionTraitDef.INSTANCE, RelCollationTraitDef.INSTANCE)
.convertletTable(new DruidConvertletTable(plannerContext))
.operatorTable(operatorTable)

View File

@ -20,11 +20,10 @@
package io.druid.sql.calcite.schema;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
@ -32,24 +31,24 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DirectDruidClient;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.client.TimelineServerView;
import io.druid.common.utils.JodaUtils;
import io.druid.guice.ManageLifecycle;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.Yielders;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.query.QueryPlus;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.TableDataSource;
import io.druid.query.metadata.metadata.AllColumnIncluderator;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.segment.column.ValueType;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ServerConfig;
@ -59,42 +58,66 @@ import io.druid.sql.calcite.table.RowSignature;
import io.druid.sql.calcite.view.DruidViewMacro;
import io.druid.sql.calcite.view.ViewManager;
import io.druid.timeline.DataSegment;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ManageLifecycle
public class DruidSchema extends AbstractSchema
{
// Newest segments first, so they override older ones.
private static final Comparator<DataSegment> SEGMENT_ORDER = Comparator
.comparing((DataSegment segment) -> segment.getInterval().getStart()).reversed()
.thenComparing(Function.identity());
public static final String NAME = "druid";
private static final EmittingLogger log = new EmittingLogger(DruidSchema.class);
private static final int MAX_SEGMENTS_PER_QUERY = 15000;
private final QuerySegmentWalker walker;
private final TimelineServerView serverView;
private final PlannerConfig config;
private final ViewManager viewManager;
private final ExecutorService cacheExec;
private final ConcurrentMap<String, Table> tables;
private final ConcurrentMap<String, DruidTable> tables;
private final ServerConfig serverConfig;
// For awaitInitialization.
private final CountDownLatch initializationLatch = new CountDownLatch(1);
// Protects access to dataSourcesNeedingRefresh, lastRefresh, isServerViewInitialized
// Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized
private final Object lock = new Object();
// List of dataSources that need metadata refreshes.
private final Set<String> dataSourcesNeedingRefresh = Sets.newHashSet();
// DataSource -> Segment -> RowSignature for that segment.
// Use TreeMap for segments so they are merged in deterministic order, from older to newer.
private final Map<String, TreeMap<DataSegment, RowSignature>> segmentSignatures = new HashMap<>();
// All mutable segments.
private final Set<DataSegment> mutableSegments = new TreeSet<>(SEGMENT_ORDER);
// All dataSources that need tables regenerated.
private final Set<String> dataSourcesNeedingRebuild = new HashSet<>();
// All segments that need to be refreshed.
private final TreeSet<DataSegment> segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER);
private boolean refreshImmediately = false;
private long lastRefresh = 0L;
private boolean isServerViewInitialized = false;
@ -128,43 +151,63 @@ public class DruidSchema extends AbstractSchema
{
try {
while (!Thread.currentThread().isInterrupted()) {
final Set<String> dataSources = Sets.newHashSet();
final Set<DataSegment> segmentsToRefresh = new TreeSet<>();
final Set<String> dataSourcesToRebuild = new TreeSet<>();
try {
synchronized (lock) {
final long nextRefresh = new DateTime(lastRefresh).plus(config.getMetadataRefreshPeriod())
.getMillis();
final long nextRefreshNoFuzz = new DateTime(lastRefresh)
.plus(config.getMetadataRefreshPeriod())
.getMillis();
// Fuzz a bit to spread load out when we have multiple brokers.
final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10);
while (!(
isServerViewInitialized
&& !dataSourcesNeedingRefresh.isEmpty()
&& (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty())
&& (refreshImmediately || nextRefresh < System.currentTimeMillis())
)) {
lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
}
dataSources.addAll(dataSourcesNeedingRefresh);
dataSourcesNeedingRefresh.clear();
segmentsToRefresh.addAll(segmentsNeedingRefresh);
segmentsNeedingRefresh.clear();
// Mutable segments need a refresh every period, since new columns could be added dynamically.
segmentsNeedingRefresh.addAll(mutableSegments);
lastRefresh = System.currentTimeMillis();
refreshImmediately = false;
}
// Refresh dataSources.
for (final String dataSource : dataSources) {
log.debug("Refreshing metadata for dataSource[%s].", dataSource);
final long startTime = System.currentTimeMillis();
final DruidTable druidTable = computeTable(dataSource);
if (druidTable == null) {
if (tables.remove(dataSource) != null) {
log.info("Removed dataSource[%s] from the list of active dataSources.", dataSource);
}
} else {
tables.put(dataSource, druidTable);
log.info(
"Refreshed metadata for dataSource[%s] in %,dms.",
// Refresh the segments.
final Set<DataSegment> refreshed = refreshSegments(segmentsToRefresh);
synchronized (lock) {
// Add missing segments back to the refresh list.
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
// Compute the list of dataSources to rebuild tables for.
dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
dataSourcesNeedingRebuild.clear();
lock.notifyAll();
}
// Rebuild the dataSources.
for (String dataSource : dataSourcesToRebuild) {
final DruidTable druidTable = buildDruidTable(dataSource);
final DruidTable oldTable = tables.put(dataSource, druidTable);
if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
log.debug(
"Table for dataSource[%s] has new signature[%s].",
dataSource,
System.currentTimeMillis() - startTime
druidTable.getRowSignature()
);
} else {
log.debug("Table for dataSource[%s] signature is unchanged.", dataSource);
}
}
@ -175,15 +218,12 @@ public class DruidSchema extends AbstractSchema
throw e;
}
catch (Exception e) {
log.warn(
e,
"Metadata refresh failed for dataSources[%s], trying again soon.",
Joiner.on(", ").join(dataSources)
);
log.warn(e, "Metadata refresh failed, trying again soon.");
synchronized (lock) {
// Add dataSources back to the refresh list.
dataSourcesNeedingRefresh.addAll(dataSources);
// Add our segments and dataSources back to their refresh and rebuild lists.
segmentsNeedingRefresh.addAll(segmentsToRefresh);
dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
lock.notifyAll();
}
}
@ -205,12 +245,12 @@ public class DruidSchema extends AbstractSchema
}
);
serverView.registerSegmentCallback(
serverView.registerTimelineCallback(
MoreExecutors.sameThreadExecutor(),
new ServerView.SegmentCallback()
new TimelineServerView.TimelineCallback()
{
@Override
public ServerView.CallbackAction segmentViewInitialized()
public ServerView.CallbackAction timelineInitialized()
{
synchronized (lock) {
isServerViewInitialized = true;
@ -221,50 +261,16 @@ public class DruidSchema extends AbstractSchema
}
@Override
public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
public ServerView.CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment)
{
synchronized (lock) {
dataSourcesNeedingRefresh.add(segment.getDataSource());
if (!tables.containsKey(segment.getDataSource())) {
refreshImmediately = true;
}
lock.notifyAll();
}
addSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
public ServerView.CallbackAction segmentRemoved(final DataSegment segment)
{
synchronized (lock) {
dataSourcesNeedingRefresh.add(segment.getDataSource());
lock.notifyAll();
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
serverView.registerServerCallback(
MoreExecutors.sameThreadExecutor(),
new ServerView.ServerCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
{
final List<String> dataSourceNames = Lists.newArrayList();
for (DruidDataSource druidDataSource : server.getDataSources()) {
dataSourceNames.add(druidDataSource.getName());
}
synchronized (lock) {
dataSourcesNeedingRefresh.addAll(dataSourceNames);
lock.notifyAll();
}
removeSegment(segment);
return ServerView.CallbackAction.CONTINUE;
}
}
@ -290,92 +296,248 @@ public class DruidSchema extends AbstractSchema
}
@Override
protected Multimap<String, Function> getFunctionMultimap()
protected Multimap<String, org.apache.calcite.schema.Function> getFunctionMultimap()
{
final ImmutableMultimap.Builder<String, Function> builder = ImmutableMultimap.builder();
final ImmutableMultimap.Builder<String, org.apache.calcite.schema.Function> builder = ImmutableMultimap.builder();
for (Map.Entry<String, DruidViewMacro> entry : viewManager.getViews().entrySet()) {
builder.put(entry);
}
return builder.build();
}
private DruidTable computeTable(final String dataSource)
private void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
new TableDataSource(dataSource),
null,
null,
false,
ImmutableMap.<String, Object>of("useCache", false, "populateCache", false),
EnumSet.of(SegmentMetadataQuery.AnalysisType.INTERVAL),
null,
true
synchronized (lock) {
final Map<DataSegment, RowSignature> knownSegments = segmentSignatures.get(segment.getDataSource());
if (knownSegments == null || !knownSegments.containsKey(segment)) {
// Unknown segment.
setSegmentSignature(segment, null);
segmentsNeedingRefresh.add(segment);
if (!server.segmentReplicatable()) {
log.debug("Added new mutable segment[%s].", segment.getIdentifier());
mutableSegments.add(segment);
} else {
log.debug("Added new immutable segment[%s].", segment.getIdentifier());
}
} else if (server.segmentReplicatable()) {
// If a segment shows up on a replicatable (historical) server at any point, then it must be immutable,
// even if it's also available on non-replicatable (realtime) servers.
mutableSegments.remove(segment);
log.debug("Segment[%s] has become immutable.", segment.getIdentifier());
}
if (!tables.containsKey(segment.getDataSource())) {
refreshImmediately = true;
}
lock.notifyAll();
}
}
private void removeSegment(final DataSegment segment)
{
synchronized (lock) {
log.debug("Segment[%s] is gone.", segment.getIdentifier());
dataSourcesNeedingRebuild.add(segment.getDataSource());
segmentsNeedingRefresh.remove(segment);
mutableSegments.remove(segment);
final Map<DataSegment, RowSignature> dataSourceSegments = segmentSignatures.get(segment.getDataSource());
dataSourceSegments.remove(segment);
if (dataSourceSegments.isEmpty()) {
segmentSignatures.remove(segment.getDataSource());
tables.remove(segment.getDataSource());
log.info("Removed all metadata for dataSource[%s].", segment.getDataSource());
}
lock.notifyAll();
}
}
/**
* Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed,
* which may be a subset of the asked-for set.
*/
private Set<DataSegment> refreshSegments(final Set<DataSegment> segments) throws IOException
{
final Set<DataSegment> retVal = new HashSet<>();
// Organize segments by dataSource.
final Map<String, TreeSet<DataSegment>> segmentMap = new TreeMap<>();
for (DataSegment segment : segments) {
segmentMap.computeIfAbsent(segment.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER))
.add(segment);
}
for (Map.Entry<String, TreeSet<DataSegment>> entry : segmentMap.entrySet()) {
final String dataSource = entry.getKey();
retVal.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue()));
}
return retVal;
}
/**
* Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of
* segments actually refreshed, which may be a subset of the asked-for set.
*/
private Set<DataSegment> refreshSegmentsForDataSource(
final String dataSource,
final Set<DataSegment> segments
) throws IOException
{
log.debug("Refreshing metadata for dataSource[%s].", dataSource);
final long startTime = System.currentTimeMillis();
// Segment identifier -> segment object.
final Map<String, DataSegment> segmentMap = segments.stream().collect(
Collectors.toMap(
DataSegment::getIdentifier,
Function.identity()
)
);
segmentMetadataQuery = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
segmentMetadataQuery,
final Set<DataSegment> retVal = new HashSet<>();
final Sequence<SegmentAnalysis> sequence = runSegmentMetadataQuery(
walker,
serverConfig,
Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY)
);
Yielder<SegmentAnalysis> yielder = Yielders.each(sequence);
try {
while (!yielder.isDone()) {
final SegmentAnalysis analysis = yielder.get();
final DataSegment segment = segmentMap.get(analysis.getId());
if (segment == null) {
log.warn("Got analysis for segment[%s] we didn't ask for, ignoring.", analysis.getId());
} else {
final RowSignature rowSignature = analysisToRowSignature(analysis);
log.debug("Segment[%s] has signature[%s].", segment.getIdentifier(), rowSignature);
setSegmentSignature(segment, rowSignature);
retVal.add(segment);
}
yielder = yielder.next(null);
}
}
finally {
yielder.close();
}
log.info(
"Refreshed metadata for dataSource[%s] in %,d ms (%d segments queried, %d segments left).",
dataSource,
System.currentTimeMillis() - startTime,
retVal.size(),
segments.size() - retVal.size()
);
return retVal;
}
private void setSegmentSignature(final DataSegment segment, final RowSignature rowSignature)
{
synchronized (lock) {
segmentSignatures.computeIfAbsent(segment.getDataSource(), x -> new TreeMap<>(SEGMENT_ORDER))
.put(segment, rowSignature);
}
}
private DruidTable buildDruidTable(final String dataSource)
{
synchronized (lock) {
final TreeMap<DataSegment, RowSignature> segmentMap = segmentSignatures.get(dataSource);
final Map<String, ValueType> columnTypes = new TreeMap<>();
if (segmentMap != null) {
for (RowSignature rowSignature : segmentMap.values()) {
if (rowSignature != null) {
for (String column : rowSignature.getRowOrder()) {
// Newer column types should override older ones.
columnTypes.putIfAbsent(column, rowSignature.getColumnType(column));
}
}
}
}
final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add);
return new DruidTable(new TableDataSource(dataSource), builder.build());
}
}
private static Sequence<SegmentAnalysis> runSegmentMetadataQuery(
final QuerySegmentWalker walker,
final ServerConfig serverConfig,
final Iterable<DataSegment> segments
)
{
// Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource.
final String dataSource = Iterables.getOnlyElement(
StreamSupport.stream(segments.spliterator(), false)
.map(DataSegment::getDataSource).collect(Collectors.toSet())
);
final MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec(
StreamSupport.stream(segments.spliterator(), false)
.map(DataSegment::toDescriptor).collect(Collectors.toList())
);
final SegmentMetadataQuery segmentMetadataQuery = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
new SegmentMetadataQuery(
new TableDataSource(dataSource),
querySegmentSpec,
new AllColumnIncluderator(),
false,
ImmutableMap.of(),
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
false
),
serverConfig
);
final Sequence<SegmentAnalysis> sequence = QueryPlus.wrap(segmentMetadataQuery)
.run(
walker,
DirectDruidClient.makeResponseContextForQuery(
segmentMetadataQuery,
System.currentTimeMillis()
)
);
final List<SegmentAnalysis> results = Sequences.toList(sequence, Lists.<SegmentAnalysis>newArrayList());
if (results.isEmpty()) {
return null;
}
return QueryPlus.wrap(segmentMetadataQuery)
.run(
walker,
DirectDruidClient.makeResponseContextForQuery(
segmentMetadataQuery,
System.currentTimeMillis()
)
);
}
final Map<String, ValueType> columnTypes = Maps.newLinkedHashMap();
private static RowSignature analysisToRowSignature(final SegmentAnalysis analysis)
{
final RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
// Resolve conflicts by taking the latest metadata. This aids in gradual schema evolution.
long maxTimestamp = JodaUtils.MIN_INSTANT;
for (SegmentAnalysis analysis : results) {
final long timestamp;
if (analysis.getIntervals() != null && analysis.getIntervals().size() > 0) {
timestamp = analysis.getIntervals().get(analysis.getIntervals().size() - 1).getEndMillis();
} else {
timestamp = JodaUtils.MIN_INSTANT;
for (Map.Entry<String, ColumnAnalysis> entry : analysis.getColumns().entrySet()) {
if (entry.getValue().isError()) {
// Skip columns with analysis errors.
continue;
}
for (Map.Entry<String, ColumnAnalysis> entry : analysis.getColumns().entrySet()) {
if (entry.getValue().isError()) {
// Skip columns with analysis errors.
continue;
}
if (!columnTypes.containsKey(entry.getKey()) || timestamp >= maxTimestamp) {
ValueType valueType;
try {
valueType = ValueType.valueOf(StringUtils.toUpperCase(entry.getValue().getType()));
}
catch (IllegalArgumentException e) {
// Assume unrecognized types are some flavor of COMPLEX. This throws away information about exactly
// what kind of complex column it is, which we may want to preserve some day.
valueType = ValueType.COMPLEX;
}
columnTypes.put(entry.getKey(), valueType);
maxTimestamp = timestamp;
}
ValueType valueType;
try {
valueType = ValueType.valueOf(StringUtils.toUpperCase(entry.getValue().getType()));
}
catch (IllegalArgumentException e) {
// Assume unrecognized types are some flavor of COMPLEX. This throws away information about exactly
// what kind of complex column it is, which we may want to preserve some day.
valueType = ValueType.COMPLEX;
}
rowSignatureBuilder.add(entry.getKey(), valueType);
}
final RowSignature.Builder rowSignature = RowSignature.builder();
for (Map.Entry<String, ValueType> entry : columnTypes.entrySet()) {
rowSignature.add(entry.getKey(), entry.getValue());
}
return new DruidTable(
new TableDataSource(dataSource),
rowSignature.build()
);
return rowSignatureBuilder.build();
}
}

View File

@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Provider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
@ -57,7 +56,6 @@ import io.druid.sql.calcite.schema.DruidSchema;
import io.druid.sql.calcite.view.NoopViewManager;
import io.druid.sql.calcite.view.ViewManager;
import io.druid.sql.http.SqlResource;
import org.apache.calcite.schema.SchemaPlus;
import java.util.List;
import java.util.Properties;
@ -106,8 +104,7 @@ public class SqlModule implements Module
JsonConfigProvider.bind(binder, "druid.sql.planner", PlannerConfig.class);
JsonConfigProvider.bind(binder, "druid.sql.avatica", AvaticaServerConfig.class);
LifecycleModule.register(binder, DruidSchema.class);
binder.bind(ViewManager.class).to(NoopViewManager.class);
binder.bind(SchemaPlus.class).toProvider(SchemaPlusProvider.class);
binder.bind(ViewManager.class).to(NoopViewManager.class).in(LazySingleton.class);
for (Class<? extends SqlAggregator> clazz : DEFAULT_AGGREGATOR_CLASSES) {
SqlBindings.addAggregator(binder, clazz);
@ -129,18 +126,6 @@ public class SqlModule implements Module
}
}
public static class SchemaPlusProvider implements Provider<SchemaPlus>
{
@Inject
private DruidSchema druidSchema;
@Override
public SchemaPlus get()
{
return Calcites.createRootSchema(druidSchema);
}
}
private boolean isEnabled()
{
Preconditions.checkNotNull(props, "props");

View File

@ -39,6 +39,7 @@ import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.schema.DruidSchema;
import io.druid.sql.calcite.util.CalciteTests;
import io.druid.sql.calcite.util.QueryLogHook;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
@ -46,7 +47,6 @@ import org.apache.calcite.avatica.AvaticaClientRuntimeException;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.MissingResultsException;
import org.apache.calcite.avatica.NoSuchStatementException;
import org.apache.calcite.schema.SchemaPlus;
import org.eclipse.jetty.server.Server;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -116,16 +116,11 @@ public class DruidAvaticaHandlerTest
Calcites.setSystemProperties();
walker = CalciteTests.createMockWalker(temporaryFolder.newFolder());
final PlannerConfig plannerConfig = new PlannerConfig();
final SchemaPlus rootSchema = Calcites.createRootSchema(
CalciteTests.createMockSchema(
walker,
plannerConfig
)
);
final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
druidMeta = new DruidMeta(
new PlannerFactory(rootSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()),
new PlannerFactory(druidSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()),
AVATICA_CONFIG
);
final DruidAvaticaHandler handler = new DruidAvaticaHandler(
@ -544,18 +539,13 @@ public class DruidAvaticaHandlerTest
};
final PlannerConfig plannerConfig = new PlannerConfig();
final SchemaPlus rootSchema = Calcites.createRootSchema(
CalciteTests.createMockSchema(
walker,
plannerConfig
)
);
final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final List<Meta.Frame> frames = new ArrayList<>();
DruidMeta smallFrameDruidMeta = new DruidMeta(
new PlannerFactory(rootSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()),
new PlannerFactory(druidSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()),
smallFrameConfig
)
{

View File

@ -27,12 +27,12 @@ import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.schema.DruidSchema;
import io.druid.sql.calcite.util.CalciteTests;
import io.druid.sql.calcite.util.QueryLogHook;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.schema.SchemaPlus;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
@ -60,16 +60,14 @@ public class DruidStatementTest
Calcites.setSystemProperties();
walker = CalciteTests.createMockWalker(temporaryFolder.newFolder());
final PlannerConfig plannerConfig = new PlannerConfig();
final SchemaPlus rootSchema = Calcites.createRootSchema(
CalciteTests.createMockSchema(
walker,
plannerConfig
)
final DruidSchema druidSchema = CalciteTests.createMockSchema(
walker,
plannerConfig
);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
plannerFactory = new PlannerFactory(
rootSchema,
druidSchema,
walker,
operatorTable,
macroTable,

View File

@ -96,7 +96,6 @@ import io.druid.sql.calcite.util.QueryLogHook;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import io.druid.sql.calcite.view.InProcessViewManager;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.schema.SchemaPlus;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
@ -5351,11 +5350,10 @@ public class CalciteQueryTest
{
final InProcessViewManager viewManager = new InProcessViewManager();
final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig, viewManager);
final SchemaPlus rootSchema = Calcites.createRootSchema(druidSchema);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final PlannerFactory plannerFactory = new PlannerFactory(
rootSchema,
druidSchema,
walker,
operatorTable,
macroTable,

View File

@ -35,12 +35,12 @@ import io.druid.sql.calcite.planner.DruidOperatorTable;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.schema.DruidSchema;
import io.druid.sql.calcite.util.CalciteTests;
import io.druid.sql.calcite.util.QueryLogHook;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import io.druid.sql.http.SqlQuery;
import io.druid.sql.http.SqlResource;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.ValidationException;
import org.junit.After;
import org.junit.Assert;
@ -75,14 +75,12 @@ public class SqlResourceTest
Calcites.setSystemProperties();
walker = CalciteTests.createMockWalker(temporaryFolder.newFolder());
final PlannerConfig plannerConfig = new PlannerConfig();
final SchemaPlus rootSchema = Calcites.createRootSchema(
CalciteTests.createMockSchema(walker, plannerConfig)
);
final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
resource = new SqlResource(
JSON_MAPPER,
new PlannerFactory(rootSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig())
new PlannerFactory(druidSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig())
);
}

View File

@ -166,8 +166,12 @@ public class DruidSchemaTest
final Map<String, Table> tableMap = schema.getTableMap();
Assert.assertEquals(ImmutableSet.of("foo", "foo2"), tableMap.keySet());
}
final DruidTable fooTable = (DruidTable) tableMap.get("foo");
@Test
public void testGetTableMapFoo()
{
final DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo");
final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl());
final List<RelDataTypeField> fields = rowType.getFieldList();
@ -182,13 +186,32 @@ public class DruidSchemaTest
Assert.assertEquals("dim1", fields.get(2).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(2).getType().getSqlTypeName());
Assert.assertEquals("m1", fields.get(3).getName());
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(3).getType().getSqlTypeName());
Assert.assertEquals("dim2", fields.get(3).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(3).getType().getSqlTypeName());
Assert.assertEquals("unique_dim1", fields.get(4).getName());
Assert.assertEquals(SqlTypeName.OTHER, fields.get(4).getType().getSqlTypeName());
Assert.assertEquals("m1", fields.get(4).getName());
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(4).getType().getSqlTypeName());
Assert.assertEquals("dim2", fields.get(5).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(5).getType().getSqlTypeName());
Assert.assertEquals("unique_dim1", fields.get(5).getName());
Assert.assertEquals(SqlTypeName.OTHER, fields.get(5).getType().getSqlTypeName());
}
@Test
public void testGetTableMapFoo2()
{
final DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo2");
final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl());
final List<RelDataTypeField> fields = rowType.getFieldList();
Assert.assertEquals(3, fields.size());
Assert.assertEquals("__time", fields.get(0).getName());
Assert.assertEquals(SqlTypeName.TIMESTAMP, fields.get(0).getType().getSqlTypeName());
Assert.assertEquals("dim2", fields.get(1).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(1).getType().getSqlTypeName());
Assert.assertEquals("m1", fields.get(2).getName());
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName());
}
}

View File

@ -36,6 +36,15 @@ import java.util.concurrent.Executor;
public class TestServerInventoryView implements TimelineServerView
{
private static final DruidServerMetadata DUMMY_SERVER = new DruidServerMetadata(
"dummy",
"dummy",
null,
0,
ServerType.HISTORICAL,
"dummy",
0
);
private final List<DataSegment> segments;
public TestServerInventoryView(List<DataSegment> segments)
@ -52,31 +61,21 @@ public class TestServerInventoryView implements TimelineServerView
@Override
public void registerSegmentCallback(Executor exec, final SegmentCallback callback)
{
final DruidServerMetadata dummyServer = new DruidServerMetadata("dummy", "dummy", null, 0, ServerType.HISTORICAL, "dummy", 0);
for (final DataSegment segment : segments) {
exec.execute(
new Runnable()
{
@Override
public void run()
{
callback.segmentAdded(dummyServer, segment);
}
}
);
exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment));
}
exec.execute(
new Runnable()
{
@Override
public void run()
{
callback.segmentViewInitialized();
}
}
);
exec.execute(callback::segmentViewInitialized);
}
@Override
public void registerTimelineCallback(final Executor exec, final TimelineCallback callback)
{
for (DataSegment segment : segments) {
exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment));
}
exec.execute(callback::timelineInitialized);
}
@Override