Response context refactoring (#8110)

* Response context refactoring

* Serialization/Deserialization of ResponseContext

* Added java doc comments

* Renamed vars related to ResponseContext

* Renamed empty() methods to createEmpty()

* Fixed ResponseContext usage

* Renamed multiple ResponseContext static fields

* Added PublicApi annotations

* Renamed QueryResponseContext class to ResourceIOReaderWriter

* Moved the protected method below public static constants

* Added createEmpty method to ResponseContext with DefaultResponseContext creation

* Fixed inspection error

* Added comments to the ResponseContext length limit and ResponseContext
http header name

* Added a comment of possible future refactoring

* Removed .gitignore file of indexing-service

* Removed a never-used method

* VisibleForTesting method reducing boilerplate

Co-Authored-By: Clint Wylie <cjwylie@gmail.com>

* Reduced boilerplate

* Renamed the method serialize to serializeWith

* Removed unused import

* Fixed incorrectly refactored test method

* Added comments for ResponseContext keys

* Fixed incorrectly refactored test method

* Fixed IntervalChunkingQueryRunnerTest mocks
This commit is contained in:
Eugene Sevastianov 2019-07-24 18:29:03 +03:00 committed by Roman Leventov
parent 0388581493
commit 799d20249f
145 changed files with 901 additions and 840 deletions

View File

@ -44,6 +44,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.InDimFilter;
@ -89,7 +90,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -240,7 +240,7 @@ public class FilteredAggregatorBenchmark
final QueryPlus<T> queryToRun = QueryPlus.wrap(
query.withOverriddenContext(ImmutableMap.of("vectorize", vectorize))
);
Sequence<T> queryResult = theRunner.run(queryToRun, new HashMap<>());
Sequence<T> queryResult = theRunner.run(queryToRun, ResponseContext.createEmpty());
return queryResult.toList();
}

View File

@ -51,6 +51,7 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
@ -92,7 +93,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -450,7 +450,7 @@ public class GroupByTypeInterfaceBenchmark
toolChest
);
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
return queryResult.toList();
}

View File

@ -46,6 +46,7 @@ import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.extraction.IdentityExtractionFn;
import org.apache.druid.query.ordering.StringComparators;
@ -85,7 +86,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -322,7 +322,7 @@ public class TopNTypeInterfaceBenchmark
toolChest
);
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
return queryResult.toList();
}

View File

@ -74,6 +74,7 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.groupby.GroupByQuery;
@ -432,7 +433,7 @@ public class CachingClusteredClientBenchmark
.applyPostMergeDecoration();
//noinspection unchecked
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
return queryResult.toList();
}
@ -528,7 +529,7 @@ public class CachingClusteredClientBenchmark
}
@Override
public Sequence<Object> run(QueryPlus<Object> queryPlus, Map<String, Object> responseContext)
public Sequence<Object> run(QueryPlus<Object> queryPlus, ResponseContext responseContext)
{
final QueryRunnerFactory factory = conglomerate.findFactory(queryPlus.getQuery());
//noinspection unchecked

View File

@ -56,6 +56,7 @@ import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
@ -101,7 +102,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -554,7 +554,7 @@ public class GroupByBenchmark
toolChest
);
return theRunner.run(QueryPlus.wrap(query), new HashMap<>());
return theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
}
@Benchmark
@ -610,7 +610,7 @@ public class GroupByBenchmark
(QueryToolChest) toolChest
);
Sequence<Row> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<Row> queryResult = theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
List<Row> results = queryResult.toList();
blackhole.consume(results);
}
@ -631,7 +631,7 @@ public class GroupByBenchmark
final GroupByQuery spillingQuery = query.withOverriddenContext(
ImmutableMap.of("bufferGrouperMaxSize", 4000)
);
Sequence<Row> queryResult = theRunner.run(QueryPlus.wrap(spillingQuery), new HashMap<>());
Sequence<Row> queryResult = theRunner.run(QueryPlus.wrap(spillingQuery), ResponseContext.createEmpty());
List<Row> results = queryResult.toList();
blackhole.consume(results);
}
@ -655,7 +655,7 @@ public class GroupByBenchmark
(QueryToolChest) toolChest
);
Sequence<Row> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<Row> queryResult = theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
List<Row> results = queryResult.toList();
blackhole.consume(results);
}

View File

@ -30,11 +30,10 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.Segment;
import org.apache.druid.timeline.SegmentId;
import java.util.Map;
public class QueryBenchmarkUtil
{
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
@ -57,7 +56,7 @@ public class QueryBenchmarkUtil
{
return new QueryRunner<T>() {
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return delegate.run(queryPlus, responseContext);
}

View File

@ -46,6 +46,7 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.extraction.StrlenExtractionFn;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.DimFilter;
@ -88,7 +89,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -330,7 +330,7 @@ public class ScanBenchmark
toolChest
);
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
return queryResult.toList();
}
@ -441,7 +441,7 @@ public class ScanBenchmark
Sequence<Result<ScanResultValue>> queryResult = theRunner.run(
QueryPlus.wrap(effectiveQuery),
new HashMap<>()
ResponseContext.createEmpty()
);
List<Result<ScanResultValue>> results = queryResult.toList();
blackhole.consume(results);

View File

@ -46,6 +46,7 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.extraction.DimExtractionFn;
import org.apache.druid.query.extraction.IdentityExtractionFn;
import org.apache.druid.query.extraction.LowerExtractionFn;
@ -95,7 +96,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -400,7 +400,7 @@ public class SearchBenchmark
toolChest
);
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
return queryResult.toList();
}
@ -461,7 +461,7 @@ public class SearchBenchmark
Sequence<Result<SearchResultValue>> queryResult = theRunner.run(
QueryPlus.wrap(query),
new HashMap<>()
ResponseContext.createEmpty()
);
List<Result<SearchResultValue>> results = queryResult.toList();
blackhole.consume(results);

View File

@ -44,6 +44,7 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.select.EventHolder;
import org.apache.druid.query.select.PagingSpec;
@ -261,7 +262,7 @@ public class SelectBenchmark
toolChest
);
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
return queryResult.toList();
}
@ -368,7 +369,7 @@ public class SelectBenchmark
boolean done = false;
while (!done) {
Sequence<Result<SelectResultValue>> queryResult = theRunner.run(QueryPlus.wrap(queryCopy), new HashMap<>());
Sequence<Result<SelectResultValue>> queryResult = theRunner.run(QueryPlus.wrap(queryCopy), ResponseContext.createEmpty());
List<Result<SelectResultValue>> results = queryResult.toList();
SelectResultValue result = results.get(0).getValue();

View File

@ -25,8 +25,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import java.util.Map;
import org.apache.druid.query.context.ResponseContext;
public class SerializingQueryRunner<T> implements QueryRunner<T>
{
@ -48,7 +47,7 @@ public class SerializingQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(
final QueryPlus<T> queryPlus,
final Map<String, Object> responseContext
final ResponseContext responseContext
)
{
return Sequences.map(

View File

@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.segment.QueryableIndex;
@ -63,7 +64,6 @@ import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
/**
@ -151,7 +151,7 @@ public class SqlVsNativeBenchmark
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryNative(Blackhole blackhole)
{
final Sequence<Row> resultSequence = QueryPlus.wrap(groupByQuery).run(walker, new HashMap<>());
final Sequence<Row> resultSequence = QueryPlus.wrap(groupByQuery).run(walker, ResponseContext.createEmpty());
final Row lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);
blackhole.consume(lastRow);
}

View File

@ -48,6 +48,7 @@ import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
@ -89,7 +90,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -323,7 +323,7 @@ public class TimeseriesBenchmark
toolChest
);
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
return queryResult.toList();
}
@ -401,7 +401,7 @@ public class TimeseriesBenchmark
Sequence<Result<TimeseriesResultValue>> queryResult = theRunner.run(
QueryPlus.wrap(query),
new HashMap<>()
ResponseContext.createEmpty()
);
List<Result<TimeseriesResultValue>> results = queryResult.toList();

View File

@ -47,6 +47,7 @@ import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
@ -86,7 +87,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -307,7 +307,7 @@ public class TopNBenchmark
toolChest
);
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
return queryResult.toList();
}
@ -367,7 +367,7 @@ public class TopNBenchmark
Sequence<Result<TopNResultValue>> queryResult = theRunner.run(
QueryPlus.wrap(query),
new HashMap<>()
ResponseContext.createEmpty()
);
List<Result<TopNResultValue>> results = queryResult.toList();
blackhole.consume(results);

View File

@ -52,6 +52,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.IntervalDimFilter;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
@ -417,7 +418,7 @@ public class TimeCompareBenchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndexTopN(Blackhole blackhole)
{
Sequence<Result<TopNResultValue>> queryResult = topNRunner.run(QueryPlus.wrap(topNQuery), new HashMap<>());
Sequence<Result<TopNResultValue>> queryResult = topNRunner.run(QueryPlus.wrap(topNQuery), ResponseContext.createEmpty());
List<Result<TopNResultValue>> results = queryResult.toList();
blackhole.consume(results);
}
@ -430,7 +431,7 @@ public class TimeCompareBenchmark
{
Sequence<Result<TimeseriesResultValue>> queryResult = timeseriesRunner.run(
QueryPlus.wrap(timeseriesQuery),
new HashMap<>()
ResponseContext.createEmpty()
);
List<Result<TimeseriesResultValue>> results = queryResult.toList();
blackhole.consume(results);

View File

@ -30,8 +30,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import java.util.Map;
import org.apache.druid.query.context.ResponseContext;
public class MaterializedViewQueryQueryToolChest extends QueryToolChest
{
@ -51,7 +50,7 @@ public class MaterializedViewQueryQueryToolChest extends QueryToolChest
{
return new QueryRunner() {
@Override
public Sequence run(QueryPlus queryPlus, Map responseContext)
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
Query realQuery = getRealQuery(queryPlus.getQuery());
return warehouse.getToolChest(realQuery).mergeResults(runner).run(queryPlus.withQuery(realQuery), responseContext);
@ -91,7 +90,7 @@ public class MaterializedViewQueryQueryToolChest extends QueryToolChest
{
return new QueryRunner() {
@Override
public Sequence run(QueryPlus queryPlus, Map responseContext)
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
Query realQuery = getRealQuery(queryPlus.getQuery());
QueryToolChest realQueryToolChest = warehouse.getToolChest(realQuery);

View File

@ -27,8 +27,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import java.util.Map;
import org.apache.druid.query.context.ResponseContext;
public class MaterializedViewQueryRunner<T> implements QueryRunner<T>
@ -43,7 +42,7 @@ public class MaterializedViewQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
Query query = queryPlus.getQuery();
return new MergeSequence<>(

View File

@ -38,6 +38,7 @@ import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.Result;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
@ -50,7 +51,6 @@ import org.joda.time.Interval;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -84,7 +84,7 @@ public class MovingAverageQueryRunner implements QueryRunner<Row>
}
@Override
public Sequence<Row> run(QueryPlus<Row> query, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> query, ResponseContext responseContext)
{
MovingAverageQuery maq = (MovingAverageQuery) query.getQuery();
@ -125,11 +125,11 @@ public class MovingAverageQueryRunner implements QueryRunner<Row>
.setContext(maq.getContext());
GroupByQuery gbq = builder.build();
HashMap<String, Object> gbqResponse = new HashMap<>();
gbqResponse.put(QUERY_FAIL_TIME, System.currentTimeMillis() + QueryContexts.getTimeout(gbq));
gbqResponse.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
ResponseContext gbqResponseContext = ResponseContext.createEmpty();
gbqResponseContext.put(QUERY_FAIL_TIME, System.currentTimeMillis() + QueryContexts.getTimeout(gbq));
gbqResponseContext.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
Sequence<Row> results = gbq.getRunner(walker).run(QueryPlus.wrap(gbq), gbqResponse);
Sequence<Row> results = gbq.getRunner(walker).run(QueryPlus.wrap(gbq), gbqResponseContext);
try {
// use localhost for remote address
requestLogger.logNativeQuery(RequestLogLine.forNative(
@ -163,11 +163,11 @@ public class MovingAverageQueryRunner implements QueryRunner<Row>
0,
maq.getContext()
);
HashMap<String, Object> tsqResponse = new HashMap<>();
tsqResponse.put(QUERY_FAIL_TIME, System.currentTimeMillis() + QueryContexts.getTimeout(tsq));
tsqResponse.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
ResponseContext tsqResponseContext = ResponseContext.createEmpty();
tsqResponseContext.put(QUERY_FAIL_TIME, System.currentTimeMillis() + QueryContexts.getTimeout(tsq));
tsqResponseContext.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
Sequence<Result<TimeseriesResultValue>> results = tsq.getRunner(walker).run(QueryPlus.wrap(tsq), tsqResponse);
Sequence<Result<TimeseriesResultValue>> results = tsq.getRunner(walker).run(QueryPlus.wrap(tsq), tsqResponseContext);
try {
// use localhost for remote address
requestLogger.logNativeQuery(RequestLogLine.forNative(

View File

@ -80,7 +80,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@ -364,12 +363,11 @@ public class MovingAverageQueryTest
},
baseClient, warehouse, retryConfig, jsonMapper, serverConfig, null, new CacheConfig()
);
final Map<String, Object> responseContext = new HashMap<>();
defineMocks();
QueryPlus queryPlus = QueryPlus.wrap(query);
final Sequence<?> res = query.getRunner(walker).run(queryPlus, responseContext);
final Sequence<?> res = query.getRunner(walker).run(queryPlus);
List actualResults = new ArrayList();
actualResults = (List<MapBasedRow>) res.accumulate(actualResults, Accumulators.list());

View File

@ -52,7 +52,6 @@ import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
public class MapVirtualColumnGroupByTest
@ -141,7 +140,7 @@ public class MapVirtualColumnGroupByTest
expectedException.expect(UnsupportedOperationException.class);
expectedException.expectMessage("Map column doesn't support getRow()");
runner.run(QueryPlus.wrap(query), new HashMap<>()).toList();
runner.run(QueryPlus.wrap(query)).toList();
}
@Test
@ -162,7 +161,7 @@ public class MapVirtualColumnGroupByTest
null
);
final List<Row> result = runner.run(QueryPlus.wrap(query), new HashMap<>()).toList();
final List<Row> result = runner.run(QueryPlus.wrap(query)).toList();
final List<Row> expected = ImmutableList.of(
new MapBasedRow(
DateTimes.of("2011-01-12T00:00:00.000Z"),

View File

@ -22,7 +22,6 @@ package org.apache.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CharSource;
import org.apache.druid.data.input.impl.DelimitedParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -190,7 +189,7 @@ public class MapVirtualColumnSelectTest
private void checkSelectQuery(SelectQuery searchQuery, List<Map> expected)
{
List<Result<SelectResultValue>> results = runner.run(QueryPlus.wrap(searchQuery), ImmutableMap.of()).toList();
List<Result<SelectResultValue>> results = runner.run(QueryPlus.wrap(searchQuery)).toList();
Assert.assertEquals(1, results.size());
List<EventHolder> events = results.get(0).getValue().getEvents();

View File

@ -50,7 +50,6 @@ import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
public class MapVirtualColumnTopNTest
@ -105,7 +104,7 @@ public class MapVirtualColumnTopNTest
expectedException.expect(UnsupportedOperationException.class);
expectedException.expectMessage("Map column doesn't support getRow()");
runner.run(QueryPlus.wrap(query), new HashMap<>()).toList();
runner.run(QueryPlus.wrap(query)).toList();
}
@Test
@ -129,7 +128,7 @@ public class MapVirtualColumnTopNTest
null
);
final List<Result<TopNResultValue>> result = runner.run(QueryPlus.wrap(query), new HashMap<>()).toList();
final List<Result<TopNResultValue>> result = runner.run(QueryPlus.wrap(query)).toList();
final List<Result<TopNResultValue>> expected = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),

View File

@ -48,7 +48,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -250,8 +249,6 @@ public class ApproximateHistogramTopNQueryTest
)
)
);
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query), context));
TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query)));
}
}

View File

@ -48,7 +48,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -238,9 +237,8 @@ public class FixedBucketsHistogramTopNQueryTest
)
)
);
HashMap<String, Object> context = new HashMap<String, Object>();
List<Result<TopNResultValue>> results = runner.run(QueryPlus.wrap(query), context).toList();
List<Result<TopNResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
TestHelper.assertExpectedResults(expectedResults, results);
}
}

View File

@ -2359,7 +2359,7 @@ public class KafkaIndexTaskTest
{
ScanQuery query = new Druids.ScanQueryBuilder().dataSource(
DATA_SCHEMA.getDataSource()).intervals(spec).build();
return task.getQueryRunner(query).run(QueryPlus.wrap(query), new HashMap<>()).toList();
return task.getQueryRunner(query).run(QueryPlus.wrap(query)).toList();
}
private void insertData() throws ExecutionException, InterruptedException
@ -2772,8 +2772,7 @@ public class KafkaIndexTaskTest
.intervals("0000/3000")
.build();
List<Result<TimeseriesResultValue>> results =
task.getQueryRunner(query).run(QueryPlus.wrap(query), ImmutableMap.of()).toList();
List<Result<TimeseriesResultValue>> results = task.getQueryRunner(query).run(QueryPlus.wrap(query)).toList();
return results.isEmpty() ? 0L : DimensionHandlerUtils.nullToZero(results.get(0).getValue().getLongMetric("rows"));
}

View File

@ -2960,8 +2960,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
.intervals("0000/3000")
.build();
List<Result<TimeseriesResultValue>> results =
task.getQueryRunner(query).run(QueryPlus.wrap(query), ImmutableMap.of()).toList();
List<Result<TimeseriesResultValue>> results = task.getQueryRunner(query).run(QueryPlus.wrap(query)).toList();
return results.isEmpty() ? 0L : DimensionHandlerUtils.nullToZero(results.get(0).getValue().getLongMetric("rows"));
}

View File

@ -35,7 +35,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -112,7 +111,7 @@ public class VarianceTimeseriesQueryTest
)
);
Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query), new HashMap<>()).toList();
Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
assertExpectedResults(expectedResults, results);
}

View File

@ -140,10 +140,7 @@ public class VarianceTopNQueryTest
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
final QueryRunner<Result<TopNResultValue>> mergeRunner = chest.mergeResults(runner);
final Sequence<Result<TopNResultValue>> retval = mergeRunner.run(
QueryPlus.wrap(query),
ImmutableMap.of()
);
final Sequence<Result<TopNResultValue>> retval = mergeRunner.run(QueryPlus.wrap(query));
TestHelper.assertExpectedResults(expectedResults, retval);
return retval;
}

View File

@ -1641,7 +1641,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
.build();
List<Result<TimeseriesResultValue>> results =
task.getQueryRunner(query).run(QueryPlus.wrap(query), ImmutableMap.of()).toList();
task.getQueryRunner(query).run(QueryPlus.wrap(query)).toList();
if (results.isEmpty()) {
return 0L;

View File

@ -1020,8 +1020,7 @@ public class RealtimeIndexTaskTest
.intervals("2000/3000")
.build();
List<Result<TimeseriesResultValue>> results =
task.getQueryRunner(query).run(QueryPlus.wrap(query), ImmutableMap.of()).toList();
List<Result<TimeseriesResultValue>> results = task.getQueryRunner(query).run(QueryPlus.wrap(query)).toList();
if (results.isEmpty()) {
return 0L;
} else {

View File

@ -25,8 +25,8 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.context.ResponseContext;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@ -47,7 +47,7 @@ public class AsyncQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
final Query<T> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);

View File

@ -21,12 +21,12 @@ package org.apache.druid.query;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
*/
@ -45,7 +45,7 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@Override
@SuppressWarnings("unchecked")
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, ResponseContext responseContext)
{
if (QueryContexts.isBySegment(queryPlus.getQuery())) {
final Sequence<T> baseSequence = base.run(queryPlus, responseContext);

View File

@ -21,8 +21,7 @@ package org.apache.druid.query;
import org.apache.druid.java.util.common.guava.Sequence;
import java.util.Map;
import org.apache.druid.query.context.ResponseContext;
/**
*/
@ -38,7 +37,7 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
if (QueryContexts.isBySegment(queryPlus.getQuery())) {
return baseRunner.run(queryPlus, responseContext);
@ -47,5 +46,5 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
return doRun(baseRunner, queryPlus, responseContext);
}
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, QueryPlus<T> queryPlus, Map<String, Object> context);
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, QueryPlus<T> queryPlus, ResponseContext context);
}

View File

@ -25,9 +25,9 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.utils.JvmUtils;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
@ -58,7 +58,7 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
final QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest);
final Sequence<T> baseSequence = delegate.run(queryWithMetrics, responseContext);

View File

@ -32,11 +32,11 @@ import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.MergeIterable;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.context.ResponseContext;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -87,7 +87,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
Query<T> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);

View File

@ -22,8 +22,7 @@ package org.apache.druid.query;
import com.google.common.base.Function;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import java.util.Map;
import org.apache.druid.query.context.ResponseContext;
/**
*/
@ -37,7 +36,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
return Sequences.concat(
Sequences.map(

View File

@ -27,8 +27,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import java.util.Map;
import org.apache.druid.query.context.ResponseContext;
/**
*/
@ -47,7 +46,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, ResponseContext responseContext)
{
final Query<T> query = queryPlus.getQuery();
final boolean isBySegment = QueryContexts.isBySegment(query);

View File

@ -21,8 +21,8 @@ package org.apache.druid.query;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.context.ResponseContext;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class FluentQueryRunnerBuilder<T>
@ -49,7 +49,7 @@ public class FluentQueryRunnerBuilder<T>
}
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return baseRunner.run(queryPlus, responseContext);
}

View File

@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryHelper;
@ -44,7 +45,6 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@ -78,7 +78,7 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
final GroupByQuery query = (GroupByQuery) queryPlus.getQuery();
final GroupByQueryConfig querySpecificConfig = configSupplier.get().withOverrides(query);

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -36,7 +37,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
@ -69,7 +69,7 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
final Period chunkPeriod = getChunkPeriod(queryPlus.getQuery());

View File

@ -25,8 +25,8 @@ import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.context.ResponseContext;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.ObjLongConsumer;
@ -83,7 +83,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest);
final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();

View File

@ -21,15 +21,14 @@ package org.apache.druid.query;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import java.util.Map;
import org.apache.druid.query.context.ResponseContext;
/**
*/
public class NoopQueryRunner<T> implements QueryRunner<T>
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return Sequences.empty();
}

View File

@ -20,8 +20,7 @@
package org.apache.druid.query;
import org.apache.druid.java.util.common.guava.Sequence;
import java.util.Map;
import org.apache.druid.query.context.ResponseContext;
/**
* This runner optimizes queries made on a single segment, using per-segment information,
@ -49,7 +48,7 @@ public class PerSegmentOptimizingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> input, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> input, final ResponseContext responseContext)
{
return base.run(
input.optimizeForSegment(optimizationContext),

View File

@ -23,10 +23,10 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.QuerySegmentSpec;
import javax.annotation.Nullable;
import java.util.Map;
/**
* An immutable composite object of {@link Query} + extra stuff needed in {@link QueryRunner}s.
@ -153,7 +153,7 @@ public final class QueryPlus<T>
return new QueryPlus<>(replacementQuery, queryMetrics, identity);
}
public Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context)
public Sequence<T> run(QuerySegmentWalker walker, ResponseContext context)
{
return query.getRunner(walker).run(this, context);
}

View File

@ -19,10 +19,10 @@
package org.apache.druid.query;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.guava.Sequence;
import java.util.Map;
import org.apache.druid.query.context.ResponseContext;
@ExtensionPoint
public interface QueryRunner<T>
@ -30,5 +30,11 @@ public interface QueryRunner<T>
/**
* Runs the given query and returns results in a time-ordered sequence.
*/
Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext);
Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext);
@VisibleForTesting
default Sequence<T> run(QueryPlus<T> queryPlus)
{
return this.run(queryPlus, ResponseContext.createEmpty());
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.StorageAdapter;
@ -32,7 +33,6 @@ import org.joda.time.Interval;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
@ -67,7 +67,7 @@ public class QueryRunnerHelper
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return Sequences.withBaggage(runner.run(queryPlus, responseContext), closeable);
}

View File

@ -21,10 +21,9 @@ package org.apache.druid.query;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.ReferenceCountingSegment;
import java.util.Map;
/**
*/
public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
@ -45,7 +44,7 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, ResponseContext responseContext)
{
if (adapter.increment()) {
try {

View File

@ -21,10 +21,10 @@ package org.apache.druid.query;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.context.ResponseContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
*/
@ -38,13 +38,13 @@ public class ReportTimelineMissingSegmentQueryRunner<T> implements QueryRunner<T
}
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
List<SegmentDescriptor> missingSegments =
(List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY);
(List<SegmentDescriptor>) responseContext.get(ResponseContext.CTX_MISSING_SEGMENTS);
if (missingSegments == null) {
missingSegments = new ArrayList<>();
responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments);
responseContext.put(ResponseContext.CTX_MISSING_SEGMENTS, missingSegments);
}
missingSegments.add(descriptor);
return Sequences.empty();

View File

@ -34,9 +34,6 @@ import java.util.function.Function;
@PublicApi
public class Result<T> implements Comparable<Result<T>>
{
public static final String MISSING_SEGMENTS_KEY = "missingSegments";
@Nullable
private final DateTime timestamp;
private final T value;

View File

@ -22,9 +22,9 @@ package org.apache.druid.query;
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.context.ResponseContext;
import java.util.Comparator;
import java.util.Map;
import java.util.function.BinaryOperator;
import java.util.function.Function;
@ -48,7 +48,7 @@ public class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRunner<T>
}
@Override
public Sequence<T> doRun(QueryRunner<T> baseRunner, QueryPlus<T> queryPlus, Map<String, Object> context)
public Sequence<T> doRun(QueryRunner<T> baseRunner, QueryPlus<T> queryPlus, ResponseContext context)
{
Query<T> query = queryPlus.getQuery();
return CombiningSequence.create(

View File

@ -29,12 +29,12 @@ import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.SegmentMissingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class RetryQueryRunner<T> implements QueryRunner<T>
{
@ -56,7 +56,7 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> context)
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext context)
{
final List<Sequence<T>> listOfSequences = new ArrayList<>();
listOfSequences.add(baseRunner.run(queryPlus, context));
@ -72,7 +72,7 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
for (int i = 0; i < config.getNumTries(); i++) {
log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i);
context.put(Result.MISSING_SEGMENTS_KEY, new ArrayList<>());
context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>());
final QueryPlus<T> retryQueryPlus = queryPlus.withQuerySegmentSpec(
new MultipleSpecificSegmentSpec(
missingSegments
@ -100,9 +100,9 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
};
}
private List<SegmentDescriptor> getMissingSegments(final Map<String, Object> context)
private List<SegmentDescriptor> getMissingSegments(final ResponseContext context)
{
final Object maybeMissingSegments = context.get(Result.MISSING_SEGMENTS_KEY);
final Object maybeMissingSegments = context.get(ResponseContext.CTX_MISSING_SEGMENTS);
if (maybeMissingSegments == null) {
return new ArrayList<>();
}

View File

@ -21,10 +21,9 @@ package org.apache.druid.query;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import java.util.Map;
/**
* If there's a subquery, run it instead of the outer query
*/
@ -38,7 +37,7 @@ public class SubqueryQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, ResponseContext responseContext)
{
DataSource dataSource = queryPlus.getQuery().getDataSource();
boolean forcePushDownNestedQuery = queryPlus.getQuery()

View File

@ -26,6 +26,7 @@ import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryResultValue;
@ -35,7 +36,6 @@ import org.joda.time.Interval;
import org.joda.time.Period;
import java.util.Collections;
import java.util.Map;
/**
* TimewarpOperator is an example post-processing operator that maps current time
@ -79,7 +79,7 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
final DateTimeZone tz = queryPlus.getQuery().getTimezone();
final long offset = computeOffset(now, tz);

View File

@ -24,8 +24,7 @@ import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import java.util.Map;
import org.apache.druid.query.context.ResponseContext;
public class UnionQueryRunner<T> implements QueryRunner<T>
{
@ -39,7 +38,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
Query<T> query = queryPlus.getQuery();
DataSource dataSource = query.getDataSource();

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.context;
import org.apache.druid.guice.annotations.PublicApi;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* The implementation of {@link ResponseContext} with a {@link ConcurrentHashMap} as a delegate
*/
@PublicApi
public class ConcurrentResponseContext extends ResponseContext
{
public static ConcurrentResponseContext createEmpty()
{
return new ConcurrentResponseContext();
}
private final ConcurrentHashMap<String, Object> delegate = new ConcurrentHashMap<>();
@Override
protected Map<String, Object> getDelegate()
{
return delegate;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.context;
import org.apache.druid.guice.annotations.PublicApi;
import java.util.HashMap;
import java.util.Map;
/**
* The implementation of {@link ResponseContext} with a HashMap as a delegate
*/
@PublicApi
public class DefaultResponseContext extends ResponseContext
{
public static DefaultResponseContext createEmpty()
{
return new DefaultResponseContext();
}
private final HashMap<String, Object> delegate = new HashMap<>();
@Override
protected Map<String, Object> getDelegate()
{
return delegate;
}
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.context;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import java.io.IOException;
import java.util.Map;
/**
* The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s.
* The context is also transferred between Druid nodes with all the data it contains.
* All the keys associated with data inside the context should be stored here.
* CTX_* keys might be aggregated into an enum. Consider refactoring that.
*/
@PublicApi
public abstract class ResponseContext
{
/**
* Lists intervals for which NO segment is present.
*/
public static final String CTX_UNCOVERED_INTERVALS = "uncoveredIntervals";
/**
* Indicates if the number of uncovered intervals exceeded the limit (true/false).
*/
public static final String CTX_UNCOVERED_INTERVALS_OVERFLOWED = "uncoveredIntervalsOverflowed";
/**
* Lists missing segments.
*/
public static final String CTX_MISSING_SEGMENTS = "missingSegments";
/**
* Entity tag. A part of HTTP cache validation mechanism.
* Is being removed from the context before sending and used as a separate HTTP header.
*/
public static final String CTX_ETAG = "ETag";
/**
* Query total bytes gathered.
*/
public static final String CTX_QUERY_TOTAL_BYTES_GATHERED = "queryTotalBytesGathered";
/**
* This variable indicates when a running query should be expired,
* and is effective only when 'timeout' of queryContext has a positive value.
*/
public static final String CTX_TIMEOUT_AT = "timeoutAt";
/**
* The number of scanned rows.
*/
public static final String CTX_COUNT = "count";
/**
* Create an empty DefaultResponseContext instance
* @return empty DefaultResponseContext instance
*/
public static ResponseContext createEmpty()
{
return DefaultResponseContext.createEmpty();
}
protected abstract Map<String, Object> getDelegate();
public Object put(String key, Object value)
{
return getDelegate().put(key, value);
}
public Object get(String key)
{
return getDelegate().get(key);
}
public Object remove(String key)
{
return getDelegate().remove(key);
}
public void putAll(Map<? extends String, ?> m)
{
getDelegate().putAll(m);
}
public void putAll(ResponseContext responseContext)
{
getDelegate().putAll(responseContext.getDelegate());
}
public int size()
{
return getDelegate().size();
}
public String serializeWith(ObjectMapper objectMapper) throws JsonProcessingException
{
return objectMapper.writeValueAsString(getDelegate());
}
public static ResponseContext deserialize(String responseContext, ObjectMapper objectMapper) throws IOException
{
final Map<String, Object> delegate = objectMapper.readValue(
responseContext,
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
return new ResponseContext()
{
@Override
protected Map<String, Object> getDelegate()
{
return delegate;
}
};
}
}

View File

@ -31,11 +31,11 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
@ -89,7 +89,7 @@ public class DataSourceMetadataQueryRunnerFactory
@Override
public Sequence<Result<DataSourceMetadataResultValue>> run(
QueryPlus<Result<DataSourceMetadataResultValue>> input,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
Query<Result<DataSourceMetadataResultValue>> query = input.getQuery();

View File

@ -35,10 +35,10 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.timeline.LogicalSegment;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
@ -82,7 +82,7 @@ public class DataSourceQueryQueryToolChest
protected Sequence<Result<DataSourceMetadataResultValue>> doRun(
QueryRunner<Result<DataSourceMetadataResultValue>> baseRunner,
QueryPlus<Result<DataSourceMetadataResultValue>> input,
Map<String, Object> context
ResponseContext context
)
{
DataSourceMetadataQuery query = (DataSourceMetadataQuery) input.getQuery();

View File

@ -53,6 +53,7 @@ import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.extraction.ExtractionFn;
@ -145,7 +146,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private Sequence<Row> initAndMergeGroupByResults(
final GroupByQuery query,
QueryRunner<Row> runner,
Map<String, Object> context
ResponseContext context
)
{
final GroupByStrategy groupByStrategy = strategySelector.strategize(query);
@ -159,7 +160,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
final GroupByQuery query,
GroupByQueryResource resource,
QueryRunner<Row> runner,
Map<String, Object> context
ResponseContext context
)
{
if (isNestedQueryPushDown(query, groupByStrategy)) {
@ -173,7 +174,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
GroupByQuery query,
GroupByQueryResource resource,
QueryRunner<Row> runner,
Map<String, Object> context
ResponseContext context
)
{
// If there's a subquery, merge subquery results and then apply the aggregator
@ -254,7 +255,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
GroupByQuery query,
GroupByQueryResource resource,
QueryRunner<Row> runner,
Map<String, Object> context
ResponseContext context
)
{
Sequence<Row> pushDownQueryResults = groupByStrategy.mergeResults(runner, query, context);
@ -417,7 +418,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
GroupByQuery groupByQuery = (GroupByQuery) queryPlus.getQuery();
if (groupByQuery.getDimFilter() != null) {

View File

@ -31,11 +31,11 @@ import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
@ -70,7 +70,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
return new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
QueryRunner<Row> rowQueryRunner = strategySelector.strategize((GroupByQuery) queryPlus.getQuery()).mergeRunners(
queryExecutor,
@ -99,7 +99,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
}
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
Query<Row> query = queryPlus.getQuery();
if (!(query instanceof GroupByQuery)) {

View File

@ -52,6 +52,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
@ -61,7 +62,6 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@ -108,7 +108,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
}
@Override
public Sequence<Row> run(final QueryPlus<Row> queryPlus, final Map<String, Object> responseContext)
public Sequence<Row> run(final QueryPlus<Row> queryPlus, final ResponseContext responseContext)
{
final GroupByQuery query = (GroupByQuery) queryPlus.getQuery();
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);

View File

@ -27,6 +27,7 @@ import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.resource.GroupByQueryResource;
@ -34,7 +35,6 @@ import org.apache.druid.segment.StorageAdapter;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.BinaryOperator;
@ -72,7 +72,7 @@ public interface GroupByStrategy
GroupByQueryQueryToolChest toolChest
);
Sequence<Row> mergeResults(QueryRunner<Row> baseRunner, GroupByQuery query, Map<String, Object> responseContext);
Sequence<Row> mergeResults(QueryRunner<Row> baseRunner, GroupByQuery query, ResponseContext responseContext);
/**
* See {@link org.apache.druid.query.QueryToolChest#createMergeFn(Query)} for details, allows

View File

@ -39,6 +39,7 @@ import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
@ -56,7 +57,6 @@ import org.joda.time.Interval;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class GroupByStrategyV1 implements GroupByStrategy
@ -112,7 +112,7 @@ public class GroupByStrategyV1 implements GroupByStrategy
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,
final GroupByQuery query,
final Map<String, Object> responseContext
final ResponseContext responseContext
)
{
final IncrementalIndex index = GroupByQueryHelper.makeIncrementalIndex(

View File

@ -55,6 +55,7 @@ import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.ResultMergeQueryRunner;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
@ -229,7 +230,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,
final GroupByQuery query,
final Map<String, Object> responseContext
final ResponseContext responseContext
)
{
// Merge streams using ResultMergeQueryRunner, then apply postaggregators, then apply limit (which may
@ -363,7 +364,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
mergeResults(new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
return GroupByRowProcessor.getRowsFromGrouper(
query,
@ -440,7 +441,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
mergeResults(new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
return GroupByRowProcessor.getRowsFromGrouper(
queryWithoutSubtotalsSpec,

View File

@ -50,6 +50,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
@ -112,7 +113,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
public Sequence<SegmentAnalysis> doRun(
QueryRunner<SegmentAnalysis> baseRunner,
QueryPlus<SegmentAnalysis> queryPlus,
Map<String, Object> context
ResponseContext context
)
{
SegmentMetadataQuery updatedQuery = ((SegmentMetadataQuery) queryPlus.getQuery()).withFinalizedAnalysisTypes(config);

View File

@ -40,6 +40,7 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.ColumnIncluderator;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
@ -83,7 +84,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
return new QueryRunner<SegmentAnalysis>()
{
@Override
public Sequence<SegmentAnalysis> run(QueryPlus<SegmentAnalysis> inQ, Map<String, Object> responseContext)
public Sequence<SegmentAnalysis> run(QueryPlus<SegmentAnalysis> inQ, ResponseContext responseContext)
{
SegmentMetadataQuery updatedQuery = ((SegmentMetadataQuery) inQ.getQuery())
.withFinalizedAnalysisTypes(toolChest.getConfig());
@ -201,7 +202,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
@Override
public Sequence<SegmentAnalysis> run(
final QueryPlus<SegmentAnalysis> queryPlus,
final Map<String, Object> responseContext
final ResponseContext responseContext
)
{
final Query<SegmentAnalysis> query = queryPlus.getQuery();

View File

@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.Segment;
@ -60,20 +61,20 @@ public class ScanQueryEngine
public Sequence<ScanResultValue> process(
final ScanQuery query,
final Segment segment,
final Map<String, Object> responseContext
final ResponseContext responseContext
)
{
// "legacy" should be non-null due to toolChest.mergeResults
final boolean legacy = Preconditions.checkNotNull(query.isLegacy(), "WTF?! Expected non-null legacy");
if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) != null) {
long count = (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
if (responseContext.get(ResponseContext.CTX_COUNT) != null) {
long count = (long) responseContext.get(ResponseContext.CTX_COUNT);
if (count >= query.getLimit() && query.getOrder().equals(ScanQuery.Order.NONE)) {
return Sequences.empty();
}
}
final boolean hasTimeout = QueryContexts.hasTimeout(query);
final long timeoutAt = (long) responseContext.get(ScanQueryRunnerFactory.CTX_TIMEOUT_AT);
final long timeoutAt = (long) responseContext.get(ResponseContext.CTX_TIMEOUT_AT);
final long start = System.currentTimeMillis();
final StorageAdapter adapter = segment.asStorageAdapter();
@ -120,8 +121,8 @@ public class ScanQueryEngine
final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter()));
if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) {
responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0L);
if (responseContext.get(ResponseContext.CTX_COUNT) == null) {
responseContext.put(ResponseContext.CTX_COUNT, 0L);
}
final long limit = calculateLimit(query, responseContext);
return Sequences.concat(
@ -187,12 +188,12 @@ public class ScanQueryEngine
throw new UOE("resultFormat[%s] is not supported", resultFormat.toString());
}
responseContext.put(
ScanQueryRunnerFactory.CTX_COUNT,
(long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset)
ResponseContext.CTX_COUNT,
(long) responseContext.get(ResponseContext.CTX_COUNT) + (offset - lastOffset)
);
if (hasTimeout) {
responseContext.put(
ScanQueryRunnerFactory.CTX_TIMEOUT_AT,
ResponseContext.CTX_TIMEOUT_AT,
timeoutAt - (System.currentTimeMillis() - start)
);
}
@ -262,10 +263,10 @@ public class ScanQueryEngine
* If we're performing time-ordering, we want to scan through the first `limit` rows in each segment ignoring the number
* of rows already counted on other segments.
*/
private long calculateLimit(ScanQuery query, Map<String, Object> responseContext)
private long calculateLimit(ScanQuery query, ResponseContext responseContext)
{
if (query.getOrder().equals(ScanQuery.Order.NONE)) {
return query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
return query.getLimit() - (long) responseContext.get(ResponseContext.CTX_COUNT);
}
return query.getLimit();
}

View File

@ -29,11 +29,11 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.context.ResponseContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* This iterator supports iteration through a Sequence returned by a ScanResultValue QueryRunner. Its behaviour
@ -60,7 +60,7 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
public ScanQueryLimitRowIterator(
QueryRunner<ScanResultValue> baseRunner,
QueryPlus<ScanResultValue> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
this.query = (ScanQuery) queryPlus.getQuery();

View File

@ -39,6 +39,7 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
@ -52,17 +53,12 @@ import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValue, ScanQuery>
{
// This variable indicates when a running query should be expired,
// and is effective only when 'timeout' of queryContext has a positive value.
public static final String CTX_TIMEOUT_AT = "timeoutAt";
public static final String CTX_COUNT = "count";
private final ScanQueryQueryToolChest toolChest;
private final ScanQueryEngine engine;
private final ScanQueryConfig scanQueryConfig;
@ -98,7 +94,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// Note: this variable is effective only when queryContext has a timeout.
// See the comment of CTX_TIMEOUT_AT.
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
responseContext.put(ResponseContext.CTX_TIMEOUT_AT, timeoutAt);
if (query.getOrder().equals(ScanQuery.Order.NONE)) {
// Use normal strategy
@ -311,7 +307,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
Sequence<ScanResultValue> nWayMergeAndLimit(
List<List<QueryRunner<ScanResultValue>>> groupedRunners,
QueryPlus<ScanResultValue> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
// Starting from the innermost Sequences.map:
@ -366,7 +362,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
}
@Override
public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, Map<String, Object> responseContext)
public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, ResponseContext responseContext)
{
Query<ScanResultValue> query = queryPlus.getQuery();
if (!(query instanceof ScanQuery)) {
@ -374,9 +370,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
}
// it happens in unit tests
final Number timeoutAt = (Number) responseContext.get(CTX_TIMEOUT_AT);
final Number timeoutAt = (Number) responseContext.get(ResponseContext.CTX_TIMEOUT_AT);
if (timeoutAt == null || timeoutAt.longValue() == 0L) {
responseContext.put(CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT);
responseContext.put(ResponseContext.CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT);
}
return engine.process((ScanQuery) query, segment, responseContext);
}

View File

@ -43,6 +43,7 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.ResultGranularTimestampComparator;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DimFilter;
@ -329,7 +330,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
@Override
public Sequence<Result<SearchResultValue>> run(
QueryPlus<Result<SearchResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
SearchQuery searchQuery = (SearchQuery) queryPlus.getQuery();
@ -363,7 +364,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
@Override
public Sequence<Result<SearchResultValue>> run(
QueryPlus<Result<SearchResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
Query<Result<SearchResultValue>> input = queryPlus.getQuery();

View File

@ -34,6 +34,7 @@ import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.Result;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.ColumnSelectorStrategy;
import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
@ -47,7 +48,6 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import java.util.List;
import java.util.Map;
/**
*/
@ -207,7 +207,7 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
@Override
public Sequence<Result<SearchResultValue>> run(
final QueryPlus<Result<SearchResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
Query<Result<SearchResultValue>> input = queryPlus.getQuery();

View File

@ -40,6 +40,7 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.ResultGranularTimestampComparator;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.timeline.LogicalSegment;
@ -322,7 +323,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
@Override
public Sequence<Result<SelectResultValue>> run(
QueryPlus<Result<SelectResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
SelectQuery selectQuery = (SelectQuery) queryPlus.getQuery();

View File

@ -30,9 +30,9 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.Segment;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
@ -91,7 +91,7 @@ public class SelectQueryRunnerFactory
@Override
public Sequence<Result<SelectResultValue>> run(
QueryPlus<Result<SelectResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
Query<Result<SelectResultValue>> input = queryPlus.getQuery();

View File

@ -31,14 +31,13 @@ import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.SegmentMissingException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
*/
@ -57,7 +56,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final QueryPlus<T> input, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> input, final ResponseContext responseContext)
{
final QueryPlus<T> queryPlus = input.withQuerySegmentSpec(specificSpec);
final Query<T> query = queryPlus.getQuery();
@ -151,12 +150,13 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
);
}
private void appendMissingSegment(Map<String, Object> responseContext)
private void appendMissingSegment(ResponseContext responseContext)
{
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY);
List<SegmentDescriptor> missingSegments =
(List<SegmentDescriptor>) responseContext.get(ResponseContext.CTX_MISSING_SEGMENTS);
if (missingSegments == null) {
missingSegments = new ArrayList<>();
responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments);
responseContext.put(ResponseContext.CTX_MISSING_SEGMENTS, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
}

View File

@ -39,11 +39,11 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.timeline.LogicalSegment;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
@ -101,7 +101,7 @@ public class TimeBoundaryQueryQueryToolChest
protected Sequence<Result<TimeBoundaryResultValue>> doRun(
QueryRunner<Result<TimeBoundaryResultValue>> baseRunner,
QueryPlus<Result<TimeBoundaryResultValue>> input,
Map<String, Object> context
ResponseContext context
)
{
TimeBoundaryQuery query = (TimeBoundaryQuery) input.getQuery();

View File

@ -35,6 +35,7 @@ import org.apache.druid.query.QueryRunnerHelper;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.Segment;
@ -46,7 +47,6 @@ import org.joda.time.DateTime;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
@ -131,7 +131,7 @@ public class TimeBoundaryQueryRunnerFactory
@Override
public Sequence<Result<TimeBoundaryResultValue>> run(
final QueryPlus<Result<TimeBoundaryResultValue>> queryPlus,
final Map<String, Object> responseContext
final ResponseContext responseContext
)
{
Query<Result<TimeBoundaryResultValue>> input = queryPlus.getQuery();

View File

@ -48,6 +48,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.RowBasedColumnSelectorFactory;
import org.joda.time.DateTime;
@ -108,7 +109,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
public Sequence<Result<TimeseriesResultValue>> doRun(
QueryRunner<Result<TimeseriesResultValue>> baseRunner,
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> context
ResponseContext context
)
{
int limit = ((TimeseriesQuery) queryPlus.getQuery()).getLimit();

View File

@ -30,10 +30,10 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
@ -92,7 +92,7 @@ public class TimeseriesQueryRunnerFactory
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
Query<Result<TimeseriesResultValue>> input = queryPlus.getQuery();

View File

@ -45,6 +45,7 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.DimensionHandlerUtils;
@ -435,7 +436,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override
public Sequence<Result<TopNResultValue>> run(
QueryPlus<Result<TopNResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
TopNQuery topNQuery = (TopNQuery) queryPlus.getQuery();
@ -475,7 +476,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override
public Sequence<Result<TopNResultValue>> run(
final QueryPlus<Result<TopNResultValue>> queryPlus, final Map<String, Object> responseContext
final QueryPlus<Result<TopNResultValue>> queryPlus, final ResponseContext responseContext
)
{
// thresholdRunner.run throws ISE if query is not TopNQuery
@ -543,7 +544,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override
public Sequence<Result<TopNResultValue>> run(
QueryPlus<Result<TopNResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
Query<Result<TopNResultValue>> input = queryPlus.getQuery();

View File

@ -31,10 +31,10 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.Segment;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
@ -66,7 +66,7 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
@Override
public Sequence<Result<TopNResultValue>> run(
QueryPlus<Result<TopNResultValue>> input,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
if (!(input.getQuery() instanceof TopNQuery)) {

View File

@ -24,12 +24,12 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -60,7 +60,7 @@ public class AsyncQueryRunnerTest
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(QueryPlus queryPlus, Map responseContext)
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
try {
latch.await();
@ -78,7 +78,7 @@ public class AsyncQueryRunnerTest
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
Sequence lazy = asyncRunner.run(QueryPlus.wrap(query), Collections.EMPTY_MAP);
Sequence lazy = asyncRunner.run(QueryPlus.wrap(query));
latch.countDown();
Assert.assertEquals(Collections.singletonList(1), lazy.toList());
}
@ -89,7 +89,7 @@ public class AsyncQueryRunnerTest
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(QueryPlus queryPlus, Map responseContext)
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
try {
Thread.sleep(Long.MAX_VALUE);
@ -107,10 +107,8 @@ public class AsyncQueryRunnerTest
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
Sequence lazy = asyncRunner.run(
QueryPlus.wrap(query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1))),
Collections.EMPTY_MAP
);
Sequence lazy =
asyncRunner.run(QueryPlus.wrap(query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1))));
try {
lazy.toList();
@ -133,7 +131,7 @@ public class AsyncQueryRunnerTest
AsyncQueryRunner asyncRunner = new AsyncQueryRunner<>(baseRunner, executor, mock);
asyncRunner.run(QueryPlus.wrap(query), Collections.EMPTY_MAP);
asyncRunner.run(QueryPlus.wrap(query));
EasyMock.verify(mock);
}
}

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.easymock.Capture;
import org.easymock.EasyMock;
@ -36,8 +37,6 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@ -119,13 +118,12 @@ public class ChainedExecutionQueryRunnerTest
runners
)
);
Map<String, Object> context = ImmutableMap.of();
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals("2014/2015")
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.build();
final Sequence seq = chainedRunner.run(QueryPlus.wrap(query), context);
final Sequence seq = chainedRunner.run(QueryPlus.wrap(query));
Future resultFuture = Executors.newFixedThreadPool(1).submit(
new Runnable()
@ -244,14 +242,13 @@ public class ChainedExecutionQueryRunnerTest
runners
)
);
HashMap<String, Object> context = new HashMap<String, Object>();
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals("2014/2015")
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 100, "queryId", "test"))
.build();
final Sequence seq = chainedRunner.run(QueryPlus.wrap(query), context);
final Sequence seq = chainedRunner.run(QueryPlus.wrap(query));
Future resultFuture = Executors.newFixedThreadPool(1).submit(
new Runnable()
@ -327,7 +324,7 @@ public class ChainedExecutionQueryRunnerTest
}
@Override
public Sequence<Integer> run(QueryPlus<Integer> queryPlus, Map<String, Object> responseContext)
public Sequence<Integer> run(QueryPlus<Integer> queryPlus, ResponseContext responseContext)
{
// do a lot of work
synchronized (this) {

View File

@ -70,7 +70,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -271,7 +270,7 @@ public class DoubleStorageTest
)
.merge(true)
.build();
List<SegmentAnalysis> results = runner.run(QueryPlus.wrap(segmentMetadataQuery), new HashMap<>()).toList();
List<SegmentAnalysis> results = runner.run(QueryPlus.wrap(segmentMetadataQuery)).toList();
Assert.assertEquals(Collections.singletonList(expectedSegmentAnalysis), results);
@ -292,8 +291,7 @@ public class DoubleStorageTest
.virtualColumns()
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query), context).toList();
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query)).toList();
ScanResultValue expectedScanResult = new ScanResultValue(
SEGMENT_ID.toString(),

View File

@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Druids.TimeseriesQueryBuilder;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
@ -62,12 +63,13 @@ public class IntervalChunkingQueryRunnerTest
public void testDefaultNoChunking()
{
QueryPlus queryPlus = QueryPlus.wrap(queryBuilder.intervals("2014/2016").build());
final ResponseContext context = ResponseContext.createEmpty();
EasyMock.expect(baseRunner.run(queryPlus, Collections.EMPTY_MAP)).andReturn(Sequences.empty());
EasyMock.expect(baseRunner.run(queryPlus, context)).andReturn(Sequences.empty());
EasyMock.replay(baseRunner);
QueryRunner runner = decorator.decorate(baseRunner, toolChest);
runner.run(queryPlus, Collections.EMPTY_MAP);
runner.run(queryPlus, context);
EasyMock.verify(baseRunner);
}
@ -84,7 +86,7 @@ public class IntervalChunkingQueryRunnerTest
EasyMock.replay(toolChest);
QueryRunner runner = decorator.decorate(baseRunner, toolChest);
runner.run(QueryPlus.wrap(query), Collections.EMPTY_MAP);
runner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
EasyMock.verify(executors);
}
@ -101,7 +103,7 @@ public class IntervalChunkingQueryRunnerTest
EasyMock.replay(toolChest);
QueryRunner runner = decorator.decorate(baseRunner, toolChest);
runner.run(QueryPlus.wrap(query), Collections.EMPTY_MAP);
runner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
EasyMock.verify(executors);
}

View File

@ -84,7 +84,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -1035,8 +1034,7 @@ public class MultiValuedDimensionTest
new QueryableIndexSegment(queryableIndex, SegmentId.dummy("sid1")),
null
);
Map<String, Object> context = new HashMap<>();
Sequence<Result<TopNResultValue>> result = runner.run(QueryPlus.wrap(query), context);
Sequence<Result<TopNResultValue>> result = runner.run(QueryPlus.wrap(query));
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<TopNResultValue>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
@ -1089,8 +1087,7 @@ public class MultiValuedDimensionTest
new QueryableIndexSegment(queryableIndex, SegmentId.dummy("sid1")),
null
);
Map<String, Object> context = new HashMap<>();
Sequence<Result<TopNResultValue>> result = runner.run(QueryPlus.wrap(query), context);
Sequence<Result<TopNResultValue>> result = runner.run(QueryPlus.wrap(query));
List<Map<String, Object>> expected =
ImmutableList.<Map<String, Object>>builder()
.add(ImmutableMap.of("texpr", "t3foo", "count", 2L))
@ -1150,8 +1147,7 @@ public class MultiValuedDimensionTest
new QueryableIndexSegment(queryableIndex, SegmentId.dummy("sid1")),
null
);
Map<String, Object> context = new HashMap<>();
Sequence<Result<TopNResultValue>> result = runner.run(QueryPlus.wrap(query), context);
Sequence<Result<TopNResultValue>> result = runner.run(QueryPlus.wrap(query));
List<Map<String, Object>> expected =
ImmutableList.<Map<String, Object>>builder()

View File

@ -47,6 +47,7 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFact
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
@ -430,7 +431,7 @@ public class QueryRunnerTestHelper
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return runner.run(queryPlus, responseContext);
}
@ -454,7 +455,7 @@ public class QueryRunnerTestHelper
new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
Query<T> query = queryPlus.getQuery();
List<TimelineObjectHolder> segments = new ArrayList<>();
@ -497,7 +498,7 @@ public class QueryRunnerTestHelper
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return delegate.run(queryPlus, responseContext);
}

View File

@ -25,6 +25,8 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
@ -38,9 +40,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class RetryQueryRunnerTest
{
@ -90,15 +89,15 @@ public class RetryQueryRunnerTest
@Test
public void testRunWithMissingSegments()
{
ConcurrentMap<String, Object> context = new ConcurrentHashMap<>();
context.put(Result.MISSING_SEGMENTS_KEY, new ArrayList<>());
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(QueryPlus queryPlus, Map context)
public Sequence<Result<TimeseriesResultValue>> run(QueryPlus queryPlus, ResponseContext context)
{
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add(
new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)
);
return Sequences.empty();
@ -125,7 +124,7 @@ public class RetryQueryRunnerTest
Assert.assertTrue(
"Should have one entry in the list of missing segments",
((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 1
((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).size() == 1
);
Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0);
}
@ -134,20 +133,20 @@ public class RetryQueryRunnerTest
@Test
public void testRetry()
{
ConcurrentMap<String, Object> context = new ConcurrentHashMap<>();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.put("count", 0);
context.put(Result.MISSING_SEGMENTS_KEY, new ArrayList<>());
context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> context
ResponseContext context
)
{
if ((int) context.get("count") == 0) {
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add(
new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)
);
context.put("count", 1);
@ -175,27 +174,27 @@ public class RetryQueryRunnerTest
Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1);
Assert.assertTrue(
"Should have nothing in missingSegment list",
((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 0
((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).size() == 0
);
}
@Test
public void testRetryMultiple()
{
ConcurrentMap<String, Object> context = new ConcurrentHashMap<>();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.put("count", 0);
context.put(Result.MISSING_SEGMENTS_KEY, new ArrayList<>());
context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> context
ResponseContext context
)
{
if ((int) context.get("count") < 3) {
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add(
new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)
);
context.put("count", (int) context.get("count") + 1);
@ -223,25 +222,25 @@ public class RetryQueryRunnerTest
Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1);
Assert.assertTrue(
"Should have nothing in missingSegment list",
((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 0
((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).size() == 0
);
}
@Test(expected = SegmentMissingException.class)
public void testException()
{
ConcurrentMap<String, Object> context = new ConcurrentHashMap<>();
context.put(Result.MISSING_SEGMENTS_KEY, new ArrayList<>());
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> context
ResponseContext context
)
{
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add(
new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)
);
return Sequences.empty();
@ -255,32 +254,32 @@ public class RetryQueryRunnerTest
Assert.assertTrue(
"Should have one entry in the list of missing segments",
((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 1
((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).size() == 1
);
}
@Test
public void testNoDuplicateRetry()
{
ConcurrentMap<String, Object> context = new ConcurrentHashMap<>();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.put("count", 0);
context.put(Result.MISSING_SEGMENTS_KEY, new ArrayList<>());
context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> context
ResponseContext context
)
{
final Query<Result<TimeseriesResultValue>> query = queryPlus.getQuery();
if ((int) context.get("count") == 0) {
// assume 2 missing segments at first run
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add(
new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)
);
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add(
new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 2)
);
context.put("count", 1);
@ -298,7 +297,7 @@ public class RetryQueryRunnerTest
// this is first retry
Assert.assertTrue("Should retry with 2 missing segments", ((MultipleSpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptors().size() == 2);
// assume only left 1 missing at first retry
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add(
new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 2)
);
context.put("count", 2);
@ -339,7 +338,7 @@ public class RetryQueryRunnerTest
Assert.assertTrue("Should return a list with 3 elements", ((List) actualResults).size() == 3);
Assert.assertTrue(
"Should have nothing in missingSegment list",
((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 0
((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).size() == 0
);
}
}

View File

@ -115,7 +115,7 @@ public class SchemaEvolutionTest
)
),
(QueryToolChest<T, Query<T>>) factory.getToolchest()
).run(QueryPlus.wrap(query), new HashMap<>());
).run(QueryPlus.wrap(query));
return results.toList();
}

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.timeboundary.TimeBoundaryResultValue;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.joda.time.DateTime;
@ -37,14 +38,10 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class TimewarpOperatorTest
{
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of();
TimewarpOperator<Result<TimeseriesResultValue>> testOperator = new TimewarpOperator<>(
new Interval(DateTimes.of("2014-01-01"), DateTimes.of("2014-01-15")),
new Period("P1W"),
@ -88,7 +85,7 @@ public class TimewarpOperatorTest
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
return Sequences.simple(
@ -134,7 +131,7 @@ public class TimewarpOperatorTest
new TimeseriesResultValue(ImmutableMap.of("metric", 5))
)
),
queryRunner.run(QueryPlus.wrap(query), CONTEXT).toList()
queryRunner.run(QueryPlus.wrap(query)).toList()
);
@ -150,7 +147,7 @@ public class TimewarpOperatorTest
@Override
public Sequence<Result<TimeBoundaryResultValue>> run(
QueryPlus<Result<TimeBoundaryResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
return Sequences.simple(
@ -183,7 +180,7 @@ public class TimewarpOperatorTest
new TimeBoundaryResultValue(ImmutableMap.<String, Object>of("maxTime", DateTimes.of("2014-08-02")))
)
),
timeBoundaryRunner.run(QueryPlus.wrap(timeBoundaryQuery), CONTEXT).toList()
timeBoundaryRunner.run(QueryPlus.wrap(timeBoundaryQuery)).toList()
);
}
@ -197,7 +194,7 @@ public class TimewarpOperatorTest
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
return Sequences.simple(
@ -244,7 +241,7 @@ public class TimewarpOperatorTest
new TimeseriesResultValue(ImmutableMap.of("metric", 5))
)
),
queryRunner.run(QueryPlus.wrap(query), CONTEXT).toList()
queryRunner.run(QueryPlus.wrap(query)).toList()
);
}
@ -257,7 +254,7 @@ public class TimewarpOperatorTest
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
return Sequences.simple(
@ -304,7 +301,7 @@ public class TimewarpOperatorTest
new TimeseriesResultValue(ImmutableMap.of("metric", 5))
)
),
queryRunner.run(QueryPlus.wrap(query), CONTEXT).toList()
queryRunner.run(QueryPlus.wrap(query)).toList()
);
}
@ -317,7 +314,7 @@ public class TimewarpOperatorTest
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> responseContext
ResponseContext responseContext
)
{
final Query<Result<TimeseriesResultValue>> query = queryPlus.getQuery();
@ -356,7 +353,7 @@ public class TimewarpOperatorTest
new TimeseriesResultValue(ImmutableMap.of("metric", 3))
)
),
queryRunner.run(QueryPlus.wrap(query), new HashMap<>()).toList()
queryRunner.run(QueryPlus.wrap(query)).toList()
);
}
}

View File

@ -23,12 +23,11 @@ import com.google.common.collect.Iterables;
import junit.framework.Assert;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.context.ResponseContext;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class UnionQueryRunnerTest
{
@ -38,7 +37,7 @@ public class UnionQueryRunnerTest
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(QueryPlus queryPlus, Map responseContext)
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
// verify that table datasource is passed to baseQueryRunner
Assert.assertTrue(queryPlus.getQuery().getDataSource() instanceof TableDataSource);
@ -68,7 +67,7 @@ public class UnionQueryRunnerTest
.intervals("2014-01-01T00:00:00Z/2015-01-01T00:00:00Z")
.aggregators(QueryRunnerTestHelper.commonDoubleAggregators)
.build();
Map<String, Object> responseContext = new HashMap<>();
ResponseContext responseContext = ResponseContext.createEmpty();
Sequence<?> result = runner.run(QueryPlus.wrap(q), responseContext);
List res = result.toList();
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), res);

View File

@ -55,6 +55,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
@ -93,10 +94,8 @@ import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* This class provides general utility to test any druid aggregation implementation given raw data,
@ -633,7 +632,7 @@ public class AggregationTestHelper implements Closeable
toolChest
);
return baseRunner.run(QueryPlus.wrap(query), new HashMap<>());
return baseRunner.run(QueryPlus.wrap(query));
}
public QueryRunner<Row> makeStringSerdeQueryRunner(
@ -645,10 +644,10 @@ public class AggregationTestHelper implements Closeable
return new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> map)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext map)
{
try {
Sequence<Row> resultSeq = baseRunner.run(queryPlus, new HashMap<>());
Sequence<Row> resultSeq = baseRunner.run(queryPlus);
final Yielder yielder = resultSeq.toYielder(
null,
new YieldingAccumulator()

View File

@ -37,6 +37,8 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.timeline.LogicalSegment;
@ -51,8 +53,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class DataSourceMetadataQueryTest
{
@ -138,8 +138,8 @@ public class DataSourceMetadataQueryTest
DataSourceMetadataQuery dataSourceMetadataQuery = Druids.newDataSourceMetadataQueryBuilder()
.dataSource("testing")
.build();
ConcurrentMap<String, Object> context = new ConcurrentHashMap<>();
context.put(Result.MISSING_SEGMENTS_KEY, new ArrayList<>());
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>());
Iterable<Result<DataSourceMetadataResultValue>> results =
runner.run(QueryPlus.wrap(dataSourceMetadataQuery), context).toList();
DataSourceMetadataResultValue val = results.iterator().next().getValue();

View File

@ -57,6 +57,7 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
@ -450,7 +451,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
return Sequences
.simple(
@ -485,7 +486,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
.setGranularity(Granularities.ALL)
.build();
Sequence<Row> queryResult = theRunner3.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<Row> queryResult = theRunner3.run(QueryPlus.wrap(query));
List<Row> results = queryResult.toList();
Row expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
@ -536,7 +537,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
return Sequences
.simple(
@ -579,7 +580,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
)
.build();
Sequence<Row> queryResult = theRunner3.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<Row> queryResult = theRunner3.run(QueryPlus.wrap(query));
List<Row> results = queryResult.toList();
Row expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
@ -689,7 +690,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return delegate.run(queryPlus, responseContext);
}

View File

@ -59,6 +59,7 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
@ -480,7 +481,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
return Sequences
.simple(
@ -549,7 +550,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
.setGranularity(Granularities.ALL)
.build();
Sequence<Row> queryResult = finalRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<Row> queryResult = finalRunner.run(QueryPlus.wrap(query));
List<Row> results = queryResult.toList();
Row expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
@ -613,7 +614,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
return Sequences
.simple(
@ -670,7 +671,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
.setGranularity(Granularities.ALL)
.build();
Sequence<Row> queryResult = finalRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<Row> queryResult = finalRunner.run(QueryPlus.wrap(query));
List<Row> results = queryResult.toList();
Row expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
@ -790,7 +791,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return delegate.run(queryPlus, responseContext);
}

View File

@ -54,6 +54,7 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.having.GreaterThanHavingSpec;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
@ -342,7 +343,7 @@ public class GroupByMultiSegmentTest
.setGranularity(Granularities.ALL)
.build();
Sequence<Row> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<Row> queryResult = theRunner.run(QueryPlus.wrap(query));
List<Row> results = queryResult.toList();
Row expectedRow = GroupByQueryRunnerTestHelper.createExpectedRow(
@ -427,7 +428,7 @@ public class GroupByMultiSegmentTest
{
return new QueryRunner<T>() {
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return delegate.run(queryPlus, responseContext);
}

View File

@ -35,6 +35,7 @@ import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.segment.CloserRule;
@ -50,9 +51,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*/
@ -97,13 +96,13 @@ public class GroupByQueryRunnerFactoryTest
new QueryRunner()
{
@Override
public Sequence run(QueryPlus queryPlus, Map responseContext)
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
return factory.getToolchest().mergeResults(
new QueryRunner()
{
@Override
public Sequence run(QueryPlus queryPlus, Map responseContext)
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
final Query query = queryPlus.getQuery();
try {
@ -127,7 +126,7 @@ public class GroupByQueryRunnerFactoryTest
}
);
Sequence<Row> result = mergedRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<Row> result = mergedRunner.run(QueryPlus.wrap(query));
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t1", "count", 2L),

View File

@ -81,6 +81,7 @@ import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.dimension.ListFilteredDimensionSpec;
@ -2922,7 +2923,7 @@ public class GroupByQueryRunnerTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
// simulate two daily segments
final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec(
@ -2953,7 +2954,7 @@ public class GroupByQueryRunnerTest
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L)
);
Map<String, Object> context = new HashMap<>();
ResponseContext context = ResponseContext.createEmpty();
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(QueryPlus.wrap(fullQuery), context), "merged");
List<Row> allGranExpectedResults = Arrays.asList(
@ -3009,10 +3010,9 @@ public class GroupByQueryRunnerTest
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
Map<String, Object> context = new HashMap<>();
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, limit),
mergeRunner.run(QueryPlus.wrap(fullQuery), context),
mergeRunner.run(QueryPlus.wrap(fullQuery)),
StringUtils.format("limit: %d", limit)
);
}
@ -3052,10 +3052,9 @@ public class GroupByQueryRunnerTest
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
Map<String, Object> context = new HashMap<>();
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, limit),
mergeRunner.run(QueryPlus.wrap(fullQuery), context),
mergeRunner.run(QueryPlus.wrap(fullQuery)),
StringUtils.format("limit: %d", limit)
);
}
@ -3106,10 +3105,9 @@ public class GroupByQueryRunnerTest
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
Map<String, Object> context = new HashMap<>();
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, limit),
mergeRunner.run(QueryPlus.wrap(fullQuery), context),
mergeRunner.run(QueryPlus.wrap(fullQuery)),
StringUtils.format("limit: %d", limit)
);
}
@ -3204,7 +3202,7 @@ public class GroupByQueryRunnerTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
// simulate two daily segments
final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec(
@ -3223,8 +3221,7 @@ public class GroupByQueryRunnerTest
}
);
Map<String, Object> context = new HashMap<>();
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(QueryPlus.wrap(fullQuery), context), "merged");
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(QueryPlus.wrap(fullQuery)), "merged");
}
@Test
@ -3256,7 +3253,7 @@ public class GroupByQueryRunnerTest
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L)
);
Map<String, Object> context = new HashMap<>();
ResponseContext context = ResponseContext.createEmpty();
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(QueryPlus.wrap(query), context), "no-limit");
@ -3348,7 +3345,7 @@ public class GroupByQueryRunnerTest
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L)
);
Map<String, Object> context = new HashMap<>();
ResponseContext context = ResponseContext.createEmpty();
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(QueryPlus.wrap(query), context), "no-limit");
TestHelper.assertExpectedObjects(
@ -3385,7 +3382,7 @@ public class GroupByQueryRunnerTest
new Object[]{"2011-04-01", "technology", 2L, 178.24917602539062D}
);
Map<String, Object> context = new HashMap<>();
ResponseContext context = ResponseContext.createEmpty();
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(QueryPlus.wrap(query), context), "no-limit");
TestHelper.assertExpectedObjects(
@ -3429,7 +3426,7 @@ public class GroupByQueryRunnerTest
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L)
);
Map<String, Object> context = new HashMap<>();
ResponseContext context = ResponseContext.createEmpty();
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(QueryPlus.wrap(query), context), "no-limit");
TestHelper.assertExpectedObjects(
@ -3950,7 +3947,7 @@ public class GroupByQueryRunnerTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
// simulate two daily segments
final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec(
@ -3969,8 +3966,7 @@ public class GroupByQueryRunnerTest
}
);
Map<String, Object> context = new HashMap<>();
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(QueryPlus.wrap(fullQuery), context), "merged");
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(QueryPlus.wrap(fullQuery)), "merged");
}
@Test
@ -4251,7 +4247,7 @@ public class GroupByQueryRunnerTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
// simulate two daily segments
final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec(
@ -4270,8 +4266,7 @@ public class GroupByQueryRunnerTest
}
);
Map<String, Object> context = new HashMap<>();
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(QueryPlus.wrap(fullQuery), context), "merged");
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(QueryPlus.wrap(fullQuery)), "merged");
}
@Test
@ -4352,7 +4347,7 @@ public class GroupByQueryRunnerTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
// simulate two daily segments
final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec(
@ -4371,7 +4366,7 @@ public class GroupByQueryRunnerTest
}
);
Map<String, Object> context = new HashMap<>();
ResponseContext context = ResponseContext.createEmpty();
// add an extra layer of merging, simulate broker forwarding query to historical
TestHelper.assertExpectedObjects(
expectedResults,
@ -4523,8 +4518,7 @@ public class GroupByQueryRunnerTest
);
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
Map<String, Object> context = new HashMap<>();
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(QueryPlus.wrap(query), context), "no-limit");
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(QueryPlus.wrap(query)), "no-limit");
}
@Test
@ -4575,9 +4569,8 @@ public class GroupByQueryRunnerTest
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "billy", null, "quality", "travel", "rows", 2L)
);
Map<String, Object> context = new HashMap<>();
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(QueryPlus.wrap(query), context), "no-limit");
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(QueryPlus.wrap(query)), "no-limit");
}
// A subquery identical to the query should yield identical results
@ -7960,7 +7953,7 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(
bySegmentResults,
theRunner.run(QueryPlus.wrap(fullQuery), new HashMap<>()),
theRunner.run(QueryPlus.wrap(fullQuery)),
"bySegment"
);
exec.shutdownNow();
@ -8029,7 +8022,7 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(
bySegmentResults,
theRunner.run(QueryPlus.wrap(fullQuery), new HashMap<>()),
theRunner.run(QueryPlus.wrap(fullQuery)),
"bySegment"
);
exec.shutdownNow();
@ -8094,7 +8087,7 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(
bySegmentResults,
theRunner.run(QueryPlus.wrap(fullQuery), new HashMap<>()),
theRunner.run(QueryPlus.wrap(fullQuery)),
"bySegment-dim-extraction"
);
exec.shutdownNow();
@ -8601,7 +8594,7 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(
bySegmentResults,
theRunner.run(QueryPlus.wrap(fullQuery), new HashMap<>()),
theRunner.run(QueryPlus.wrap(fullQuery)),
"bySegment-filter"
);
exec.shutdownNow();
@ -10017,7 +10010,7 @@ public class GroupByQueryRunnerTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
// simulate two daily segments
final QueryPlus<Row> queryPlus1 = queryPlus.withQuerySegmentSpec(
@ -10041,7 +10034,6 @@ public class GroupByQueryRunnerTest
}
}
);
Map<String, Object> context = new HashMap<>();
List<Row> allGranExpectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 2L, "idx", 177L),
@ -10052,7 +10044,7 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(
allGranExpectedResults,
mergedRunner.run(QueryPlus.wrap(allGranQuery), context),
mergedRunner.run(QueryPlus.wrap(allGranQuery)),
"merged"
);
}
@ -10083,7 +10075,7 @@ public class GroupByQueryRunnerTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
// simulate two daily segments
final QueryPlus<Row> queryPlus1 = queryPlus.withQuerySegmentSpec(
@ -10107,7 +10099,6 @@ public class GroupByQueryRunnerTest
}
}
);
Map<String, Object> context = new HashMap<>();
List<Row> allGranExpectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
@ -10117,7 +10108,7 @@ public class GroupByQueryRunnerTest
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L)
);
Iterable<Row> results = mergedRunner.run(QueryPlus.wrap(allGranQuery), context).toList();
Iterable<Row> results = mergedRunner.run(QueryPlus.wrap(allGranQuery)).toList();
TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged");
}
@ -10152,7 +10143,7 @@ public class GroupByQueryRunnerTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
// simulate two daily segments
final QueryPlus<Row> queryPlus1 = queryPlus.withQuerySegmentSpec(
@ -10176,7 +10167,6 @@ public class GroupByQueryRunnerTest
}
}
);
Map<String, Object> context = new HashMap<>();
List<Row> allGranExpectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
@ -10236,7 +10226,7 @@ public class GroupByQueryRunnerTest
)
);
Iterable<Row> results = mergedRunner.run(QueryPlus.wrap(allGranQuery), context).toList();
Iterable<Row> results = mergedRunner.run(QueryPlus.wrap(allGranQuery)).toList();
TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged");
}
@ -10284,7 +10274,7 @@ public class GroupByQueryRunnerTest
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, ResponseContext responseContext)
{
// simulate two daily segments
final QueryPlus<Row> queryPlus1 = queryPlus.withQuerySegmentSpec(
@ -10308,7 +10298,6 @@ public class GroupByQueryRunnerTest
}
}
);
Map<String, Object> context = new HashMap<>();
List<Row> allGranExpectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
@ -10368,7 +10357,7 @@ public class GroupByQueryRunnerTest
)
);
Iterable<Row> results = mergedRunner.run(QueryPlus.wrap(allGranQuery), context).toList();
Iterable<Row> results = mergedRunner.run(QueryPlus.wrap(allGranQuery)).toList();
TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged");
}

View File

@ -54,7 +54,7 @@ public class GroupByQueryRunnerTestHelper
toolChest
);
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query));
return queryResult.toList();
}

View File

@ -40,6 +40,7 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerTest;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
@ -53,7 +54,6 @@ import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
*
@ -87,7 +87,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
final QueryRunner modifiedRunner = new QueryRunner()
{
@Override
public Sequence run(QueryPlus queryPlus, Map responseContext)
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
TimeseriesQuery tsQuery = (TimeseriesQuery) queryPlus.getQuery();
QueryRunner<Row> newRunner = factory.mergeRunners(
@ -171,7 +171,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
DateTime expectedEarliest = DateTimes.of("1970-01-01");
DateTime expectedLast = DateTimes.of("2011-04-15");
Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query), CONTEXT).toList();
Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
Result<TimeseriesResultValue> result = results.iterator().next();
Assert.assertEquals(expectedEarliest, result.getTimestamp());

View File

@ -61,6 +61,7 @@ import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.extraction.RegexDimExtractionFn;
@ -417,7 +418,7 @@ public class NestedQueryPushDownTest
.setGranularity(Granularities.ALL)
.build();
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, new HashMap<>());
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, ResponseContext.createEmpty());
List<Row> results = queryResult.toList();
Row expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
@ -470,7 +471,7 @@ public class NestedQueryPushDownTest
.setGranularity(Granularities.ALL)
.build();
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, new HashMap<>());
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, ResponseContext.createEmpty());
List<Row> results = queryResult.toList();
Row expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
@ -532,7 +533,7 @@ public class NestedQueryPushDownTest
.setQuerySegmentSpec(intervalSpec)
.build();
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, new HashMap<>());
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, ResponseContext.createEmpty());
List<Row> results = queryResult.toList();
Assert.assertEquals(0, results.size());
@ -581,7 +582,7 @@ public class NestedQueryPushDownTest
"finalSum", 4000L,
"newDimA", "mango"
);
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, new HashMap<>());
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, ResponseContext.createEmpty());
List<Row> results = queryResult.toList();
Assert.assertEquals(1, results.size());
@ -631,7 +632,7 @@ public class NestedQueryPushDownTest
"finalSum", 4000L,
"newDimA", "mango"
);
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, new HashMap<>());
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, ResponseContext.createEmpty());
List<Row> results = queryResult.toList();
Assert.assertEquals(1, results.size());
@ -682,7 +683,7 @@ public class NestedQueryPushDownTest
"finalSum", 4000L,
"extractedDimA", "replacement"
);
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, new HashMap<>());
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, ResponseContext.createEmpty());
List<Row> results = queryResult.toList();
Assert.assertEquals(2, results.size());
@ -728,14 +729,14 @@ public class NestedQueryPushDownTest
"dimB", "sweet",
"finalSum", 90L
);
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, new HashMap<>());
Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, ResponseContext.createEmpty());
List<Row> results = queryResult.toList();
Assert.assertEquals(1, results.size());
Assert.assertEquals(expectedRow0, results.get(0));
}
private Sequence<Row> runNestedQueryWithForcePushDown(GroupByQuery nestedQuery, Map<String, Object> context)
private Sequence<Row> runNestedQueryWithForcePushDown(GroupByQuery nestedQuery, ResponseContext context)
{
QueryToolChest<Row, GroupByQuery> toolChest = groupByFactory.getToolchest();
GroupByQuery pushDownQuery = nestedQuery;

Some files were not shown because too many files have changed in this diff Show More