time-descending result of timeseries queries

This commit is contained in:
navis.ryu 2015-12-14 10:29:05 +09:00
parent d7ad93debc
commit 18479bb757
79 changed files with 1068 additions and 783 deletions

View File

@ -13,6 +13,7 @@ An example timeseries query object is shown below:
"queryType": "timeseries",
"dataSource": "sample_datasource",
"granularity": "day",
"descending": "true",
"filter": {
"type": "and",
"fields": [
@ -49,6 +50,7 @@ There are 7 main parts to a timeseries query:
|--------|-----------|---------|
|queryType|This String should always be "timeseries"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes|
|descending|Whether to make descending ordered result.|no|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|granularity|Defines the granularity to bucket query results. See [Granularities](../querying/granularities.html)|yes|
|filter|See [Filters](../querying/filters.html)|no|

View File

@ -49,7 +49,7 @@ public class AsyncQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final int priority = query.getContextPriority(0);
final int priority = BaseQuery.getContextPriority(query, 0);
final ListenableFuture<Sequence<T>> future = executor.submit(new AbstractPrioritizedCallable<Sequence<T>>(priority)
{
@Override

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
import io.druid.query.spec.QuerySegmentSpec;
@ -34,10 +35,66 @@ import java.util.Map;
/**
*/
public abstract class BaseQuery<T> implements Query<T>
public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
{
public static <T> int getContextPriority(Query<T> query, int defaultValue)
{
return parseInt(query, "priority", defaultValue);
}
public static <T> boolean getContextBySegment(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "bySegment", defaultValue);
}
public static <T> boolean getContextPopulateCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "populateCache", defaultValue);
}
public static <T> boolean getContextUseCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "useCache", defaultValue);
}
public static <T> boolean getContextFinalize(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "finalize", defaultValue);
}
private static <T> int parseInt(Query<T> query, String key, int defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Integer.parseInt((String) val);
} else if (val instanceof Integer) {
return (int) val;
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
private static <T> boolean parseBoolean(Query<T> query, String key, boolean defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Boolean) {
return (boolean) val;
} else {
throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass());
}
}
public static final String QUERYID = "queryId";
private final DataSource dataSource;
private final boolean descending;
private final Map<String, Object> context;
private final QuerySegmentSpec querySegmentSpec;
private volatile Duration duration;
@ -45,6 +102,7 @@ public abstract class BaseQuery<T> implements Query<T>
public BaseQuery(
DataSource dataSource,
QuerySegmentSpec querySegmentSpec,
boolean descending,
Map<String, Object> context
)
{
@ -54,6 +112,7 @@ public abstract class BaseQuery<T> implements Query<T>
this.dataSource = dataSource;
this.context = context;
this.querySegmentSpec = querySegmentSpec;
this.descending = descending;
}
@JsonProperty
@ -63,6 +122,13 @@ public abstract class BaseQuery<T> implements Query<T>
return dataSource;
}
@JsonProperty
@Override
public boolean isDescending()
{
return descending;
}
@JsonProperty("intervals")
public QuerySegmentSpec getQuerySegmentSpec()
{
@ -122,67 +188,6 @@ public abstract class BaseQuery<T> implements Query<T>
return retVal == null ? defaultValue : retVal;
}
@Override
public int getContextPriority(int defaultValue)
{
if (context == null) {
return defaultValue;
}
Object val = context.get("priority");
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Integer.parseInt((String) val);
} else if (val instanceof Integer) {
return (int) val;
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
@Override
public boolean getContextBySegment(boolean defaultValue)
{
return parseBoolean("bySegment", defaultValue);
}
@Override
public boolean getContextPopulateCache(boolean defaultValue)
{
return parseBoolean("populateCache", defaultValue);
}
@Override
public boolean getContextUseCache(boolean defaultValue)
{
return parseBoolean("useCache", defaultValue);
}
@Override
public boolean getContextFinalize(boolean defaultValue)
{
return parseBoolean("finalize", defaultValue);
}
private boolean parseBoolean(String key, boolean defaultValue)
{
if (context == null) {
return defaultValue;
}
Object val = context.get(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Boolean) {
return (boolean) val;
} else {
throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass());
}
}
protected Map<String, Object> computeOverridenContext(Map<String, Object> overrides)
{
Map<String, Object> overridden = Maps.newTreeMap();
@ -195,6 +200,13 @@ public abstract class BaseQuery<T> implements Query<T>
return overridden;
}
@Override
public Ordering<T> getResultOrdering()
{
Ordering<T> retVal = Ordering.natural();
return descending ? retVal.reverse() : retVal;
}
@Override
public String getId()
{
@ -219,6 +231,9 @@ public abstract class BaseQuery<T> implements Query<T>
BaseQuery baseQuery = (BaseQuery) o;
if (descending != baseQuery.descending) {
return false;
}
if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) {
return false;
}
@ -241,6 +256,7 @@ public abstract class BaseQuery<T> implements Query<T>
public int hashCode()
{
int result = dataSource != null ? dataSource.hashCode() : 0;
result = 31 * result + (descending ? 1 : 0);
result = 31 * result + (context != null ? context.hashCode() : 0);
result = 31 * result + (querySegmentSpec != null ? querySegmentSpec.hashCode() : 0);
result = 31 * result + (duration != null ? duration.hashCode() : 0);

View File

@ -51,7 +51,7 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
if (query.getContextBySegment(false)) {
if (BaseQuery.getContextBySegment(query, false)) {
final Sequence<T> baseSequence = base.run(query, responseContext);
final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
return Sequences.simple(

View File

@ -39,7 +39,7 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
if (query.getContextBySegment(false)) {
if (BaseQuery.getContextBySegment(query, false)) {
return baseRunner.run(query, responseContext);
}

View File

@ -21,20 +21,39 @@ package io.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
/**
*/
public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
{
public byte[] computeCacheKey(QueryType query);
/**
* Computes the cache key for the given query
*
* @param query the query to compute a cache key for
* @return the cache key
*/
byte[] computeCacheKey(QueryType query);
public TypeReference<CacheType> getCacheObjectClazz();
/**
* Returns the class type of what is used in the cache
*
* @return Returns the class type of what is used in the cache
*/
TypeReference<CacheType> getCacheObjectClazz();
// Resultant function must be THREAD SAFE
public Function<T, CacheType> prepareForCache();
/**
* Returns a function that converts from the QueryType's result type to something cacheable.
*
* The resulting function must be thread-safe.
*
* @return a thread-safe function that converts the QueryType's result type into something cacheable
*/
Function<T, CacheType> prepareForCache();
public Function<CacheType, T> pullFromCache();
public Sequence<T> mergeSequences(Sequence<Sequence<T>> seqOfSequences);
/**
* A function that does the inverse of the operation that the function prepareForCache returns
*
* @return A function that does the inverse of the operation that the function prepareForCache returns
*/
Function<CacheType, T> pullFromCache();
}

View File

@ -64,22 +64,19 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
private final Iterable<QueryRunner<T>> queryables;
private final ListeningExecutorService exec;
private final Ordering<T> ordering;
private final QueryWatcher queryWatcher;
public ChainedExecutionQueryRunner(
ExecutorService exec,
Ordering<T> ordering,
QueryWatcher queryWatcher,
QueryRunner<T>... queryables
)
{
this(exec, ordering, queryWatcher, Arrays.asList(queryables));
this(exec, queryWatcher, Arrays.asList(queryables));
}
public ChainedExecutionQueryRunner(
ExecutorService exec,
Ordering<T> ordering,
QueryWatcher queryWatcher,
Iterable<QueryRunner<T>> queryables
)
@ -87,7 +84,6 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
// listeningDecorator will leave PrioritizedExecutorService unchanged,
// since it already implements ListeningExecutorService
this.exec = MoreExecutors.listeningDecorator(exec);
this.ordering = ordering;
this.queryables = Iterables.unmodifiableIterable(queryables);
this.queryWatcher = queryWatcher;
}
@ -95,7 +91,8 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final int priority = query.getContextPriority(0);
final int priority = BaseQuery.getContextPriority(query, 0);
final Ordering ordering = query.getResultOrdering();
return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>()

View File

@ -332,6 +332,8 @@ public class Druids
private List<PostAggregator> postAggregatorSpecs;
private Map<String, Object> context;
private boolean descending;
private TimeseriesQueryBuilder()
{
dataSource = null;
@ -348,6 +350,7 @@ public class Druids
return new TimeseriesQuery(
dataSource,
querySegmentSpec,
descending,
dimFilter,
granularity,
aggregatorSpecs,
@ -362,6 +365,7 @@ public class Druids
.dataSource(query.getDataSource())
.intervals(query.getIntervals())
.filters(query.getDimensionsFilter())
.descending(query.isDescending())
.granularity(query.getGranularity())
.aggregators(query.getAggregatorSpecs())
.postAggregators(query.getPostAggregatorSpecs())
@ -374,6 +378,7 @@ public class Druids
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.filters(builder.dimFilter)
.descending(builder.descending)
.granularity(builder.granularity)
.aggregators(builder.aggregatorSpecs)
.postAggregators(builder.postAggregatorSpecs)
@ -395,6 +400,11 @@ public class Druids
return dimFilter;
}
public boolean isDescending()
{
return descending;
}
public QueryGranularity getGranularity()
{
return granularity;
@ -467,6 +477,12 @@ public class Druids
return this;
}
public TimeseriesQueryBuilder descending(boolean d)
{
descending = d;
return this;
}
public TimeseriesQueryBuilder granularity(String g)
{
granularity = QueryGranularity.fromString(g);

View File

@ -49,8 +49,8 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
final boolean isBySegment = query.getContextBySegment(false);
final boolean shouldFinalize = query.getContextFinalize(true);
final boolean isBySegment = BaseQuery.getContextBySegment(query, false);
final boolean shouldFinalize = BaseQuery.getContextFinalize(query, true);
final Query<T> queryToRun;
final Function<T, T> finalizerFn;

View File

@ -88,8 +88,8 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
bufferPool
);
final Pair<Queue, Accumulator<Queue, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final boolean bySegment = query.getContextBySegment(false);
final int priority = query.getContextPriority(0);
final boolean bySegment = BaseQuery.getContextBySegment(query, false);
final int priority = BaseQuery.getContextPriority(query, 0);
ListenableFuture<List<Void>> futures = Futures.allAsList(
Lists.newArrayList(
@ -175,7 +175,7 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
return new ResourceClosingSequence<T>(
Sequences.simple(
Iterables.transform(
indexAccumulatorPair.lhs.iterableWithPostAggregations(null),
indexAccumulatorPair.lhs.iterableWithPostAggregations(null, query.isDescending()),
new Function<Row, T>()
{
@Override

View File

@ -21,6 +21,7 @@ package io.druid.query;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.Sequence;
import io.druid.query.datasourcemetadata.DataSourceMetadataQuery;
import io.druid.query.groupby.GroupByQuery;
@ -51,49 +52,46 @@ import java.util.Map;
})
public interface Query<T>
{
public static final String TIMESERIES = "timeseries";
public static final String SEARCH = "search";
public static final String TIME_BOUNDARY = "timeBoundary";
public static final String GROUP_BY = "groupBy";
public static final String SEGMENT_METADATA = "segmentMetadata";
public static final String SELECT = "select";
public static final String TOPN = "topN";
public static final String DATASOURCE_METADATA = "dataSourceMetadata";
String TIMESERIES = "timeseries";
String SEARCH = "search";
String TIME_BOUNDARY = "timeBoundary";
String GROUP_BY = "groupBy";
String SEGMENT_METADATA = "segmentMetadata";
String SELECT = "select";
String TOPN = "topN";
String DATASOURCE_METADATA = "dataSourceMetadata";
public DataSource getDataSource();
DataSource getDataSource();
public boolean hasFilters();
boolean hasFilters();
public String getType();
String getType();
public Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context);
Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context);
public Sequence<T> run(QueryRunner<T> runner, Map<String, Object> context);
Sequence<T> run(QueryRunner<T> runner, Map<String, Object> context);
public List<Interval> getIntervals();
List<Interval> getIntervals();
public Duration getDuration();
Duration getDuration();
public Map<String, Object> getContext();
Map<String, Object> getContext();
public <ContextType> ContextType getContextValue(String key);
<ContextType> ContextType getContextValue(String key);
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue);
<ContextType> ContextType getContextValue(String key, ContextType defaultValue);
// For backwards compatibility
@Deprecated public int getContextPriority(int defaultValue);
@Deprecated public boolean getContextBySegment(boolean defaultValue);
@Deprecated public boolean getContextPopulateCache(boolean defaultValue);
@Deprecated public boolean getContextUseCache(boolean defaultValue);
@Deprecated public boolean getContextFinalize(boolean defaultValue);
boolean isDescending();
public Query<T> withOverriddenContext(Map<String, Object> contextOverride);
Ordering<T> getResultOrdering();
public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);
Query<T> withOverriddenContext(Map<String, Object> contextOverride);
public Query<T> withId(String id);
Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);
public String getId();
Query<T> withId(String id);
String getId();
Query<T> withDataSource(DataSource dataSource);
}

View File

@ -27,5 +27,11 @@ import java.util.Map;
*/
public interface QueryRunner<T>
{
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext);
/**
* Runs the given query and returns results in a time-ordered sequence
* @param query
* @param responseContext
* @return
*/
Sequence<T> run(Query<T> query, Map<String, Object> responseContext);
}

View File

@ -56,6 +56,7 @@ public class QueryRunnerHelper
final StorageAdapter adapter,
List<Interval> queryIntervals,
Filter filter,
boolean descending,
QueryGranularity granularity,
final Function<Cursor, Result<T>> mapFn
)
@ -66,7 +67,7 @@ public class QueryRunnerHelper
return Sequences.filter(
Sequences.map(
adapter.makeCursors(filter, queryIntervals.get(0), granularity),
adapter.makeCursors(filter, queryIntervals.get(0), granularity, descending),
new Function<Cursor, Result<T>>()
{
@Override

View File

@ -21,7 +21,6 @@ package io.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.timeline.LogicalSegment;
@ -37,54 +36,19 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
{
/**
* This method wraps a QueryRunner. The input QueryRunner, by contract, will provide a series of
* ResultType objects in time order (ascending). This method should return a new QueryRunner that
* ResultType objects in time order (ascending or descending). This method should return a new QueryRunner that
* potentially merges the stream of ordered ResultType objects.
*
* @param runner A QueryRunner that provides a series of ResultType objects in time order (ascending)
* @param runner A QueryRunner that provides a series of ResultType objects in time order (ascending or descending)
*
* @return a QueryRunner that potentialy merges the stream of ordered ResultType objects
* @return a QueryRunner that potentially merges the stream of ordered ResultType objects
*/
public abstract QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner);
/**
* This method doesn't belong here, but it's here for now just to make it work. The method needs to
* take a Sequence of Sequences and return a single Sequence of ResultType objects in time-order (ascending)
* <p>
* This method assumes that its input sequences provide values already in sorted order.
* Even more specifically, it assumes that the individual sequences are also ordered by their first element.
* <p>
* In the vast majority of cases, this should just be implemented with:
* <p>
* return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
*
* @param seqOfSequences sequence of sequences to be merged
*
* @return the sequence of merged results
*/
public abstract Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences);
/**
* This method doesn't belong here, but it's here for now just to make it work. The method needs to
* take a Sequence of Sequences and return a single Sequence of ResultType objects in time-order (ascending)
* <p>
* This method assumes that its input sequences provide values already in sorted order, but, unlike
* mergeSequences, it does *not* assume that the individual sequences are also ordered by their first element.
* <p>
* In the vast majority if ocases, this hsould just be implemented with:
* <p>
* return new MergeSequence<>(getOrdering(), seqOfSequences);
*
* @param seqOfSequences sequence of sequences to be merged
*
* @return the sequence of merged results
*/
public abstract Sequence<ResultType> mergeSequencesUnordered(Sequence<Sequence<ResultType>> seqOfSequences);
/**
* Creates a builder that is used to generate a metric for this specific query type. This exists
* to allow for query-specific dimensions on metrics. That is, the ToolChest is expected to set some
* meaningful dimensions for metrics given this query type. Examples might be the topN threshhold for
* meaningful dimensions for metrics given this query type. Examples might be the topN threshold for
* a TopN query or the number of dimensions included for a groupBy query.
*
* @param query The query that is being processed

View File

@ -19,6 +19,7 @@
package io.druid.query;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import io.druid.granularity.QueryGranularity;
@ -43,4 +44,9 @@ public class ResultGranularTimestampComparator<T> implements Comparator<Result<T
gran.truncate(r2.getTimestamp().getMillis())
);
}
public static <T> Ordering<Result<T>> create(QueryGranularity granularity, boolean descending) {
Comparator<Result<T>> comparator = new ResultGranularTimestampComparator<>(granularity);
return descending ? Ordering.from(comparator).reverse() : Ordering.from(comparator);
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
@ -93,7 +94,12 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
if (!config.isReturnPartialResults() && !finalMissingSegs.isEmpty()) {
throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs);
}
return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator);
return new MergeSequence<>(
query.getResultOrdering(),
Sequences.simple(listOfSequences)).toYielder(
initValue, accumulator
);
}
else {
return Iterables.getOnlyElement(listOfSequences).toYielder(initValue, accumulator);

View File

@ -21,6 +21,7 @@ package io.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
@ -29,15 +30,12 @@ import java.util.Map;
public class UnionQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
private final QueryToolChest<T, Query<T>> toolChest;
public UnionQueryRunner(
QueryRunner<T> baseRunner,
QueryToolChest<T, Query<T>> toolChest
QueryRunner<T> baseRunner
)
{
this.baseRunner = baseRunner;
this.toolChest = toolChest;
}
@Override
@ -45,7 +43,9 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
return toolChest.mergeSequencesUnordered(
return new MergeSequence<>(
query.getResultOrdering(),
Sequences.simple(
Lists.transform(
((UnionDataSource) dataSource).getDataSources(),

View File

@ -55,6 +55,7 @@ public class DataSourceMetadataQuery extends BaseQuery<Result<DataSourceMetadata
dataSource,
(querySegmentSpec == null) ? new MultipleIntervalSegmentSpec(Arrays.asList(MY_Y2K_INTERVAL))
: querySegmentSpec,
false,
context
);
}

View File

@ -63,7 +63,7 @@ public class DataSourceMetadataQueryRunnerFactory
)
{
return new ChainedExecutionQueryRunner<>(
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
queryExecutor, queryWatcher, queryRunners
);
}

View File

@ -25,12 +25,9 @@ import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
@ -104,18 +101,6 @@ public class DataSourceQueryQueryToolChest
};
}
@Override
public Sequence<Result<DataSourceMetadataResultValue>> mergeSequences(Sequence<Sequence<Result<DataSourceMetadataResultValue>>> seqOfSequences)
{
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public Sequence<Result<DataSourceMetadataResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<DataSourceMetadataResultValue>>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(DataSourceMetadataQuery query)
{
@ -143,9 +128,4 @@ public class DataSourceQueryQueryToolChest
{
return null;
}
public Ordering<Result<DataSourceMetadataResultValue>> getOrdering()
{
return Ordering.natural();
}
}

View File

@ -88,7 +88,7 @@ public class GroupByQuery extends BaseQuery<Row>
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
super(dataSource, querySegmentSpec, false, context);
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions == null ? ImmutableList.<DimensionSpec>of() : dimensions;
@ -152,7 +152,7 @@ public class GroupByQuery extends BaseQuery<Row>
Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
super(dataSource, querySegmentSpec, false, context);
this.dimFilter = dimFilter;
this.granularity = granularity;

View File

@ -95,7 +95,8 @@ public class GroupByQueryEngine
final Sequence<Cursor> cursors = storageAdapter.makeCursors(
Filters.convertDimensionFilters(query.getDimFilter()),
intervals.get(0),
query.getGranularity()
query.getGranularity(),
false
);
final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();

View File

@ -31,24 +31,22 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.annotations.Global;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSource;
import io.druid.query.DruidMetrics;
@ -126,20 +124,20 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
return new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(Query<Row> input, Map<String, Object> responseContext)
public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
{
if (input.getContextBySegment(false)) {
return runner.run(input, responseContext);
if (BaseQuery.getContextBySegment(query, false)) {
return runner.run(query, responseContext);
}
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
if (Boolean.valueOf(query.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
return mergeGroupByResults(
(GroupByQuery) input,
(GroupByQuery) query,
runner,
responseContext
);
}
return runner.run(input, responseContext);
return runner.run(query, responseContext);
}
};
}
@ -261,7 +259,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private Sequence<Row> postAggregate(final GroupByQuery query, IncrementalIndex index)
{
return Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs(), query.isDescending())),
new Function<Row, Row>()
{
@Override
@ -290,23 +288,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public Sequence<Row> mergeSequencesUnordered(Sequence<Sequence<Row>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
private Ordering<Row> getOrdering()
{
return Ordering.<Row>natural().nullsFirst();
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(GroupByQuery query)
{
@ -586,12 +567,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
};
}
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
};
}

View File

@ -36,6 +36,7 @@ import io.druid.collections.StupidPool;
import io.druid.data.input.Row;
import io.druid.guice.annotations.Global;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.BaseQuery;
import io.druid.query.ConcatQueryRunner;
import io.druid.query.GroupByParallelQueryRunner;
import io.druid.query.Query;
@ -119,8 +120,8 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
computationBufferPool
);
final Pair<Queue, Accumulator<Queue, Row>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final int priority = query.getContextPriority(0);
final boolean bySegment = query.getContextBySegment(false);
final int priority = BaseQuery.getContextPriority(query, 0);
final boolean bySegment = BaseQuery.getContextBySegment(query, false);
final ListenableFuture<Void> future = queryExecutor.submit(
new AbstractPrioritizedCallable<Void>(priority)
@ -173,7 +174,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
return Sequences.simple(bySegmentAccumulatorPair.lhs);
}
return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null));
return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null, query.isDescending()));
}
};
}

View File

@ -30,11 +30,9 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.guava.MappedSequence;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.common.guava.CombiningSequence;
import io.druid.common.utils.JodaUtils;
import io.druid.query.CacheStrategy;
@ -128,7 +126,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
};
}
return getOrdering(); // No two elements should be equal, so it should never merge
return query.getResultOrdering(); // No two elements should be equal, so it should never merge
}
@Override
@ -185,18 +183,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
};
}
@Override
public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
{
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public Sequence<SegmentAnalysis> mergeSequencesUnordered(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(SegmentMetadataQuery query)
{
@ -265,12 +251,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
}
};
}
@Override
public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
{
return new MergeSequence<SegmentAnalysis>(getOrdering(), seqOfSequences);
}
};
}
@ -304,16 +284,4 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
)
);
}
private Ordering<SegmentAnalysis> getOrdering()
{
return new Ordering<SegmentAnalysis>()
{
@Override
public int compare(SegmentAnalysis left, SegmentAnalysis right)
{
return left.getId().compareTo(right.getId());
}
}.nullsFirst();
}
}

View File

@ -30,6 +30,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.BaseQuery;
import io.druid.query.ConcatQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
@ -158,7 +159,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
final Map<String, Object> responseContext
)
{
final int priority = query.getContextPriority(0);
final int priority = BaseQuery.getContextPriority(query, 0);
ListenableFuture<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
new AbstractPrioritizedCallable<Sequence<SegmentAnalysis>>(priority)
{

View File

@ -26,7 +26,7 @@ import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
public class SegmentAnalysis
public class SegmentAnalysis implements Comparable<SegmentAnalysis>
{
private final String id;
private final List<Interval> interval;
@ -143,4 +143,14 @@ public class SegmentAnalysis
result = 31 * result + (int) (numRows ^ (numRows >>> 32));
return result;
}
@Override
public int compareTo(SegmentAnalysis rhs)
{
// Nulls first
if (rhs == null) {
return 1;
}
return id.compareTo(rhs.getId());
}
}

View File

@ -102,6 +102,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
dataSource,
(querySegmentSpec == null) ? new MultipleIntervalSegmentSpec(Arrays.asList(DEFAULT_INTERVAL))
: querySegmentSpec,
false,
context
);

View File

@ -30,12 +30,11 @@ import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
@ -86,15 +85,18 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
@Override
public QueryRunner<Result<SearchResultValue>> mergeResults(QueryRunner<Result<SearchResultValue>> runner)
public QueryRunner<Result<SearchResultValue>> mergeResults(
QueryRunner<Result<SearchResultValue>> runner
)
{
return new ResultMergeQueryRunner<Result<SearchResultValue>>(runner)
{
@Override
protected Ordering<Result<SearchResultValue>> makeOrdering(Query<Result<SearchResultValue>> query)
{
return Ordering.from(
new ResultGranularTimestampComparator<SearchResultValue>(((SearchQuery) query).getGranularity())
return ResultGranularTimestampComparator.create(
((SearchQuery) query).getGranularity(),
query.isDescending()
);
}
@ -109,18 +111,6 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
};
}
@Override
public Sequence<Result<SearchResultValue>> mergeSequences(Sequence<Sequence<Result<SearchResultValue>>> seqOfSequences)
{
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public Sequence<Result<SearchResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<SearchResultValue>>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(SearchQuery query)
{
@ -243,12 +233,6 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
};
}
@Override
public Sequence<Result<SearchResultValue>> mergeSequences(Sequence<Sequence<Result<SearchResultValue>>> seqOfSequences)
{
return new MergeSequence<Result<SearchResultValue>>(getOrdering(), seqOfSequences);
}
};
}
@ -261,11 +245,6 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
);
}
public Ordering<Result<SearchResultValue>> getOrdering()
{
return Ordering.natural();
}
private static class SearchThresholdAdjustingQueryRunner implements QueryRunner<Result<SearchResultValue>>
{
private final QueryRunner<Result<SearchResultValue>> runner;
@ -295,7 +274,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
return runner.run(query, responseContext);
}
final boolean isBySegment = query.getContextBySegment(false);
final boolean isBySegment = BaseQuery.getContextBySegment(query, false);
return Sequences.map(
runner.run(query.withLimit(config.getMaxSearchLimit()), responseContext),

View File

@ -88,6 +88,7 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
final List<DimensionSpec> dimensions = query.getDimensions();
final SearchQuerySpec searchQuerySpec = query.getQuery();
final int limit = query.getLimit();
final boolean descending = query.isDescending();
// Closing this will cause segfaults in unit tests.
final QueryableIndex index = segment.asQueryableIndex();
@ -158,7 +159,7 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
dimsToSearch = dimensions;
}
final Sequence<Cursor> cursors = adapter.makeCursors(filter, segment.getDataInterval(), QueryGranularity.ALL);
final Sequence<Cursor> cursors = adapter.makeCursors(filter, segment.getDataInterval(), QueryGranularity.ALL, descending);
final TreeSet<SearchHit> retVal = cursors.accumulate(
Sets.newTreeSet(query.getSort().getComparator()),

View File

@ -60,7 +60,7 @@ public class SearchQueryRunnerFactory implements QueryRunnerFactory<Result<Searc
)
{
return new ChainedExecutionQueryRunner<Result<SearchResultValue>>(
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
queryExecutor, queryWatcher, queryRunners
);
}

View File

@ -59,7 +59,7 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
super(dataSource, querySegmentSpec, false, context);
this.dimFilter = dimFilter;
this.sortSpec = sortSpec == null ? new LexicographicSearchSortSpec() : sortSpec;
this.granularity = granularity == null ? QueryGranularity.ALL : granularity;

View File

@ -32,14 +32,17 @@ public class SelectBinaryFn
{
private final QueryGranularity gran;
private final PagingSpec pagingSpec;
private final boolean descending;
public SelectBinaryFn(
QueryGranularity granularity,
PagingSpec pagingSpec
PagingSpec pagingSpec,
boolean descending
)
{
this.gran = granularity;
this.pagingSpec = pagingSpec;
this.descending = descending;
}
@Override
@ -59,7 +62,7 @@ public class SelectBinaryFn
? arg1.getTimestamp()
: gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis()));
SelectResultValueBuilder builder = new SelectResultValueBuilder(timestamp, pagingSpec.getThreshold());
SelectResultValueBuilder builder = new SelectResultValueBuilder(timestamp, pagingSpec.getThreshold(), descending);
SelectResultValue arg1Val = arg1.getValue();
SelectResultValue arg2Val = arg2.getValue();

View File

@ -57,7 +57,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
super(dataSource, querySegmentSpec, false, context);
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions;

View File

@ -73,6 +73,7 @@ public class SelectQueryEngine
adapter,
query.getQuerySegmentSpec().getIntervals(),
Filters.convertDimensionFilters(query.getDimensionsFilter()),
query.isDescending(),
query.getGranularity(),
new Function<Cursor, Result<SelectResultValue>>()
{
@ -82,7 +83,8 @@ public class SelectQueryEngine
final SelectResultValueBuilder builder = new SelectResultValueBuilder(
cursor.getTime(),
query.getPagingSpec()
.getThreshold()
.getThreshold(),
query.isDescending()
);
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);

View File

@ -27,11 +27,8 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.StringUtils;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics;
@ -80,17 +77,17 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
}
@Override
public QueryRunner<Result<SelectResultValue>> mergeResults(QueryRunner<Result<SelectResultValue>> queryRunner)
public QueryRunner<Result<SelectResultValue>> mergeResults(
QueryRunner<Result<SelectResultValue>> queryRunner
)
{
return new ResultMergeQueryRunner<Result<SelectResultValue>>(queryRunner)
{
@Override
protected Ordering<Result<SelectResultValue>> makeOrdering(Query<Result<SelectResultValue>> query)
{
return Ordering.from(
new ResultGranularTimestampComparator<SelectResultValue>(
((SelectQuery) query).getGranularity()
)
return ResultGranularTimestampComparator.create(
((SelectQuery) query).getGranularity(), query.isDescending()
);
}
@ -102,24 +99,13 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
SelectQuery query = (SelectQuery) input;
return new SelectBinaryFn(
query.getGranularity(),
query.getPagingSpec()
query.getPagingSpec(),
query.isDescending()
);
}
};
}
@Override
public Sequence<Result<SelectResultValue>> mergeSequences(Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences)
{
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public Sequence<Result<SelectResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(SelectQuery query)
{
@ -261,12 +247,6 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
}
};
}
@Override
public Sequence<Result<SelectResultValue>> mergeSequences(Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences)
{
return new MergeSequence<Result<SelectResultValue>>(getOrdering(), seqOfSequences);
}
};
}
@ -275,9 +255,4 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
{
return intervalChunkingQueryRunnerDecorator.decorate(runner, this);
}
public Ordering<Result<SelectResultValue>> getOrdering()
{
return Ordering.natural();
}
}

View File

@ -67,7 +67,7 @@ public class SelectQueryRunnerFactory
)
{
return new ChainedExecutionQueryRunner<Result<SelectResultValue>>(
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
queryExecutor, queryWatcher, queryRunners
);
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.primitives.Longs;
import com.metamx.common.guava.Comparators;
import io.druid.query.Result;
import org.joda.time.DateTime;
@ -59,12 +60,13 @@ public class SelectResultValueBuilder
public SelectResultValueBuilder(
DateTime timestamp,
int threshold
int threshold,
boolean descending
)
{
this.timestamp = timestamp;
instantiatePQueue(threshold, comparator);
instantiatePQueue(threshold, descending ? Comparators.inverse(comparator) : comparator);
}
public void addEntry(

View File

@ -66,6 +66,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
dataSource,
(querySegmentSpec == null) ? new MultipleIntervalSegmentSpec(Arrays.asList(MY_Y2K_INTERVAL))
: querySegmentSpec,
false,
context
);

View File

@ -25,12 +25,9 @@ import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
@ -109,18 +106,6 @@ public class TimeBoundaryQueryQueryToolChest
};
}
@Override
public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
{
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public Sequence<Result<TimeBoundaryResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query)
{
@ -195,17 +180,6 @@ public class TimeBoundaryQueryQueryToolChest
}
};
}
@Override
public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
};
}
public Ordering<Result<TimeBoundaryResultValue>> getOrdering()
{
return Ordering.natural();
}
}

View File

@ -63,9 +63,7 @@ public class TimeBoundaryQueryRunnerFactory
ExecutorService queryExecutor, Iterable<QueryRunner<Result<TimeBoundaryResultValue>>> queryRunners
)
{
return new ChainedExecutionQueryRunner<>(
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners);
}
@Override
@ -85,8 +83,8 @@ public class TimeBoundaryQueryRunnerFactory
@Override
public Sequence<Result<TimeBoundaryResultValue>> run(
Query<Result<TimeBoundaryResultValue>> input,
Map<String, Object> responseContext
final Query<Result<TimeBoundaryResultValue>> input,
final Map<String, Object> responseContext
)
{
if (!(input instanceof TimeBoundaryQuery)) {

View File

@ -51,6 +51,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
public TimeseriesQuery(
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("descending") boolean descending,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
@ -58,7 +59,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
super(dataSource, querySegmentSpec, descending, context);
this.dimFilter = dimFilter;
this.granularity = granularity;
this.aggregatorSpecs = aggregatorSpecs;
@ -113,6 +114,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
return new TimeseriesQuery(
getDataSource(),
querySegmentSpec,
isDescending(),
dimFilter,
granularity,
aggregatorSpecs,
@ -127,6 +129,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
return new TimeseriesQuery(
dataSource,
getQuerySegmentSpec(),
isDescending(),
dimFilter,
granularity,
aggregatorSpecs,
@ -140,6 +143,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
return new TimeseriesQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
dimFilter,
granularity,
aggregatorSpecs,
@ -154,6 +158,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
return "TimeseriesQuery{" +
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", descending=" + isDescending() +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", aggregatorSpecs=" + aggregatorSpecs +

View File

@ -48,6 +48,7 @@ public class TimeseriesQueryEngine
adapter,
query.getQuerySegmentSpec().getIntervals(),
Filters.convertDimensionFilters(query.getDimensionsFilter()),
query.isDescending(),
query.getGranularity(),
new Function<Cursor, Result<TimeseriesResultValue>>()
{

View File

@ -25,11 +25,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics;
@ -76,17 +73,17 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
}
@Override
public QueryRunner<Result<TimeseriesResultValue>> mergeResults(QueryRunner<Result<TimeseriesResultValue>> queryRunner)
public QueryRunner<Result<TimeseriesResultValue>> mergeResults(
QueryRunner<Result<TimeseriesResultValue>> queryRunner
)
{
return new ResultMergeQueryRunner<Result<TimeseriesResultValue>>(queryRunner)
{
@Override
protected Ordering<Result<TimeseriesResultValue>> makeOrdering(Query<Result<TimeseriesResultValue>> query)
{
return Ordering.from(
new ResultGranularTimestampComparator<TimeseriesResultValue>(
((TimeseriesQuery) query).getGranularity()
)
return ResultGranularTimestampComparator.create(
((TimeseriesQuery) query).getGranularity(), query.isDescending()
);
}
@ -104,18 +101,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
};
}
@Override
public Sequence<Result<TimeseriesResultValue>> mergeSequences(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
{
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public Sequence<Result<TimeseriesResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(TimeseriesQuery query)
{
@ -150,10 +135,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey();
final byte[] aggregatorBytes = QueryCacheHelper.computeAggregatorBytes(query.getAggregatorSpecs());
final byte[] granularityBytes = query.getGranularity().cacheKey();
final byte descending = query.isDescending() ? (byte)1 : 0;
return ByteBuffer
.allocate(1 + granularityBytes.length + filterBytes.length + aggregatorBytes.length)
.allocate(2 + granularityBytes.length + filterBytes.length + aggregatorBytes.length)
.put(TIMESERIES_QUERY)
.put(descending)
.put(granularityBytes)
.put(filterBytes)
.put(aggregatorBytes)
@ -217,12 +204,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
}
};
}
@Override
public Sequence<Result<TimeseriesResultValue>> mergeSequences(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
{
return new MergeSequence<Result<TimeseriesResultValue>>(getOrdering(), seqOfSequences);
}
};
}
@ -232,11 +213,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
return intervalChunkingQueryRunnerDecorator.decorate(runner, this);
}
public Ordering<Result<TimeseriesResultValue>> getOrdering()
{
return Ordering.natural();
}
@Override
public Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makePreComputeManipulatorFn(
final TimeseriesQuery query, final MetricManipulationFn fn

View File

@ -68,7 +68,7 @@ public class TimeseriesQueryRunnerFactory
)
{
return new ChainedExecutionQueryRunner<Result<TimeseriesResultValue>>(
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
queryExecutor, queryWatcher, queryRunners
);
}

View File

@ -66,7 +66,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
super(dataSource, querySegmentSpec, false, context);
this.dimensionSpec = dimensionSpec;
this.topNMetricSpec = topNMetricSpec;
this.threshold = threshold;

View File

@ -74,7 +74,7 @@ public class TopNQueryEngine
return Sequences.filter(
Sequences.map(
adapter.makeCursors(filter, queryIntervals.get(0), granularity),
adapter.makeCursors(filter, queryIntervals.get(0), granularity, query.isDescending()),
new Function<Cursor, Result<TopNResultValue>>()
{
@Override

View File

@ -28,13 +28,12 @@ import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.BySegmentResultValue;
import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics;
@ -110,17 +109,17 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
@Override
public QueryRunner<Result<TopNResultValue>> mergeResults(QueryRunner<Result<TopNResultValue>> runner)
public QueryRunner<Result<TopNResultValue>> mergeResults(
QueryRunner<Result<TopNResultValue>> runner
)
{
return new ResultMergeQueryRunner<Result<TopNResultValue>>(runner)
{
@Override
protected Ordering<Result<TopNResultValue>> makeOrdering(Query<Result<TopNResultValue>> query)
{
return Ordering.from(
new ResultGranularTimestampComparator<TopNResultValue>(
((TopNQuery) query).getGranularity()
)
return ResultGranularTimestampComparator.create(
((TopNQuery) query).getGranularity(), query.isDescending()
);
}
@ -143,18 +142,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
};
}
@Override
public Sequence<Result<TopNResultValue>> mergeSequences(Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences)
{
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public Sequence<Result<TopNResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(TopNQuery query)
{
@ -330,7 +317,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return ByteBuffer
.allocate(
1 + dimensionSpecBytes.length + metricSpecBytes.length + 4 +
granularityBytes.length + filterBytes.length + aggregatorBytes.length
granularityBytes.length + filterBytes.length + aggregatorBytes.length
)
.put(TOPN_QUERY)
.put(dimensionSpecBytes)
@ -417,12 +404,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
};
}
@Override
public Sequence<Result<TopNResultValue>> mergeSequences(Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
};
}
@ -527,11 +508,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
};
}
public Ordering<Result<TopNResultValue>> getOrdering()
{
return Ordering.natural();
}
static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>>
{
private final QueryRunner<Result<TopNResultValue>> runner;
@ -562,7 +538,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return runner.run(query, responseContext);
}
final boolean isBySegment = query.getContextBySegment(false);
final boolean isBySegment = BaseQuery.getContextBySegment(query, false);
return Sequences.map(
runner.run(query.withThreshold(minTopNThreshold), responseContext),

View File

@ -85,7 +85,7 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
)
{
return new ChainedExecutionQueryRunner<>(
queryExecutor, toolchest.getOrdering(), queryWatcher, queryRunners
queryExecutor, queryWatcher, queryRunners
);
}

View File

@ -33,21 +33,69 @@ public class BitmapOffset implements Offset
private final IntIterator itr;
private final BitmapFactory bitmapFactory;
private final ImmutableBitmap bitmapIndex;
private final boolean descending;
private volatile int val;
public BitmapOffset(BitmapFactory bitmapFactory, ImmutableBitmap bitmapIndex)
public BitmapOffset(BitmapFactory bitmapFactory, ImmutableBitmap bitmapIndex, boolean descending)
{
this.bitmapFactory = bitmapFactory;
this.bitmapIndex = bitmapIndex;
this.itr = bitmapIndex.iterator();
this.descending = descending;
this.itr = newIterator();
increment();
}
private IntIterator newIterator()
{
if (!descending) {
return bitmapIndex.iterator();
}
// ImmutableRoaringReverseIntIterator is not cloneable.. looks like a bug
// update : it's fixed in 0.5.13
int i = bitmapIndex.size();
int[] back = new int[bitmapIndex.size()];
IntIterator iterator = bitmapIndex.iterator();
while (iterator.hasNext()) {
back[--i] = iterator.next();
}
return new ArrayIntIterator(back, 0);
}
private static class ArrayIntIterator implements IntIterator {
private final int[] array;
private int index;
private ArrayIntIterator(int[] array, int index) {
this.array = array;
this.index = index;
}
@Override
public boolean hasNext()
{
return index < array.length;
}
@Override
public int next()
{
return array[index++];
}
@Override
public IntIterator clone()
{
return new ArrayIntIterator(array, index);
}
}
private BitmapOffset(BitmapOffset otherOffset)
{
this.bitmapFactory = otherOffset.bitmapFactory;
this.bitmapIndex = otherOffset.bitmapIndex;
this.descending = otherOffset.descending;
this.itr = otherOffset.itr.clone();
this.val = otherOffset.val;
}
@ -72,7 +120,7 @@ public class BitmapOffset implements Offset
public Offset clone()
{
if (bitmapIndex == null || bitmapIndex.size() == 0) {
return new BitmapOffset(bitmapFactory, bitmapFactory.makeEmptyImmutableBitmap());
return new BitmapOffset(bitmapFactory, bitmapFactory.makeEmptyImmutableBitmap(), descending);
}
return new BitmapOffset(this);

View File

@ -28,5 +28,5 @@ import org.joda.time.Interval;
*/
public interface CursorFactory
{
public Sequence<Cursor> makeCursors(Filter filter, Interval interval, QueryGranularity gran);
public Sequence<Cursor> makeCursors(Filter filter, Interval interval, QueryGranularity gran, boolean descending);
}

View File

@ -22,7 +22,9 @@ package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.common.guava.CloseQuietly;
@ -158,7 +160,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
@Override
public Sequence<Cursor> makeCursors(Filter filter, Interval interval, QueryGranularity gran)
public Sequence<Cursor> makeCursors(Filter filter, Interval interval, QueryGranularity gran, boolean descending)
{
Interval actualInterval = interval;
@ -182,18 +184,26 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
final Offset offset;
if (filter == null) {
offset = new NoFilterOffset(0, index.getNumRows());
offset = new NoFilterOffset(0, index.getNumRows(), descending);
} else {
final ColumnSelectorBitmapIndexSelector selector = new ColumnSelectorBitmapIndexSelector(
index.getBitmapFactoryForDimensions(),
index
);
offset = new BitmapOffset(selector.getBitmapFactory(), filter.getBitmapIndex(selector));
offset = new BitmapOffset(selector.getBitmapFactory(), filter.getBitmapIndex(selector), descending);
}
return Sequences.filter(
new CursorSequenceBuilder(index, actualInterval, gran, offset, maxDataTimestamp).build(),
new CursorSequenceBuilder(
index,
actualInterval,
gran,
offset,
minDataTimestamp,
maxDataTimestamp,
descending
).build(),
Predicates.<Cursor>notNull()
);
}
@ -204,21 +214,27 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
private final Interval interval;
private final QueryGranularity gran;
private final Offset offset;
private final long minDataTimestamp;
private final long maxDataTimestamp;
private final boolean descending;
public CursorSequenceBuilder(
ColumnSelector index,
Interval interval,
QueryGranularity gran,
Offset offset,
long maxDataTimestamp
long minDataTimestamp,
long maxDataTimestamp,
boolean descending
)
{
this.index = index;
this.interval = interval;
this.gran = gran;
this.offset = offset;
this.minDataTimestamp = minDataTimestamp;
this.maxDataTimestamp = maxDataTimestamp;
this.descending = descending;
}
public Sequence<Cursor> build()
@ -232,24 +248,49 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
final GenericColumn timestamps = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
Iterable<Long> iterable = gran.iterable(interval.getStartMillis(), interval.getEndMillis());
if (descending) {
iterable = Lists.reverse(ImmutableList.copyOf(iterable));
}
return Sequences.withBaggage(
Sequences.map(
Sequences.simple(gran.iterable(interval.getStartMillis(), interval.getEndMillis())),
Sequences.simple(iterable),
new Function<Long, Cursor>()
{
@Override
public Cursor apply(final Long input)
{
final long timeStart = Math.max(interval.getStartMillis(), input);
while (baseOffset.withinBounds()
&& timestamps.getLongSingleValueRow(baseOffset.getOffset()) < timeStart) {
baseOffset.increment();
final long timeEnd = Math.min(interval.getEndMillis(), gran.next(input));
if (descending) {
for (; baseOffset.withinBounds(); baseOffset.increment()) {
if (timestamps.getLongSingleValueRow(baseOffset.getOffset()) < timeEnd) {
break;
}
}
} else {
for (; baseOffset.withinBounds(); baseOffset.increment()) {
if (timestamps.getLongSingleValueRow(baseOffset.getOffset()) >= timeStart) {
break;
}
}
}
long threshold = Math.min(interval.getEndMillis(), gran.next(input));
final Offset offset = new TimestampCheckingOffset(
baseOffset, timestamps, threshold, maxDataTimestamp < threshold
);
final Offset offset = descending ?
new DescendingTimestampCheckingOffset(
baseOffset,
timestamps,
timeStart,
minDataTimestamp >= timeStart
) :
new AscendingTimestampCheckingOffset(
baseOffset,
timestamps,
timeEnd,
maxDataTimestamp < timeEnd
);
return new Cursor()
{
@ -315,7 +356,11 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn);
return new SingleScanTimeDimSelector(
makeLongColumnSelector(dimension),
extractionFn,
descending
);
}
DictionaryEncodedColumn cachedColumn = dictionaryColumnCache.get(dimension);
@ -684,23 +729,23 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
}
private static class TimestampCheckingOffset implements Offset
private abstract static class TimestampCheckingOffset implements Offset
{
private final Offset baseOffset;
private final GenericColumn timestamps;
private final long threshold;
private final boolean allWithinThreshold;
protected final Offset baseOffset;
protected final GenericColumn timestamps;
protected final long timeLimit;
protected final boolean allWithinThreshold;
public TimestampCheckingOffset(
Offset baseOffset,
GenericColumn timestamps,
long threshold,
long timeLimit,
boolean allWithinThreshold
)
{
this.baseOffset = baseOffset;
this.timestamps = timestamps;
this.threshold = threshold;
this.timeLimit = timeLimit;
// checks if all the values are within the Threshold specified, skips timestamp lookups and checks if all values are within threshold.
this.allWithinThreshold = allWithinThreshold;
}
@ -711,35 +756,108 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return baseOffset.getOffset();
}
@Override
public Offset clone()
{
return new TimestampCheckingOffset(baseOffset.clone(), timestamps, threshold, allWithinThreshold);
}
@Override
public boolean withinBounds()
{
return baseOffset.withinBounds() && (allWithinThreshold
|| timestamps.getLongSingleValueRow(baseOffset.getOffset()) < threshold);
if (!baseOffset.withinBounds()) {
return false;
}
if (allWithinThreshold) {
return true;
}
return timeInRange(timestamps.getLongSingleValueRow(baseOffset.getOffset()));
}
protected abstract boolean timeInRange(long current);
@Override
public void increment()
{
baseOffset.increment();
}
@Override
public Offset clone() {
throw new IllegalStateException("clone");
}
}
private static class AscendingTimestampCheckingOffset extends TimestampCheckingOffset
{
public AscendingTimestampCheckingOffset(
Offset baseOffset,
GenericColumn timestamps,
long timeLimit,
boolean allWithinThreshold
)
{
super(baseOffset, timestamps, timeLimit, allWithinThreshold);
}
@Override
protected final boolean timeInRange(long current)
{
return current < timeLimit;
}
@Override
public String toString()
{
return (baseOffset.withinBounds() ? timestamps.getLongSingleValueRow(baseOffset.getOffset()) : "OOB") +
"<" + timeLimit + "::" + baseOffset;
}
@Override
public Offset clone()
{
return new AscendingTimestampCheckingOffset(baseOffset.clone(), timestamps, timeLimit, allWithinThreshold);
}
}
private static class DescendingTimestampCheckingOffset extends TimestampCheckingOffset
{
public DescendingTimestampCheckingOffset(
Offset baseOffset,
GenericColumn timestamps,
long timeLimit,
boolean allWithinThreshold
)
{
super(baseOffset, timestamps, timeLimit, allWithinThreshold);
}
@Override
protected final boolean timeInRange(long current)
{
return current >= timeLimit;
}
@Override
public String toString()
{
return timeLimit + ">=" +
(baseOffset.withinBounds() ? timestamps.getLongSingleValueRow(baseOffset.getOffset()) : "OOB") +
"::" + baseOffset;
}
@Override
public Offset clone()
{
return new DescendingTimestampCheckingOffset(baseOffset.clone(), timestamps, timeLimit, allWithinThreshold);
}
}
private static class NoFilterOffset implements Offset
{
private final int rowCount;
private final boolean descending;
private volatile int currentOffset;
NoFilterOffset(int currentOffset, int rowCount)
NoFilterOffset(int currentOffset, int rowCount, boolean descending)
{
this.currentOffset = currentOffset;
this.rowCount = rowCount;
this.descending = descending;
}
@Override
@ -757,13 +875,19 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public Offset clone()
{
return new NoFilterOffset(currentOffset, rowCount);
return new NoFilterOffset(currentOffset, rowCount, descending);
}
@Override
public int getOffset()
{
return currentOffset;
return descending ? rowCount - currentOffset - 1 : currentOffset;
}
@Override
public String toString()
{
return currentOffset + "/" + rowCount + (descending ? "(DSC)" : "");
}
}

View File

@ -32,6 +32,7 @@ public class SingleScanTimeDimSelector implements DimensionSelector
{
private final ExtractionFn extractionFn;
private final LongColumnSelector selector;
private final boolean descending;
private final Map<Integer, String> timeValues = Maps.newHashMap();
private String currentValue = null;
@ -43,7 +44,7 @@ public class SingleScanTimeDimSelector implements DimensionSelector
// - it assumes time values are scanned once and values are grouped together
// (i.e. we never revisit a timestamp we have seen before, unless it is the same as the last accessed one)
// - it also applies and caches extraction function values at the DimSelector level to speed things up
public SingleScanTimeDimSelector(LongColumnSelector selector, ExtractionFn extractionFn)
public SingleScanTimeDimSelector(LongColumnSelector selector, ExtractionFn extractionFn, boolean descending)
{
if (extractionFn == null) {
throw new UnsupportedOperationException("time dimension must provide an extraction function");
@ -51,6 +52,7 @@ public class SingleScanTimeDimSelector implements DimensionSelector
this.extractionFn = extractionFn;
this.selector = selector;
this.descending = descending;
}
@Override
@ -72,7 +74,7 @@ public class SingleScanTimeDimSelector implements DimensionSelector
// we can also avoid creating a dimension value and corresponding index
// and use the current one
else if (timestamp != currentTimestamp) {
if(timestamp < currentTimestamp) {
if (descending ? timestamp > currentTimestamp : timestamp < currentTimestamp) {
// re-using this selector for multiple scans would cause the same rows to return different IDs
// we might want to re-visit if we ever need to do multiple scans with this dimension selector
throw new IllegalStateException("cannot re-use time dimension selector for multiple scans");

View File

@ -623,19 +623,20 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
@Override
public Iterator<Row> iterator()
{
return iterableWithPostAggregations(null).iterator();
return iterableWithPostAggregations(null, false).iterator();
}
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs)
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs, final boolean descending)
{
final List<String> dimensions = getDimensionNames();
final ConcurrentNavigableMap<TimeAndDims, Integer> facts = descending ? getFacts().descendingMap() : getFacts();
return new Iterable<Row>()
{
@Override
public Iterator<Row> iterator()
{
return Iterators.transform(
getFacts().entrySet().iterator(),
facts.entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
{
@Override

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.metamx.collections.spatial.search.Bound;
@ -153,7 +154,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
@Override
public Sequence<Cursor> makeCursors(final Filter filter, final Interval interval, final QueryGranularity gran)
public Sequence<Cursor> makeCursors(final Filter filter, final Interval interval, final QueryGranularity gran, final boolean descending)
{
if (index.isEmpty()) {
return Sequences.empty();
@ -179,8 +180,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
final Interval actualInterval = actualIntervalTmp;
Iterable<Long> iterable = gran.iterable(actualInterval.getStartMillis(), actualInterval.getEndMillis());
if (descending) {
// might be better to be included in granularity#iterable
iterable = Lists.reverse(ImmutableList.copyOf(iterable));
}
return Sequences.map(
Sequences.simple(gran.iterable(actualInterval.getStartMillis(), actualInterval.getEndMillis())),
Sequences.simple(iterable),
new Function<Long, Cursor>()
{
EntryHolder currEntry = new EntryHolder();
@ -212,6 +218,9 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
Math.min(actualInterval.getEndMillis(), gran.next(input)), new String[][]{}
)
);
if (descending) {
cursorMap = cursorMap.descendingMap();
}
time = gran.toDateTime(input);
reset();
@ -309,7 +318,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
final ExtractionFn extractionFn = dimensionSpec.getExtractionFn();
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn);
return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn, descending);
}
final IncrementalIndex.DimDim dimValLookup = index.getDimensionValues(dimension);

View File

@ -21,7 +21,6 @@ package io.druid.query;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.Sequence;
@ -114,7 +113,6 @@ public class ChainedExecutionQueryRunnerTest
ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
exec,
Ordering.<Integer>natural(),
watcher,
Lists.<QueryRunner<Integer>>newArrayList(
runners
@ -242,7 +240,6 @@ public class ChainedExecutionQueryRunnerTest
ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
exec,
Ordering.<Integer>natural(),
watcher,
Lists.<QueryRunner<Integer>>newArrayList(
runners

View File

@ -54,6 +54,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -87,15 +88,26 @@ public class QueryRunnerTestHelper
}
)
);
public static final DateTime minTime = new DateTime("2011-01-12T00:00:00.000Z");
public static final QueryGranularity dayGran = QueryGranularity.DAY;
public static final QueryGranularity allGran = QueryGranularity.ALL;
public static final String marketDimension = "market";
public static final String qualityDimension = "quality";
public static final String placementDimension = "placement";
public static final String placementishDimension = "placementish";
public static final List<String> dimensions = Lists.newArrayList(
marketDimension,
qualityDimension,
placementDimension,
placementishDimension
);
public static final String indexMetric = "index";
public static final String uniqueMetric = "uniques";
public static final String addRowsIndexConstantMetric = "addRowsIndexConstant";
public static final List<String> metrics = Lists.newArrayList(indexMetric, uniqueMetric, addRowsIndexConstantMetric);
public static String dependentPostAggMetric = "dependentPostAgg";
public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
@ -192,6 +204,17 @@ public class QueryRunnerTestHelper
"5506.567192077637", "4743.144546508789", "4913.282669067383", "4723.869743347168"
};
public static final String[] expectedFullOnIndexValuesDesc;
static {
List<String> list = new ArrayList(Arrays.asList(expectedFullOnIndexValues));
Collections.reverse(list);
expectedFullOnIndexValuesDesc = list.toArray(new String[list.size()]);
}
public static final DateTime earliest = new DateTime("2011-01-12");
public static final DateTime last = new DateTime("2011-04-15");
public static final DateTime skippedDay = new DateTime("2011-01-21T00:00:00.000Z");
public static final QuerySegmentSpec firstToThird = new MultipleIntervalSegmentSpec(
@ -222,6 +245,66 @@ public class QueryRunnerTestHelper
);
}
// simple cartesian iterable
public static Iterable<Object[]> cartesian(final Iterable... iterables)
{
return new Iterable<Object[]>()
{
@Override
public Iterator<Object[]> iterator()
{
return new Iterator<Object[]>()
{
private final Iterator[] iterators = new Iterator[iterables.length];
private final Object[] cached = new Object[iterables.length];
@Override
public boolean hasNext()
{
return hasNext(0);
}
private boolean hasNext(int index)
{
if (iterators[index] == null) {
iterators[index] = iterables[index].iterator();
}
for (; hasMore(index); cached[index] = null) {
if (index == iterables.length - 1 || hasNext(index + 1)) {
return true;
}
}
iterators[index] = null;
return false;
}
private boolean hasMore(int index)
{
if (cached[index] == null && iterators[index].hasNext()) {
cached[index] = iterators[index].next();
}
return cached[index] != null;
}
@Override
public Object[] next()
{
Object[] result = Arrays.copyOf(cached, cached.length);
cached[cached.length - 1] = null;
return result;
}
@Override
public void remove()
{
throw new UnsupportedOperationException("remove");
}
};
}
};
}
public static <T, QueryType extends Query<T>> List<QueryRunner<T>> makeQueryRunners(
QueryRunnerFactory<T, QueryType> factory
)
@ -252,23 +335,13 @@ public class QueryRunnerTestHelper
final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true);
return Arrays.asList(
new Object[][]{
{
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId))
},
{
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex))
},
{
makeUnionQueryRunner(
factory,
new QueryableIndexSegment(segmentId, mergedRealtimeIndex)
)
},
{
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
}
}
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)),
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)),
makeUnionQueryRunner(
factory,
new QueryableIndexSegment(segmentId, mergedRealtimeIndex)
),
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
);
}
/**
@ -361,8 +434,7 @@ public class QueryRunnerTestHelper
new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
),
factory.getToolchest()
)
)
)
),

View File

@ -23,11 +23,30 @@ import io.druid.granularity.QueryGranularity;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
/**
*/
@RunWith(Parameterized.class)
public class ResultGranularTimestampComparatorTest
{
@Parameterized.Parameters(name = "descending={0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.transformToConstructionFeeder(Arrays.asList(false, true));
}
private final boolean descending;
public ResultGranularTimestampComparatorTest(boolean descending)
{
this.descending = descending;
}
private final DateTime time = new DateTime("2011-11-11");
@Test
@ -36,7 +55,7 @@ public class ResultGranularTimestampComparatorTest
Result<Object> r1 = new Result<Object>(time, null);
Result<Object> r2 = new Result<Object>(time.plusYears(5), null);
Assert.assertEquals(new ResultGranularTimestampComparator<Object>(QueryGranularity.ALL).compare(r1, r2), 0);
Assert.assertEquals(ResultGranularTimestampComparator.create(QueryGranularity.ALL, descending).compare(r1, r2), 0);
}
@Test
@ -48,9 +67,9 @@ public class ResultGranularTimestampComparatorTest
Result<Object> less = new Result<Object>(time.minusHours(1), null);
QueryGranularity day = QueryGranularity.DAY;
Assert.assertEquals(new ResultGranularTimestampComparator<Object>(day).compare(res, same), 0);
Assert.assertEquals(new ResultGranularTimestampComparator<Object>(day).compare(res, greater), -1);
Assert.assertEquals(new ResultGranularTimestampComparator<Object>(day).compare(res, less), 1);
Assert.assertEquals(ResultGranularTimestampComparator.create(day, descending).compare(res, same), 0);
Assert.assertEquals(ResultGranularTimestampComparator.create(day, descending).compare(res, greater), descending ? 1 : -1);
Assert.assertEquals(ResultGranularTimestampComparator.create(day, descending).compare(res, less), descending ? -1 : 1);
}
@Test
@ -62,8 +81,8 @@ public class ResultGranularTimestampComparatorTest
Result<Object> less = new Result<Object>(time.minusHours(1), null);
QueryGranularity hour = QueryGranularity.HOUR;
Assert.assertEquals(new ResultGranularTimestampComparator<Object>(hour).compare(res, same), 0);
Assert.assertEquals(new ResultGranularTimestampComparator<Object>(hour).compare(res, greater), -1);
Assert.assertEquals(new ResultGranularTimestampComparator<Object>(hour).compare(res, less), 1);
Assert.assertEquals(ResultGranularTimestampComparator.create(hour, descending).compare(res, same), 0);
Assert.assertEquals(ResultGranularTimestampComparator.create(hour, descending).compare(res, greater), descending ? 1 : -1);
Assert.assertEquals(ResultGranularTimestampComparator.create(hour, descending).compare(res, less), descending ? -1 : 1);
}
}

View File

@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import junit.framework.Assert;
import org.junit.Test;
@ -56,12 +55,7 @@ public class UnionQueryRunnerTest
}
}
};
UnionQueryRunner runner = new UnionQueryRunner(
baseRunner,
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
UnionQueryRunner runner = new UnionQueryRunner(baseRunner);
// Make a dummy query with Union datasource
Query q = Druids.newTimeseriesQueryBuilder()
.dataSource(

View File

@ -134,7 +134,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
public GroupByTimeseriesQueryRunnerTest(QueryRunner runner)
{
super(runner);
super(runner, false);
}
@Override

View File

@ -44,7 +44,7 @@ public class SelectBinaryFnTest
@Test
public void testApply() throws Exception
{
SelectBinaryFn binaryFn = new SelectBinaryFn(QueryGranularity.ALL, new PagingSpec(null, 5));
SelectBinaryFn binaryFn = new SelectBinaryFn(QueryGranularity.ALL, new PagingSpec(null, 5), false);
Result<SelectResultValue> res1 = new Result<>(
new DateTime("2013-01-01"),

View File

@ -43,7 +43,6 @@ import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -52,27 +51,41 @@ import java.util.Map;
public class TimeSeriesUnionQueryRunnerTest
{
private final QueryRunner runner;
private final boolean descending;
public TimeSeriesUnionQueryRunnerTest(
QueryRunner runner
QueryRunner runner, boolean descending
)
{
this.runner = runner;
this.descending = descending;
}
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
@Parameterized.Parameters(name="{0}:descending={1}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeUnionQueryRunners(
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
return QueryRunnerTestHelper.cartesian(
QueryRunnerTestHelper.makeUnionQueryRunners(
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
QueryRunnerTestHelper.unionDataSource
),
QueryRunnerTestHelper.unionDataSource
// descending?
Arrays.asList(false, true)
);
}
private <T> void assertExpectedResults(Iterable<Result<T>> expectedResults, Iterable<Result<T>> results)
{
if (descending) {
expectedResults = TestHelper.revert(expectedResults);
}
TestHelper.assertExpectedResults(expectedResults, results);
}
@Test
public void testUnionTimeseries()
{
@ -90,29 +103,30 @@ public class TimeSeriesUnionQueryRunnerTest
QueryRunnerTestHelper.qualityUniques
)
)
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new Result<>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 52L, "idx", 26476L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
)
),
new Result<TimeseriesResultValue>(
new Result<>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 52L, "idx", 23308L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
)
)
);
HashMap<String, Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<>();
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
runner.run(query, context),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -138,86 +152,51 @@ public class TimeSeriesUnionQueryRunnerTest
)
)
)
.descending(descending)
.build();
QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator());
final List<Result<TimeseriesResultValue>> ds1 = Lists.newArrayList(
new Result<>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(ImmutableMap.<String, Object>of("rows", 1L, "idx", 2L))
),
new Result<>(
new DateTime("2011-04-03"),
new TimeseriesResultValue(ImmutableMap.<String, Object>of("rows", 3L, "idx", 4L))
)
);
final List<Result<TimeseriesResultValue>> ds2 = Lists.newArrayList(
new Result<>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(ImmutableMap.<String, Object>of("rows", 5L, "idx", 6L))
),
new Result<>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(ImmutableMap.<String, Object>of("rows", 7L, "idx", 8L))
),
new Result<>(
new DateTime("2011-04-04"),
new TimeseriesResultValue(ImmutableMap.<String, Object>of("rows", 9L, "idx", 10L))
)
);
QueryRunner mergingrunner = toolChest.mergeResults(
new UnionQueryRunner<Result<TimeseriesResultValue>>(
new UnionQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> query,
Map<String, Object> responseContext
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
Map<String, Object> responseContext
)
{
if (query.getDataSource().equals(new TableDataSource("ds1"))) {
return Sequences.simple(
Lists.newArrayList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
1L,
"idx",
2L
)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-03"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
3L,
"idx",
4L
)
)
)
)
);
return Sequences.simple(descending ? Lists.reverse(ds1) : ds1);
} else {
return Sequences.simple(
Lists.newArrayList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
5L,
"idx",
6L
)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
7L,
"idx",
8L
)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-04"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
9L,
"idx",
10L
)
)
)
)
);
return Sequences.simple(descending ? Lists.reverse(ds2) : ds2);
}
}
},
toolChest
}
)
);
@ -253,8 +232,7 @@ public class TimeSeriesUnionQueryRunnerTest
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
System.out.println(results);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.CacheStrategy;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
@ -34,9 +35,27 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
@RunWith(Parameterized.class)
public class TimeseriesQueryQueryToolChestTest
{
@Parameterized.Parameters(name = "descending={0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.transformToConstructionFeeder(Arrays.asList(false, true));
}
private final boolean descending;
public TimeseriesQueryQueryToolChestTest(boolean descending)
{
this.descending = descending;
}
@Test
public void testCacheStrategy() throws Exception
@ -53,6 +72,7 @@ public class TimeseriesQueryQueryToolChestTest
)
)
),
descending,
null,
QueryGranularity.ALL,
ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("metric1")),

View File

@ -42,12 +42,30 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@RunWith(Parameterized.class)
public class TimeseriesQueryRunnerBonusTest
{
@Parameterized.Parameters(name = "descending={0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.transformToConstructionFeeder(Arrays.asList(false, true));
}
private final boolean descending;
public TimeseriesQueryRunnerBonusTest(boolean descending)
{
this.descending = descending;
}
@Test
public void testOneRowAtATime() throws Exception
{
@ -88,7 +106,7 @@ public class TimeseriesQueryRunnerBonusTest
Assert.assertEquals("result count metric", 2, (long) results.get(0).getValue().getLongMetric("rows"));
}
private static List<Result<TimeseriesResultValue>> runTimeseriesCount(IncrementalIndex index)
private List<Result<TimeseriesResultValue>> runTimeseriesCount(IncrementalIndex index)
{
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
@ -111,6 +129,7 @@ public class TimeseriesQueryRunnerBonusTest
new CountAggregatorFactory("rows")
)
)
.descending(descending)
.build();
HashMap<String,Object> context = new HashMap<String, Object>();
return Sequences.toList(

View File

@ -69,10 +69,11 @@ public class TimeseriesQueryRunnerTest
public static final Map<String, Object> CONTEXT = ImmutableMap.of();
@Parameterized.Parameters
@Parameterized.Parameters(name="{0}:descending={1}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.transformToConstructionFeeder(
return QueryRunnerTestHelper.cartesian(
// runners
QueryRunnerTestHelper.makeQueryRunners(
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
@ -81,17 +82,29 @@ public class TimeseriesQueryRunnerTest
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
),
// descending?
Arrays.asList(false, true)
);
}
private <T> void assertExpectedResults(Iterable<Result<T>> expectedResults, Iterable<Result<T>> results)
{
if (descending) {
expectedResults = TestHelper.revert(expectedResults);
}
TestHelper.assertExpectedResults(expectedResults, results);
}
private final QueryRunner runner;
private final boolean descending;
public TimeseriesQueryRunnerTest(
QueryRunner runner
QueryRunner runner, boolean descending
)
{
this.runner = runner;
this.descending = descending;
}
@Test
@ -110,52 +123,57 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
DateTime expectedEarliest = new DateTime("2011-01-12");
DateTime expectedLast = new DateTime("2011-04-15");
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
final String[] expectedIndex = descending ?
QueryRunnerTestHelper.expectedFullOnIndexValuesDesc :
QueryRunnerTestHelper.expectedFullOnIndexValues;
final DateTime expectedLast = descending ?
QueryRunnerTestHelper.earliest :
QueryRunnerTestHelper.last;
int count = 0;
Result lastResult = null;
for (Result<TimeseriesResultValue> result : results) {
lastResult = result;
Assert.assertEquals(expectedEarliest, result.getTimestamp());
DateTime current = result.getTimestamp();
Assert.assertFalse(
String.format("Timestamp[%s] > expectedLast[%s]", result.getTimestamp(), expectedLast),
result.getTimestamp().isAfter(expectedLast)
String.format("Timestamp[%s] > expectedLast[%s]", current, expectedLast),
descending ? current.isBefore(expectedLast) : current.isAfter(expectedLast)
);
final TimeseriesResultValue value = result.getValue();
Assert.assertEquals(
result.toString(),
QueryRunnerTestHelper.skippedDay.equals(result.getTimestamp()) ? 0L : 13L,
QueryRunnerTestHelper.skippedDay.equals(current) ? 0L : 13L,
value.getLongMetric("rows").longValue()
);
Assert.assertEquals(
result.toString(),
QueryRunnerTestHelper.expectedFullOnIndexValues[count],
expectedIndex[count],
String.valueOf(value.getDoubleMetric("index"))
);
Assert.assertEquals(
result.toString(),
new Double(QueryRunnerTestHelper.expectedFullOnIndexValues[count]) +
(QueryRunnerTestHelper.skippedDay.equals(result.getTimestamp()) ? 0L : 13L) + 1L,
new Double(expectedIndex[count]) +
(QueryRunnerTestHelper.skippedDay.equals(current) ? 0L : 13L) + 1L,
value.getDoubleMetric("addRowsIndexConstant"),
0.0
);
Assert.assertEquals(
value.getDoubleMetric("uniques"),
QueryRunnerTestHelper.skippedDay.equals(result.getTimestamp()) ? 0.0d : 9.0d,
QueryRunnerTestHelper.skippedDay.equals(current) ? 0.0d : 9.0d,
0.02
);
expectedEarliest = gran.toDateTime(gran.next(expectedEarliest.getMillis()));
lastResult = result;
++count;
}
@ -175,6 +193,7 @@ public class TimeseriesQueryRunnerTest
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
.descending(descending)
.build();
DateTime expectedEarliest = new DateTime("2011-01-12");
@ -202,7 +221,6 @@ public class TimeseriesQueryRunnerTest
public void testFullOnTimeseriesWithFilter()
{
QueryGranularity gran = QueryGranularity.DAY;
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.dayGran)
@ -214,6 +232,7 @@ public class TimeseriesQueryRunnerTest
QueryRunnerTestHelper.qualityUniques
)
)
.descending(descending)
.build();
Assert.assertEquals(
@ -224,8 +243,9 @@ public class TimeseriesQueryRunnerTest
query.getDimensionsFilter()
);
DateTime expectedEarliest = new DateTime("2011-01-12");
DateTime expectedLast = new DateTime("2011-04-15");
final DateTime expectedLast = descending ?
QueryRunnerTestHelper.earliest :
QueryRunnerTestHelper.last;
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
runner.run(query, CONTEXT),
@ -233,10 +253,10 @@ public class TimeseriesQueryRunnerTest
);
for (Result<TimeseriesResultValue> result : results) {
Assert.assertEquals(result.toString(), expectedEarliest, result.getTimestamp());
DateTime current = result.getTimestamp();
Assert.assertFalse(
String.format("Timestamp[%s] > expectedLast[%s]", result.getTimestamp(), expectedLast),
result.getTimestamp().isAfter(expectedLast)
String.format("Timestamp[%s] > expectedLast[%s]", current, expectedLast),
descending ? current.isBefore(expectedLast) : current.isAfter(expectedLast)
);
final TimeseriesResultValue value = result.getValue();
@ -254,8 +274,6 @@ public class TimeseriesQueryRunnerTest
),
0.01
);
expectedEarliest = gran.toDateTime(gran.next(expectedEarliest.getMillis()));
}
}
@ -276,6 +294,7 @@ public class TimeseriesQueryRunnerTest
QueryRunnerTestHelper.qualityUniques
)
)
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -298,7 +317,7 @@ public class TimeseriesQueryRunnerTest
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -323,6 +342,7 @@ public class TimeseriesQueryRunnerTest
DateTimeZone.forID("America/Los_Angeles")
)
)
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -345,7 +365,7 @@ public class TimeseriesQueryRunnerTest
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -371,6 +391,7 @@ public class TimeseriesQueryRunnerTest
QueryRunnerTestHelper.qualityUniques
)
)
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults1 = Arrays.asList(
@ -386,7 +407,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query1, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults1, results1);
assertExpectedResults(expectedResults1, results1);
TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
@ -423,7 +444,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query2, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults2, results2);
assertExpectedResults(expectedResults2, results2);
}
@Test
@ -455,6 +476,7 @@ public class TimeseriesQueryRunnerTest
)
)
)
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults1 = Arrays.asList(
@ -476,7 +498,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query1, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults1, results1);
assertExpectedResults(expectedResults1, results1);
}
@Test
@ -502,6 +524,7 @@ public class TimeseriesQueryRunnerTest
)
)
)
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> lotsOfZeroes = Lists.newArrayList();
@ -544,7 +567,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query1, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults1, results1);
assertExpectedResults(expectedResults1, results1);
}
@Test
@ -576,6 +599,7 @@ public class TimeseriesQueryRunnerTest
)
)
)
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults1 = Arrays.asList(
@ -591,7 +615,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query1, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults1, results1);
assertExpectedResults(expectedResults1, results1);
}
@Test
@ -618,6 +642,7 @@ public class TimeseriesQueryRunnerTest
QueryRunnerTestHelper.qualityUniques
)
)
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults1 = Arrays.asList(
@ -632,7 +657,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query1, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults1, results1);
assertExpectedResults(expectedResults1, results1);
TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
@ -670,7 +695,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query2, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults2, results2);
assertExpectedResults(expectedResults2, results2);
}
@Test
@ -697,6 +722,7 @@ public class TimeseriesQueryRunnerTest
)
)
)
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList();
@ -705,7 +731,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -724,6 +750,7 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -755,7 +782,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -774,6 +801,7 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -805,7 +833,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -824,6 +852,7 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -855,7 +884,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -874,6 +903,7 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -905,7 +935,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -924,6 +954,7 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -955,7 +986,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -982,6 +1013,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -1013,7 +1045,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -1040,6 +1072,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -1071,7 +1104,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -1098,6 +1131,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -1129,7 +1163,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -1154,6 +1188,7 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -1185,7 +1220,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@ -1214,6 +1249,7 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -1245,7 +1281,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -1276,6 +1312,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -1307,7 +1344,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -1320,6 +1357,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -1351,7 +1389,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -1365,6 +1403,7 @@ public class TimeseriesQueryRunnerTest
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.context(ImmutableMap.<String, Object>of("skipEmptyBuckets", "true"))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList();
@ -1373,7 +1412,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, new HashMap<String, Object>()),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -1386,6 +1425,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -1417,7 +1457,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, new HashMap<String, Object>()),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -1430,6 +1470,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -1461,7 +1502,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, new HashMap<String, Object>()),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -1474,6 +1515,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -1505,7 +1547,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -1532,6 +1574,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -1563,7 +1606,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
assertExpectedResults(expectedResults, results);
}
@Test
@ -1580,6 +1623,7 @@ public class TimeseriesQueryRunnerTest
QueryRunnerTestHelper.jsPlacementishCount
)
)
.descending(descending)
.build();
Iterable<Result<TimeseriesResultValue>> expectedResults = ImmutableList.of(
@ -1603,7 +1647,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
assertExpectedResults(expectedResults, actualResults);
}
@Test
@ -1621,9 +1665,10 @@ public class TimeseriesQueryRunnerTest
QueryRunnerTestHelper.jsPlacementishCount
)
)
.descending(descending)
.build();
Iterable<Result<TimeseriesResultValue>> expectedResults = ImmutableList.of(
List<Result<TimeseriesResultValue>> expectedResults = ImmutableList.of(
new Result<>(
new DateTime(
QueryRunnerTestHelper.firstToThird.getIntervals()
@ -1644,7 +1689,7 @@ public class TimeseriesQueryRunnerTest
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
assertExpectedResults(expectedResults, actualResults);
}
@Test
@ -1657,6 +1702,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
Iterable<Result<TimeseriesResultValue>> expectedResults = Sequences.toList(
@ -1667,6 +1713,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build(),
CONTEXT
),
@ -1689,6 +1736,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
Iterable<Result<TimeseriesResultValue>> expectedResults = Sequences.toList(
@ -1700,6 +1748,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build(),
CONTEXT
),
@ -1736,6 +1785,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
AndDimFilter andDimFilter2 = Druids.newAndDimFilterBuilder()
@ -1762,6 +1812,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build(),
CONTEXT
),
@ -1797,6 +1848,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
AndDimFilter andDimFilter2 = Druids.newAndDimFilterBuilder()
@ -1826,6 +1878,7 @@ public class TimeseriesQueryRunnerTest
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build(),
CONTEXT
),
@ -1862,6 +1915,7 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
@ -1883,7 +1937,7 @@ public class TimeseriesQueryRunnerTest
)
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
assertExpectedResults(expectedResults, actualResults);
}
@Test
@ -1910,6 +1964,7 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
@ -1932,7 +1987,7 @@ public class TimeseriesQueryRunnerTest
)
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
assertExpectedResults(expectedResults, actualResults);
}
@Test
@ -1959,6 +2014,7 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
@ -1981,7 +2037,7 @@ public class TimeseriesQueryRunnerTest
)
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
assertExpectedResults(expectedResults, actualResults);
}
@Test
@ -2010,6 +2066,7 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
@ -2031,7 +2088,7 @@ public class TimeseriesQueryRunnerTest
)
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
assertExpectedResults(expectedResults, actualResults);
}
@Test
@ -2060,6 +2117,7 @@ public class TimeseriesQueryRunnerTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
@ -2081,7 +2139,7 @@ public class TimeseriesQueryRunnerTest
)
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
assertExpectedResults(expectedResults, actualResults);
}
@Test
@ -2098,6 +2156,7 @@ public class TimeseriesQueryRunnerTest
)
)
.granularity(QueryRunnerTestHelper.allGran)
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -2121,7 +2180,7 @@ public class TimeseriesQueryRunnerTest
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
assertExpectedResults(expectedResults, actualResults);
}
@Test

View File

@ -27,14 +27,30 @@ import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.PostAggregator;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
@RunWith(Parameterized.class)
public class TimeseriesQueryTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Parameterized.Parameters(name="descending={0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.cartesian(Arrays.asList(false, true));
}
private final boolean descending;
public TimeseriesQueryTest(boolean descending)
{
this.descending = descending;
}
@Test
public void testQuerySerialization() throws IOException
{
@ -49,6 +65,7 @@ public class TimeseriesQueryTest
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(descending)
.build();
String json = jsonMapper.writeValueAsString(query);

View File

@ -41,7 +41,6 @@ import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -59,40 +58,43 @@ public class TopNUnionQueryTest
}
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
public static Iterable<Object[]> constructorFeeder() throws IOException
{
List<Object> retVal = Lists.newArrayList();
retVal.addAll(
QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
QueryRunnerTestHelper.unionDataSource
)
);
retVal.addAll(
QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory(
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(2000);
}
}
return QueryRunnerTestHelper.cartesian(
Iterables.concat(
QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
QueryRunnerTestHelper.unionDataSource
),
QueryRunnerTestHelper.unionDataSource
QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory(
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(2000);
}
}
),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
QueryRunnerTestHelper.unionDataSource
)
)
);
return retVal;
}
@Test

View File

@ -42,7 +42,7 @@ public class BitmapOffsetTest
ImmutableConciseSet set = ImmutableConciseSet.newImmutableFromMutable(mutableSet);
BitmapOffset offset = new BitmapOffset(new ConciseBitmapFactory(), new WrappedImmutableConciseBitmap(set));
BitmapOffset offset = new BitmapOffset(new ConciseBitmapFactory(), new WrappedImmutableConciseBitmap(set), false);
int count = 0;
while (offset.withinBounds()) {

View File

@ -69,6 +69,10 @@ public class TestHelper
return JSON_MAPPER;
}
public static <T> Iterable<T> revert(Iterable<T> input) {
return Lists.reverse(Lists.newArrayList(input));
}
public static <T> void assertExpectedResults(Iterable<Result<T>> expectedResults, Sequence<Result<T>> results)
{
assertResults(expectedResults, Sequences.toList(results, Lists.<Result<T>>newArrayList()), "");
@ -150,17 +154,19 @@ public class TestHelper
}
}
private static <T> void assertObjects(Iterable<T> expectedResults, Iterable<T> actualResults, String failMsg)
private static <T> void assertObjects(Iterable<T> expectedResults, Iterable<T> actualResults, String msg)
{
Iterator resultsIter = actualResults.iterator();
Iterator resultsIter2 = actualResults.iterator();
Iterator expectedResultsIter = expectedResults.iterator();
int index = 0;
while (resultsIter.hasNext() && resultsIter2.hasNext() && expectedResultsIter.hasNext()) {
Object expectedNext = expectedResultsIter.next();
final Object next = resultsIter.next();
final Object next2 = resultsIter2.next();
String failMsg = msg + "-" + index++;
Assert.assertEquals(failMsg, expectedNext, next);
Assert.assertEquals(
String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg),
@ -171,13 +177,13 @@ public class TestHelper
if (resultsIter.hasNext()) {
Assert.fail(
String.format("%s: Expected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next())
String.format("%s: Expected resultsIter to be exhausted, next element was %s", msg, resultsIter.next())
);
}
if (resultsIter2.hasNext()) {
Assert.fail(
String.format("%s: Expected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next())
String.format("%s: Expected resultsIter2 to be exhausted, next element was %s", msg, resultsIter.next())
);
}
@ -185,7 +191,7 @@ public class TestHelper
Assert.fail(
String.format(
"%s: Expected expectedResultsIter to be exhausted, next element was %s",
failMsg,
msg,
expectedResultsIter.next()
)
);

View File

@ -252,31 +252,35 @@ public class IncrementalIndexStorageAdapterTest
);
IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(index);
Sequence<Cursor> cursorSequence = adapter.makeCursors(
new SelectorFilter("sally", "bo"),
interval,
QueryGranularity.NONE
);
Cursor cursor = Sequences.toList(Sequences.limit(cursorSequence, 1), Lists.<Cursor>newArrayList()).get(0);
DimensionSelector dimSelector;
for (boolean descending : Arrays.asList(false, true)) {
Sequence<Cursor> cursorSequence = adapter.makeCursors(
new SelectorFilter("sally", "bo"),
interval,
QueryGranularity.NONE,
descending
);
dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally"));
Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0)));
Cursor cursor = Sequences.toList(Sequences.limit(cursorSequence, 1), Lists.<Cursor>newArrayList()).get(0);
DimensionSelector dimSelector;
index.add(
new MapBasedInputRow(
t.minus(1).getMillis(),
Lists.newArrayList("sally"),
ImmutableMap.<String, Object>of("sally", "ah")
)
);
dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally"));
Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0)));
// Cursor reset should not be affected by out of order values
cursor.reset();
index.add(
new MapBasedInputRow(
t.minus(1).getMillis(),
Lists.newArrayList("sally"),
ImmutableMap.<String, Object>of("sally", "ah")
)
);
dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally"));
Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0)));
// Cursor reset should not be affected by out of order values
cursor.reset();
dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally"));
Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0)));
}
}
@Test

View File

@ -39,6 +39,7 @@ import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.LazySequence;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger;
@ -49,6 +50,7 @@ import io.druid.client.selector.ServerSelector;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Smile;
import io.druid.query.BaseQuery;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
@ -131,20 +133,20 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final List<Pair<Interval, byte[]>> cachedResults = Lists.newArrayList();
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
final boolean useCache = query.getContextUseCache(true)
final boolean useCache = BaseQuery.getContextUseCache(query, true)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
final boolean populateCache = query.getContextPopulateCache(true)
final boolean populateCache = BaseQuery.getContextPopulateCache(query, true)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);
final boolean isBySegment = query.getContextBySegment(false);
final boolean isBySegment = BaseQuery.getContextBySegment(query, false);
final ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
final int priority = query.getContextPriority(0);
final int priority = BaseQuery.getContextPriority(query, 0);
contextBuilder.put("priority", priority);
if (populateCache) {
@ -292,7 +294,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
addSequencesFromCache(sequencesByInterval);
addSequencesFromServer(sequencesByInterval);
return mergeCachedAndUncachedSequences(sequencesByInterval, toolChest);
return mergeCachedAndUncachedSequences(query, sequencesByInterval);
}
private void addSequencesFromCache(ArrayList<Sequence<T>> listOfSequences)
@ -340,8 +342,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
{
listOfSequences.ensureCapacity(listOfSequences.size() + serverSegments.size());
final Query<Result<BySegmentResultValueClass<T>>> rewrittenQuery = (Query<Result<BySegmentResultValueClass<T>>>) query
.withOverriddenContext(contextBuilder.build());
final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build());
// Loop through each server, setting up the query and initiating it.
// The data gets handled as a Future and parsed in the long Sequence chain in the resultSeqToAdd setter.
@ -366,7 +367,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// bySegment queries need to be de-serialized, see DirectDruidClient.run()
@SuppressWarnings("unchecked")
final Query<Result<BySegmentResultValueClass<T>>> bySegmentQuery = (Query<Result<BySegmentResultValueClass<T>>>) query;
final Query<Result<BySegmentResultValueClass<T>>> bySegmentQuery =
(Query<Result<BySegmentResultValueClass<T>>>) ((Query) query);
@SuppressWarnings("unchecked")
final Sequence<Result<BySegmentResultValueClass<T>>> resultSequence = clientQueryable.run(
@ -406,7 +408,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
rewrittenQuery.withQuerySegmentSpec(segmentSpec),
responseContext
);
resultSeqToAdd = toolChest.mergeSequencesUnordered(
resultSeqToAdd = new MergeSequence(
query.getResultOrdering(),
Sequences.<Result<BySegmentResultValueClass<T>>, Sequence<T>>map(
runningSequence,
new Function<Result<BySegmentResultValueClass<T>>, Sequence<T>>()
@ -504,18 +507,17 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
protected Sequence<T> mergeCachedAndUncachedSequences(
List<Sequence<T>> sequencesByInterval,
QueryToolChest<T, Query<T>> toolChest
Query<T> query,
List<Sequence<T>> sequencesByInterval
)
{
if (sequencesByInterval.isEmpty()) {
return Sequences.empty();
}
return toolChest.mergeSequencesUnordered(
Sequences.simple(
sequencesByInterval
)
return new MergeSequence<>(
query.getResultOrdering(),
Sequences.simple(sequencesByInterval)
);
}

View File

@ -35,6 +35,7 @@ import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -87,12 +88,12 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
{
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
final boolean populateCache = query.getContextPopulateCache(true)
final boolean populateCache = BaseQuery.getContextPopulateCache(query, true)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);
final boolean useCache = query.getContextUseCache(true)
final boolean useCache = BaseQuery.getContextUseCache(query, true)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);

View File

@ -52,6 +52,7 @@ import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.query.BaseQuery;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.DruidMetrics;
import io.druid.query.Query;
@ -134,7 +135,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
boolean isBySegment = query.getContextBySegment(false);
boolean isBySegment = BaseQuery.getContextBySegment(query, false);
Pair<JavaType, JavaType> types = typesMap.get(query.getClass());
if (types == null) {

View File

@ -75,7 +75,8 @@ public class IngestSegmentFirehose implements Firehose
adapter.getAdapter().makeCursors(
Filters.convertDimensionFilters(dimFilter),
adapter.getInterval(),
granularity
granularity,
false
), new Function<Cursor, Sequence<InputRow>>()
{
@Nullable

View File

@ -97,8 +97,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
retryConfig,
objectMapper
)
),
toolChest
)
)
)
),

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import io.druid.query.BaseQuery;
import io.druid.query.Query;
/**
@ -45,7 +46,7 @@ public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelecto
@Override
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
{
final int priority = query.getContextPriority(0);
final int priority = BaseQuery.getContextPriority(query, 0);
if (priority < minPriority) {
return Optional.of(

View File

@ -926,11 +926,15 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
),
client.mergeCachedAndUncachedSequences(
sequences,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
new TopNQueryBuilder()
.dataSource("test")
.intervals("2011-01-06/2011-01-10")
.dimension("a")
.metric("b")
.threshold(3)
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("b")))
.build(),
sequences
)
);
}

View File

@ -37,15 +37,19 @@ import io.druid.client.cache.MapCache;
import io.druid.granularity.AllGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.CacheStrategy;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.topn.TopNQuery;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
@ -84,8 +88,10 @@ public class CachingQueryRunnerTest
@Test
public void testCloseAndPopulate() throws Exception
{
Iterable<Result<TopNResultValue>> expectedRes = makeTopNResults(false, objects);
final TopNQueryBuilder builder = new TopNQueryBuilder()
List<Result> expectedRes = makeTopNResults(false, objects);
List<Result> expectedCacheRes = makeTopNResults(true, objects);
TopNQueryBuilder builder = new TopNQueryBuilder()
.dataSource("ds")
.dimension("top_dim")
.metric("imps")
@ -94,6 +100,72 @@ public class CachingQueryRunnerTest
.aggregators(AGGS)
.granularity(AllGranularity.ALL);
QueryToolChest toolchest = new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
testCloseAndPopulate(expectedRes, expectedCacheRes, builder.build(), toolchest);
testUseCache(expectedCacheRes, builder.build(), toolchest);
}
@Test
public void testTimeseries() throws Exception
{
for (boolean descending : new boolean[]{false, true}) {
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.dayGran)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory(
"idx",
"index"
),
QueryRunnerTestHelper.qualityUniques
)
)
.descending(descending)
.build();
Result row1 = new Result(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 6619L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
)
);
Result row2 = new Result<>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
)
);
List<Result> expectedResults;
if (descending) {
expectedResults = Lists.newArrayList(row2, row1);
} else {
expectedResults = Lists.newArrayList(row1, row2);
}
QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
testCloseAndPopulate(expectedResults, expectedResults, query, toolChest);
testUseCache(expectedResults, query, toolChest);
}
}
private void testCloseAndPopulate(
List<Result> expectedRes,
List<Result> expectedCacheRes,
Query query,
QueryToolChest toolchest
)
throws Exception
{
final AssertingClosable closable = new AssertingClosable();
final Sequence resultSeq = new ResourceClosingSequence(
Sequences.simple(expectedRes), closable
@ -115,8 +187,6 @@ public class CachingQueryRunnerTest
String segmentIdentifier = "segment";
SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0);
TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator());
DefaultObjectMapper objectMapper = new DefaultObjectMapper();
CachingQueryRunner runner = new CachingQueryRunner(
segmentIdentifier,
@ -149,8 +219,7 @@ public class CachingQueryRunnerTest
}
);
TopNQuery query = builder.build();
CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> cacheStrategy = toolchest.getCacheStrategy(query);
CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query);
Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey(
segmentIdentifier,
segmentDescriptor,
@ -165,15 +234,13 @@ public class CachingQueryRunnerTest
ArrayList results = Sequences.toList(res, new ArrayList());
Assert.assertTrue(closable.isClosed());
Assert.assertEquals(expectedRes, results);
Iterable<Result<TopNResultValue>> expectedCacheRes = makeTopNResults(true, objects);
Assert.assertEquals(expectedRes.toString(), results.toString());
byte[] cacheValue = cache.get(cacheKey);
Assert.assertNotNull(cacheValue);
Function<Object, Result<TopNResultValue>> fn = cacheStrategy.pullFromCache();
List<Result<TopNResultValue>> cacheResults = Lists.newArrayList(
Function<Object, Result> fn = cacheStrategy.pullFromCache();
List<Result> cacheResults = Lists.newArrayList(
Iterators.transform(
objectMapper.readValues(
objectMapper.getFactory().createParser(cacheValue),
@ -182,31 +249,20 @@ public class CachingQueryRunnerTest
fn
)
);
Assert.assertEquals(expectedCacheRes, cacheResults);
Assert.assertEquals(expectedCacheRes.toString(), cacheResults.toString());
}
@Test
public void testUseCache() throws Exception
private void testUseCache(
List<Result> expectedResults,
Query query,
QueryToolChest toolchest
) throws Exception
{
DefaultObjectMapper objectMapper = new DefaultObjectMapper();
Iterable<Result<TopNResultValue>> expectedResults = makeTopNResults(true, objects);
String segmentIdentifier = "segment";
SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0);
TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator());
final TopNQueryBuilder builder = new TopNQueryBuilder()
.dataSource("ds")
.dimension("top_dim")
.metric("imps")
.threshold(3)
.intervals("2011-01-05/2011-01-10")
.aggregators(AGGS)
.granularity(AllGranularity.ALL);
final TopNQuery query = builder.build();
CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> cacheStrategy = toolchest.getCacheStrategy(query);
CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query);
Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey(
segmentIdentifier,
segmentDescriptor,
@ -254,14 +310,14 @@ public class CachingQueryRunnerTest
);
HashMap<String, Object> context = new HashMap<String, Object>();
List<Object> results = Sequences.toList(runner.run(query, context), new ArrayList());
Assert.assertEquals(expectedResults, results);
List<Result> results = Sequences.toList(runner.run(query, context), new ArrayList());
Assert.assertEquals(expectedResults.toString(), results.toString());
}
private Iterable<Result<TopNResultValue>> makeTopNResults
private List<Result> makeTopNResults
(boolean cachedResults, Object... objects)
{
List<Result<TopNResultValue>> retVal = Lists.newArrayList();
List<Result> retVal = Lists.newArrayList();
int index = 0;
while (index < objects.length) {
DateTime timestamp = (DateTime) objects[index++];

View File

@ -30,7 +30,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.IAE;
import com.metamx.common.MapUtils;
import com.metamx.common.Pair;
import com.metamx.common.guava.ConcatSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
@ -571,18 +570,6 @@ public class ServerManagerTest
return runner;
}
@Override
public Sequence<T> mergeSequences(Sequence<Sequence<T>> seqOfSequences)
{
return new ConcatSequence<T>(seqOfSequences);
}
@Override
public Sequence<T> mergeSequencesUnordered(Sequence<Sequence<T>> seqOfSequences)
{
return new ConcatSequence<T>(seqOfSequences);
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query)
{