Merge branch 'master' into refactor-examples

This commit is contained in:
fjy 2014-11-24 11:00:26 -08:00
commit 13cae41f6c
46 changed files with 401 additions and 251 deletions

View File

@ -29,23 +29,23 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultiset; import com.google.common.hash.HashFunction;
import com.google.common.primitives.Ints; import com.google.common.hash.Hashing;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.index.YeOldePlumberSchool; import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IOConfig; import io.druid.segment.indexing.IOConfig;
import io.druid.segment.indexing.IngestionSpec; import io.druid.segment.indexing.IngestionSpec;
@ -59,7 +59,6 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -67,7 +66,6 @@ import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -76,6 +74,26 @@ public class IndexTask extends AbstractFixedIntervalTask
{ {
private static final Logger log = new Logger(IndexTask.class); private static final Logger log = new Logger(IndexTask.class);
private static HashFunction hashFunction = Hashing.murmur3_128();
/**
* Should we index this inputRow? Decision is based on our interval and shardSpec.
*
* @param inputRow the row to check
*
* @return true or false
*/
private static boolean shouldIndex(
final ShardSpec shardSpec,
final Interval interval,
final InputRow inputRow,
final QueryGranularity rollupGran
)
{
return interval.contains(inputRow.getTimestampFromEpoch())
&& shardSpec.isInChunk(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow);
}
private static String makeId(String id, IndexIngestionSpec ingestionSchema) private static String makeId(String id, IndexIngestionSpec ingestionSchema)
{ {
if (id == null) { if (id == null) {
@ -153,7 +171,7 @@ public class IndexTask extends AbstractFixedIntervalTask
for (final Interval bucket : validIntervals) { for (final Interval bucket : validIntervals) {
final List<ShardSpec> shardSpecs; final List<ShardSpec> shardSpecs;
if (targetPartitionSize > 0) { if (targetPartitionSize > 0) {
shardSpecs = determinePartitions(bucket, targetPartitionSize); shardSpecs = determinePartitions(bucket, targetPartitionSize, granularitySpec.getQueryGranularity());
} else { } else {
int numShards = ingestionSchema.getTuningConfig().getNumShards(); int numShards = ingestionSchema.getTuningConfig().getNumShards();
if (numShards > 0) { if (numShards > 0) {
@ -200,7 +218,8 @@ public class IndexTask extends AbstractFixedIntervalTask
private List<ShardSpec> determinePartitions( private List<ShardSpec> determinePartitions(
final Interval interval, final Interval interval,
final int targetPartitionSize final int targetPartitionSize,
final QueryGranularity queryGranularity
) throws IOException ) throws IOException
{ {
log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize); log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize);
@ -208,113 +227,49 @@ public class IndexTask extends AbstractFixedIntervalTask
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
// The implementation of this determine partitions stuff is less than optimal. Should be done better. // The implementation of this determine partitions stuff is less than optimal. Should be done better.
// Use HLL to estimate number of rows
// Blacklist dimensions that have multiple values per row HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
final Set<String> unusableDimensions = com.google.common.collect.Sets.newHashSet();
// Track values of all non-blacklisted dimensions
final Map<String, TreeMultiset<String>> dimensionValueMultisets = Maps.newHashMap();
// Load data // Load data
try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) { try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) {
while (firehose.hasMore()) { while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow(); final InputRow inputRow = firehose.nextRow();
if (interval.contains(inputRow.getTimestampFromEpoch())) { if (interval.contains(inputRow.getTimestampFromEpoch())) {
// Extract dimensions from event final List<Object> groupKey = Rows.toGroupKey(
for (final String dim : inputRow.getDimensions()) { queryGranularity.truncate(inputRow.getTimestampFromEpoch()),
final List<String> dimValues = inputRow.getDimension(dim); inputRow
if (!unusableDimensions.contains(dim)) { );
if (dimValues.size() == 1) { collector.add(
// Track this value hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey))
TreeMultiset<String> dimensionValueMultiset = dimensionValueMultisets.get(dim); .asBytes()
if (dimensionValueMultiset == null) { );
dimensionValueMultiset = TreeMultiset.create();
dimensionValueMultisets.put(dim, dimensionValueMultiset);
}
dimensionValueMultiset.add(dimValues.get(0));
} else {
// Only single-valued dimensions can be used for partitions
unusableDimensions.add(dim);
dimensionValueMultisets.remove(dim);
}
}
}
} }
} }
} }
final double numRows = collector.estimateCardinality();
log.info("Estimated approximately [%,f] rows of data.", numRows);
int numberOfShards = (int) Math.ceil(numRows / targetPartitionSize);
if ((double) numberOfShards > numRows) {
numberOfShards = (int) numRows;
}
log.info("Will require [%,d] shard(s).", numberOfShards);
// ShardSpecs we will return // ShardSpecs we will return
final List<ShardSpec> shardSpecs = Lists.newArrayList(); final List<ShardSpec> shardSpecs = Lists.newArrayList();
// Select highest-cardinality dimension if (numberOfShards == 1) {
Ordering<Map.Entry<String, TreeMultiset<String>>> byCardinalityOrdering = new Ordering<Map.Entry<String, TreeMultiset<String>>>()
{
@Override
public int compare(
Map.Entry<String, TreeMultiset<String>> left,
Map.Entry<String, TreeMultiset<String>> right
)
{
return Ints.compare(left.getValue().elementSet().size(), right.getValue().elementSet().size());
}
};
if (dimensionValueMultisets.isEmpty()) {
// No suitable partition dimension. We'll make one big segment and hope for the best.
log.info("No suitable partition dimension found");
shardSpecs.add(new NoneShardSpec()); shardSpecs.add(new NoneShardSpec());
} else { } else {
// Find best partition dimension (heuristic: highest cardinality). for (int i = 0; i < numberOfShards; ++i) {
final Map.Entry<String, TreeMultiset<String>> partitionEntry = shardSpecs.add(
byCardinalityOrdering.max(dimensionValueMultisets.entrySet()); new HashBasedNumberedShardSpec(
i,
final String partitionDim = partitionEntry.getKey(); numberOfShards,
final TreeMultiset<String> partitionDimValues = partitionEntry.getValue(); HadoopDruidIndexerConfig.jsonMapper
)
log.info( );
"Partitioning on dimension[%s] with cardinality[%d] over rows[%d]",
partitionDim,
partitionDimValues.elementSet().size(),
partitionDimValues.size()
);
// Iterate over unique partition dimension values in sorted order
String currentPartitionStart = null;
int currentPartitionSize = 0;
for (final String partitionDimValue : partitionDimValues.elementSet()) {
currentPartitionSize += partitionDimValues.count(partitionDimValue);
if (currentPartitionSize >= targetPartitionSize) {
final ShardSpec shardSpec = new SingleDimensionShardSpec(
partitionDim,
currentPartitionStart,
partitionDimValue,
shardSpecs.size()
);
log.info("Adding shard: %s", shardSpec);
shardSpecs.add(shardSpec);
currentPartitionSize = partitionDimValues.count(partitionDimValue);
currentPartitionStart = partitionDimValue;
}
}
if (currentPartitionSize > 0) {
// One last shard to go
final ShardSpec shardSpec;
if (shardSpecs.isEmpty()) {
shardSpec = new NoneShardSpec();
} else {
shardSpec = new SingleDimensionShardSpec(
partitionDim,
currentPartitionStart,
null,
shardSpecs.size()
);
}
log.info("Adding shard: %s", shardSpec);
shardSpecs.add(shardSpec);
} }
} }
@ -437,24 +392,6 @@ public class IndexTask extends AbstractFixedIntervalTask
return Iterables.getOnlyElement(pushedSegments); return Iterables.getOnlyElement(pushedSegments);
} }
/**
* Should we index this inputRow? Decision is based on our interval and shardSpec.
*
* @param inputRow the row to check
*
* @return true or false
*/
private static boolean shouldIndex(
final ShardSpec shardSpec,
final Interval interval,
final InputRow inputRow,
final QueryGranularity rollupGran
)
{
return interval.contains(inputRow.getTimestampFromEpoch())
&& shardSpec.isInChunk(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow);
}
public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, IndexTuningConfig> public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, IndexTuningConfig>
{ {
private final DataSchema dataSchema; private final DataSchema dataSchema;

View File

@ -0,0 +1,159 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.google.api.client.util.Lists;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.List;
public class IndexTaskTest
{
@Test
public void testDeterminePartitions() throws Exception
{
File tmpFile = File.createTempFile("druid", "index");
tmpFile.deleteOnExit();
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("2014-01-01T00:00:10Z,a,1");
writer.println("2014-01-01T01:00:20Z,b,1");
writer.println("2014-01-01T02:00:30Z,c,1");
writer.close();
IndexTask indexTask = new IndexTask(
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto"
),
new DimensionsSpec(
Arrays.asList("ts"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
Granularity.DAY,
QueryGranularity.MINUTE,
Arrays.asList(new Interval("2014/2015"))
)
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
tmpFile.getParentFile(),
"druid*",
null
)
),
new IndexTask.IndexTuningConfig(
2,
0,
null
)
),
new DefaultObjectMapper()
);
final List<DataSegment> segments = Lists.newArrayList();
indexTask.run(
new TaskToolbox(
null, null, new TaskActionClientFactory()
{
@Override
public TaskActionClient create(Task task)
{
return new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof LockListAction) {
return (RetType) Arrays.asList(
new TaskLock(
"", "", null, new DateTime().toString()
)
);
}
return null;
}
};
}
}, null, new DataSegmentPusher()
{
@Override
public String getPathForHadoop(String dataSource)
{
return null;
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
segments.add(segment);
return segment;
}
}, null, null, null, null, null, null, null, null, null, null, null
)
);
Assert.assertEquals(2, segments.size());
}
}

View File

@ -49,11 +49,11 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query, Map<String, Object> context) public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{ {
if (query.getContextBySegment(false)) { if (query.getContextBySegment(false)) {
final Sequence<T> baseSequence = base.run(query, context); final Sequence<T> baseSequence = base.run(query, responseContext);
final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList()); final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
return Sequences.simple( return Sequences.simple(
Arrays.asList( Arrays.asList(
@ -68,6 +68,6 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
) )
); );
} }
return base.run(query, context); return base.run(query, responseContext);
} }
} }

View File

@ -37,13 +37,13 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(Query<T> query, Map<String, Object> context) public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{ {
if (query.getContextBySegment(false)) { if (query.getContextBySegment(false)) {
return baseRunner.run(query, context); return baseRunner.run(query, responseContext);
} }
return doRun(baseRunner, query, context); return doRun(baseRunner, query, responseContext);
} }
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context); protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context);

View File

@ -20,7 +20,6 @@
package io.druid.query; package io.druid.query;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -89,12 +88,12 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
// since it already implements ListeningExecutorService // since it already implements ListeningExecutorService
this.exec = MoreExecutors.listeningDecorator(exec); this.exec = MoreExecutors.listeningDecorator(exec);
this.ordering = ordering; this.ordering = ordering;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.queryables = Iterables.unmodifiableIterable(queryables);
this.queryWatcher = queryWatcher; this.queryWatcher = queryWatcher;
} }
@Override @Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context) public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{ {
final int priority = query.getContextPriority(0); final int priority = query.getContextPriority(0);
@ -125,11 +124,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public Iterable<T> call() throws Exception public Iterable<T> call() throws Exception
{ {
try { try {
if (input == null) { Sequence<T> result = input.run(query, responseContext);
throw new ISE("Input is null?! How is this possible?!");
}
Sequence<T> result = input.run(query, context);
if (result == null) { if (result == null) {
throw new ISE("Got a null result! Segments are missing!"); throw new ISE("Got a null result! Segments are missing!");
} }

View File

@ -39,7 +39,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context) public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{ {
return Sequences.concat( return Sequences.concat(
Sequences.map( Sequences.map(
@ -49,7 +49,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> apply(final QueryRunner<T> input) public Sequence<T> apply(final QueryRunner<T> input)
{ {
return input.run(query, context); return input.run(query, responseContext);
} }
} }
) )

View File

@ -49,7 +49,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query, Map<String, Object> context) public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{ {
final boolean isBySegment = query.getContextBySegment(false); final boolean isBySegment = query.getContextBySegment(false);
final boolean shouldFinalize = query.getContextFinalize(true); final boolean shouldFinalize = query.getContextFinalize(true);
@ -102,7 +102,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
return Sequences.map( return Sequences.map(
baseRunner.run(queryToRun, context), baseRunner.run(queryToRun, responseContext),
finalizerFn finalizerFn
); );

View File

@ -78,7 +78,7 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> queryParam, final Map<String, Object> context) public Sequence<T> run(final Query<T> queryParam, final Map<String, Object> responseContext)
{ {
final GroupByQuery query = (GroupByQuery) queryParam; final GroupByQuery query = (GroupByQuery) queryParam;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
@ -111,10 +111,10 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
{ {
try { try {
if (bySegment) { if (bySegment) {
input.run(queryParam, context) input.run(queryParam, responseContext)
.accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs); .accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
} else { } else {
input.run(queryParam, context) input.run(queryParam, responseContext)
.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); .accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
} }

View File

@ -49,10 +49,10 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context) public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{ {
if (period.getMillis() == 0) { if (period.getMillis() == 0) {
return baseRunner.run(query, context); return baseRunner.run(query, responseContext);
} }
return Sequences.concat( return Sequences.concat(
@ -76,7 +76,7 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
{ {
return baseRunner.run( return baseRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))), query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))),
context responseContext
); );
} }
} }

View File

@ -84,7 +84,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context) public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{ {
final ServiceMetricEvent.Builder builder = builderFn.apply(query); final ServiceMetricEvent.Builder builder = builderFn.apply(query);
String queryId = query.getId(); String queryId = query.getId();
@ -102,7 +102,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
try { try {
retVal = queryRunner.run(query, context).accumulate(outType, accumulator); retVal = queryRunner.run(query, responseContext).accumulate(outType, accumulator);
} }
catch (RuntimeException e) { catch (RuntimeException e) {
builder.setUser10("failed"); builder.setUser10("failed");
@ -132,7 +132,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
try { try {
retVal = queryRunner.run(query, context).toYielder(initValue, accumulator); retVal = queryRunner.run(query, responseContext).toYielder(initValue, accumulator);
} }
catch (RuntimeException e) { catch (RuntimeException e) {
builder.setUser10("failed"); builder.setUser10("failed");

View File

@ -30,7 +30,7 @@ import java.util.Map;
public class NoopQueryRunner<T> implements QueryRunner<T> public class NoopQueryRunner<T> implements QueryRunner<T>
{ {
@Override @Override
public Sequence<T> run(Query<T> query, Map<String, Object> context) public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{ {
return Sequences.empty(); return Sequences.empty();
} }

View File

@ -27,5 +27,5 @@ import java.util.Map;
*/ */
public interface QueryRunner<T> public interface QueryRunner<T>
{ {
public Sequence<T> run(Query<T> query, Map<String, Object> context); public Sequence<T> run(Query<T> query, Map<String, Object> responseContext);
} }

View File

@ -45,11 +45,11 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query, Map<String, Object> context) public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{ {
final Closeable closeable = adapter.increment(); final Closeable closeable = adapter.increment();
try { try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, context); final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, responseContext);
return new ResourceClosingSequence<T>(baseSequence, closeable); return new ResourceClosingSequence<T>(baseSequence, closeable);
} }

View File

@ -0,0 +1,53 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import java.util.List;
import java.util.Map;
/**
*/
public class ReportTimelineMissingSegmentQueryRunner<T> implements QueryRunner<T>
{
private final SegmentDescriptor descriptor;
public ReportTimelineMissingSegmentQueryRunner(SegmentDescriptor descriptor)
{
this.descriptor = descriptor;
}
@Override
public Sequence<T> run(
Query<T> query, Map<String, Object> responseContext
)
{
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(descriptor);
return Sequences.empty();
}
}

View File

@ -27,6 +27,8 @@ import org.joda.time.DateTime;
*/ */
public class Result<T> implements Comparable<Result<T>> public class Result<T> implements Comparable<Result<T>>
{ {
public static String MISSING_SEGMENTS_KEY = "missingSegments";
private final DateTime timestamp; private final DateTime timestamp;
private final T value; private final T value;

View File

@ -36,7 +36,6 @@ import java.util.Map;
public class RetryQueryRunner<T> implements QueryRunner<T> public class RetryQueryRunner<T> implements QueryRunner<T>
{ {
public static String MISSING_SEGMENTS_KEY = "missingSegments";
private static final EmittingLogger log = new EmittingLogger(RetryQueryRunner.class); private static final EmittingLogger log = new EmittingLogger(RetryQueryRunner.class);
private final QueryRunner<T> baseRunner; private final QueryRunner<T> baseRunner;
@ -76,7 +75,7 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
for (int i = 0; i < config.getNumTries(); i++) { for (int i = 0; i < config.getNumTries(); i++) {
log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i);
context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList()); context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
final Query<T> retryQuery = query.withQuerySegmentSpec( final Query<T> retryQuery = query.withQuerySegmentSpec(
new MultipleSpecificSegmentSpec( new MultipleSpecificSegmentSpec(
missingSegments missingSegments
@ -102,7 +101,7 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
private List<SegmentDescriptor> getMissingSegments(final Map<String, Object> context) private List<SegmentDescriptor> getMissingSegments(final Map<String, Object> context)
{ {
final Object maybeMissingSegments = context.get(MISSING_SEGMENTS_KEY); final Object maybeMissingSegments = context.get(Result.MISSING_SEGMENTS_KEY);
if (maybeMissingSegments == null) { if (maybeMissingSegments == null) {
return Lists.newArrayList(); return Lists.newArrayList();
} }
@ -115,4 +114,3 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
); );
} }
} }

View File

@ -39,13 +39,13 @@ public class SubqueryQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query, Map<String, Object> context) public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{ {
DataSource dataSource = query.getDataSource(); DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) { if (dataSource instanceof QueryDataSource) {
return run((Query<T>) ((QueryDataSource) dataSource).getQuery(), context); return run((Query<T>) ((QueryDataSource) dataSource).getQuery(), responseContext);
} else { } else {
return baseRunner.run(query, context); return baseRunner.run(query, responseContext);
} }
} }
} }

View File

@ -81,7 +81,7 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
return new QueryRunner<T>() return new QueryRunner<T>()
{ {
@Override @Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context) public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{ {
final long offset = computeOffset(now); final long offset = computeOffset(now);
@ -93,7 +93,7 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
return Sequences.map( return Sequences.map(
baseRunner.run( baseRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))), query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))),
context responseContext
), ),
new Function<T, T>() new Function<T, T>()
{ {

View File

@ -44,7 +44,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context) public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{ {
DataSource dataSource = query.getDataSource(); DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) { if (dataSource instanceof UnionDataSource) {
@ -59,7 +59,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
{ {
return baseRunner.run( return baseRunner.run(
query.withDataSource(singleSource), query.withDataSource(singleSource),
context responseContext
); );
} }
} }
@ -67,7 +67,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
) )
); );
} else { } else {
return baseRunner.run(query, context); return baseRunner.run(query, responseContext);
} }
} }

View File

@ -111,16 +111,18 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
return new QueryRunner<Row>() return new QueryRunner<Row>()
{ {
@Override @Override
public Sequence<Row> run(Query<Row> input, Map<String, Object> context) public Sequence<Row> run(Query<Row> input, Map<String, Object> responseContext)
{ {
if (input.getContextBySegment(false)) { if (input.getContextBySegment(false)) {
return runner.run(input, context); return runner.run(input, responseContext);
} }
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner, context); return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner,
responseContext
);
} }
return runner.run(input, context); return runner.run(input, responseContext);
} }
}; };
} }

View File

@ -108,7 +108,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
return new QueryRunner<Row>() return new QueryRunner<Row>()
{ {
@Override @Override
public Sequence<Row> run(final Query<Row> query, final Map<String, Object> context) public Sequence<Row> run(final Query<Row> query, final Map<String, Object> responseContext)
{ {
final GroupByQuery queryParam = (GroupByQuery) query; final GroupByQuery queryParam = (GroupByQuery) query;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper
@ -128,13 +128,13 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
public Void call() throws Exception public Void call() throws Exception
{ {
if (bySegment) { if (bySegment) {
input.run(queryParam, context) input.run(queryParam, responseContext)
.accumulate( .accumulate(
bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.lhs,
bySegmentAccumulatorPair.rhs bySegmentAccumulatorPair.rhs
); );
} else { } else {
input.run(query, context) input.run(query, responseContext)
.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); .accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
} }
@ -203,7 +203,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
} }
@Override @Override
public Sequence<Row> run(Query<Row> input, Map<String, Object> context) public Sequence<Row> run(Query<Row> input, Map<String, Object> responseContext)
{ {
if (!(input instanceof GroupByQuery)) { if (!(input instanceof GroupByQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class); throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class);

View File

@ -77,7 +77,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
return new QueryRunner<SegmentAnalysis>() return new QueryRunner<SegmentAnalysis>()
{ {
@Override @Override
public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Object> context) public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Object> responseContext)
{ {
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ; SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
@ -138,7 +138,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
@Override @Override
public Sequence<SegmentAnalysis> run( public Sequence<SegmentAnalysis> run(
final Query<SegmentAnalysis> query, final Query<SegmentAnalysis> query,
final Map<String, Object> context final Map<String, Object> responseContext
) )
{ {
final int priority = query.getContextPriority(0); final int priority = query.getContextPriority(0);
@ -148,7 +148,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
@Override @Override
public Sequence<SegmentAnalysis> call() throws Exception public Sequence<SegmentAnalysis> call() throws Exception
{ {
return input.run(query, context); return input.run(query, responseContext);
} }
} }
); );

View File

@ -280,7 +280,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
@Override @Override
public Sequence<Result<SearchResultValue>> run( public Sequence<Result<SearchResultValue>> run(
Query<Result<SearchResultValue>> input, Query<Result<SearchResultValue>> input,
Map<String, Object> context Map<String, Object> responseContext
) )
{ {
if (!(input instanceof SearchQuery)) { if (!(input instanceof SearchQuery)) {
@ -289,13 +289,13 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
final SearchQuery query = (SearchQuery) input; final SearchQuery query = (SearchQuery) input;
if (query.getLimit() < config.getMaxSearchLimit()) { if (query.getLimit() < config.getMaxSearchLimit()) {
return runner.run(query, context); return runner.run(query, responseContext);
} }
final boolean isBySegment = query.getContextBySegment(false); final boolean isBySegment = query.getContextBySegment(false);
return Sequences.map( return Sequences.map(
runner.run(query.withLimit(config.getMaxSearchLimit()), context), runner.run(query.withLimit(config.getMaxSearchLimit()), responseContext),
new Function<Result<SearchResultValue>, Result<SearchResultValue>>() new Function<Result<SearchResultValue>, Result<SearchResultValue>>()
{ {
@Override @Override

View File

@ -73,7 +73,7 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
@Override @Override
public Sequence<Result<SearchResultValue>> run( public Sequence<Result<SearchResultValue>> run(
final Query<Result<SearchResultValue>> input, final Query<Result<SearchResultValue>> input,
Map<String, Object> context Map<String, Object> responseContext
) )
{ {
if (!(input instanceof SearchQuery)) { if (!(input instanceof SearchQuery)) {

View File

@ -92,7 +92,7 @@ public class SelectQueryRunnerFactory
@Override @Override
public Sequence<Result<SelectResultValue>> run( public Sequence<Result<SelectResultValue>> run(
Query<Result<SelectResultValue>> input, Query<Result<SelectResultValue>> input,
Map<String, Object> context Map<String, Object> responseContext
) )
{ {
if (!(input instanceof SelectQuery)) { if (!(input instanceof SelectQuery)) {

View File

@ -27,7 +27,7 @@ import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.guava.YieldingAccumulator;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.RetryQueryRunner; import io.druid.query.Result;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.segment.SegmentMissingException; import io.druid.segment.SegmentMissingException;
@ -53,7 +53,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> input, final Map<String, Object> context) public Sequence<T> run(final Query<T> input, final Map<String, Object> responseContext)
{ {
final Query<T> query = input.withQuerySegmentSpec(specificSpec); final Query<T> query = input.withQuerySegmentSpec(specificSpec);
@ -67,7 +67,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> call() throws Exception public Sequence<T> call() throws Exception
{ {
return base.run(query, context); return base.run(query, responseContext);
} }
} }
); );
@ -87,10 +87,10 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
return baseSequence.accumulate(initValue, accumulator); return baseSequence.accumulate(initValue, accumulator);
} }
catch (SegmentMissingException e) { catch (SegmentMissingException e) {
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY);
if (missingSegments == null) { if (missingSegments == null) {
missingSegments = Lists.newArrayList(); missingSegments = Lists.newArrayList();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, missingSegments); responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments);
} }
missingSegments.add(specificSpec.getDescriptor()); missingSegments.add(specificSpec.getDescriptor());
return initValue; return initValue;

View File

@ -87,7 +87,7 @@ public class TimeBoundaryQueryRunnerFactory
@Override @Override
public Sequence<Result<TimeBoundaryResultValue>> run( public Sequence<Result<TimeBoundaryResultValue>> run(
Query<Result<TimeBoundaryResultValue>> input, Query<Result<TimeBoundaryResultValue>> input,
Map<String, Object> context Map<String, Object> responseContext
) )
{ {
if (!(input instanceof TimeBoundaryQuery)) { if (!(input instanceof TimeBoundaryQuery)) {

View File

@ -93,7 +93,7 @@ public class TimeseriesQueryRunnerFactory
@Override @Override
public Sequence<Result<TimeseriesResultValue>> run( public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> input, Query<Result<TimeseriesResultValue>> input,
Map<String, Object> context Map<String, Object> responseContext
) )
{ {
if (!(input instanceof TimeseriesQuery)) { if (!(input instanceof TimeseriesQuery)) {

View File

@ -414,7 +414,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override @Override
public Sequence<Result<TopNResultValue>> run( public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> input, Query<Result<TopNResultValue>> input,
Map<String, Object> context Map<String, Object> responseContext
) )
{ {
if (!(input instanceof TopNQuery)) { if (!(input instanceof TopNQuery)) {
@ -423,13 +423,13 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
final TopNQuery query = (TopNQuery) input; final TopNQuery query = (TopNQuery) input;
if (query.getThreshold() > minTopNThreshold) { if (query.getThreshold() > minTopNThreshold) {
return runner.run(query, context); return runner.run(query, responseContext);
} }
final boolean isBySegment = query.getContextBySegment(false); final boolean isBySegment = query.getContextBySegment(false);
return Sequences.map( return Sequences.map(
runner.run(query.withThreshold(minTopNThreshold), context), runner.run(query.withThreshold(minTopNThreshold), responseContext),
new Function<Result<TopNResultValue>, Result<TopNResultValue>>() new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{ {
@Override @Override

View File

@ -67,7 +67,7 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
@Override @Override
public Sequence<Result<TopNResultValue>> run( public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> input, Query<Result<TopNResultValue>> input,
Map<String, Object> context Map<String, Object> responseContext
) )
{ {
if (!(input instanceof TopNQuery)) { if (!(input instanceof TopNQuery)) {

View File

@ -279,7 +279,7 @@ public class ChainedExecutionQueryRunnerTest
} }
@Override @Override
public Sequence<Integer> run(Query<Integer> query, Map<String, Object> context) public Sequence<Integer> run(Query<Integer> query, Map<String, Object> responseContext)
{ {
hasStarted = true; hasStarted = true;
start.countDown(); start.countDown();

View File

@ -46,14 +46,14 @@ public class RetryQueryRunnerTest
public void testRunWithMissingSegments() throws Exception public void testRunWithMissingSegments() throws Exception
{ {
Map<String, Object> context = new MapMaker().makeMap(); Map<String, Object> context = new MapMaker().makeMap();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>( RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>() new QueryRunner<Result<TimeseriesResultValue>>()
{ {
@Override @Override
public Sequence<Result<TimeseriesResultValue>> run(Query query, Map context) public Sequence<Result<TimeseriesResultValue>> run(Query query, Map context)
{ {
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add( ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor( new SegmentDescriptor(
new Interval( new Interval(
178888, 178888,
@ -70,7 +70,8 @@ public class RetryQueryRunnerTest
new RetryQueryRunnerConfig() new RetryQueryRunnerConfig()
{ {
@Override @Override
public int getNumTries() { public int getNumTries()
{
return 0; return 0;
} }
@ -90,7 +91,7 @@ public class RetryQueryRunnerTest
Assert.assertTrue( Assert.assertTrue(
"Should have one entry in the list of missing segments", "Should have one entry in the list of missing segments",
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 1 ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 1
); );
Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0); Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0);
} }
@ -101,7 +102,7 @@ public class RetryQueryRunnerTest
{ {
Map<String, Object> context = new MapMaker().makeMap(); Map<String, Object> context = new MapMaker().makeMap();
context.put("count", 0); context.put("count", 0);
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>( RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>() new QueryRunner<Result<TimeseriesResultValue>>()
{ {
@ -112,7 +113,7 @@ public class RetryQueryRunnerTest
) )
{ {
if ((int) context.get("count") == 0) { if ((int) context.get("count") == 0) {
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add( ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor( new SegmentDescriptor(
new Interval( new Interval(
178888, 178888,
@ -159,7 +160,7 @@ public class RetryQueryRunnerTest
Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1); Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1);
Assert.assertTrue( Assert.assertTrue(
"Should have nothing in missingSegment list", "Should have nothing in missingSegment list",
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 0 ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 0
); );
} }
@ -168,7 +169,7 @@ public class RetryQueryRunnerTest
{ {
Map<String, Object> context = new MapMaker().makeMap(); Map<String, Object> context = new MapMaker().makeMap();
context.put("count", 0); context.put("count", 0);
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>( RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>() new QueryRunner<Result<TimeseriesResultValue>>()
{ {
@ -179,7 +180,7 @@ public class RetryQueryRunnerTest
) )
{ {
if ((int) context.get("count") < 3) { if ((int) context.get("count") < 3) {
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add( ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor( new SegmentDescriptor(
new Interval( new Interval(
178888, 178888,
@ -226,7 +227,7 @@ public class RetryQueryRunnerTest
Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1); Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1);
Assert.assertTrue( Assert.assertTrue(
"Should have nothing in missingSegment list", "Should have nothing in missingSegment list",
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 0 ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 0
); );
} }
@ -234,7 +235,7 @@ public class RetryQueryRunnerTest
public void testException() throws Exception public void testException() throws Exception
{ {
Map<String, Object> context = new MapMaker().makeMap(); Map<String, Object> context = new MapMaker().makeMap();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>( RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>() new QueryRunner<Result<TimeseriesResultValue>>()
{ {
@ -244,7 +245,7 @@ public class RetryQueryRunnerTest
Map<String, Object> context Map<String, Object> context
) )
{ {
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add( ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor( new SegmentDescriptor(
new Interval( new Interval(
178888, 178888,
@ -277,7 +278,7 @@ public class RetryQueryRunnerTest
Assert.assertTrue( Assert.assertTrue(
"Should have one entry in the list of missing segments", "Should have one entry in the list of missing segments",
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 1 ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 1
); );
} }
} }

View File

@ -82,7 +82,7 @@ public class TimewarpOperatorTest
@Override @Override
public Sequence<Result<TimeseriesResultValue>> run( public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query, Query<Result<TimeseriesResultValue>> query,
Map<String, Object> context Map<String, Object> responseContext
) )
{ {
return Sequences.simple( return Sequences.simple(
@ -144,7 +144,7 @@ public class TimewarpOperatorTest
@Override @Override
public Sequence<Result<TimeBoundaryResultValue>> run( public Sequence<Result<TimeBoundaryResultValue>> run(
Query<Result<TimeBoundaryResultValue>> query, Query<Result<TimeBoundaryResultValue>> query,
Map<String, Object> context Map<String, Object> responseContext
) )
{ {
return Sequences.simple( return Sequences.simple(
@ -194,7 +194,7 @@ public class TimewarpOperatorTest
@Override @Override
public Sequence<Result<TimeseriesResultValue>> run( public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query, Query<Result<TimeseriesResultValue>> query,
Map<String, Object> context Map<String, Object> responseContext
) )
{ {
return Sequences.simple( return Sequences.simple(

View File

@ -556,7 +556,7 @@ public class GroupByQueryRunnerTest
{ {
@Override @Override
public Sequence<Row> run( public Sequence<Row> run(
Query<Row> query, Map<String, Object> context Query<Row> query, Map<String, Object> responseContext
) )
{ {
// simulate two daily segments // simulate two daily segments
@ -566,7 +566,7 @@ public class GroupByQueryRunnerTest
final Query query2 = query.withQuerySegmentSpec( final Query query2 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
); );
return Sequences.concat(runner.run(query1, context), runner.run(query2, context)); return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext));
} }
} }
); );
@ -752,7 +752,7 @@ public class GroupByQueryRunnerTest
{ {
@Override @Override
public Sequence<Row> run( public Sequence<Row> run(
Query<Row> query, Map<String, Object> context Query<Row> query, Map<String, Object> responseContext
) )
{ {
// simulate two daily segments // simulate two daily segments
@ -762,7 +762,7 @@ public class GroupByQueryRunnerTest
final Query query2 = query.withQuerySegmentSpec( final Query query2 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
); );
return Sequences.concat(runner.run(query1, context), runner.run(query2, context)); return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext));
} }
} }
); );
@ -1109,7 +1109,7 @@ public class GroupByQueryRunnerTest
{ {
@Override @Override
public Sequence<Row> run( public Sequence<Row> run(
Query<Row> query, Map<String, Object> context Query<Row> query, Map<String, Object> responseContext
) )
{ {
// simulate two daily segments // simulate two daily segments
@ -1119,7 +1119,7 @@ public class GroupByQueryRunnerTest
final Query query2 = query.withQuerySegmentSpec( final Query query2 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
); );
return Sequences.concat(runner.run(query1, context), runner.run(query2, context)); return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext));
} }
} }
); );

View File

@ -95,7 +95,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
QueryRunner timeseriesRunner = new QueryRunner() QueryRunner timeseriesRunner = new QueryRunner()
{ {
@Override @Override
public Sequence run(Query query, Map metadata) public Sequence run(Query query, Map responseContext)
{ {
TimeseriesQuery tsQuery = (TimeseriesQuery) query; TimeseriesQuery tsQuery = (TimeseriesQuery) query;
@ -109,7 +109,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
.setAggregatorSpecs(tsQuery.getAggregatorSpecs()) .setAggregatorSpecs(tsQuery.getAggregatorSpecs())
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs()) .setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
.build(), .build(),
metadata responseContext
), ),
new Function<Row, Result<TimeseriesResultValue>>() new Function<Row, Result<TimeseriesResultValue>>()
{ {

View File

@ -69,7 +69,7 @@ public class SpecificSegmentQueryRunnerTest
new QueryRunner() new QueryRunner()
{ {
@Override @Override
public Sequence run(Query query, Map context) public Sequence run(Query query, Map responseContext)
{ {
return new Sequence() return new Sequence()
{ {
@ -112,7 +112,7 @@ public class SpecificSegmentQueryRunnerTest
); );
Sequences.toList(results, Lists.newArrayList()); Sequences.toList(results, Lists.newArrayList());
Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY);
Assert.assertTrue(missingSegments != null); Assert.assertTrue(missingSegments != null);
Assert.assertTrue(missingSegments instanceof List); Assert.assertTrue(missingSegments instanceof List);
@ -149,7 +149,7 @@ public class SpecificSegmentQueryRunnerTest
new QueryRunner() new QueryRunner()
{ {
@Override @Override
public Sequence run(Query query, Map context) public Sequence run(Query query, Map responseContext)
{ {
return Sequences.withEffect( return Sequences.withEffect(
Sequences.simple(Arrays.asList(value)), Sequences.simple(Arrays.asList(value)),
@ -196,7 +196,7 @@ public class SpecificSegmentQueryRunnerTest
Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows")); Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows"));
Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY);
Assert.assertTrue(missingSegments != null); Assert.assertTrue(missingSegments != null);
Assert.assertTrue(missingSegments instanceof List); Assert.assertTrue(missingSegments instanceof List);

View File

@ -93,7 +93,7 @@ public class TimeBoundaryQueryRunnerTest
.bound(TimeBoundaryQuery.MAX_TIME) .bound(TimeBoundaryQuery.MAX_TIME)
.build(); .build();
Map<String, Object> context = new MapMaker().makeMap(); Map<String, Object> context = new MapMaker().makeMap();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList( Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList(
runner.run(timeBoundaryQuery, context), runner.run(timeBoundaryQuery, context),
Lists.<Result<TimeBoundaryResultValue>>newArrayList() Lists.<Result<TimeBoundaryResultValue>>newArrayList()
@ -115,7 +115,7 @@ public class TimeBoundaryQueryRunnerTest
.bound(TimeBoundaryQuery.MIN_TIME) .bound(TimeBoundaryQuery.MIN_TIME)
.build(); .build();
Map<String, Object> context = new MapMaker().makeMap(); Map<String, Object> context = new MapMaker().makeMap();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList( Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList(
runner.run(timeBoundaryQuery, context), runner.run(timeBoundaryQuery, context),
Lists.<Result<TimeBoundaryResultValue>>newArrayList() Lists.<Result<TimeBoundaryResultValue>>newArrayList()

View File

@ -146,7 +146,8 @@ public class TimeSeriesUnionQueryRunnerTest
{ {
@Override @Override
public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> query, public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> query,
Map<String, Object> context) Map<String, Object> responseContext
)
{ {
if (query.getDataSource().equals(new TableDataSource("ds1"))) { if (query.getDataSource().equals(new TableDataSource("ds1"))) {
return Sequences.simple( return Sequences.simple(

View File

@ -117,7 +117,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context) public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{ {
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query); final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final CacheStrategy<T, Object, Query<T>> strategy = toolChest.getCacheStrategy(query); final CacheStrategy<T, Object, Query<T>> strategy = toolChest.getCacheStrategy(query);
@ -319,13 +319,13 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
List<Interval> intervals = segmentSpec.getIntervals(); List<Interval> intervals = segmentSpec.getIntervals();
if (!server.isAssignable() || !populateCache || isBySegment) { if (!server.isAssignable() || !populateCache || isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), context); resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext);
} else { } else {
// this could be more efficient, since we only need to reorder results // this could be more efficient, since we only need to reorder results
// for batches of segments with the same segment start time. // for batches of segments with the same segment start time.
resultSeqToAdd = toolChest.mergeSequencesUnordered( resultSeqToAdd = toolChest.mergeSequencesUnordered(
Sequences.map( Sequences.map(
clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec), context), clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec), responseContext),
new Function<Object, Sequence<T>>() new Function<Object, Sequence<T>>()
{ {
private final Function<T, Object> cacheFn = strategy.prepareForCache(); private final Function<T, Object> cacheFn = strategy.prepareForCache();

View File

@ -73,7 +73,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(Query<T> query, Map<String, Object> context) public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{ {
final CacheStrategy strategy = toolChest.getCacheStrategy(query); final CacheStrategy strategy = toolChest.getCacheStrategy(query);
@ -143,7 +143,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
return Sequences.withEffect( return Sequences.withEffect(
Sequences.map( Sequences.map(
base.run(query, context), base.run(query, responseContext),
new Function<T, T>() new Function<T, T>()
{ {
@Override @Override
@ -165,7 +165,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
MoreExecutors.sameThreadExecutor() MoreExecutors.sameThreadExecutor()
); );
} else { } else {
return base.run(query, context); return base.run(query, responseContext);
} }
} }

View File

@ -54,6 +54,7 @@ public class LocalFirehoseFactory implements FirehoseFactory<StringInputRowParse
public LocalFirehoseFactory( public LocalFirehoseFactory(
@JsonProperty("baseDir") File baseDir, @JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter, @JsonProperty("filter") String filter,
// Backwards compatible
@JsonProperty("parser") StringInputRowParser parser @JsonProperty("parser") StringInputRowParser parser
) )
{ {

View File

@ -87,7 +87,7 @@ public class BridgeQuerySegmentWalker implements QuerySegmentWalker
return new QueryRunner<T>() return new QueryRunner<T>()
{ {
@Override @Override
public Sequence<T> run(Query<T> query, Map<String, Object> context) public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{ {
try { try {
Server instance = brokerSelector.pick(); Server instance = brokerSelector.pick();

View File

@ -21,7 +21,6 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -48,6 +47,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.ReferenceCountingSegmentQueryRunner; import io.druid.query.ReferenceCountingSegmentQueryRunner;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource; import io.druid.query.TableDataSource;
import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentQueryRunner;
@ -261,7 +261,7 @@ public class ServerManager implements QuerySegmentWalker
return new NoopQueryRunner<T>(); return new NoopQueryRunner<T>();
} }
FunctionalIterable<QueryRunner<T>> adapters = FunctionalIterable FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
.create(intervals) .create(intervals)
.transformCat( .transformCat(
new Function<Interval, Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>>>() new Function<Interval, Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>>>()
@ -278,7 +278,8 @@ public class ServerManager implements QuerySegmentWalker
{ {
@Override @Override
public Iterable<QueryRunner<T>> apply( public Iterable<QueryRunner<T>> apply(
@Nullable final TimelineObjectHolder<String, ReferenceCountingSegment> holder @Nullable
final TimelineObjectHolder<String, ReferenceCountingSegment> holder
) )
{ {
if (holder == null) { if (holder == null) {
@ -306,17 +307,15 @@ public class ServerManager implements QuerySegmentWalker
); );
} }
} }
) );
.filter(Predicates.<QueryRunner<T>>notNull());
} }
} }
)
.filter(
Predicates.<QueryRunner<T>>notNull()
); );
return new FinalizeResultsQueryRunner<T>(
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
);
} }
private String getDataSourceName(DataSource dataSource) private String getDataSourceName(DataSource dataSource)
@ -339,13 +338,15 @@ public class ServerManager implements QuerySegmentWalker
String dataSourceName = getDataSourceName(query.getDataSource()); String dataSourceName = getDataSourceName(query.getDataSource());
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName); final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(
dataSourceName
);
if (timeline == null) { if (timeline == null) {
return new NoopQueryRunner<T>(); return new NoopQueryRunner<T>();
} }
FunctionalIterable<QueryRunner<T>> adapters = FunctionalIterable FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
.create(specs) .create(specs)
.transformCat( .transformCat(
new Function<SegmentDescriptor, Iterable<QueryRunner<T>>>() new Function<SegmentDescriptor, Iterable<QueryRunner<T>>>()
@ -359,12 +360,12 @@ public class ServerManager implements QuerySegmentWalker
); );
if (entry == null) { if (entry == null) {
return null; return Arrays.<QueryRunner<T>>asList(new ReportTimelineMissingSegmentQueryRunner<T>(input));
} }
final PartitionChunk<ReferenceCountingSegment> chunk = entry.getChunk(input.getPartitionNumber()); final PartitionChunk<ReferenceCountingSegment> chunk = entry.getChunk(input.getPartitionNumber());
if (chunk == null) { if (chunk == null) {
return null; return Arrays.<QueryRunner<T>>asList(new ReportTimelineMissingSegmentQueryRunner<T>(input));
} }
final ReferenceCountingSegment adapter = chunk.getObject(); final ReferenceCountingSegment adapter = chunk.getObject();
@ -373,12 +374,12 @@ public class ServerManager implements QuerySegmentWalker
); );
} }
} }
)
.filter(
Predicates.<QueryRunner<T>>notNull()
); );
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); return new FinalizeResultsQueryRunner<T>(
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
);
} }
private <T> QueryRunner<T> buildAndDecorateQueryRunner( private <T> QueryRunner<T> buildAndDecorateQueryRunner(

View File

@ -124,7 +124,7 @@ public class CachingQueryRunnerTest
new QueryRunner() new QueryRunner()
{ {
@Override @Override
public Sequence run(Query query, Map context) public Sequence run(Query query, Map responseContext)
{ {
return resultSeq; return resultSeq;
} }
@ -214,7 +214,7 @@ public class CachingQueryRunnerTest
new QueryRunner() new QueryRunner()
{ {
@Override @Override
public Sequence run(Query query, Map context) public Sequence run(Query query, Map responseContext)
{ {
return Sequences.empty(); return Sequences.empty();
} }

View File

@ -685,9 +685,9 @@ public class ServerManagerTest
} }
@Override @Override
public Sequence<T> run(Query<T> query, Map<String, Object> context) public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{ {
return new BlockingSequence<T>(runner.run(query, context), waitLatch, waitYieldLatch, notifyLatch); return new BlockingSequence<T>(runner.run(query, responseContext), waitLatch, waitYieldLatch, notifyLatch);
} }
} }