mirror of https://github.com/apache/druid.git
add retry feature on broker side; modify QueryRunner inferface and tests
This commit is contained in:
parent
ada2d92c16
commit
00856f0fec
2
pom.xml
2
pom.xml
|
@ -244,7 +244,7 @@
|
|||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||
<artifactId>jackson-datatype-joda</artifactId>
|
||||
<version>2.2.3</version>
|
||||
<version>2.4.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||
|
|
|
@ -70,14 +70,14 @@ public abstract class BaseQuery<T> implements Query<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(QuerySegmentWalker walker)
|
||||
public Sequence<T> run(QuerySegmentWalker walker, Map<String, List> metadata)
|
||||
{
|
||||
return run(querySegmentSpec.lookup(this, walker));
|
||||
return run(querySegmentSpec.lookup(this, walker), metadata);
|
||||
}
|
||||
|
||||
public Sequence<T> run(QueryRunner<T> runner)
|
||||
public Sequence<T> run(QueryRunner<T> runner, Map<String, List> metadata)
|
||||
{
|
||||
return runner.run(this);
|
||||
return runner.run(this, metadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -29,6 +29,7 @@ import com.metamx.common.guava.YieldingAccumulator;
|
|||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -51,10 +52,10 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
|
|||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
public Sequence<T> run(final Query<T> query, Map<String, List> metadata)
|
||||
{
|
||||
if (query.getContextBySegment(false)) {
|
||||
final Sequence<T> baseSequence = base.run(query);
|
||||
final Sequence<T> baseSequence = base.run(query, metadata);
|
||||
return new Sequence<T>()
|
||||
{
|
||||
@Override
|
||||
|
@ -96,7 +97,6 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
return base.run(query);
|
||||
return base.run(query, metadata);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,9 @@ package io.druid.query;
|
|||
|
||||
import com.metamx.common.guava.Sequence;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
|
||||
|
@ -35,14 +38,14 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(Query<T> query)
|
||||
public Sequence<T> run(Query<T> query, Map<String, List> metadata)
|
||||
{
|
||||
if (query.getContextBySegment(false)) {
|
||||
return baseRunner.run(query);
|
||||
return baseRunner.run(query, metadata);
|
||||
}
|
||||
|
||||
return doRun(baseRunner, query);
|
||||
return doRun(baseRunner, query, metadata);
|
||||
}
|
||||
|
||||
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query);
|
||||
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, List> metadata);
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import com.metamx.common.logger.Logger;
|
|||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -93,7 +94,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
public Sequence<T> run(final Query<T> query, final Map<String, List> metadata)
|
||||
{
|
||||
final int priority = query.getContextPriority(0);
|
||||
|
||||
|
@ -124,7 +125,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
|
|||
throw new ISE("Input is null?! How is this possible?!");
|
||||
}
|
||||
|
||||
Sequence<T> result = input.run(query);
|
||||
Sequence<T> result = input.run(query, metadata);
|
||||
if (result == null) {
|
||||
throw new ISE("Got a null result! Segments are missing!");
|
||||
}
|
||||
|
|
|
@ -23,6 +23,9 @@ import com.google.common.base.Function;
|
|||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ConcatQueryRunner<T> implements QueryRunner<T>
|
||||
|
@ -36,7 +39,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
public Sequence<T> run(final Query<T> query, final Map<String, List> metadata)
|
||||
{
|
||||
return Sequences.concat(
|
||||
Sequences.map(
|
||||
|
@ -46,7 +49,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
|
|||
@Override
|
||||
public Sequence<T> apply(final QueryRunner<T> input)
|
||||
{
|
||||
return input.run(query);
|
||||
return input.run(query, metadata);
|
||||
}
|
||||
}
|
||||
)
|
||||
|
|
|
@ -28,6 +28,8 @@ import io.druid.query.aggregation.MetricManipulationFn;
|
|||
import io.druid.query.aggregation.MetricManipulatorFns;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -46,7 +48,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
public Sequence<T> run(final Query<T> query, Map<String, List> metadata)
|
||||
{
|
||||
final boolean isBySegment = query.getContextBySegment(false);
|
||||
final boolean shouldFinalize = query.getContextFinalize(true);
|
||||
|
@ -94,7 +96,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
|
|||
|
||||
|
||||
return Sequences.map(
|
||||
baseRunner.run(queryToRun),
|
||||
baseRunner.run(queryToRun, metadata),
|
||||
finalizerFn
|
||||
);
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ import io.druid.segment.incremental.IncrementalIndex;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -86,7 +87,7 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Row> run(final Query<Row> queryParam)
|
||||
public Sequence<Row> run(final Query<Row> queryParam, final Map<String, List> metadata)
|
||||
{
|
||||
|
||||
final GroupByQuery query = (GroupByQuery) queryParam;
|
||||
|
@ -115,7 +116,7 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
|
|||
public Boolean call() throws Exception
|
||||
{
|
||||
try {
|
||||
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
|
||||
input.run(queryParam, metadata).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
|
||||
return true;
|
||||
}
|
||||
catch (QueryInterruptedException e) {
|
||||
|
|
|
@ -33,6 +33,7 @@ import javax.annotation.Nullable;
|
|||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -48,10 +49,10 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
public Sequence<T> run(final Query<T> query, final Map<String, List> metadata)
|
||||
{
|
||||
if (period.getMillis() == 0) {
|
||||
return baseRunner.run(query);
|
||||
return baseRunner.run(query, metadata);
|
||||
}
|
||||
|
||||
return Sequences.concat(
|
||||
|
@ -74,7 +75,8 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
|
|||
public Sequence<T> apply(Interval singleInterval)
|
||||
{
|
||||
return baseRunner.run(
|
||||
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval)))
|
||||
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))),
|
||||
metadata
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ import com.metamx.emitter.service.ServiceEmitter;
|
|||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -66,7 +68,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
public Sequence<T> run(final Query<T> query, final Map<String, List> metadata)
|
||||
{
|
||||
final ServiceMetricEvent.Builder builder = builderFn.apply(query);
|
||||
String queryId = query.getId();
|
||||
|
@ -84,7 +86,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
|||
|
||||
long startTime = System.currentTimeMillis();
|
||||
try {
|
||||
retVal = queryRunner.run(query).accumulate(outType, accumulator);
|
||||
retVal = queryRunner.run(query, metadata).accumulate(outType, accumulator);
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
builder.setUser10("failed");
|
||||
|
@ -114,7 +116,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
|||
|
||||
long startTime = System.currentTimeMillis();
|
||||
try {
|
||||
retVal = queryRunner.run(query).toYielder(initValue, accumulator);
|
||||
retVal = queryRunner.run(query, metadata).toYielder(initValue, accumulator);
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
builder.setUser10("failed");
|
||||
|
|
|
@ -22,12 +22,15 @@ package io.druid.query;
|
|||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class NoopQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
@Override
|
||||
public Sequence<T> run(Query query)
|
||||
public Sequence<T> run(Query<T> query, Map<String, List> metadata)
|
||||
{
|
||||
return Sequences.empty();
|
||||
}
|
||||
|
|
|
@ -62,9 +62,9 @@ public interface Query<T>
|
|||
|
||||
public String getType();
|
||||
|
||||
public Sequence<T> run(QuerySegmentWalker walker);
|
||||
public Sequence<T> run(QuerySegmentWalker walker, Map<String, List> metadata);
|
||||
|
||||
public Sequence<T> run(QueryRunner<T> runner);
|
||||
public Sequence<T> run(QueryRunner<T> runner, Map<String, List> metadata);
|
||||
|
||||
public List<Interval> getIntervals();
|
||||
|
||||
|
|
|
@ -21,9 +21,12 @@ package io.druid.query;
|
|||
|
||||
import com.metamx.common.guava.Sequence;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface QueryRunner<T>
|
||||
{
|
||||
public Sequence<T> run(Query<T> query);
|
||||
}
|
||||
public Sequence<T> run(Query<T> query, Map<String, List> metadata);
|
||||
}
|
|
@ -25,6 +25,8 @@ import com.metamx.common.guava.Sequence;
|
|||
import io.druid.segment.ReferenceCountingSegment;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -43,11 +45,11 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
public Sequence<T> run(final Query<T> query, Map<String, List> metadata)
|
||||
{
|
||||
final Closeable closeable = adapter.increment();
|
||||
try {
|
||||
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query);
|
||||
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, metadata);
|
||||
|
||||
return new ResourceClosingSequence<T>(baseSequence, closeable);
|
||||
}
|
||||
|
|
|
@ -24,6 +24,9 @@ import com.metamx.common.guava.Sequence;
|
|||
import com.metamx.common.guava.nary.BinaryFn;
|
||||
import io.druid.common.guava.CombiningSequence;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRunner<T>
|
||||
|
@ -36,9 +39,9 @@ public abstract class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRu
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query)
|
||||
public Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, List> metadata)
|
||||
{
|
||||
return CombiningSequence.create(baseRunner.run(query), makeOrdering(query), createMergeFn(query));
|
||||
return CombiningSequence.create(baseRunner.run(query, metadata), makeOrdering(query), createMergeFn(query));
|
||||
}
|
||||
|
||||
protected abstract Ordering<T> makeOrdering(Query<T> query);
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.query;
|
||||
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.query.spec.QuerySegmentSpec;
|
||||
import io.druid.query.spec.SpecificSegmentSpec;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class RetryQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
private final QueryRunner<T> baseRunner;
|
||||
private final QueryToolChest<T, Query<T>> toolChest;
|
||||
|
||||
public RetryQueryRunner(QueryRunner<T> baseRunner, QueryToolChest<T, Query<T>> toolChest)
|
||||
{
|
||||
this.baseRunner = baseRunner;
|
||||
this.toolChest = toolChest;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query, Map<String, List> metadata)
|
||||
{
|
||||
Sequence<T> returningSeq = baseRunner.run(query, metadata);
|
||||
|
||||
for (int i = RetryQueryRunnerConfig.numTries(); i > 0; i--) {
|
||||
for (int j = metadata.get("missingSegments").size(); j > 0; j--) {
|
||||
QuerySegmentSpec segmentSpec = new SpecificSegmentSpec((SegmentDescriptor)metadata.get("missingSegments").remove(0));
|
||||
returningSeq = toolChest.mergeSequences(
|
||||
Sequences.simple(
|
||||
Arrays.asList(
|
||||
returningSeq,
|
||||
baseRunner.run(
|
||||
query.withQuerySegmentSpec(segmentSpec),
|
||||
metadata
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return returningSeq;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.query;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class RetryQueryRunnerConfig
|
||||
{
|
||||
@JsonProperty
|
||||
private static int numTries = 1;
|
||||
|
||||
public static int numTries() { return numTries; }
|
||||
}
|
|
@ -23,6 +23,9 @@ package io.druid.query;
|
|||
|
||||
import com.metamx.common.guava.Sequence;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* If there's a subquery, run it instead of the outer query
|
||||
*/
|
||||
|
@ -36,13 +39,13 @@ public class SubqueryQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
public Sequence<T> run(final Query<T> query, Map<String, List> metadata)
|
||||
{
|
||||
DataSource dataSource = query.getDataSource();
|
||||
if (dataSource instanceof QueryDataSource) {
|
||||
return run((Query<T>) ((QueryDataSource) dataSource).getQuery());
|
||||
return run((Query<T>) ((QueryDataSource) dataSource).getQuery(), metadata);
|
||||
} else {
|
||||
return baseRunner.run(query);
|
||||
return baseRunner.run(query, metadata);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,8 @@ import com.google.common.collect.Lists;
|
|||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class UnionQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
|
@ -43,7 +44,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
public Sequence<T> run(final Query<T> query, final Map<String, List> metadata)
|
||||
{
|
||||
DataSource dataSource = query.getDataSource();
|
||||
if (dataSource instanceof UnionDataSource) {
|
||||
|
@ -57,7 +58,8 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
|
|||
public Sequence<T> apply(DataSource singleSource)
|
||||
{
|
||||
return baseRunner.run(
|
||||
query.withDataSource(singleSource)
|
||||
query.withDataSource(singleSource),
|
||||
metadata
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -65,7 +67,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
|
|||
)
|
||||
);
|
||||
} else {
|
||||
return baseRunner.run(query);
|
||||
return baseRunner.run(query, metadata);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
|||
import org.joda.time.Interval;
|
||||
import org.joda.time.Minutes;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -79,18 +80,18 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
return new QueryRunner<Row>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Row> run(Query<Row> input)
|
||||
public Sequence<Row> run(Query<Row> input, Map<String, List> metadata)
|
||||
{
|
||||
if (Boolean.valueOf((String) input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
|
||||
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
|
||||
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner, metadata);
|
||||
} else {
|
||||
return runner.run(input);
|
||||
return runner.run(input, metadata);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
|
||||
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner, Map<String, List> metadata)
|
||||
{
|
||||
|
||||
Sequence<Row> result;
|
||||
|
@ -104,12 +105,12 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
} catch (ClassCastException e) {
|
||||
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
|
||||
}
|
||||
Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner);
|
||||
Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner, metadata);
|
||||
IncrementalIndexStorageAdapter adapter
|
||||
= new IncrementalIndexStorageAdapter(makeIncrementalIndex(subquery, subqueryResult));
|
||||
result = engine.process(query, adapter);
|
||||
} else {
|
||||
result = runner.run(query);
|
||||
result = runner.run(query, metadata);
|
||||
}
|
||||
|
||||
return postAggregate(query, makeIncrementalIndex(query, result));
|
||||
|
|
|
@ -45,6 +45,8 @@ import io.druid.query.QueryWatcher;
|
|||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -100,7 +102,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
|
|||
return new QueryRunner<Row>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Row> run(final Query<Row> query)
|
||||
public Sequence<Row> run(final Query<Row> query, final Map<String, List> metadata)
|
||||
{
|
||||
|
||||
ListenableFuture<Sequence<Row>> future = queryExecutor.submit(
|
||||
|
@ -110,7 +112,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
|
|||
public Sequence<Row> call() throws Exception
|
||||
{
|
||||
return new ExecutorExecutingSequence<Row>(
|
||||
input.run(query),
|
||||
input.run(query, metadata),
|
||||
queryExecutor
|
||||
);
|
||||
}
|
||||
|
@ -166,7 +168,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Row> run(Query<Row> input)
|
||||
public Sequence<Row> run(Query<Row> input, Map<String, List> metadata)
|
||||
{
|
||||
if (!(input instanceof GroupByQuery)) {
|
||||
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class);
|
||||
|
|
|
@ -45,6 +45,7 @@ import io.druid.segment.QueryableIndex;
|
|||
import io.druid.segment.Segment;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -74,7 +75,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
|
|||
return new QueryRunner<SegmentAnalysis>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ)
|
||||
public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, List> metadata)
|
||||
{
|
||||
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
|
||||
|
||||
|
@ -133,7 +134,10 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
|
|||
return new QueryRunner<SegmentAnalysis>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query)
|
||||
public Sequence<SegmentAnalysis> run(
|
||||
final Query<SegmentAnalysis> query,
|
||||
final Map<String, List> metadata
|
||||
)
|
||||
{
|
||||
final int priority = query.getContextPriority(0);
|
||||
ListenableFuture<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
|
||||
|
@ -142,7 +146,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
|
|||
@Override
|
||||
public Sequence<SegmentAnalysis> call() throws Exception
|
||||
{
|
||||
return input.run(query);
|
||||
return input.run(query, metadata);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -284,7 +284,10 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<SearchResultValue>> run(Query<Result<SearchResultValue>> input)
|
||||
public Sequence<Result<SearchResultValue>> run(
|
||||
Query<Result<SearchResultValue>> input,
|
||||
Map<String, List> metadata
|
||||
)
|
||||
{
|
||||
if (!(input instanceof SearchQuery)) {
|
||||
throw new ISE("Can only handle [%s], got [%s]", SearchQuery.class, input.getClass());
|
||||
|
@ -292,13 +295,13 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
|
|||
|
||||
final SearchQuery query = (SearchQuery) input;
|
||||
if (query.getLimit() < config.getMaxSearchLimit()) {
|
||||
return runner.run(query);
|
||||
return runner.run(query, metadata);
|
||||
}
|
||||
|
||||
final boolean isBySegment = query.getContextBySegment(false);
|
||||
|
||||
return Sequences.map(
|
||||
runner.run(query.withLimit(config.getMaxSearchLimit())),
|
||||
runner.run(query.withLimit(config.getMaxSearchLimit()), metadata),
|
||||
new Function<Result<SearchResultValue>, Result<SearchResultValue>>()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.query.search;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -52,7 +51,6 @@ import io.druid.segment.filter.Filters;
|
|||
import it.uniroma3.mat.extendedset.intset.ConciseSet;
|
||||
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeSet;
|
||||
|
@ -71,7 +69,10 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<SearchResultValue>> run(final Query<Result<SearchResultValue>> input)
|
||||
public Sequence<Result<SearchResultValue>> run(
|
||||
final Query<Result<SearchResultValue>> input,
|
||||
Map<String, List> metadata
|
||||
)
|
||||
{
|
||||
if (!(input instanceof SearchQuery)) {
|
||||
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SearchQuery.class);
|
||||
|
|
|
@ -19,14 +19,11 @@
|
|||
|
||||
package io.druid.query.select;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import io.druid.query.ChainedExecutionQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryConfig;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryToolChest;
|
||||
|
@ -34,6 +31,8 @@ import io.druid.query.QueryWatcher;
|
|||
import io.druid.query.Result;
|
||||
import io.druid.segment.Segment;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
|
@ -91,7 +90,10 @@ public class SelectQueryRunnerFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<SelectResultValue>> run(Query<Result<SelectResultValue>> input)
|
||||
public Sequence<Result<SelectResultValue>> run(
|
||||
Query<Result<SelectResultValue>> input,
|
||||
Map<String, List> metadata
|
||||
)
|
||||
{
|
||||
if (!(input instanceof SelectQuery)) {
|
||||
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SelectQuery.class);
|
||||
|
|
|
@ -22,12 +22,16 @@ package io.druid.query.spec;
|
|||
import com.google.common.base.Throwables;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.guava.Yielder;
|
||||
import com.metamx.common.guava.YieldingAccumulator;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.segment.NullStorageAdapterException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
|
@ -47,7 +51,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> input)
|
||||
public Sequence<T> run(final Query<T> input, final Map<String, List> metadata)
|
||||
{
|
||||
final Query<T> query = input.withQuerySegmentSpec(specificSpec);
|
||||
|
||||
|
@ -60,7 +64,14 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
|
|||
@Override
|
||||
public Sequence<T> call() throws Exception
|
||||
{
|
||||
return base.run(query);
|
||||
Sequence<T> returningSeq;
|
||||
try {
|
||||
returningSeq = base.run(query, metadata);
|
||||
} catch (NullStorageAdapterException e) {
|
||||
metadata.get("missingSegments").add(((SpecificSegmentSpec) specificSpec).getDescriptor());
|
||||
returningSeq = Sequences.empty();
|
||||
}
|
||||
return returningSeq;
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -52,6 +52,8 @@ public class SpecificSegmentSpec implements QuerySegmentSpec
|
|||
return walker.getQueryRunnerForSegments(query, Arrays.asList(descriptor));
|
||||
}
|
||||
|
||||
public SegmentDescriptor getDescriptor() { return descriptor; }
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.joda.time.DateTime;
|
|||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -95,13 +96,13 @@ public class TimeBoundaryQueryQueryToolChest
|
|||
{
|
||||
@Override
|
||||
protected Sequence<Result<TimeBoundaryResultValue>> doRun(
|
||||
QueryRunner<Result<TimeBoundaryResultValue>> baseRunner, Query<Result<TimeBoundaryResultValue>> input
|
||||
QueryRunner<Result<TimeBoundaryResultValue>> baseRunner, Query<Result<TimeBoundaryResultValue>> input, Map<String, List> metadata
|
||||
)
|
||||
{
|
||||
TimeBoundaryQuery query = (TimeBoundaryQuery) input;
|
||||
return Sequences.simple(
|
||||
query.mergeResults(
|
||||
Sequences.toList(baseRunner.run(query), Lists.<Result<TimeBoundaryResultValue>>newArrayList())
|
||||
Sequences.toList(baseRunner.run(query, metadata), Lists.<Result<TimeBoundaryResultValue>>newArrayList())
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -34,6 +34,8 @@ import io.druid.segment.Segment;
|
|||
import io.druid.segment.StorageAdapter;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
|
@ -82,7 +84,10 @@ public class TimeBoundaryQueryRunnerFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<TimeBoundaryResultValue>> run(Query<Result<TimeBoundaryResultValue>> input)
|
||||
public Sequence<Result<TimeBoundaryResultValue>> run(
|
||||
Query<Result<TimeBoundaryResultValue>> input,
|
||||
Map<String, List> metadata
|
||||
)
|
||||
{
|
||||
if (!(input instanceof TimeBoundaryQuery)) {
|
||||
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeBoundaryQuery.class);
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.query.timeseries;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import io.druid.query.QueryRunnerHelper;
|
||||
import io.druid.query.Result;
|
||||
|
@ -28,6 +27,7 @@ import io.druid.query.aggregation.Aggregator;
|
|||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.NullStorageAdapterException;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.filter.Filters;
|
||||
|
||||
|
@ -40,7 +40,7 @@ public class TimeseriesQueryEngine
|
|||
public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter)
|
||||
{
|
||||
if (adapter == null) {
|
||||
throw new ISE(
|
||||
throw new NullStorageAdapterException(
|
||||
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
|
||||
);
|
||||
}
|
||||
|
|
|
@ -19,13 +19,11 @@
|
|||
|
||||
package io.druid.query.timeseries;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import io.druid.query.ChainedExecutionQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryConfig;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryToolChest;
|
||||
|
@ -34,6 +32,8 @@ import io.druid.query.Result;
|
|||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
|
@ -91,7 +91,10 @@ public class TimeseriesQueryRunnerFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> input)
|
||||
public Sequence<Result<TimeseriesResultValue>> run(
|
||||
Query<Result<TimeseriesResultValue>> input,
|
||||
Map<String, List> metadata
|
||||
)
|
||||
{
|
||||
if (!(input instanceof TimeseriesQuery)) {
|
||||
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeseriesQuery.class);
|
||||
|
|
|
@ -22,8 +22,6 @@ package io.druid.query.topn;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
@ -34,11 +32,11 @@ import io.druid.query.aggregation.AggregatorFactory;
|
|||
import io.druid.query.filter.Filter;
|
||||
import io.druid.segment.Capabilities;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.NullStorageAdapterException;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.sql.rowset.Predicate;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -58,7 +56,7 @@ public class TopNQueryEngine
|
|||
public Sequence<Result<TopNResultValue>> query(final TopNQuery query, final StorageAdapter adapter)
|
||||
{
|
||||
if (adapter == null) {
|
||||
throw new ISE(
|
||||
throw new NullStorageAdapterException(
|
||||
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
|
||||
);
|
||||
}
|
||||
|
|
|
@ -410,7 +410,10 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<TopNResultValue>> run(Query<Result<TopNResultValue>> input)
|
||||
public Sequence<Result<TopNResultValue>> run(
|
||||
Query<Result<TopNResultValue>> input,
|
||||
Map<String, List> metadata
|
||||
)
|
||||
{
|
||||
if (!(input instanceof TopNQuery)) {
|
||||
throw new ISE("Can only handle [%s], got [%s]", TopNQuery.class, input.getClass());
|
||||
|
@ -418,13 +421,13 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
|
||||
final TopNQuery query = (TopNQuery) input;
|
||||
if (query.getThreshold() > minTopNThreshold) {
|
||||
return runner.run(query);
|
||||
return runner.run(query, metadata);
|
||||
}
|
||||
|
||||
final boolean isBySegment = query.getContextBySegment(false);
|
||||
|
||||
return Sequences.map(
|
||||
runner.run(query.withThreshold(minTopNThreshold)),
|
||||
runner.run(query.withThreshold(minTopNThreshold), metadata),
|
||||
new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -21,7 +21,6 @@ package io.druid.query.topn;
|
|||
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.BaseSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import io.druid.collections.StupidPool;
|
||||
import io.druid.guice.annotations.Global;
|
||||
|
@ -35,7 +34,8 @@ import io.druid.query.Result;
|
|||
import io.druid.segment.Segment;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
|
@ -65,7 +65,10 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
|
|||
return new QueryRunner<Result<TopNResultValue>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Result<TopNResultValue>> run(Query<Result<TopNResultValue>> input)
|
||||
public Sequence<Result<TopNResultValue>> run(
|
||||
Query<Result<TopNResultValue>> input,
|
||||
Map<String, List> metadata
|
||||
)
|
||||
{
|
||||
if (!(input instanceof TopNQuery)) {
|
||||
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TopNQuery.class);
|
||||
|
|
|
@ -31,6 +31,7 @@ public class IncrementalIndexSegment implements Segment
|
|||
{
|
||||
private final IncrementalIndex index;
|
||||
private final String segmentIdentifier;
|
||||
private boolean nullStorage = false;
|
||||
|
||||
public IncrementalIndexSegment(
|
||||
IncrementalIndex index,
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.segment;
|
||||
|
||||
public class NullStorageAdapterException extends IllegalStateException
|
||||
{
|
||||
public NullStorageAdapterException(String formatText, Object... arguments) {
|
||||
super(String.format(formatText, arguments));
|
||||
}
|
||||
}
|
|
@ -29,6 +29,7 @@ public class QueryableIndexSegment implements Segment
|
|||
{
|
||||
private final QueryableIndex index;
|
||||
private final String identifier;
|
||||
private boolean nullStorage = false;
|
||||
|
||||
public QueryableIndexSegment(final String segmentIdentifier, QueryableIndex index)
|
||||
{
|
||||
|
|
|
@ -35,6 +35,9 @@ import org.easymock.IAnswer;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -99,13 +102,14 @@ public class ChainedExecutionQueryRunnerTest
|
|||
runner3
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
final Sequence seq = chainedRunner.run(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("test")
|
||||
.intervals("2014/2015")
|
||||
.aggregators(Lists.<AggregatorFactory>newArrayList(new CountAggregatorFactory("count")))
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
);
|
||||
|
||||
Future resultFuture = Executors.newFixedThreadPool(1).submit(
|
||||
|
@ -202,14 +206,15 @@ public class ChainedExecutionQueryRunnerTest
|
|||
runner3
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
final Sequence seq = chainedRunner.run(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("test")
|
||||
.intervals("2014/2015")
|
||||
.aggregators(Lists.<AggregatorFactory>newArrayList(new CountAggregatorFactory("count")))
|
||||
.context(ImmutableMap.<String, Object>of("timeout", (100), "queryId", "test"))
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
);
|
||||
|
||||
Future resultFuture = Executors.newFixedThreadPool(1).submit(
|
||||
|
@ -263,7 +268,7 @@ public class ChainedExecutionQueryRunnerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Integer> run(Query<Integer> query)
|
||||
public Sequence<Integer> run(Query<Integer> query, Map<String, List> metadata)
|
||||
{
|
||||
hasStarted = true;
|
||||
latch.countDown();
|
||||
|
|
|
@ -74,6 +74,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -343,7 +344,7 @@ public class GroupByQueryRunnerTest
|
|||
new QueryRunner<Row>()
|
||||
{
|
||||
@Override
|
||||
public Sequence run(Query<Row> query)
|
||||
public Sequence run(Query<Row> query, Map<String, List> metadata)
|
||||
{
|
||||
// simulate two daily segments
|
||||
final Query query1 = query.withQuerySegmentSpec(
|
||||
|
@ -352,7 +353,7 @@ public class GroupByQueryRunnerTest
|
|||
final Query query2 = query.withQuerySegmentSpec(
|
||||
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
|
||||
);
|
||||
return Sequences.concat(runner.run(query1), runner.run(query2));
|
||||
return Sequences.concat(runner.run(query1, metadata), runner.run(query2, metadata));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -369,8 +370,9 @@ public class GroupByQueryRunnerTest
|
|||
createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct");
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery, metadata), "direct");
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery, metadata), "merged");
|
||||
|
||||
List<Row> allGranExpectedResults = Arrays.asList(
|
||||
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L),
|
||||
|
@ -384,8 +386,8 @@ public class GroupByQueryRunnerTest
|
|||
createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(allGranExpectedResults, runner.run(allGranQuery), "direct");
|
||||
TestHelper.assertExpectedObjects(allGranExpectedResults, mergedRunner.run(allGranQuery), "merged");
|
||||
TestHelper.assertExpectedObjects(allGranExpectedResults, runner.run(allGranQuery, metadata), "direct");
|
||||
TestHelper.assertExpectedObjects(allGranExpectedResults, mergedRunner.run(allGranQuery, metadata), "merged");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -427,9 +429,9 @@ public class GroupByQueryRunnerTest
|
|||
);
|
||||
|
||||
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedObjects(
|
||||
Iterables.limit(expectedResults, limit), mergeRunner.run(fullQuery), String.format("limit: %d", limit)
|
||||
Iterables.limit(expectedResults, limit), mergeRunner.run(fullQuery, metadata), String.format("limit: %d", limit)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -535,7 +537,7 @@ public class GroupByQueryRunnerTest
|
|||
new QueryRunner<Row>()
|
||||
{
|
||||
@Override
|
||||
public Sequence run(Query<Row> query)
|
||||
public Sequence run(Query<Row> query, Map<String, List> metadata)
|
||||
{
|
||||
// simulate two daily segments
|
||||
final Query query1 = query.withQuerySegmentSpec(
|
||||
|
@ -544,12 +546,12 @@ public class GroupByQueryRunnerTest
|
|||
final Query query2 = query.withQuerySegmentSpec(
|
||||
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
|
||||
);
|
||||
return Sequences.concat(runner.run(query1), runner.run(query2));
|
||||
return Sequences.concat(runner.run(query1, metadata), runner.run(query2, metadata));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery, metadata), "merged");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -585,10 +587,11 @@ public class GroupByQueryRunnerTest
|
|||
);
|
||||
|
||||
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, metadata), "no-limit");
|
||||
|
||||
TestHelper.assertExpectedObjects(
|
||||
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build()), "limited"
|
||||
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), metadata), "limited"
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -625,9 +628,10 @@ public class GroupByQueryRunnerTest
|
|||
);
|
||||
|
||||
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, metadata), "no-limit");
|
||||
TestHelper.assertExpectedObjects(
|
||||
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build()), "limited"
|
||||
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), metadata), "limited"
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -664,9 +668,10 @@ public class GroupByQueryRunnerTest
|
|||
);
|
||||
|
||||
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, metadata), "no-limit");
|
||||
TestHelper.assertExpectedObjects(
|
||||
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build()), "limited"
|
||||
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), metadata), "limited"
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -706,7 +711,7 @@ public class GroupByQueryRunnerTest
|
|||
new QueryRunner<Row>()
|
||||
{
|
||||
@Override
|
||||
public Sequence run(Query<Row> query)
|
||||
public Sequence run(Query<Row> query, Map<String, List> metadata)
|
||||
{
|
||||
// simulate two daily segments
|
||||
final Query query1 = query.withQuerySegmentSpec(
|
||||
|
@ -715,12 +720,13 @@ public class GroupByQueryRunnerTest
|
|||
final Query query2 = query.withQuerySegmentSpec(
|
||||
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
|
||||
);
|
||||
return Sequences.concat(runner.run(query1), runner.run(query2));
|
||||
return Sequences.concat(runner.run(query1, metadata), runner.run(query2, metadata));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery, metadata), "merged");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -759,8 +765,9 @@ public class GroupByQueryRunnerTest
|
|||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner);
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, metadata), "no-limit");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -792,8 +799,8 @@ public class GroupByQueryRunnerTest
|
|||
createExpectedRow("2011-04-01", "quality", "technology", "rows", 2L),
|
||||
createExpectedRow("2011-04-01", "quality", "travel", "rows", 2L)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(expectedResults, runner.run(query), "normal");
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedObjects(expectedResults, runner.run(query, metadata), "normal");
|
||||
final GroupByQueryEngine engine = new GroupByQueryEngine(
|
||||
configSupplier,
|
||||
new StupidPool<ByteBuffer>(
|
||||
|
@ -809,7 +816,7 @@ public class GroupByQueryRunnerTest
|
|||
);
|
||||
|
||||
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner);
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, metadata), "no-limit");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -842,7 +849,8 @@ public class GroupByQueryRunnerTest
|
|||
createExpectedRow("2011-04-01", "quality", "travel", "rows", 2L)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(expectedResults, runner.run(query), "normal");
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedObjects(expectedResults, runner.run(query, metadata), "normal");
|
||||
final GroupByQueryEngine engine = new GroupByQueryEngine(
|
||||
configSupplier,
|
||||
new StupidPool<ByteBuffer>(
|
||||
|
@ -858,7 +866,7 @@ public class GroupByQueryRunnerTest
|
|||
);
|
||||
|
||||
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner);
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, metadata), "no-limit");
|
||||
}
|
||||
|
||||
// A subquery identical to the query should yield identical results
|
||||
|
@ -1038,7 +1046,8 @@ public class GroupByQueryRunnerTest
|
|||
toolChest
|
||||
);
|
||||
|
||||
Sequence<Row> queryResult = theRunner.run(query);
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Sequence<Row> queryResult = theRunner.run(query, metadata);
|
||||
return Sequences.toList(queryResult, Lists.<Row>newArrayList());
|
||||
}
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -91,7 +92,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
|
|||
QueryRunner timeseriesRunner = new QueryRunner()
|
||||
{
|
||||
@Override
|
||||
public Sequence run(Query query)
|
||||
public Sequence run(Query query, Map metadata)
|
||||
{
|
||||
TimeseriesQuery tsQuery = (TimeseriesQuery) query;
|
||||
|
||||
|
@ -104,7 +105,8 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
|
|||
.setDimFilter(tsQuery.getDimensionsFilter())
|
||||
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
|
||||
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
),
|
||||
new Function<Row, Result<TimeseriesResultValue>>()
|
||||
{
|
||||
|
|
|
@ -25,7 +25,6 @@ import io.druid.query.LegacyDataSource;
|
|||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.QueryWatcher;
|
||||
import io.druid.query.metadata.metadata.ColumnAnalysis;
|
||||
import io.druid.query.metadata.metadata.SegmentAnalysis;
|
||||
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
|
||||
|
@ -38,6 +37,7 @@ import io.druid.segment.column.ValueType;
|
|||
import junit.framework.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -102,6 +102,7 @@ public class SegmentAnalyzerTest
|
|||
final SegmentMetadataQuery query = new SegmentMetadataQuery(
|
||||
new LegacyDataSource("test"), QuerySegmentSpecs.create("2011/2012"), null, null, null
|
||||
);
|
||||
return Sequences.toList(query.run(runner), Lists.<SegmentAnalysis>newArrayList());
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
return Sequences.toList(query.run(runner, metadata), Lists.<SegmentAnalysis>newArrayList());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,8 @@ import org.junit.Assert;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
public class SegmentMetadataQueryTest
|
||||
{
|
||||
|
@ -70,9 +72,9 @@ public class SegmentMetadataQueryTest
|
|||
.toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
|
||||
.merge(true)
|
||||
.build();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<SegmentAnalysis> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<SegmentAnalysis>newArrayList()
|
||||
);
|
||||
SegmentAnalysis val = results.iterator().next();
|
||||
|
|
|
@ -23,13 +23,10 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.QueryWatcher;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.search.search.FragmentSearchQuerySpec;
|
||||
|
@ -47,6 +44,7 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -371,8 +369,9 @@ public class SearchQueryRunnerTest
|
|||
|
||||
private void checkSearchQuery(SearchQuery searchQuery, Map<String, Set<String>> expectedResults)
|
||||
{
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<SearchResultValue>> results = Sequences.toList(
|
||||
runner.run(searchQuery),
|
||||
runner.run(searchQuery, metadata),
|
||||
Lists.<Result<SearchResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
|
|
|
@ -22,15 +22,12 @@ package io.druid.query.select;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryConfig;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.QueryWatcher;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
|
@ -45,6 +42,7 @@ import org.junit.runners.Parameterized;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -90,9 +88,9 @@ public class SelectQueryRunnerTest
|
|||
new PagingSpec(null, 3),
|
||||
null
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<SelectResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<SelectResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
|
@ -159,9 +157,9 @@ public class SelectQueryRunnerTest
|
|||
new PagingSpec(null, 3),
|
||||
null
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<SelectResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<SelectResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
|
@ -219,9 +217,9 @@ public class SelectQueryRunnerTest
|
|||
new PagingSpec(Maps.newLinkedHashMap(ImmutableMap.of(QueryRunnerTestHelper.segmentId, 3)), 3),
|
||||
null
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<SelectResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<SelectResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
|
@ -279,9 +277,9 @@ public class SelectQueryRunnerTest
|
|||
new PagingSpec(Maps.newLinkedHashMap(ImmutableMap.of(QueryRunnerTestHelper.segmentId, 3)), 3),
|
||||
null
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<SelectResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<SelectResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
|
|
|
@ -20,13 +20,10 @@
|
|||
package io.druid.query.timeboundary;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.QueryWatcher;
|
||||
import io.druid.query.Result;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Assert;
|
||||
|
@ -36,6 +33,8 @@ import org.junit.runners.Parameterized;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -66,9 +65,9 @@ public class TimeBoundaryQueryRunnerTest
|
|||
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
|
||||
.dataSource("testing")
|
||||
.build();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList(
|
||||
runner.run(timeBoundaryQuery),
|
||||
runner.run(timeBoundaryQuery, metadata),
|
||||
Lists.<Result<TimeBoundaryResultValue>>newArrayList()
|
||||
);
|
||||
TimeBoundaryResultValue val = results.iterator().next().getValue();
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.junit.runners.Parameterized;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
|
@ -97,9 +98,9 @@ public class TimeSeriesUnionQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.joda.time.DateTime;
|
|||
import org.joda.time.Interval;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
public class TimeseriesQueryRunnerBonusTest
|
||||
|
@ -110,9 +111,9 @@ public class TimeseriesQueryRunnerBonusTest
|
|||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
return Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.junit.runners.Parameterized;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -100,9 +101,10 @@ public class TimeseriesQueryRunnerTest
|
|||
|
||||
DateTime expectedEarliest = new DateTime("2011-01-12");
|
||||
DateTime expectedLast = new DateTime("2011-04-15");
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
|
@ -166,8 +168,9 @@ public class TimeseriesQueryRunnerTest
|
|||
DateTime expectedEarliest = new DateTime("2011-01-12");
|
||||
DateTime expectedLast = new DateTime("2011-04-15");
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
Result<TimeseriesResultValue> result = results.iterator().next();
|
||||
|
@ -212,9 +215,9 @@ public class TimeseriesQueryRunnerTest
|
|||
|
||||
DateTime expectedEarliest = new DateTime("2011-01-12");
|
||||
DateTime expectedLast = new DateTime("2011-04-15");
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
|
@ -278,9 +281,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
|
@ -325,9 +328,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
|
@ -367,9 +370,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results1 = Sequences.toList(
|
||||
runner.run(query1),
|
||||
runner.run(query1, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults1, results1);
|
||||
|
@ -406,7 +409,7 @@ public class TimeseriesQueryRunnerTest
|
|||
);
|
||||
|
||||
Iterable<Result<TimeseriesResultValue>> results2 = Sequences.toList(
|
||||
runner.run(query2),
|
||||
runner.run(query2, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults2, results2);
|
||||
|
@ -457,9 +460,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results1 = Sequences.toList(
|
||||
runner.run(query1),
|
||||
runner.run(query1, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults1, results1);
|
||||
|
@ -499,9 +502,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results1 = Sequences.toList(
|
||||
runner.run(query1),
|
||||
runner.run(query1, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults1, results1);
|
||||
|
@ -539,7 +542,7 @@ public class TimeseriesQueryRunnerTest
|
|||
);
|
||||
|
||||
Iterable<Result<TimeseriesResultValue>> results2 = Sequences.toList(
|
||||
runner.run(query2),
|
||||
runner.run(query2, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults2, results2);
|
||||
|
@ -572,9 +575,9 @@ public class TimeseriesQueryRunnerTest
|
|||
.build();
|
||||
|
||||
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -622,9 +625,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -672,9 +675,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -722,9 +725,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -772,9 +775,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -822,9 +825,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -880,9 +883,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -938,9 +941,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -996,9 +999,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -1052,9 +1055,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -1114,9 +1117,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -1158,9 +1161,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -1202,9 +1205,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -1260,9 +1263,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
|
@ -1300,8 +1303,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, actualResults);
|
||||
|
@ -1340,8 +1344,9 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, actualResults);
|
||||
|
@ -1358,7 +1363,7 @@ public class TimeseriesQueryRunnerTest
|
|||
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
.build();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> expectedResults = Sequences.toList(
|
||||
runner.run(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
|
@ -1367,12 +1372,13 @@ public class TimeseriesQueryRunnerTest
|
|||
.intervals(QueryRunnerTestHelper.firstToThird)
|
||||
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, actualResults);
|
||||
|
@ -1389,7 +1395,7 @@ public class TimeseriesQueryRunnerTest
|
|||
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
.build();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> expectedResults = Sequences.toList(
|
||||
runner.run(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
|
@ -1399,12 +1405,13 @@ public class TimeseriesQueryRunnerTest
|
|||
.intervals(QueryRunnerTestHelper.firstToThird)
|
||||
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, actualResults);
|
||||
|
@ -1450,7 +1457,7 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> expectedResults = Sequences.toList(
|
||||
runner.run(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
|
@ -1460,12 +1467,13 @@ public class TimeseriesQueryRunnerTest
|
|||
.intervals(QueryRunnerTestHelper.firstToThird)
|
||||
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, actualResults);
|
||||
|
@ -1513,7 +1521,7 @@ public class TimeseriesQueryRunnerTest
|
|||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Iterable<Result<TimeseriesResultValue>> expectedResults = Sequences.toList(
|
||||
runner.run(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
|
@ -1523,12 +1531,13 @@ public class TimeseriesQueryRunnerTest
|
|||
.intervals(QueryRunnerTestHelper.firstToThird)
|
||||
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
|
||||
runner.run(query),
|
||||
runner.run(query, metadata),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, actualResults);
|
||||
|
|
|
@ -52,6 +52,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -166,8 +167,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -230,8 +231,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
|
||||
|
@ -295,8 +296,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
|
||||
|
@ -345,8 +346,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -394,8 +395,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -443,8 +444,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -485,8 +486,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -520,8 +521,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -569,8 +570,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -622,8 +623,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -664,8 +665,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -682,7 +683,7 @@ public class TopNQueryRunnerTest
|
|||
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
.build();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(
|
||||
Lists.<Result<TopNResultValue>>newArrayList(
|
||||
new Result<TopNResultValue>(
|
||||
|
@ -690,7 +691,7 @@ public class TopNQueryRunnerTest
|
|||
new TopNResultValue(Lists.<Map<String, Object>>newArrayList())
|
||||
)
|
||||
),
|
||||
runner.run(query)
|
||||
runner.run(query, metadata)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -721,7 +722,7 @@ public class TopNQueryRunnerTest
|
|||
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
.build();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(
|
||||
Lists.<Result<TopNResultValue>>newArrayList(
|
||||
new Result<TopNResultValue>(
|
||||
|
@ -729,7 +730,7 @@ public class TopNQueryRunnerTest
|
|||
new TopNResultValue(Lists.<Map<String, Object>>newArrayList())
|
||||
)
|
||||
),
|
||||
runner.run(query)
|
||||
runner.run(query, metadata)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -747,7 +748,7 @@ public class TopNQueryRunnerTest
|
|||
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
.build();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(
|
||||
Sequences.toList(
|
||||
runner.run(
|
||||
|
@ -761,9 +762,10 @@ public class TopNQueryRunnerTest
|
|||
.intervals(QueryRunnerTestHelper.firstToThird)
|
||||
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
), Lists.<Result<TopNResultValue>>newArrayList()
|
||||
), runner.run(query)
|
||||
), runner.run(query, metadata)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -781,7 +783,7 @@ public class TopNQueryRunnerTest
|
|||
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
.build();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(
|
||||
Sequences.toList(
|
||||
runner.run(
|
||||
|
@ -795,10 +797,11 @@ public class TopNQueryRunnerTest
|
|||
.intervals(QueryRunnerTestHelper.firstToThird)
|
||||
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
), Lists.<Result<TopNResultValue>>newArrayList()
|
||||
)
|
||||
, runner.run(query)
|
||||
, runner.run(query, metadata)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -840,8 +843,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -889,8 +892,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -945,8 +948,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -993,8 +996,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1034,8 +1037,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1075,8 +1078,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1116,8 +1119,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1157,8 +1160,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1209,8 +1212,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1261,8 +1264,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1313,8 +1316,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1358,8 +1361,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
|
||||
|
@ -1404,8 +1407,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1449,8 +1452,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1498,8 +1501,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1583,8 +1586,8 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1666,6 +1669,7 @@ public class TopNQueryRunnerTest
|
|||
)
|
||||
)
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -173,8 +174,8 @@ public class TopNUnionQueryTest
|
|||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -61,6 +61,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
|
@ -448,8 +449,8 @@ public class SpatialFilterBonusTest
|
|||
factory.createRunner(segment),
|
||||
factory.getToolchest()
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
@ -535,8 +536,8 @@ public class SpatialFilterBonusTest
|
|||
factory.createRunner(segment),
|
||||
factory.getToolchest()
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
|
|
@ -61,6 +61,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
|
@ -478,8 +479,8 @@ public class SpatialFilterTest
|
|||
factory.createRunner(segment),
|
||||
factory.getToolchest()
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
@ -565,8 +566,8 @@ public class SpatialFilterTest
|
|||
factory.createRunner(segment),
|
||||
factory.getToolchest()
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query, metadata));
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
|
|
@ -115,7 +115,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
public Sequence<T> run(final Query<T> query, final Map<String, List> metadata)
|
||||
{
|
||||
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
|
||||
final CacheStrategy<T, Object, Query<T>> strategy = toolChest.getCacheStrategy(query);
|
||||
|
@ -327,11 +327,11 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
List<Interval> intervals = segmentSpec.getIntervals();
|
||||
|
||||
if (!server.isAssignable() || !populateCache || isBySegment) {
|
||||
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec));
|
||||
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), metadata);
|
||||
} else {
|
||||
resultSeqToAdd = toolChest.mergeSequences(
|
||||
Sequences.map(
|
||||
clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec)),
|
||||
clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec), metadata),
|
||||
new Function<Object, Sequence<T>>()
|
||||
{
|
||||
private final Function<T, Object> cacheFn = strategy.prepareForCache();
|
||||
|
|
|
@ -40,6 +40,7 @@ import io.druid.query.SegmentDescriptor;
|
|||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class CachingQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
|
@ -72,7 +73,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(Query<T> query)
|
||||
public Sequence<T> run(Query<T> query, Map<String, List> metadata)
|
||||
{
|
||||
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
|
||||
|
||||
|
@ -140,7 +141,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
|
|||
|
||||
return Sequences.withEffect(
|
||||
Sequences.map(
|
||||
base.run(query),
|
||||
base.run(query, metadata),
|
||||
new Function<T, T>()
|
||||
{
|
||||
@Override
|
||||
|
@ -162,7 +163,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
|
|||
MoreExecutors.sameThreadExecutor()
|
||||
);
|
||||
} else {
|
||||
return base.run(query);
|
||||
return base.run(query, metadata);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ import io.druid.query.QueryToolChest;
|
|||
import io.druid.query.QueryToolChestWarehouse;
|
||||
import io.druid.query.QueryWatcher;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import io.druid.query.aggregation.MetricManipulatorFns;
|
||||
import org.jboss.netty.handler.codec.http.HttpChunk;
|
||||
import org.jboss.netty.handler.codec.http.HttpHeaders;
|
||||
|
@ -64,6 +65,8 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.net.URL;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -111,7 +114,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
public Sequence<T> run(final Query<T> query, final Map<String, List> metadata)
|
||||
{
|
||||
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
|
||||
boolean isBySegment = query.getContextBySegment(false);
|
||||
|
@ -156,6 +159,20 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
log.debug("Initial response from url[%s]", url);
|
||||
startTime = System.currentTimeMillis();
|
||||
byteCount += response.getContent().readableBytes();
|
||||
|
||||
if (!response.getHeader("Missing-Segments").equals("")) {
|
||||
LinkedList missingSegments = new LinkedList();
|
||||
try {
|
||||
missingSegments = objectMapper.readValue(response.getHeader("Missing-Segments"), LinkedList.class);
|
||||
for (int i = missingSegments.size(); i > 0; i--) {
|
||||
missingSegments.add(objectMapper.convertValue(missingSegments.remove(0), SegmentDescriptor.class));
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
}
|
||||
metadata.get("missingSegments").addAll(missingSegments);
|
||||
}
|
||||
|
||||
return super.handleResponse(response);
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import io.druid.query.QueryRunner;
|
|||
import io.druid.query.QuerySegmentWalker;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.QueryToolChestWarehouse;
|
||||
import io.druid.query.RetryQueryRunner;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import io.druid.query.UnionQueryRunner;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -86,7 +87,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||
return toolChest.makeMetricBuilder(query);
|
||||
}
|
||||
},
|
||||
toolChest.preMergeQueryDecoration(baseClient)
|
||||
toolChest.preMergeQueryDecoration(new RetryQueryRunner<T>(baseClient, toolChest)
|
||||
)
|
||||
).withWaitMeasuredFromNow(),
|
||||
toolChest
|
||||
)
|
||||
|
|
|
@ -22,20 +22,15 @@ package io.druid.server;
|
|||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.ObjectWriter;
|
||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.Accumulators;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.guava.Yielder;
|
||||
import com.metamx.common.guava.YieldingAccumulator;
|
||||
import com.metamx.common.guava.YieldingAccumulators;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
|
@ -51,7 +46,6 @@ import org.joda.time.DateTime;
|
|||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.Path;
|
||||
|
@ -59,12 +53,13 @@ import javax.ws.rs.PathParam;
|
|||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.WebApplicationException;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
|
@ -147,7 +142,9 @@ public class QueryResource
|
|||
log.debug("Got query [%s]", query);
|
||||
}
|
||||
|
||||
Sequence results = query.run(texasRanger);
|
||||
HashMap<String, List> metadata = new HashMap<String, List>();
|
||||
metadata.put("missingSegments", new LinkedList());
|
||||
Sequence results = query.run(texasRanger, metadata);
|
||||
|
||||
if (results == null) {
|
||||
results = Sequences.empty();
|
||||
|
@ -167,6 +164,12 @@ public class QueryResource
|
|||
}
|
||||
)
|
||||
) {
|
||||
|
||||
String missingSegments = "";
|
||||
if (!metadata.get("missingSegments").isEmpty()) {
|
||||
missingSegments = jsonMapper.writeValueAsString(metadata.get("missingSegments"));
|
||||
}
|
||||
|
||||
long requestTime = System.currentTimeMillis() - start;
|
||||
|
||||
emitter.emit(
|
||||
|
@ -209,6 +212,7 @@ public class QueryResource
|
|||
isSmile ? APPLICATION_JSON : APPLICATION_SMILE
|
||||
)
|
||||
.header("X-Druid-Query-Id", queryId)
|
||||
.header("Missing-Segments", missingSegments)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.joda.time.Interval;
|
|||
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -86,7 +87,7 @@ public class BridgeQuerySegmentWalker implements QuerySegmentWalker
|
|||
return new QueryRunner<T>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<T> run(Query<T> query)
|
||||
public Sequence<T> run(Query<T> query, Map<String, List> metadata)
|
||||
{
|
||||
try {
|
||||
Server instance = brokerSelector.pick();
|
||||
|
|
|
@ -106,6 +106,7 @@ import javax.annotation.Nullable;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -262,7 +263,7 @@ public class CachingClusteredClientTest
|
|||
new DateTime("2011-01-09T01"), 181, 52
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(
|
||||
makeRenamedTimeResults(
|
||||
new DateTime("2011-01-01"), 50, 5000,
|
||||
|
@ -283,7 +284,8 @@ public class CachingClusteredClientTest
|
|||
builder.intervals("2011-01-01/2011-01-10")
|
||||
.aggregators(RENAMED_AGGS)
|
||||
.postAggregators(RENAMED_POST_AGGS)
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -314,7 +316,7 @@ public class CachingClusteredClientTest
|
|||
new DateTime("2011-11-07", TIMEZONE), 85, 102
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(
|
||||
makeRenamedTimeResults(
|
||||
new DateTime("2011-11-04", TIMEZONE), 50, 5000,
|
||||
|
@ -326,7 +328,8 @@ public class CachingClusteredClientTest
|
|||
builder.intervals("2011-11-04/2011-11-08")
|
||||
.aggregators(RENAMED_AGGS)
|
||||
.postAggregators(RENAMED_POST_AGGS)
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -442,7 +445,7 @@ public class CachingClusteredClientTest
|
|||
new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(
|
||||
makeRenamedTopNResults(
|
||||
new DateTime("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998,
|
||||
|
@ -463,7 +466,8 @@ public class CachingClusteredClientTest
|
|||
.metric("imps")
|
||||
.aggregators(RENAMED_AGGS)
|
||||
.postAggregators(RENAMED_POST_AGGS)
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -497,7 +501,7 @@ public class CachingClusteredClientTest
|
|||
new DateTime("2011-11-07", TIMEZONE), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986
|
||||
)
|
||||
);
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(
|
||||
makeRenamedTopNResults(
|
||||
|
||||
|
@ -511,7 +515,8 @@ public class CachingClusteredClientTest
|
|||
.metric("imps")
|
||||
.aggregators(RENAMED_AGGS)
|
||||
.postAggregators(RENAMED_POST_AGGS)
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -561,7 +566,7 @@ public class CachingClusteredClientTest
|
|||
)
|
||||
);
|
||||
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(
|
||||
makeRenamedTopNResults(
|
||||
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
|
@ -580,7 +585,8 @@ public class CachingClusteredClientTest
|
|||
.metric("imps")
|
||||
.aggregators(RENAMED_AGGS)
|
||||
.postAggregators(RENAMED_POST_AGGS)
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -629,7 +635,7 @@ public class CachingClusteredClientTest
|
|||
)
|
||||
);
|
||||
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(
|
||||
makeTopNResults(
|
||||
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
|
@ -648,7 +654,8 @@ public class CachingClusteredClientTest
|
|||
.metric("avg_imps_per_row_double")
|
||||
.aggregators(AGGS)
|
||||
.postAggregators(POST_AGGS)
|
||||
.build()
|
||||
.build(),
|
||||
metadata
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -756,6 +763,7 @@ public class CachingClusteredClientTest
|
|||
.once();
|
||||
|
||||
final Capture<? extends Query> capture = new Capture();
|
||||
final Capture<? extends Map> metadata = new Capture();
|
||||
queryCaptures.add(capture);
|
||||
QueryRunner queryable = expectations.getQueryRunner();
|
||||
|
||||
|
@ -768,8 +776,7 @@ public class CachingClusteredClientTest
|
|||
intervals.add(expectation.getInterval());
|
||||
results.add(expectation.getResults());
|
||||
}
|
||||
|
||||
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
|
||||
EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(metadata)))
|
||||
.andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results))
|
||||
.once();
|
||||
|
||||
|
@ -782,7 +789,7 @@ public class CachingClusteredClientTest
|
|||
intervals.add(expectation.getInterval());
|
||||
results.add(expectation.getResults());
|
||||
}
|
||||
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
|
||||
EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(metadata)))
|
||||
.andReturn(toQueryableTopNResults(segmentIds, intervals, results))
|
||||
.once();
|
||||
} else if (query instanceof SearchQuery) {
|
||||
|
@ -794,7 +801,7 @@ public class CachingClusteredClientTest
|
|||
intervals.add(expectation.getInterval());
|
||||
results.add(expectation.getResults());
|
||||
}
|
||||
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
|
||||
EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(metadata)))
|
||||
.andReturn(toQueryableSearchResults(segmentIds, intervals, results))
|
||||
.once();
|
||||
} else if (query instanceof TimeBoundaryQuery) {
|
||||
|
@ -806,7 +813,7 @@ public class CachingClusteredClientTest
|
|||
intervals.add(expectation.getInterval());
|
||||
results.add(expectation.getResults());
|
||||
}
|
||||
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
|
||||
EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(metadata)))
|
||||
.andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results))
|
||||
.once();
|
||||
} else {
|
||||
|
@ -830,6 +837,7 @@ public class CachingClusteredClientTest
|
|||
@Override
|
||||
public void run()
|
||||
{
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
for (int i = 0; i < numTimesToQuery; ++i) {
|
||||
TestHelper.assertExpectedResults(
|
||||
new MergeIterable<>(
|
||||
|
@ -863,7 +871,8 @@ public class CachingClusteredClientTest
|
|||
actualQueryInterval
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
metadata
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -57,6 +57,7 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -123,7 +124,7 @@ public class CachingQueryRunnerTest
|
|||
new QueryRunner()
|
||||
{
|
||||
@Override
|
||||
public Sequence run(Query query)
|
||||
public Sequence run(Query query, Map metadata)
|
||||
{
|
||||
return resultSeq;
|
||||
}
|
||||
|
@ -140,8 +141,8 @@ public class CachingQueryRunnerTest
|
|||
cacheStrategy.computeCacheKey(query)
|
||||
);
|
||||
|
||||
|
||||
Sequence res = runner.run(query);
|
||||
HashMap<String,Object> metadata = new HashMap<String, Object>();
|
||||
Sequence res = runner.run(query, metadata);
|
||||
// base sequence is not closed yet
|
||||
Assert.assertFalse("sequence must not be closed", closable.isClosed());
|
||||
Assert.assertNull("cache must be empty", cache.get(cacheKey));
|
||||
|
@ -213,7 +214,7 @@ public class CachingQueryRunnerTest
|
|||
new QueryRunner()
|
||||
{
|
||||
@Override
|
||||
public Sequence run(Query query)
|
||||
public Sequence run(Query query, Map metadata)
|
||||
{
|
||||
return Sequences.empty();
|
||||
}
|
||||
|
@ -221,8 +222,8 @@ public class CachingQueryRunnerTest
|
|||
new CacheConfig()
|
||||
|
||||
);
|
||||
|
||||
List<Object> results = Sequences.toList(runner.run(query), new ArrayList());
|
||||
HashMap<String,Object> metadata = new HashMap<String, Object>();
|
||||
List<Object> results = Sequences.toList(runner.run(query, metadata), new ArrayList());
|
||||
Assert.assertEquals(expectedResults, results);
|
||||
}
|
||||
|
||||
|
|
|
@ -36,10 +36,8 @@ import io.druid.client.selector.QueryableDruidServer;
|
|||
import io.druid.client.selector.ServerSelector;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryInterruptedException;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.QueryWatcher;
|
||||
import io.druid.query.ReflectionQueryToolChestWarehouse;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.timeboundary.TimeBoundaryQuery;
|
||||
|
@ -57,6 +55,7 @@ import org.junit.Test;
|
|||
import java.io.ByteArrayInputStream;
|
||||
import java.io.InputStream;
|
||||
import java.net.URL;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
public class DirectDruidClientTest
|
||||
|
@ -118,20 +117,20 @@ public class DirectDruidClientTest
|
|||
serverSelector.addServer(queryableDruidServer2);
|
||||
|
||||
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
|
||||
|
||||
Sequence s1 = client1.run(query);
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
Sequence s1 = client1.run(query, metadata);
|
||||
Assert.assertEquals(1, client1.getNumOpenConnections());
|
||||
|
||||
// simulate read timeout
|
||||
Sequence s2 = client1.run(query);
|
||||
Sequence s2 = client1.run(query, metadata);
|
||||
Assert.assertEquals(2, client1.getNumOpenConnections());
|
||||
futureException.setException(new ReadTimeoutException());
|
||||
Assert.assertEquals(1, client1.getNumOpenConnections());
|
||||
|
||||
// subsequent connections should work
|
||||
Sequence s3 = client1.run(query);
|
||||
Sequence s4 = client1.run(query);
|
||||
Sequence s5 = client1.run(query);
|
||||
Sequence s3 = client1.run(query, metadata);
|
||||
Sequence s4 = client1.run(query, metadata);
|
||||
Sequence s5 = client1.run(query, metadata);
|
||||
|
||||
Assert.assertTrue(client1.getNumOpenConnections() == 4);
|
||||
|
||||
|
@ -142,8 +141,8 @@ public class DirectDruidClientTest
|
|||
Assert.assertEquals(new DateTime("2014-01-01T01:02:03Z"), results.get(0).getTimestamp());
|
||||
Assert.assertEquals(3, client1.getNumOpenConnections());
|
||||
|
||||
client2.run(query);
|
||||
client2.run(query);
|
||||
client2.run(query, metadata);
|
||||
client2.run(query, metadata);
|
||||
|
||||
Assert.assertTrue(client2.getNumOpenConnections() == 2);
|
||||
|
||||
|
@ -201,9 +200,9 @@ public class DirectDruidClientTest
|
|||
serverSelector.addServer(queryableDruidServer1);
|
||||
|
||||
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
|
||||
|
||||
HashMap<String,List> metadata = new HashMap<String, List>();
|
||||
cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled")));
|
||||
Sequence results = client1.run(query);
|
||||
Sequence results = client1.run(query, metadata);
|
||||
Assert.assertEquals(0, client1.getNumOpenConnections());
|
||||
|
||||
|
||||
|
|
|
@ -71,8 +71,10 @@ import org.junit.Test;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
@ -421,14 +423,14 @@ public class ServerManagerTest
|
|||
query,
|
||||
intervals
|
||||
);
|
||||
|
||||
return serverManagerExec.submit(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
Sequence<Result<SearchResultValue>> seq = runner.run(query);
|
||||
Map<String,List> metadata = new HashMap<String, List>();
|
||||
Sequence<Result<SearchResultValue>> seq = runner.run(query, metadata);
|
||||
Sequences.toList(seq, Lists.<Result<SearchResultValue>>newArrayList());
|
||||
Iterator<SegmentForTesting> adaptersIter = factory.getAdapters().iterator();
|
||||
|
||||
|
@ -677,9 +679,9 @@ public class ServerManagerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(Query<T> query)
|
||||
public Sequence<T> run(Query<T> query, Map<String, List> metadata)
|
||||
{
|
||||
return new BlockingSequence<T>(runner.run(query), waitLatch, waitYieldLatch, notifyLatch);
|
||||
return new BlockingSequence<T>(runner.run(query, metadata), waitLatch, waitYieldLatch, notifyLatch);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ import io.druid.guice.annotations.Self;
|
|||
import io.druid.query.MapQueryToolChestWarehouse;
|
||||
import io.druid.query.QuerySegmentWalker;
|
||||
import io.druid.query.QueryToolChestWarehouse;
|
||||
import io.druid.query.RetryQueryRunnerConfig;
|
||||
import io.druid.server.ClientInfoResource;
|
||||
import io.druid.server.ClientQuerySegmentWalker;
|
||||
import io.druid.server.QueryResource;
|
||||
|
@ -88,6 +89,7 @@ public class CliBroker extends ServerRunnable
|
|||
JsonConfigProvider.bind(binder, "druid.broker.select.tier", TierSelectorStrategy.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.select.tier.custom", CustomTierSelectorStrategyConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.retryPolicy", RetryQueryRunnerConfig.class);
|
||||
|
||||
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
|
||||
|
||||
|
|
Loading…
Reference in New Issue