diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 0615c839a62..c8474893db8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -29,23 +29,23 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; import com.google.common.collect.Sets; -import com.google.common.collect.TreeMultiset; -import com.google.common.primitives.Ints; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; import com.metamx.common.ISE; import com.metamx.common.guava.Comparators; import com.metamx.common.logger.Logger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; +import io.druid.data.input.Rows; import io.druid.granularity.QueryGranularity; +import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.index.YeOldePlumberSchool; -import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.IOConfig; import io.druid.segment.indexing.IngestionSpec; @@ -59,7 +59,6 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.ShardSpec; -import io.druid.timeline.partition.SingleDimensionShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -67,7 +66,6 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.concurrent.CopyOnWriteArrayList; @@ -76,6 +74,26 @@ public class IndexTask extends AbstractFixedIntervalTask { private static final Logger log = new Logger(IndexTask.class); + private static HashFunction hashFunction = Hashing.murmur3_128(); + + /** + * Should we index this inputRow? Decision is based on our interval and shardSpec. + * + * @param inputRow the row to check + * + * @return true or false + */ + private static boolean shouldIndex( + final ShardSpec shardSpec, + final Interval interval, + final InputRow inputRow, + final QueryGranularity rollupGran + ) + { + return interval.contains(inputRow.getTimestampFromEpoch()) + && shardSpec.isInChunk(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow); + } + private static String makeId(String id, IndexIngestionSpec ingestionSchema) { if (id == null) { @@ -153,7 +171,7 @@ public class IndexTask extends AbstractFixedIntervalTask for (final Interval bucket : validIntervals) { final List shardSpecs; if (targetPartitionSize > 0) { - shardSpecs = determinePartitions(bucket, targetPartitionSize); + shardSpecs = determinePartitions(bucket, targetPartitionSize, granularitySpec.getQueryGranularity()); } else { int numShards = ingestionSchema.getTuningConfig().getNumShards(); if (numShards > 0) { @@ -200,7 +218,8 @@ public class IndexTask extends AbstractFixedIntervalTask private List determinePartitions( final Interval interval, - final int targetPartitionSize + final int targetPartitionSize, + final QueryGranularity queryGranularity ) throws IOException { log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize); @@ -208,113 +227,49 @@ public class IndexTask extends AbstractFixedIntervalTask final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); // The implementation of this determine partitions stuff is less than optimal. Should be done better. - - // Blacklist dimensions that have multiple values per row - final Set unusableDimensions = com.google.common.collect.Sets.newHashSet(); - // Track values of all non-blacklisted dimensions - final Map> dimensionValueMultisets = Maps.newHashMap(); + // Use HLL to estimate number of rows + HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); // Load data try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) { while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); if (interval.contains(inputRow.getTimestampFromEpoch())) { - // Extract dimensions from event - for (final String dim : inputRow.getDimensions()) { - final List dimValues = inputRow.getDimension(dim); - if (!unusableDimensions.contains(dim)) { - if (dimValues.size() == 1) { - // Track this value - TreeMultiset dimensionValueMultiset = dimensionValueMultisets.get(dim); - if (dimensionValueMultiset == null) { - dimensionValueMultiset = TreeMultiset.create(); - dimensionValueMultisets.put(dim, dimensionValueMultiset); - } - dimensionValueMultiset.add(dimValues.get(0)); - } else { - // Only single-valued dimensions can be used for partitions - unusableDimensions.add(dim); - dimensionValueMultisets.remove(dim); - } - } - } + final List groupKey = Rows.toGroupKey( + queryGranularity.truncate(inputRow.getTimestampFromEpoch()), + inputRow + ); + collector.add( + hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)) + .asBytes() + ); } } } + final double numRows = collector.estimateCardinality(); + log.info("Estimated approximately [%,f] rows of data.", numRows); + + int numberOfShards = (int) Math.ceil(numRows / targetPartitionSize); + if ((double) numberOfShards > numRows) { + numberOfShards = (int) numRows; + } + log.info("Will require [%,d] shard(s).", numberOfShards); + // ShardSpecs we will return final List shardSpecs = Lists.newArrayList(); - // Select highest-cardinality dimension - Ordering>> byCardinalityOrdering = new Ordering>>() - { - @Override - public int compare( - Map.Entry> left, - Map.Entry> right - ) - { - return Ints.compare(left.getValue().elementSet().size(), right.getValue().elementSet().size()); - } - }; - - if (dimensionValueMultisets.isEmpty()) { - // No suitable partition dimension. We'll make one big segment and hope for the best. - log.info("No suitable partition dimension found"); + if (numberOfShards == 1) { shardSpecs.add(new NoneShardSpec()); } else { - // Find best partition dimension (heuristic: highest cardinality). - final Map.Entry> partitionEntry = - byCardinalityOrdering.max(dimensionValueMultisets.entrySet()); - - final String partitionDim = partitionEntry.getKey(); - final TreeMultiset partitionDimValues = partitionEntry.getValue(); - - log.info( - "Partitioning on dimension[%s] with cardinality[%d] over rows[%d]", - partitionDim, - partitionDimValues.elementSet().size(), - partitionDimValues.size() - ); - - // Iterate over unique partition dimension values in sorted order - String currentPartitionStart = null; - int currentPartitionSize = 0; - for (final String partitionDimValue : partitionDimValues.elementSet()) { - currentPartitionSize += partitionDimValues.count(partitionDimValue); - if (currentPartitionSize >= targetPartitionSize) { - final ShardSpec shardSpec = new SingleDimensionShardSpec( - partitionDim, - currentPartitionStart, - partitionDimValue, - shardSpecs.size() - ); - - log.info("Adding shard: %s", shardSpec); - shardSpecs.add(shardSpec); - - currentPartitionSize = partitionDimValues.count(partitionDimValue); - currentPartitionStart = partitionDimValue; - } - } - - if (currentPartitionSize > 0) { - // One last shard to go - final ShardSpec shardSpec; - - if (shardSpecs.isEmpty()) { - shardSpec = new NoneShardSpec(); - } else { - shardSpec = new SingleDimensionShardSpec( - partitionDim, - currentPartitionStart, - null, - shardSpecs.size() - ); - } - - log.info("Adding shard: %s", shardSpec); - shardSpecs.add(shardSpec); + for (int i = 0; i < numberOfShards; ++i) { + shardSpecs.add( + new HashBasedNumberedShardSpec( + i, + numberOfShards, + HadoopDruidIndexerConfig.jsonMapper + ) + ); } } @@ -437,24 +392,6 @@ public class IndexTask extends AbstractFixedIntervalTask return Iterables.getOnlyElement(pushedSegments); } - /** - * Should we index this inputRow? Decision is based on our interval and shardSpec. - * - * @param inputRow the row to check - * - * @return true or false - */ - private static boolean shouldIndex( - final ShardSpec shardSpec, - final Interval interval, - final InputRow inputRow, - final QueryGranularity rollupGran - ) - { - return interval.contains(inputRow.getTimestampFromEpoch()) - && shardSpec.isInChunk(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow); - } - public static class IndexIngestionSpec extends IngestionSpec { private final DataSchema dataSchema; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java new file mode 100644 index 00000000000..06b161951da --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -0,0 +1,159 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common.task; + +import com.google.api.client.util.Lists; +import com.metamx.common.Granularity; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.SpatialDimensionSchema; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.actions.TaskActionClientFactory; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.realtime.firehose.LocalFirehoseFactory; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; +import java.util.List; + +public class IndexTaskTest +{ + @Test + public void testDeterminePartitions() throws Exception + { + File tmpFile = File.createTempFile("druid", "index"); + tmpFile.deleteOnExit(); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("2014-01-01T00:00:10Z,a,1"); + writer.println("2014-01-01T01:00:20Z,b,1"); + writer.println("2014-01-01T02:00:30Z,c,1"); + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + new IndexTask.IndexIngestionSpec( + new DataSchema( + "test", + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec( + "ts", + "auto" + ), + new DimensionsSpec( + Arrays.asList("ts"), + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("ts", "dim", "val") + ) + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("val", "val") + }, + new UniformGranularitySpec( + Granularity.DAY, + QueryGranularity.MINUTE, + Arrays.asList(new Interval("2014/2015")) + ) + ), + new IndexTask.IndexIOConfig( + new LocalFirehoseFactory( + tmpFile.getParentFile(), + "druid*", + null + ) + ), + new IndexTask.IndexTuningConfig( + 2, + 0, + null + ) + ), + new DefaultObjectMapper() + ); + + final List segments = Lists.newArrayList(); + + indexTask.run( + new TaskToolbox( + null, null, new TaskActionClientFactory() + { + @Override + public TaskActionClient create(Task task) + { + return new TaskActionClient() + { + @Override + public RetType submit(TaskAction taskAction) throws IOException + { + if (taskAction instanceof LockListAction) { + return (RetType) Arrays.asList( + new TaskLock( + "", "", null, new DateTime().toString() + ) + ); + } + return null; + } + }; + } + }, null, new DataSegmentPusher() + { + @Override + public String getPathForHadoop(String dataSource) + { + return null; + } + + @Override + public DataSegment push(File file, DataSegment segment) throws IOException + { + segments.add(segment); + return segment; + } + }, null, null, null, null, null, null, null, null, null, null, null + ) + ); + + Assert.assertEquals(2, segments.size()); + } +} diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java index c1cf7886bfc..ec780942485 100644 --- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java @@ -49,11 +49,11 @@ public class BySegmentQueryRunner implements QueryRunner @Override @SuppressWarnings("unchecked") - public Sequence run(final Query query, Map context) + public Sequence run(final Query query, Map responseContext) { if (query.getContextBySegment(false)) { - final Sequence baseSequence = base.run(query, context); + final Sequence baseSequence = base.run(query, responseContext); final List results = Sequences.toList(baseSequence, Lists.newArrayList()); return Sequences.simple( Arrays.asList( @@ -68,6 +68,6 @@ public class BySegmentQueryRunner implements QueryRunner ) ); } - return base.run(query, context); + return base.run(query, responseContext); } } diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java index 5f9651f5222..303abebb575 100644 --- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java @@ -37,13 +37,13 @@ public abstract class BySegmentSkippingQueryRunner implements QueryRunner } @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { if (query.getContextBySegment(false)) { - return baseRunner.run(query, context); + return baseRunner.run(query, responseContext); } - return doRun(baseRunner, query, context); + return doRun(baseRunner, query, responseContext); } protected abstract Sequence doRun(QueryRunner baseRunner, Query query, Map context); diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index e5edfd3d4cf..196e6e5cb39 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -20,7 +20,6 @@ package io.druid.query; import com.google.common.base.Function; -import com.google.common.base.Predicates; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -89,12 +88,12 @@ public class ChainedExecutionQueryRunner implements QueryRunner // since it already implements ListeningExecutorService this.exec = MoreExecutors.listeningDecorator(exec); this.ordering = ordering; - this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); + this.queryables = Iterables.unmodifiableIterable(queryables); this.queryWatcher = queryWatcher; } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { final int priority = query.getContextPriority(0); @@ -125,11 +124,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner public Iterable call() throws Exception { try { - if (input == null) { - throw new ISE("Input is null?! How is this possible?!"); - } - - Sequence result = input.run(query, context); + Sequence result = input.run(query, responseContext); if (result == null) { throw new ISE("Got a null result! Segments are missing!"); } diff --git a/processing/src/main/java/io/druid/query/ConcatQueryRunner.java b/processing/src/main/java/io/druid/query/ConcatQueryRunner.java index 74c4a6481f5..0870ea87cf3 100644 --- a/processing/src/main/java/io/druid/query/ConcatQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ConcatQueryRunner.java @@ -39,7 +39,7 @@ public class ConcatQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { return Sequences.concat( Sequences.map( @@ -49,7 +49,7 @@ public class ConcatQueryRunner implements QueryRunner @Override public Sequence apply(final QueryRunner input) { - return input.run(query, context); + return input.run(query, responseContext); } } ) diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 1794024b60c..d9c27c02dfc 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -49,7 +49,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, Map context) + public Sequence run(final Query query, Map responseContext) { final boolean isBySegment = query.getContextBySegment(false); final boolean shouldFinalize = query.getContextFinalize(true); @@ -102,7 +102,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner return Sequences.map( - baseRunner.run(queryToRun, context), + baseRunner.run(queryToRun, responseContext), finalizerFn ); diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 5997518dddd..1ec7c596132 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -78,7 +78,7 @@ public class GroupByParallelQueryRunner implements QueryRunner } @Override - public Sequence run(final Query queryParam, final Map context) + public Sequence run(final Query queryParam, final Map responseContext) { final GroupByQuery query = (GroupByQuery) queryParam; final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( @@ -111,10 +111,10 @@ public class GroupByParallelQueryRunner implements QueryRunner { try { if (bySegment) { - input.run(queryParam, context) + input.run(queryParam, responseContext) .accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs); } else { - input.run(queryParam, context) + input.run(queryParam, responseContext) .accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); } diff --git a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java index 557420aa377..59ec44ba8e7 100644 --- a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java @@ -49,10 +49,10 @@ public class IntervalChunkingQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { if (period.getMillis() == 0) { - return baseRunner.run(query, context); + return baseRunner.run(query, responseContext); } return Sequences.concat( @@ -76,7 +76,7 @@ public class IntervalChunkingQueryRunner implements QueryRunner { return baseRunner.run( query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))), - context + responseContext ); } } diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index 5a8005185a7..a2f6863d326 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -84,7 +84,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { final ServiceMetricEvent.Builder builder = builderFn.apply(query); String queryId = query.getId(); @@ -102,7 +102,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner long startTime = System.currentTimeMillis(); try { - retVal = queryRunner.run(query, context).accumulate(outType, accumulator); + retVal = queryRunner.run(query, responseContext).accumulate(outType, accumulator); } catch (RuntimeException e) { builder.setUser10("failed"); @@ -132,7 +132,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner long startTime = System.currentTimeMillis(); try { - retVal = queryRunner.run(query, context).toYielder(initValue, accumulator); + retVal = queryRunner.run(query, responseContext).toYielder(initValue, accumulator); } catch (RuntimeException e) { builder.setUser10("failed"); diff --git a/processing/src/main/java/io/druid/query/NoopQueryRunner.java b/processing/src/main/java/io/druid/query/NoopQueryRunner.java index d2f3863ab62..63b8ffd3a4e 100644 --- a/processing/src/main/java/io/druid/query/NoopQueryRunner.java +++ b/processing/src/main/java/io/druid/query/NoopQueryRunner.java @@ -30,7 +30,7 @@ import java.util.Map; public class NoopQueryRunner implements QueryRunner { @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { return Sequences.empty(); } diff --git a/processing/src/main/java/io/druid/query/QueryRunner.java b/processing/src/main/java/io/druid/query/QueryRunner.java index d7a3f8af36f..9ed07edb915 100644 --- a/processing/src/main/java/io/druid/query/QueryRunner.java +++ b/processing/src/main/java/io/druid/query/QueryRunner.java @@ -27,5 +27,5 @@ import java.util.Map; */ public interface QueryRunner { - public Sequence run(Query query, Map context); + public Sequence run(Query query, Map responseContext); } \ No newline at end of file diff --git a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java index 736c60f76ab..fc393ab2a0c 100644 --- a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java @@ -45,11 +45,11 @@ public class ReferenceCountingSegmentQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, Map context) + public Sequence run(final Query query, Map responseContext) { final Closeable closeable = adapter.increment(); try { - final Sequence baseSequence = factory.createRunner(adapter).run(query, context); + final Sequence baseSequence = factory.createRunner(adapter).run(query, responseContext); return new ResourceClosingSequence(baseSequence, closeable); } diff --git a/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java new file mode 100644 index 00000000000..e41419f75ae --- /dev/null +++ b/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java @@ -0,0 +1,53 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; + +import java.util.List; +import java.util.Map; + +/** + */ +public class ReportTimelineMissingSegmentQueryRunner implements QueryRunner +{ + private final SegmentDescriptor descriptor; + + public ReportTimelineMissingSegmentQueryRunner(SegmentDescriptor descriptor) + { + this.descriptor = descriptor; + } + + @Override + public Sequence run( + Query query, Map responseContext + ) + { + List missingSegments = (List) responseContext.get(Result.MISSING_SEGMENTS_KEY); + if (missingSegments == null) { + missingSegments = Lists.newArrayList(); + responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments); + } + missingSegments.add(descriptor); + return Sequences.empty(); + } +} diff --git a/processing/src/main/java/io/druid/query/Result.java b/processing/src/main/java/io/druid/query/Result.java index bb299d51c78..a948e01a2ca 100644 --- a/processing/src/main/java/io/druid/query/Result.java +++ b/processing/src/main/java/io/druid/query/Result.java @@ -27,6 +27,8 @@ import org.joda.time.DateTime; */ public class Result implements Comparable> { + public static String MISSING_SEGMENTS_KEY = "missingSegments"; + private final DateTime timestamp; private final T value; diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunner.java b/processing/src/main/java/io/druid/query/RetryQueryRunner.java index 9f6fd5d474a..1cf8593bf63 100644 --- a/processing/src/main/java/io/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/RetryQueryRunner.java @@ -36,7 +36,6 @@ import java.util.Map; public class RetryQueryRunner implements QueryRunner { - public static String MISSING_SEGMENTS_KEY = "missingSegments"; private static final EmittingLogger log = new EmittingLogger(RetryQueryRunner.class); private final QueryRunner baseRunner; @@ -76,7 +75,7 @@ public class RetryQueryRunner implements QueryRunner for (int i = 0; i < config.getNumTries(); i++) { log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); - context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); final Query retryQuery = query.withQuerySegmentSpec( new MultipleSpecificSegmentSpec( missingSegments @@ -102,7 +101,7 @@ public class RetryQueryRunner implements QueryRunner private List getMissingSegments(final Map context) { - final Object maybeMissingSegments = context.get(MISSING_SEGMENTS_KEY); + final Object maybeMissingSegments = context.get(Result.MISSING_SEGMENTS_KEY); if (maybeMissingSegments == null) { return Lists.newArrayList(); } @@ -115,4 +114,3 @@ public class RetryQueryRunner implements QueryRunner ); } } - diff --git a/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java index d16a660e25a..983779cf073 100644 --- a/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java @@ -39,13 +39,13 @@ public class SubqueryQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, Map context) + public Sequence run(final Query query, Map responseContext) { DataSource dataSource = query.getDataSource(); if (dataSource instanceof QueryDataSource) { - return run((Query) ((QueryDataSource) dataSource).getQuery(), context); + return run((Query) ((QueryDataSource) dataSource).getQuery(), responseContext); } else { - return baseRunner.run(query, context); + return baseRunner.run(query, responseContext); } } } diff --git a/processing/src/main/java/io/druid/query/TimewarpOperator.java b/processing/src/main/java/io/druid/query/TimewarpOperator.java index 2f26f6890f1..ce3fff5fcda 100644 --- a/processing/src/main/java/io/druid/query/TimewarpOperator.java +++ b/processing/src/main/java/io/druid/query/TimewarpOperator.java @@ -81,7 +81,7 @@ public class TimewarpOperator implements PostProcessingOperator return new QueryRunner() { @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { final long offset = computeOffset(now); @@ -93,7 +93,7 @@ public class TimewarpOperator implements PostProcessingOperator return Sequences.map( baseRunner.run( query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))), - context + responseContext ), new Function() { diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index 2426bda9310..42b65d48da0 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -44,7 +44,7 @@ public class UnionQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { @@ -59,7 +59,7 @@ public class UnionQueryRunner implements QueryRunner { return baseRunner.run( query.withDataSource(singleSource), - context + responseContext ); } } @@ -67,7 +67,7 @@ public class UnionQueryRunner implements QueryRunner ) ); } else { - return baseRunner.run(query, context); + return baseRunner.run(query, responseContext); } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 790d83482c3..e35c876971d 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -111,16 +111,18 @@ public class GroupByQueryQueryToolChest extends QueryToolChest() { @Override - public Sequence run(Query input, Map context) + public Sequence run(Query input, Map responseContext) { if (input.getContextBySegment(false)) { - return runner.run(input, context); + return runner.run(input, responseContext); } if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { - return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner, context); + return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner, + responseContext + ); } - return runner.run(input, context); + return runner.run(input, responseContext); } }; } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java index cd86d16a92b..72cdcb27b73 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -108,7 +108,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory() { @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { final GroupByQuery queryParam = (GroupByQuery) query; final Pair> indexAccumulatorPair = GroupByQueryHelper @@ -128,13 +128,13 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory run(Query input, Map context) + public Sequence run(Query input, Map responseContext) { if (!(input instanceof GroupByQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class); diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index ac46c49d2a2..c0efdcdd502 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -77,7 +77,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory() { @Override - public Sequence run(Query inQ, Map context) + public Sequence run(Query inQ, Map responseContext) { SegmentMetadataQuery query = (SegmentMetadataQuery) inQ; @@ -138,7 +138,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory run( final Query query, - final Map context + final Map responseContext ) { final int priority = query.getContextPriority(0); @@ -148,7 +148,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory call() throws Exception { - return input.run(query, context); + return input.run(query, responseContext); } } ); diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 79d9686cee8..f4c38c87507 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -280,7 +280,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest> run( Query> input, - Map context + Map responseContext ) { if (!(input instanceof SearchQuery)) { @@ -289,13 +289,13 @@ public class SearchQueryQueryToolChest extends QueryToolChest, Result>() { @Override diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java index ec2c576e0e3..2b91847da57 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java @@ -73,7 +73,7 @@ public class SearchQueryRunner implements QueryRunner> @Override public Sequence> run( final Query> input, - Map context + Map responseContext ) { if (!(input instanceof SearchQuery)) { diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java index 5210a56ae6a..7efb8d4e2df 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java @@ -92,7 +92,7 @@ public class SelectQueryRunnerFactory @Override public Sequence> run( Query> input, - Map context + Map responseContext ) { if (!(input instanceof SelectQuery)) { diff --git a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java index b8b68f86895..65eb5e4d4e4 100644 --- a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java @@ -27,7 +27,7 @@ import com.metamx.common.guava.Yielder; import com.metamx.common.guava.YieldingAccumulator; import io.druid.query.Query; import io.druid.query.QueryRunner; -import io.druid.query.RetryQueryRunner; +import io.druid.query.Result; import io.druid.query.SegmentDescriptor; import io.druid.segment.SegmentMissingException; @@ -53,7 +53,7 @@ public class SpecificSegmentQueryRunner implements QueryRunner } @Override - public Sequence run(final Query input, final Map context) + public Sequence run(final Query input, final Map responseContext) { final Query query = input.withQuerySegmentSpec(specificSpec); @@ -67,7 +67,7 @@ public class SpecificSegmentQueryRunner implements QueryRunner @Override public Sequence call() throws Exception { - return base.run(query, context); + return base.run(query, responseContext); } } ); @@ -87,10 +87,10 @@ public class SpecificSegmentQueryRunner implements QueryRunner return baseSequence.accumulate(initValue, accumulator); } catch (SegmentMissingException e) { - List missingSegments = (List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); + List missingSegments = (List) responseContext.get(Result.MISSING_SEGMENTS_KEY); if (missingSegments == null) { missingSegments = Lists.newArrayList(); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, missingSegments); + responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments); } missingSegments.add(specificSpec.getDescriptor()); return initValue; diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index 7c302706b34..af6bd529c44 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -87,7 +87,7 @@ public class TimeBoundaryQueryRunnerFactory @Override public Sequence> run( Query> input, - Map context + Map responseContext ) { if (!(input instanceof TimeBoundaryQuery)) { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java index 3da84f7ff24..322e779c385 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java @@ -93,7 +93,7 @@ public class TimeseriesQueryRunnerFactory @Override public Sequence> run( Query> input, - Map context + Map responseContext ) { if (!(input instanceof TimeseriesQuery)) { diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index b52a1c127d4..39244d090c0 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -414,7 +414,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest> run( Query> input, - Map context + Map responseContext ) { if (!(input instanceof TopNQuery)) { @@ -423,13 +423,13 @@ public class TopNQueryQueryToolChest extends QueryToolChest minTopNThreshold) { - return runner.run(query, context); + return runner.run(query, responseContext); } final boolean isBySegment = query.getContextBySegment(false); return Sequences.map( - runner.run(query.withThreshold(minTopNThreshold), context), + runner.run(query.withThreshold(minTopNThreshold), responseContext), new Function, Result>() { @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java index 54d5286254b..d8e096f457d 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java @@ -67,7 +67,7 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory> run( Query> input, - Map context + Map responseContext ) { if (!(input instanceof TopNQuery)) { diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index 489c36e799f..9ba455eaed8 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -279,7 +279,7 @@ public class ChainedExecutionQueryRunnerTest } @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { hasStarted = true; start.countDown(); diff --git a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java index f79e9279026..ec3a95a4cca 100644 --- a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java @@ -46,14 +46,14 @@ public class RetryQueryRunnerTest public void testRunWithMissingSegments() throws Exception { Map context = new MapMaker().makeMap(); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @Override public Sequence> run(Query query, Map context) { - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add( + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( new Interval( 178888, @@ -70,7 +70,8 @@ public class RetryQueryRunnerTest new RetryQueryRunnerConfig() { @Override - public int getNumTries() { + public int getNumTries() + { return 0; } @@ -90,7 +91,7 @@ public class RetryQueryRunnerTest Assert.assertTrue( "Should have one entry in the list of missing segments", - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 1 + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 1 ); Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0); } @@ -101,7 +102,7 @@ public class RetryQueryRunnerTest { Map context = new MapMaker().makeMap(); context.put("count", 0); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @@ -112,7 +113,7 @@ public class RetryQueryRunnerTest ) { if ((int) context.get("count") == 0) { - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add( + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( new Interval( 178888, @@ -159,7 +160,7 @@ public class RetryQueryRunnerTest Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1); Assert.assertTrue( "Should have nothing in missingSegment list", - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 0 + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 0 ); } @@ -168,7 +169,7 @@ public class RetryQueryRunnerTest { Map context = new MapMaker().makeMap(); context.put("count", 0); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @@ -179,7 +180,7 @@ public class RetryQueryRunnerTest ) { if ((int) context.get("count") < 3) { - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add( + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( new Interval( 178888, @@ -226,7 +227,7 @@ public class RetryQueryRunnerTest Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1); Assert.assertTrue( "Should have nothing in missingSegment list", - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 0 + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 0 ); } @@ -234,7 +235,7 @@ public class RetryQueryRunnerTest public void testException() throws Exception { Map context = new MapMaker().makeMap(); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @@ -244,7 +245,7 @@ public class RetryQueryRunnerTest Map context ) { - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add( + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( new Interval( 178888, @@ -277,7 +278,7 @@ public class RetryQueryRunnerTest Assert.assertTrue( "Should have one entry in the list of missing segments", - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 1 + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 1 ); } } diff --git a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java index 73f5c39dcb6..d183e7dd716 100644 --- a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java +++ b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java @@ -82,7 +82,7 @@ public class TimewarpOperatorTest @Override public Sequence> run( Query> query, - Map context + Map responseContext ) { return Sequences.simple( @@ -144,7 +144,7 @@ public class TimewarpOperatorTest @Override public Sequence> run( Query> query, - Map context + Map responseContext ) { return Sequences.simple( @@ -194,7 +194,7 @@ public class TimewarpOperatorTest @Override public Sequence> run( Query> query, - Map context + Map responseContext ) { return Sequences.simple( diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index e4bca779dbc..24e586a5d20 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -556,7 +556,7 @@ public class GroupByQueryRunnerTest { @Override public Sequence run( - Query query, Map context + Query query, Map responseContext ) { // simulate two daily segments @@ -566,7 +566,7 @@ public class GroupByQueryRunnerTest final Query query2 = query.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); - return Sequences.concat(runner.run(query1, context), runner.run(query2, context)); + return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext)); } } ); @@ -752,7 +752,7 @@ public class GroupByQueryRunnerTest { @Override public Sequence run( - Query query, Map context + Query query, Map responseContext ) { // simulate two daily segments @@ -762,7 +762,7 @@ public class GroupByQueryRunnerTest final Query query2 = query.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); - return Sequences.concat(runner.run(query1, context), runner.run(query2, context)); + return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext)); } } ); @@ -1109,7 +1109,7 @@ public class GroupByQueryRunnerTest { @Override public Sequence run( - Query query, Map context + Query query, Map responseContext ) { // simulate two daily segments @@ -1119,7 +1119,7 @@ public class GroupByQueryRunnerTest final Query query2 = query.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); - return Sequences.concat(runner.run(query1, context), runner.run(query2, context)); + return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext)); } } ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 78c936e0a57..edd2a719381 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -95,7 +95,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest QueryRunner timeseriesRunner = new QueryRunner() { @Override - public Sequence run(Query query, Map metadata) + public Sequence run(Query query, Map responseContext) { TimeseriesQuery tsQuery = (TimeseriesQuery) query; @@ -109,7 +109,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest .setAggregatorSpecs(tsQuery.getAggregatorSpecs()) .setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs()) .build(), - metadata + responseContext ), new Function>() { diff --git a/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java b/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java index 01127e06876..afd9e87d307 100644 --- a/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java @@ -69,7 +69,7 @@ public class SpecificSegmentQueryRunnerTest new QueryRunner() { @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { return new Sequence() { @@ -112,7 +112,7 @@ public class SpecificSegmentQueryRunnerTest ); Sequences.toList(results, Lists.newArrayList()); - Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); + Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY); Assert.assertTrue(missingSegments != null); Assert.assertTrue(missingSegments instanceof List); @@ -149,7 +149,7 @@ public class SpecificSegmentQueryRunnerTest new QueryRunner() { @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { return Sequences.withEffect( Sequences.simple(Arrays.asList(value)), @@ -196,7 +196,7 @@ public class SpecificSegmentQueryRunnerTest Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows")); - Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); + Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY); Assert.assertTrue(missingSegments != null); Assert.assertTrue(missingSegments instanceof List); diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index ba93287b828..156adc1b7aa 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -93,7 +93,7 @@ public class TimeBoundaryQueryRunnerTest .bound(TimeBoundaryQuery.MAX_TIME) .build(); Map context = new MapMaker().makeMap(); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); Iterable> results = Sequences.toList( runner.run(timeBoundaryQuery, context), Lists.>newArrayList() @@ -115,7 +115,7 @@ public class TimeBoundaryQueryRunnerTest .bound(TimeBoundaryQuery.MIN_TIME) .build(); Map context = new MapMaker().makeMap(); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); Iterable> results = Sequences.toList( runner.run(timeBoundaryQuery, context), Lists.>newArrayList() diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 57b06a31ff5..146a4b6dbe0 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -146,7 +146,8 @@ public class TimeSeriesUnionQueryRunnerTest { @Override public Sequence> run(Query> query, - Map context) + Map responseContext + ) { if (query.getDataSource().equals(new TableDataSource("ds1"))) { return Sequences.simple( diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 8a43cf6ddfb..45b0ca0c25d 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -117,7 +117,7 @@ public class CachingClusteredClient implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { final QueryToolChest> toolChest = warehouse.getToolChest(query); final CacheStrategy> strategy = toolChest.getCacheStrategy(query); @@ -319,13 +319,13 @@ public class CachingClusteredClient implements QueryRunner List intervals = segmentSpec.getIntervals(); if (!server.isAssignable() || !populateCache || isBySegment) { - resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), context); + resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext); } else { // this could be more efficient, since we only need to reorder results // for batches of segments with the same segment start time. resultSeqToAdd = toolChest.mergeSequencesUnordered( Sequences.map( - clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec), context), + clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec), responseContext), new Function>() { private final Function cacheFn = strategy.prepareForCache(); diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index f76dcbb9dd2..cb0616d98d3 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -73,7 +73,7 @@ public class CachingQueryRunner implements QueryRunner } @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { final CacheStrategy strategy = toolChest.getCacheStrategy(query); @@ -143,7 +143,7 @@ public class CachingQueryRunner implements QueryRunner return Sequences.withEffect( Sequences.map( - base.run(query, context), + base.run(query, responseContext), new Function() { @Override @@ -165,7 +165,7 @@ public class CachingQueryRunner implements QueryRunner MoreExecutors.sameThreadExecutor() ); } else { - return base.run(query, context); + return base.run(query, responseContext); } } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index dca9f440bb3..d2dcf606ee1 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -54,6 +54,7 @@ public class LocalFirehoseFactory implements FirehoseFactory() { @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { try { Server instance = brokerSelector.pick(); diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index f3d22df8fe6..292cad4b26e 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -21,7 +21,6 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.inject.Inject; @@ -48,6 +47,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.ReferenceCountingSegmentQueryRunner; +import io.druid.query.ReportTimelineMissingSegmentQueryRunner; import io.druid.query.SegmentDescriptor; import io.druid.query.TableDataSource; import io.druid.query.spec.SpecificSegmentQueryRunner; @@ -261,7 +261,7 @@ public class ServerManager implements QuerySegmentWalker return new NoopQueryRunner(); } - FunctionalIterable> adapters = FunctionalIterable + FunctionalIterable> queryRunners = FunctionalIterable .create(intervals) .transformCat( new Function>>() @@ -278,7 +278,8 @@ public class ServerManager implements QuerySegmentWalker { @Override public Iterable> apply( - @Nullable final TimelineObjectHolder holder + @Nullable + final TimelineObjectHolder holder ) { if (holder == null) { @@ -306,17 +307,15 @@ public class ServerManager implements QuerySegmentWalker ); } } - ) - .filter(Predicates.>notNull()); + ); } } - ) - .filter( - Predicates.>notNull() ); - - return new FinalizeResultsQueryRunner(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); + return new FinalizeResultsQueryRunner( + toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)), + toolChest + ); } private String getDataSourceName(DataSource dataSource) @@ -339,13 +338,15 @@ public class ServerManager implements QuerySegmentWalker String dataSourceName = getDataSourceName(query.getDataSource()); - final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); + final VersionedIntervalTimeline timeline = dataSources.get( + dataSourceName + ); if (timeline == null) { return new NoopQueryRunner(); } - FunctionalIterable> adapters = FunctionalIterable + FunctionalIterable> queryRunners = FunctionalIterable .create(specs) .transformCat( new Function>>() @@ -359,12 +360,12 @@ public class ServerManager implements QuerySegmentWalker ); if (entry == null) { - return null; + return Arrays.>asList(new ReportTimelineMissingSegmentQueryRunner(input)); } final PartitionChunk chunk = entry.getChunk(input.getPartitionNumber()); if (chunk == null) { - return null; + return Arrays.>asList(new ReportTimelineMissingSegmentQueryRunner(input)); } final ReferenceCountingSegment adapter = chunk.getObject(); @@ -373,12 +374,12 @@ public class ServerManager implements QuerySegmentWalker ); } } - ) - .filter( - Predicates.>notNull() ); - return new FinalizeResultsQueryRunner(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); + return new FinalizeResultsQueryRunner( + toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)), + toolChest + ); } private QueryRunner buildAndDecorateQueryRunner( diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index 56ff5159c62..3d6fdaea13a 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -87,4 +87,4 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec } }; } -} \ No newline at end of file +} diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index a358ec44798..4975e0b5f01 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -124,7 +124,7 @@ public class CachingQueryRunnerTest new QueryRunner() { @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { return resultSeq; } @@ -214,7 +214,7 @@ public class CachingQueryRunnerTest new QueryRunner() { @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { return Sequences.empty(); } diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index f6a139ffb9c..12fdeabe7da 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -685,9 +685,9 @@ public class ServerManagerTest } @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { - return new BlockingSequence(runner.run(query, context), waitLatch, waitYieldLatch, notifyLatch); + return new BlockingSequence(runner.run(query, responseContext), waitLatch, waitYieldLatch, notifyLatch); } }