diff --git a/docs/content/querying/timeseriesquery.md b/docs/content/querying/timeseriesquery.md index ca0ed09cbb2..66d38bcb02f 100644 --- a/docs/content/querying/timeseriesquery.md +++ b/docs/content/querying/timeseriesquery.md @@ -13,6 +13,7 @@ An example timeseries query object is shown below: "queryType": "timeseries", "dataSource": "sample_datasource", "granularity": "day", + "descending": "true", "filter": { "type": "and", "fields": [ @@ -49,6 +50,7 @@ There are 7 main parts to a timeseries query: |--------|-----------|---------| |queryType|This String should always be "timeseries"; this is the first thing Druid looks at to figure out how to interpret the query|yes| |dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes| +|descending|Whether to make descending ordered result.|no| |intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes| |granularity|Defines the granularity to bucket query results. See [Granularities](../querying/granularities.html)|yes| |filter|See [Filters](../querying/filters.html)|no| diff --git a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java index fcd740302fa..9f26873760d 100644 --- a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java +++ b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java @@ -49,7 +49,7 @@ public class AsyncQueryRunner implements QueryRunner @Override public Sequence run(final Query query, final Map responseContext) { - final int priority = query.getContextPriority(0); + final int priority = BaseQuery.getContextPriority(query, 0); final ListenableFuture> future = executor.submit(new AbstractPrioritizedCallable>(priority) { @Override diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index e30a14b0f79..0f95e1bdbba 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; import io.druid.query.spec.QuerySegmentSpec; @@ -34,10 +35,66 @@ import java.util.Map; /** */ -public abstract class BaseQuery implements Query +public abstract class BaseQuery> implements Query { + public static int getContextPriority(Query query, int defaultValue) + { + return parseInt(query, "priority", defaultValue); + } + + public static boolean getContextBySegment(Query query, boolean defaultValue) + { + return parseBoolean(query, "bySegment", defaultValue); + } + + public static boolean getContextPopulateCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "populateCache", defaultValue); + } + + public static boolean getContextUseCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "useCache", defaultValue); + } + + public static boolean getContextFinalize(Query query, boolean defaultValue) + { + return parseBoolean(query, "finalize", defaultValue); + } + + private static int parseInt(Query query, String key, int defaultValue) + { + Object val = query.getContextValue(key); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Integer.parseInt((String) val); + } else if (val instanceof Integer) { + return (int) val; + } else { + throw new ISE("Unknown type [%s]", val.getClass()); + } + } + + private static boolean parseBoolean(Query query, String key, boolean defaultValue) + { + Object val = query.getContextValue(key); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Boolean.parseBoolean((String) val); + } else if (val instanceof Boolean) { + return (boolean) val; + } else { + throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass()); + } + } + public static final String QUERYID = "queryId"; private final DataSource dataSource; + private final boolean descending; private final Map context; private final QuerySegmentSpec querySegmentSpec; private volatile Duration duration; @@ -45,6 +102,7 @@ public abstract class BaseQuery implements Query public BaseQuery( DataSource dataSource, QuerySegmentSpec querySegmentSpec, + boolean descending, Map context ) { @@ -54,6 +112,7 @@ public abstract class BaseQuery implements Query this.dataSource = dataSource; this.context = context; this.querySegmentSpec = querySegmentSpec; + this.descending = descending; } @JsonProperty @@ -63,6 +122,13 @@ public abstract class BaseQuery implements Query return dataSource; } + @JsonProperty + @Override + public boolean isDescending() + { + return descending; + } + @JsonProperty("intervals") public QuerySegmentSpec getQuerySegmentSpec() { @@ -122,67 +188,6 @@ public abstract class BaseQuery implements Query return retVal == null ? defaultValue : retVal; } - @Override - public int getContextPriority(int defaultValue) - { - if (context == null) { - return defaultValue; - } - Object val = context.get("priority"); - if (val == null) { - return defaultValue; - } - if (val instanceof String) { - return Integer.parseInt((String) val); - } else if (val instanceof Integer) { - return (int) val; - } else { - throw new ISE("Unknown type [%s]", val.getClass()); - } - } - - @Override - public boolean getContextBySegment(boolean defaultValue) - { - return parseBoolean("bySegment", defaultValue); - } - - @Override - public boolean getContextPopulateCache(boolean defaultValue) - { - return parseBoolean("populateCache", defaultValue); - } - - @Override - public boolean getContextUseCache(boolean defaultValue) - { - return parseBoolean("useCache", defaultValue); - } - - @Override - public boolean getContextFinalize(boolean defaultValue) - { - return parseBoolean("finalize", defaultValue); - } - - private boolean parseBoolean(String key, boolean defaultValue) - { - if (context == null) { - return defaultValue; - } - Object val = context.get(key); - if (val == null) { - return defaultValue; - } - if (val instanceof String) { - return Boolean.parseBoolean((String) val); - } else if (val instanceof Boolean) { - return (boolean) val; - } else { - throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass()); - } - } - protected Map computeOverridenContext(Map overrides) { Map overridden = Maps.newTreeMap(); @@ -195,6 +200,13 @@ public abstract class BaseQuery implements Query return overridden; } + @Override + public Ordering getResultOrdering() + { + Ordering retVal = Ordering.natural(); + return descending ? retVal.reverse() : retVal; + } + @Override public String getId() { @@ -219,6 +231,9 @@ public abstract class BaseQuery implements Query BaseQuery baseQuery = (BaseQuery) o; + if (descending != baseQuery.descending) { + return false; + } if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) { return false; } @@ -241,6 +256,7 @@ public abstract class BaseQuery implements Query public int hashCode() { int result = dataSource != null ? dataSource.hashCode() : 0; + result = 31 * result + (descending ? 1 : 0); result = 31 * result + (context != null ? context.hashCode() : 0); result = 31 * result + (querySegmentSpec != null ? querySegmentSpec.hashCode() : 0); result = 31 * result + (duration != null ? duration.hashCode() : 0); diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java index c23d613107f..f331610e075 100644 --- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java @@ -51,7 +51,7 @@ public class BySegmentQueryRunner implements QueryRunner @SuppressWarnings("unchecked") public Sequence run(final Query query, Map responseContext) { - if (query.getContextBySegment(false)) { + if (BaseQuery.getContextBySegment(query, false)) { final Sequence baseSequence = base.run(query, responseContext); final List results = Sequences.toList(baseSequence, Lists.newArrayList()); return Sequences.simple( diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java index 06c666a8c5e..c25687165e1 100644 --- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java @@ -39,7 +39,7 @@ public abstract class BySegmentSkippingQueryRunner implements QueryRunner @Override public Sequence run(Query query, Map responseContext) { - if (query.getContextBySegment(false)) { + if (BaseQuery.getContextBySegment(query, false)) { return baseRunner.run(query, responseContext); } diff --git a/processing/src/main/java/io/druid/query/CacheStrategy.java b/processing/src/main/java/io/druid/query/CacheStrategy.java index 666a82813ec..6b00ef0e577 100644 --- a/processing/src/main/java/io/druid/query/CacheStrategy.java +++ b/processing/src/main/java/io/druid/query/CacheStrategy.java @@ -21,20 +21,39 @@ package io.druid.query; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; -import com.metamx.common.guava.Sequence; /** */ public interface CacheStrategy> { - public byte[] computeCacheKey(QueryType query); + /** + * Computes the cache key for the given query + * + * @param query the query to compute a cache key for + * @return the cache key + */ + byte[] computeCacheKey(QueryType query); - public TypeReference getCacheObjectClazz(); + /** + * Returns the class type of what is used in the cache + * + * @return Returns the class type of what is used in the cache + */ + TypeReference getCacheObjectClazz(); - // Resultant function must be THREAD SAFE - public Function prepareForCache(); + /** + * Returns a function that converts from the QueryType's result type to something cacheable. + * + * The resulting function must be thread-safe. + * + * @return a thread-safe function that converts the QueryType's result type into something cacheable + */ + Function prepareForCache(); - public Function pullFromCache(); - - public Sequence mergeSequences(Sequence> seqOfSequences); + /** + * A function that does the inverse of the operation that the function prepareForCache returns + * + * @return A function that does the inverse of the operation that the function prepareForCache returns + */ + Function pullFromCache(); } diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 332793da45b..a8a143b22d7 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -64,22 +64,19 @@ public class ChainedExecutionQueryRunner implements QueryRunner private final Iterable> queryables; private final ListeningExecutorService exec; - private final Ordering ordering; private final QueryWatcher queryWatcher; public ChainedExecutionQueryRunner( ExecutorService exec, - Ordering ordering, QueryWatcher queryWatcher, QueryRunner... queryables ) { - this(exec, ordering, queryWatcher, Arrays.asList(queryables)); + this(exec, queryWatcher, Arrays.asList(queryables)); } public ChainedExecutionQueryRunner( ExecutorService exec, - Ordering ordering, QueryWatcher queryWatcher, Iterable> queryables ) @@ -87,7 +84,6 @@ public class ChainedExecutionQueryRunner implements QueryRunner // listeningDecorator will leave PrioritizedExecutorService unchanged, // since it already implements ListeningExecutorService this.exec = MoreExecutors.listeningDecorator(exec); - this.ordering = ordering; this.queryables = Iterables.unmodifiableIterable(queryables); this.queryWatcher = queryWatcher; } @@ -95,7 +91,8 @@ public class ChainedExecutionQueryRunner implements QueryRunner @Override public Sequence run(final Query query, final Map responseContext) { - final int priority = query.getContextPriority(0); + final int priority = BaseQuery.getContextPriority(query, 0); + final Ordering ordering = query.getResultOrdering(); return new BaseSequence>( new BaseSequence.IteratorMaker>() diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index 56e0a21b805..24e241ccc4e 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -332,6 +332,8 @@ public class Druids private List postAggregatorSpecs; private Map context; + private boolean descending; + private TimeseriesQueryBuilder() { dataSource = null; @@ -348,6 +350,7 @@ public class Druids return new TimeseriesQuery( dataSource, querySegmentSpec, + descending, dimFilter, granularity, aggregatorSpecs, @@ -362,6 +365,7 @@ public class Druids .dataSource(query.getDataSource()) .intervals(query.getIntervals()) .filters(query.getDimensionsFilter()) + .descending(query.isDescending()) .granularity(query.getGranularity()) .aggregators(query.getAggregatorSpecs()) .postAggregators(query.getPostAggregatorSpecs()) @@ -374,6 +378,7 @@ public class Druids .dataSource(builder.dataSource) .intervals(builder.querySegmentSpec) .filters(builder.dimFilter) + .descending(builder.descending) .granularity(builder.granularity) .aggregators(builder.aggregatorSpecs) .postAggregators(builder.postAggregatorSpecs) @@ -395,6 +400,11 @@ public class Druids return dimFilter; } + public boolean isDescending() + { + return descending; + } + public QueryGranularity getGranularity() { return granularity; @@ -467,6 +477,12 @@ public class Druids return this; } + public TimeseriesQueryBuilder descending(boolean d) + { + descending = d; + return this; + } + public TimeseriesQueryBuilder granularity(String g) { granularity = QueryGranularity.fromString(g); diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 06fb86de1f8..7e28859def6 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -49,8 +49,8 @@ public class FinalizeResultsQueryRunner implements QueryRunner @Override public Sequence run(final Query query, Map responseContext) { - final boolean isBySegment = query.getContextBySegment(false); - final boolean shouldFinalize = query.getContextFinalize(true); + final boolean isBySegment = BaseQuery.getContextBySegment(query, false); + final boolean shouldFinalize = BaseQuery.getContextFinalize(query, true); final Query queryToRun; final Function finalizerFn; diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index a6d8fb5b0fb..81929de10a4 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -88,8 +88,8 @@ public class GroupByParallelQueryRunner implements QueryRunner bufferPool ); final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); - final boolean bySegment = query.getContextBySegment(false); - final int priority = query.getContextPriority(0); + final boolean bySegment = BaseQuery.getContextBySegment(query, false); + final int priority = BaseQuery.getContextPriority(query, 0); ListenableFuture> futures = Futures.allAsList( Lists.newArrayList( @@ -175,7 +175,7 @@ public class GroupByParallelQueryRunner implements QueryRunner return new ResourceClosingSequence( Sequences.simple( Iterables.transform( - indexAccumulatorPair.lhs.iterableWithPostAggregations(null), + indexAccumulatorPair.lhs.iterableWithPostAggregations(null, query.isDescending()), new Function() { @Override diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 386a8e2eca3..3cfbfcd4479 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -21,6 +21,7 @@ package io.druid.query; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.collect.Ordering; import com.metamx.common.guava.Sequence; import io.druid.query.datasourcemetadata.DataSourceMetadataQuery; import io.druid.query.groupby.GroupByQuery; @@ -51,49 +52,46 @@ import java.util.Map; }) public interface Query { - public static final String TIMESERIES = "timeseries"; - public static final String SEARCH = "search"; - public static final String TIME_BOUNDARY = "timeBoundary"; - public static final String GROUP_BY = "groupBy"; - public static final String SEGMENT_METADATA = "segmentMetadata"; - public static final String SELECT = "select"; - public static final String TOPN = "topN"; - public static final String DATASOURCE_METADATA = "dataSourceMetadata"; + String TIMESERIES = "timeseries"; + String SEARCH = "search"; + String TIME_BOUNDARY = "timeBoundary"; + String GROUP_BY = "groupBy"; + String SEGMENT_METADATA = "segmentMetadata"; + String SELECT = "select"; + String TOPN = "topN"; + String DATASOURCE_METADATA = "dataSourceMetadata"; - public DataSource getDataSource(); + DataSource getDataSource(); - public boolean hasFilters(); + boolean hasFilters(); - public String getType(); + String getType(); - public Sequence run(QuerySegmentWalker walker, Map context); + Sequence run(QuerySegmentWalker walker, Map context); - public Sequence run(QueryRunner runner, Map context); + Sequence run(QueryRunner runner, Map context); - public List getIntervals(); + List getIntervals(); - public Duration getDuration(); + Duration getDuration(); - public Map getContext(); + Map getContext(); - public ContextType getContextValue(String key); + ContextType getContextValue(String key); - public ContextType getContextValue(String key, ContextType defaultValue); + ContextType getContextValue(String key, ContextType defaultValue); - // For backwards compatibility - @Deprecated public int getContextPriority(int defaultValue); - @Deprecated public boolean getContextBySegment(boolean defaultValue); - @Deprecated public boolean getContextPopulateCache(boolean defaultValue); - @Deprecated public boolean getContextUseCache(boolean defaultValue); - @Deprecated public boolean getContextFinalize(boolean defaultValue); + boolean isDescending(); - public Query withOverriddenContext(Map contextOverride); + Ordering getResultOrdering(); - public Query withQuerySegmentSpec(QuerySegmentSpec spec); + Query withOverriddenContext(Map contextOverride); - public Query withId(String id); + Query withQuerySegmentSpec(QuerySegmentSpec spec); - public String getId(); + Query withId(String id); + + String getId(); Query withDataSource(DataSource dataSource); } diff --git a/processing/src/main/java/io/druid/query/QueryRunner.java b/processing/src/main/java/io/druid/query/QueryRunner.java index 4dccd47048a..70ea4218802 100644 --- a/processing/src/main/java/io/druid/query/QueryRunner.java +++ b/processing/src/main/java/io/druid/query/QueryRunner.java @@ -27,5 +27,11 @@ import java.util.Map; */ public interface QueryRunner { - public Sequence run(Query query, Map responseContext); + /** + * Runs the given query and returns results in a time-ordered sequence + * @param query + * @param responseContext + * @return + */ + Sequence run(Query query, Map responseContext); } diff --git a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java index 76f6d5954f8..ba773defd85 100644 --- a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java +++ b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java @@ -56,6 +56,7 @@ public class QueryRunnerHelper final StorageAdapter adapter, List queryIntervals, Filter filter, + boolean descending, QueryGranularity granularity, final Function> mapFn ) @@ -66,7 +67,7 @@ public class QueryRunnerHelper return Sequences.filter( Sequences.map( - adapter.makeCursors(filter, queryIntervals.get(0), granularity), + adapter.makeCursors(filter, queryIntervals.get(0), granularity, descending), new Function>() { @Override diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index 0c5697a86a1..e62348228fd 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -21,7 +21,6 @@ package io.druid.query; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; -import com.metamx.common.guava.Sequence; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.timeline.LogicalSegment; @@ -37,54 +36,19 @@ public abstract class QueryToolChest mergeResults(QueryRunner runner); - /** - * This method doesn't belong here, but it's here for now just to make it work. The method needs to - * take a Sequence of Sequences and return a single Sequence of ResultType objects in time-order (ascending) - *

- * This method assumes that its input sequences provide values already in sorted order. - * Even more specifically, it assumes that the individual sequences are also ordered by their first element. - *

- * In the vast majority of cases, this should just be implemented with: - *

- * return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - * - * @param seqOfSequences sequence of sequences to be merged - * - * @return the sequence of merged results - */ - public abstract Sequence mergeSequences(Sequence> seqOfSequences); - - /** - * This method doesn't belong here, but it's here for now just to make it work. The method needs to - * take a Sequence of Sequences and return a single Sequence of ResultType objects in time-order (ascending) - *

- * This method assumes that its input sequences provide values already in sorted order, but, unlike - * mergeSequences, it does *not* assume that the individual sequences are also ordered by their first element. - *

- * In the vast majority if ocases, this hsould just be implemented with: - *

- * return new MergeSequence<>(getOrdering(), seqOfSequences); - * - * @param seqOfSequences sequence of sequences to be merged - * - * @return the sequence of merged results - */ - public abstract Sequence mergeSequencesUnordered(Sequence> seqOfSequences); - - /** * Creates a builder that is used to generate a metric for this specific query type. This exists * to allow for query-specific dimensions on metrics. That is, the ToolChest is expected to set some - * meaningful dimensions for metrics given this query type. Examples might be the topN threshhold for + * meaningful dimensions for metrics given this query type. Examples might be the topN threshold for * a TopN query or the number of dimensions included for a groupBy query. * * @param query The query that is being processed diff --git a/processing/src/main/java/io/druid/query/ResultGranularTimestampComparator.java b/processing/src/main/java/io/druid/query/ResultGranularTimestampComparator.java index 5278c0c0289..7013dfd39d6 100644 --- a/processing/src/main/java/io/druid/query/ResultGranularTimestampComparator.java +++ b/processing/src/main/java/io/druid/query/ResultGranularTimestampComparator.java @@ -19,6 +19,7 @@ package io.druid.query; +import com.google.common.collect.Ordering; import com.google.common.primitives.Longs; import io.druid.granularity.QueryGranularity; @@ -43,4 +44,9 @@ public class ResultGranularTimestampComparator implements Comparator Ordering> create(QueryGranularity granularity, boolean descending) { + Comparator> comparator = new ResultGranularTimestampComparator<>(granularity); + return descending ? Ordering.from(comparator).reverse() : Ordering.from(comparator); + } } diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunner.java b/processing/src/main/java/io/druid/query/RetryQueryRunner.java index 3ccd5b4ae2a..7cdf8fea6ee 100644 --- a/processing/src/main/java/io/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/RetryQueryRunner.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Yielder; @@ -93,7 +94,12 @@ public class RetryQueryRunner implements QueryRunner if (!config.isReturnPartialResults() && !finalMissingSegs.isEmpty()) { throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs); } - return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator); + + return new MergeSequence<>( + query.getResultOrdering(), + Sequences.simple(listOfSequences)).toYielder( + initValue, accumulator + ); } else { return Iterables.getOnlyElement(listOfSequences).toYielder(initValue, accumulator); diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index bfc540b9479..e3fe9f45b06 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -21,6 +21,7 @@ package io.druid.query; import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -29,15 +30,12 @@ import java.util.Map; public class UnionQueryRunner implements QueryRunner { private final QueryRunner baseRunner; - private final QueryToolChest> toolChest; public UnionQueryRunner( - QueryRunner baseRunner, - QueryToolChest> toolChest + QueryRunner baseRunner ) { this.baseRunner = baseRunner; - this.toolChest = toolChest; } @Override @@ -45,7 +43,9 @@ public class UnionQueryRunner implements QueryRunner { DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { - return toolChest.mergeSequencesUnordered( + + return new MergeSequence<>( + query.getResultOrdering(), Sequences.simple( Lists.transform( ((UnionDataSource) dataSource).getDataSources(), diff --git a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQuery.java b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQuery.java index 746b4a3d16f..df2c52673fe 100644 --- a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQuery.java +++ b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQuery.java @@ -55,6 +55,7 @@ public class DataSourceMetadataQuery extends BaseQuery( - queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners + queryExecutor, queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java index 0e3cd2333c0..28d8e18a251 100644 --- a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java @@ -25,12 +25,9 @@ import com.google.common.base.Functions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; -import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.collections.OrderedMergeSequence; import io.druid.query.BySegmentSkippingQueryRunner; import io.druid.query.CacheStrategy; import io.druid.query.DataSourceUtil; @@ -104,18 +101,6 @@ public class DataSourceQueryQueryToolChest }; } - @Override - public Sequence> mergeSequences(Sequence>> seqOfSequences) - { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } - - @Override - public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) - { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } - @Override public ServiceMetricEvent.Builder makeMetricBuilder(DataSourceMetadataQuery query) { @@ -143,9 +128,4 @@ public class DataSourceQueryQueryToolChest { return null; } - - public Ordering> getOrdering() - { - return Ordering.natural(); - } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 577cfee3af8..4d4fffec1ac 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -88,7 +88,7 @@ public class GroupByQuery extends BaseQuery @JsonProperty("context") Map context ) { - super(dataSource, querySegmentSpec, context); + super(dataSource, querySegmentSpec, false, context); this.dimFilter = dimFilter; this.granularity = granularity; this.dimensions = dimensions == null ? ImmutableList.of() : dimensions; @@ -152,7 +152,7 @@ public class GroupByQuery extends BaseQuery Map context ) { - super(dataSource, querySegmentSpec, context); + super(dataSource, querySegmentSpec, false, context); this.dimFilter = dimFilter; this.granularity = granularity; diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index bdb8dae2d84..afd2739b566 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -95,7 +95,8 @@ public class GroupByQueryEngine final Sequence cursors = storageAdapter.makeCursors( Filters.convertDimensionFilters(query.getDimFilter()), intervals.get(0), - query.getGranularity() + query.getGranularity(), + false ); final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index c5f8ecbf601..2f0a6bf92d1 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -31,24 +31,22 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; -import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.ResourceClosingSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.collections.OrderedMergeSequence; import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; import io.druid.guice.annotations.Global; +import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.DataSource; import io.druid.query.DruidMetrics; @@ -126,20 +124,20 @@ public class GroupByQueryQueryToolChest extends QueryToolChest() { @Override - public Sequence run(Query input, Map responseContext) + public Sequence run(Query query, Map responseContext) { - if (input.getContextBySegment(false)) { - return runner.run(input, responseContext); + if (BaseQuery.getContextBySegment(query, false)) { + return runner.run(query, responseContext); } - if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { + if (Boolean.valueOf(query.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { return mergeGroupByResults( - (GroupByQuery) input, + (GroupByQuery) query, runner, responseContext ); } - return runner.run(input, responseContext); + return runner.run(query, responseContext); } }; } @@ -261,7 +259,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest postAggregate(final GroupByQuery query, IncrementalIndex index) { return Sequences.map( - Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), + Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs(), query.isDescending())), new Function() { @Override @@ -290,23 +288,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) - { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } - - @Override - public Sequence mergeSequencesUnordered(Sequence> seqOfSequences) - { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } - - private Ordering getOrdering() - { - return Ordering.natural().nullsFirst(); - } - @Override public ServiceMetricEvent.Builder makeMetricBuilder(GroupByQuery query) { @@ -586,12 +567,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) - { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } }; } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java index 9c34109017f..ebeda93598a 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -36,6 +36,7 @@ import io.druid.collections.StupidPool; import io.druid.data.input.Row; import io.druid.guice.annotations.Global; import io.druid.query.AbstractPrioritizedCallable; +import io.druid.query.BaseQuery; import io.druid.query.ConcatQueryRunner; import io.druid.query.GroupByParallelQueryRunner; import io.druid.query.Query; @@ -119,8 +120,8 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); - final int priority = query.getContextPriority(0); - final boolean bySegment = query.getContextBySegment(false); + final int priority = BaseQuery.getContextPriority(query, 0); + final boolean bySegment = BaseQuery.getContextBySegment(query, false); final ListenableFuture future = queryExecutor.submit( new AbstractPrioritizedCallable(priority) @@ -173,7 +174,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory mergeSequences(Sequence> seqOfSequences) - { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } - - @Override - public Sequence mergeSequencesUnordered(Sequence> seqOfSequences) - { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } - @Override public ServiceMetricEvent.Builder makeMetricBuilder(SegmentMetadataQuery query) { @@ -265,12 +251,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) - { - return new MergeSequence(getOrdering(), seqOfSequences); - } }; } @@ -304,16 +284,4 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest getOrdering() - { - return new Ordering() - { - @Override - public int compare(SegmentAnalysis left, SegmentAnalysis right) - { - return left.getId().compareTo(right.getId()); - } - }.nullsFirst(); - } } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index fd88321d9f5..fa137efa897 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -30,6 +30,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; import io.druid.query.AbstractPrioritizedCallable; +import io.druid.query.BaseQuery; import io.druid.query.ConcatQueryRunner; import io.druid.query.Query; import io.druid.query.QueryContextKeys; @@ -158,7 +159,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory responseContext ) { - final int priority = query.getContextPriority(0); + final int priority = BaseQuery.getContextPriority(query, 0); ListenableFuture> future = queryExecutor.submit( new AbstractPrioritizedCallable>(priority) { diff --git a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentAnalysis.java b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentAnalysis.java index 6fc0f751c7f..1241a9960c5 100644 --- a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentAnalysis.java +++ b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentAnalysis.java @@ -26,7 +26,7 @@ import org.joda.time.Interval; import java.util.List; import java.util.Map; -public class SegmentAnalysis +public class SegmentAnalysis implements Comparable { private final String id; private final List interval; @@ -143,4 +143,14 @@ public class SegmentAnalysis result = 31 * result + (int) (numRows ^ (numRows >>> 32)); return result; } + + @Override + public int compareTo(SegmentAnalysis rhs) + { + // Nulls first + if (rhs == null) { + return 1; + } + return id.compareTo(rhs.getId()); + } } diff --git a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java index 881d52aed64..84e119b4b26 100644 --- a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java +++ b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java @@ -102,6 +102,7 @@ public class SegmentMetadataQuery extends BaseQuery dataSource, (querySegmentSpec == null) ? new MultipleIntervalSegmentSpec(Arrays.asList(DEFAULT_INTERVAL)) : querySegmentSpec, + false, context ); diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 31ee0db2e0f..86a86a3a1bf 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -30,12 +30,11 @@ import com.google.common.primitives.Ints; import com.google.inject.Inject; import com.metamx.common.IAE; import com.metamx.common.ISE; -import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.collections.OrderedMergeSequence; +import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.DruidMetrics; import io.druid.query.IntervalChunkingQueryRunnerDecorator; @@ -86,15 +85,18 @@ public class SearchQueryQueryToolChest extends QueryToolChest> mergeResults(QueryRunner> runner) + public QueryRunner> mergeResults( + QueryRunner> runner + ) { return new ResultMergeQueryRunner>(runner) { @Override protected Ordering> makeOrdering(Query> query) { - return Ordering.from( - new ResultGranularTimestampComparator(((SearchQuery) query).getGranularity()) + return ResultGranularTimestampComparator.create( + ((SearchQuery) query).getGranularity(), + query.isDescending() ); } @@ -109,18 +111,6 @@ public class SearchQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) - { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } - - @Override - public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) - { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } - @Override public ServiceMetricEvent.Builder makeMetricBuilder(SearchQuery query) { @@ -243,12 +233,6 @@ public class SearchQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) - { - return new MergeSequence>(getOrdering(), seqOfSequences); - } }; } @@ -261,11 +245,6 @@ public class SearchQueryQueryToolChest extends QueryToolChest> getOrdering() - { - return Ordering.natural(); - } - private static class SearchThresholdAdjustingQueryRunner implements QueryRunner> { private final QueryRunner> runner; @@ -295,7 +274,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest> final List dimensions = query.getDimensions(); final SearchQuerySpec searchQuerySpec = query.getQuery(); final int limit = query.getLimit(); + final boolean descending = query.isDescending(); // Closing this will cause segfaults in unit tests. final QueryableIndex index = segment.asQueryableIndex(); @@ -158,7 +159,7 @@ public class SearchQueryRunner implements QueryRunner> dimsToSearch = dimensions; } - final Sequence cursors = adapter.makeCursors(filter, segment.getDataInterval(), QueryGranularity.ALL); + final Sequence cursors = adapter.makeCursors(filter, segment.getDataInterval(), QueryGranularity.ALL, descending); final TreeSet retVal = cursors.accumulate( Sets.newTreeSet(query.getSort().getComparator()), diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/search/SearchQueryRunnerFactory.java index cf8392049b4..1fde10cc7b8 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryRunnerFactory.java @@ -60,7 +60,7 @@ public class SearchQueryRunnerFactory implements QueryRunnerFactory>( - queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners + queryExecutor, queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/search/search/SearchQuery.java b/processing/src/main/java/io/druid/query/search/search/SearchQuery.java index 29a1eb3e824..71314185d78 100644 --- a/processing/src/main/java/io/druid/query/search/search/SearchQuery.java +++ b/processing/src/main/java/io/druid/query/search/search/SearchQuery.java @@ -59,7 +59,7 @@ public class SearchQuery extends BaseQuery> @JsonProperty("context") Map context ) { - super(dataSource, querySegmentSpec, context); + super(dataSource, querySegmentSpec, false, context); this.dimFilter = dimFilter; this.sortSpec = sortSpec == null ? new LexicographicSearchSortSpec() : sortSpec; this.granularity = granularity == null ? QueryGranularity.ALL : granularity; diff --git a/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java b/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java index 48df1591781..f5702e694c8 100644 --- a/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java +++ b/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java @@ -32,14 +32,17 @@ public class SelectBinaryFn { private final QueryGranularity gran; private final PagingSpec pagingSpec; + private final boolean descending; public SelectBinaryFn( QueryGranularity granularity, - PagingSpec pagingSpec + PagingSpec pagingSpec, + boolean descending ) { this.gran = granularity; this.pagingSpec = pagingSpec; + this.descending = descending; } @Override @@ -59,7 +62,7 @@ public class SelectBinaryFn ? arg1.getTimestamp() : gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis())); - SelectResultValueBuilder builder = new SelectResultValueBuilder(timestamp, pagingSpec.getThreshold()); + SelectResultValueBuilder builder = new SelectResultValueBuilder(timestamp, pagingSpec.getThreshold(), descending); SelectResultValue arg1Val = arg1.getValue(); SelectResultValue arg2Val = arg2.getValue(); diff --git a/processing/src/main/java/io/druid/query/select/SelectQuery.java b/processing/src/main/java/io/druid/query/select/SelectQuery.java index 25ba7b56a6d..4db2d9d4839 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQuery.java +++ b/processing/src/main/java/io/druid/query/select/SelectQuery.java @@ -57,7 +57,7 @@ public class SelectQuery extends BaseQuery> @JsonProperty("context") Map context ) { - super(dataSource, querySegmentSpec, context); + super(dataSource, querySegmentSpec, false, context); this.dimFilter = dimFilter; this.granularity = granularity; this.dimensions = dimensions; diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java index 87d9735ad4e..71da82de6e7 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java @@ -73,6 +73,7 @@ public class SelectQueryEngine adapter, query.getQuerySegmentSpec().getIntervals(), Filters.convertDimensionFilters(query.getDimensionsFilter()), + query.isDescending(), query.getGranularity(), new Function>() { @@ -82,7 +83,8 @@ public class SelectQueryEngine final SelectResultValueBuilder builder = new SelectResultValueBuilder( cursor.getTime(), query.getPagingSpec() - .getThreshold() + .getThreshold(), + query.isDescending() ); final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 481d17f3f13..3814fa10427 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -27,11 +27,8 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.StringUtils; -import com.metamx.common.guava.MergeSequence; -import com.metamx.common.guava.Sequence; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.collections.OrderedMergeSequence; import io.druid.granularity.QueryGranularity; import io.druid.query.CacheStrategy; import io.druid.query.DruidMetrics; @@ -80,17 +77,17 @@ public class SelectQueryQueryToolChest extends QueryToolChest> mergeResults(QueryRunner> queryRunner) + public QueryRunner> mergeResults( + QueryRunner> queryRunner + ) { return new ResultMergeQueryRunner>(queryRunner) { @Override protected Ordering> makeOrdering(Query> query) { - return Ordering.from( - new ResultGranularTimestampComparator( - ((SelectQuery) query).getGranularity() - ) + return ResultGranularTimestampComparator.create( + ((SelectQuery) query).getGranularity(), query.isDescending() ); } @@ -102,24 +99,13 @@ public class SelectQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) - { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } - - @Override - public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) - { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } - @Override public ServiceMetricEvent.Builder makeMetricBuilder(SelectQuery query) { @@ -261,12 +247,6 @@ public class SelectQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) - { - return new MergeSequence>(getOrdering(), seqOfSequences); - } }; } @@ -275,9 +255,4 @@ public class SelectQueryQueryToolChest extends QueryToolChest> getOrdering() - { - return Ordering.natural(); - } } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java index b3150bfd4bc..7af108bbd2d 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java @@ -67,7 +67,7 @@ public class SelectQueryRunnerFactory ) { return new ChainedExecutionQueryRunner>( - queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners + queryExecutor, queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java b/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java index cda99aabd7c..8f4f511a004 100644 --- a/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java +++ b/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.primitives.Longs; +import com.metamx.common.guava.Comparators; import io.druid.query.Result; import org.joda.time.DateTime; @@ -59,12 +60,13 @@ public class SelectResultValueBuilder public SelectResultValueBuilder( DateTime timestamp, - int threshold + int threshold, + boolean descending ) { this.timestamp = timestamp; - instantiatePQueue(threshold, comparator); + instantiatePQueue(threshold, descending ? Comparators.inverse(comparator) : comparator); } public void addEntry( diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java index 8ebcd1d6cc8..7e93a2aae74 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java @@ -66,6 +66,7 @@ public class TimeBoundaryQuery extends BaseQuery dataSource, (querySegmentSpec == null) ? new MultipleIntervalSegmentSpec(Arrays.asList(MY_Y2K_INTERVAL)) : querySegmentSpec, + false, context ); diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index b186ee72a35..727e4174db1 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -25,12 +25,9 @@ import com.google.common.base.Functions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; -import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.collections.OrderedMergeSequence; import io.druid.query.BySegmentSkippingQueryRunner; import io.druid.query.CacheStrategy; import io.druid.query.DataSourceUtil; @@ -109,18 +106,6 @@ public class TimeBoundaryQueryQueryToolChest }; } - @Override - public Sequence> mergeSequences(Sequence>> seqOfSequences) - { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } - - @Override - public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) - { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } - @Override public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query) { @@ -195,17 +180,6 @@ public class TimeBoundaryQueryQueryToolChest } }; } - - @Override - public Sequence> mergeSequences(Sequence>> seqOfSequences) - { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } }; } - - public Ordering> getOrdering() - { - return Ordering.natural(); - } } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index a13e6217605..e7c9cde7ad0 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -63,9 +63,7 @@ public class TimeBoundaryQueryRunnerFactory ExecutorService queryExecutor, Iterable>> queryRunners ) { - return new ChainedExecutionQueryRunner<>( - queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners - ); + return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners); } @Override @@ -85,8 +83,8 @@ public class TimeBoundaryQueryRunnerFactory @Override public Sequence> run( - Query> input, - Map responseContext + final Query> input, + final Map responseContext ) { if (!(input instanceof TimeBoundaryQuery)) { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java index bf72c659b6d..17ac51aa9cd 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java @@ -51,6 +51,7 @@ public class TimeseriesQuery extends BaseQuery> public TimeseriesQuery( @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, + @JsonProperty("descending") boolean descending, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("aggregations") List aggregatorSpecs, @@ -58,7 +59,7 @@ public class TimeseriesQuery extends BaseQuery> @JsonProperty("context") Map context ) { - super(dataSource, querySegmentSpec, context); + super(dataSource, querySegmentSpec, descending, context); this.dimFilter = dimFilter; this.granularity = granularity; this.aggregatorSpecs = aggregatorSpecs; @@ -113,6 +114,7 @@ public class TimeseriesQuery extends BaseQuery> return new TimeseriesQuery( getDataSource(), querySegmentSpec, + isDescending(), dimFilter, granularity, aggregatorSpecs, @@ -127,6 +129,7 @@ public class TimeseriesQuery extends BaseQuery> return new TimeseriesQuery( dataSource, getQuerySegmentSpec(), + isDescending(), dimFilter, granularity, aggregatorSpecs, @@ -140,6 +143,7 @@ public class TimeseriesQuery extends BaseQuery> return new TimeseriesQuery( getDataSource(), getQuerySegmentSpec(), + isDescending(), dimFilter, granularity, aggregatorSpecs, @@ -154,6 +158,7 @@ public class TimeseriesQuery extends BaseQuery> return "TimeseriesQuery{" + "dataSource='" + getDataSource() + '\'' + ", querySegmentSpec=" + getQuerySegmentSpec() + + ", descending=" + isDescending() + ", dimFilter=" + dimFilter + ", granularity='" + granularity + '\'' + ", aggregatorSpecs=" + aggregatorSpecs + diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java index ade888f4a95..0a4c84759bc 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java @@ -48,6 +48,7 @@ public class TimeseriesQueryEngine adapter, query.getQuerySegmentSpec().getIntervals(), Filters.convertDimensionFilters(query.getDimensionsFilter()), + query.isDescending(), query.getGranularity(), new Function>() { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index aa5e686825f..80256b89f5e 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -25,11 +25,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.inject.Inject; -import com.metamx.common.guava.MergeSequence; -import com.metamx.common.guava.Sequence; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.collections.OrderedMergeSequence; import io.druid.granularity.QueryGranularity; import io.druid.query.CacheStrategy; import io.druid.query.DruidMetrics; @@ -76,17 +73,17 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> mergeResults(QueryRunner> queryRunner) + public QueryRunner> mergeResults( + QueryRunner> queryRunner + ) { return new ResultMergeQueryRunner>(queryRunner) { @Override protected Ordering> makeOrdering(Query> query) { - return Ordering.from( - new ResultGranularTimestampComparator( - ((TimeseriesQuery) query).getGranularity() - ) + return ResultGranularTimestampComparator.create( + ((TimeseriesQuery) query).getGranularity(), query.isDescending() ); } @@ -104,18 +101,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) - { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } - - @Override - public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) - { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } - @Override public ServiceMetricEvent.Builder makeMetricBuilder(TimeseriesQuery query) { @@ -150,10 +135,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) - { - return new MergeSequence>(getOrdering(), seqOfSequences); - } }; } @@ -232,11 +213,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> getOrdering() - { - return Ordering.natural(); - } - @Override public Function, Result> makePreComputeManipulatorFn( final TimeseriesQuery query, final MetricManipulationFn fn diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java index 96bcb641c7b..a042d14e0a4 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java @@ -68,7 +68,7 @@ public class TimeseriesQueryRunnerFactory ) { return new ChainedExecutionQueryRunner>( - queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners + queryExecutor, queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQuery.java b/processing/src/main/java/io/druid/query/topn/TopNQuery.java index 2e941ea4301..86046748fc6 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQuery.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQuery.java @@ -66,7 +66,7 @@ public class TopNQuery extends BaseQuery> @JsonProperty("context") Map context ) { - super(dataSource, querySegmentSpec, context); + super(dataSource, querySegmentSpec, false, context); this.dimensionSpec = dimensionSpec; this.topNMetricSpec = topNMetricSpec; this.threshold = threshold; diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java index d02fdeadfce..0d5a7c6fe1d 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java @@ -74,7 +74,7 @@ public class TopNQueryEngine return Sequences.filter( Sequences.map( - adapter.makeCursors(filter, queryIntervals.get(0), granularity), + adapter.makeCursors(filter, queryIntervals.get(0), granularity, query.isDescending()), new Function>() { @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 4cd3e4ca9fd..af48e34f29c 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -28,13 +28,12 @@ import com.google.common.collect.Ordering; import com.google.common.primitives.Ints; import com.google.inject.Inject; import com.metamx.common.ISE; -import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.collections.OrderedMergeSequence; import io.druid.granularity.QueryGranularity; +import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValue; import io.druid.query.CacheStrategy; import io.druid.query.DruidMetrics; @@ -110,17 +109,17 @@ public class TopNQueryQueryToolChest extends QueryToolChest> mergeResults(QueryRunner> runner) + public QueryRunner> mergeResults( + QueryRunner> runner + ) { return new ResultMergeQueryRunner>(runner) { @Override protected Ordering> makeOrdering(Query> query) { - return Ordering.from( - new ResultGranularTimestampComparator( - ((TopNQuery) query).getGranularity() - ) + return ResultGranularTimestampComparator.create( + ((TopNQuery) query).getGranularity(), query.isDescending() ); } @@ -143,18 +142,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) - { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } - - @Override - public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) - { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } - @Override public ServiceMetricEvent.Builder makeMetricBuilder(TopNQuery query) { @@ -330,7 +317,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) - { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } }; } @@ -527,11 +508,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest> getOrdering() - { - return Ordering.natural(); - } - static class ThresholdAdjustingQueryRunner implements QueryRunner> { private final QueryRunner> runner; @@ -562,7 +538,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest( - queryExecutor, toolchest.getOrdering(), queryWatcher, queryRunners + queryExecutor, queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/segment/BitmapOffset.java b/processing/src/main/java/io/druid/segment/BitmapOffset.java index a72941d9266..fcf2a7298d2 100644 --- a/processing/src/main/java/io/druid/segment/BitmapOffset.java +++ b/processing/src/main/java/io/druid/segment/BitmapOffset.java @@ -33,21 +33,69 @@ public class BitmapOffset implements Offset private final IntIterator itr; private final BitmapFactory bitmapFactory; private final ImmutableBitmap bitmapIndex; + private final boolean descending; private volatile int val; - public BitmapOffset(BitmapFactory bitmapFactory, ImmutableBitmap bitmapIndex) + public BitmapOffset(BitmapFactory bitmapFactory, ImmutableBitmap bitmapIndex, boolean descending) { this.bitmapFactory = bitmapFactory; this.bitmapIndex = bitmapIndex; - this.itr = bitmapIndex.iterator(); + this.descending = descending; + this.itr = newIterator(); increment(); } + private IntIterator newIterator() + { + if (!descending) { + return bitmapIndex.iterator(); + } + // ImmutableRoaringReverseIntIterator is not cloneable.. looks like a bug + // update : it's fixed in 0.5.13 + int i = bitmapIndex.size(); + int[] back = new int[bitmapIndex.size()]; + IntIterator iterator = bitmapIndex.iterator(); + while (iterator.hasNext()) { + back[--i] = iterator.next(); + } + return new ArrayIntIterator(back, 0); + } + + private static class ArrayIntIterator implements IntIterator { + + private final int[] array; + private int index; + + private ArrayIntIterator(int[] array, int index) { + this.array = array; + this.index = index; + } + + @Override + public boolean hasNext() + { + return index < array.length; + } + + @Override + public int next() + { + return array[index++]; + } + + @Override + public IntIterator clone() + { + return new ArrayIntIterator(array, index); + } + } + private BitmapOffset(BitmapOffset otherOffset) { this.bitmapFactory = otherOffset.bitmapFactory; this.bitmapIndex = otherOffset.bitmapIndex; + this.descending = otherOffset.descending; this.itr = otherOffset.itr.clone(); this.val = otherOffset.val; } @@ -72,7 +120,7 @@ public class BitmapOffset implements Offset public Offset clone() { if (bitmapIndex == null || bitmapIndex.size() == 0) { - return new BitmapOffset(bitmapFactory, bitmapFactory.makeEmptyImmutableBitmap()); + return new BitmapOffset(bitmapFactory, bitmapFactory.makeEmptyImmutableBitmap(), descending); } return new BitmapOffset(this); diff --git a/processing/src/main/java/io/druid/segment/CursorFactory.java b/processing/src/main/java/io/druid/segment/CursorFactory.java index d124f9d5d78..e898316bc22 100644 --- a/processing/src/main/java/io/druid/segment/CursorFactory.java +++ b/processing/src/main/java/io/druid/segment/CursorFactory.java @@ -28,5 +28,5 @@ import org.joda.time.Interval; */ public interface CursorFactory { - public Sequence makeCursors(Filter filter, Interval interval, QueryGranularity gran); + public Sequence makeCursors(Filter filter, Interval interval, QueryGranularity gran, boolean descending); } diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index e67cf05df40..c2940f1f110 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -22,7 +22,9 @@ package io.druid.segment; import com.google.common.base.Function; import com.google.common.base.Predicates; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.metamx.common.guava.CloseQuietly; @@ -158,7 +160,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } @Override - public Sequence makeCursors(Filter filter, Interval interval, QueryGranularity gran) + public Sequence makeCursors(Filter filter, Interval interval, QueryGranularity gran, boolean descending) { Interval actualInterval = interval; @@ -182,18 +184,26 @@ public class QueryableIndexStorageAdapter implements StorageAdapter final Offset offset; if (filter == null) { - offset = new NoFilterOffset(0, index.getNumRows()); + offset = new NoFilterOffset(0, index.getNumRows(), descending); } else { final ColumnSelectorBitmapIndexSelector selector = new ColumnSelectorBitmapIndexSelector( index.getBitmapFactoryForDimensions(), index ); - offset = new BitmapOffset(selector.getBitmapFactory(), filter.getBitmapIndex(selector)); + offset = new BitmapOffset(selector.getBitmapFactory(), filter.getBitmapIndex(selector), descending); } return Sequences.filter( - new CursorSequenceBuilder(index, actualInterval, gran, offset, maxDataTimestamp).build(), + new CursorSequenceBuilder( + index, + actualInterval, + gran, + offset, + minDataTimestamp, + maxDataTimestamp, + descending + ).build(), Predicates.notNull() ); } @@ -204,21 +214,27 @@ public class QueryableIndexStorageAdapter implements StorageAdapter private final Interval interval; private final QueryGranularity gran; private final Offset offset; + private final long minDataTimestamp; private final long maxDataTimestamp; + private final boolean descending; public CursorSequenceBuilder( ColumnSelector index, Interval interval, QueryGranularity gran, Offset offset, - long maxDataTimestamp + long minDataTimestamp, + long maxDataTimestamp, + boolean descending ) { this.index = index; this.interval = interval; this.gran = gran; this.offset = offset; + this.minDataTimestamp = minDataTimestamp; this.maxDataTimestamp = maxDataTimestamp; + this.descending = descending; } public Sequence build() @@ -232,24 +248,49 @@ public class QueryableIndexStorageAdapter implements StorageAdapter final GenericColumn timestamps = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn(); + Iterable iterable = gran.iterable(interval.getStartMillis(), interval.getEndMillis()); + if (descending) { + iterable = Lists.reverse(ImmutableList.copyOf(iterable)); + } + return Sequences.withBaggage( Sequences.map( - Sequences.simple(gran.iterable(interval.getStartMillis(), interval.getEndMillis())), + Sequences.simple(iterable), new Function() { @Override public Cursor apply(final Long input) { final long timeStart = Math.max(interval.getStartMillis(), input); - while (baseOffset.withinBounds() - && timestamps.getLongSingleValueRow(baseOffset.getOffset()) < timeStart) { - baseOffset.increment(); + final long timeEnd = Math.min(interval.getEndMillis(), gran.next(input)); + + if (descending) { + for (; baseOffset.withinBounds(); baseOffset.increment()) { + if (timestamps.getLongSingleValueRow(baseOffset.getOffset()) < timeEnd) { + break; + } + } + } else { + for (; baseOffset.withinBounds(); baseOffset.increment()) { + if (timestamps.getLongSingleValueRow(baseOffset.getOffset()) >= timeStart) { + break; + } + } } - long threshold = Math.min(interval.getEndMillis(), gran.next(input)); - final Offset offset = new TimestampCheckingOffset( - baseOffset, timestamps, threshold, maxDataTimestamp < threshold - ); + final Offset offset = descending ? + new DescendingTimestampCheckingOffset( + baseOffset, + timestamps, + timeStart, + minDataTimestamp >= timeStart + ) : + new AscendingTimestampCheckingOffset( + baseOffset, + timestamps, + timeEnd, + maxDataTimestamp < timeEnd + ); return new Cursor() { @@ -315,7 +356,11 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } if (dimension.equals(Column.TIME_COLUMN_NAME)) { - return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn); + return new SingleScanTimeDimSelector( + makeLongColumnSelector(dimension), + extractionFn, + descending + ); } DictionaryEncodedColumn cachedColumn = dictionaryColumnCache.get(dimension); @@ -684,23 +729,23 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } } - private static class TimestampCheckingOffset implements Offset + private abstract static class TimestampCheckingOffset implements Offset { - private final Offset baseOffset; - private final GenericColumn timestamps; - private final long threshold; - private final boolean allWithinThreshold; + protected final Offset baseOffset; + protected final GenericColumn timestamps; + protected final long timeLimit; + protected final boolean allWithinThreshold; public TimestampCheckingOffset( Offset baseOffset, GenericColumn timestamps, - long threshold, + long timeLimit, boolean allWithinThreshold ) { this.baseOffset = baseOffset; this.timestamps = timestamps; - this.threshold = threshold; + this.timeLimit = timeLimit; // checks if all the values are within the Threshold specified, skips timestamp lookups and checks if all values are within threshold. this.allWithinThreshold = allWithinThreshold; } @@ -711,35 +756,108 @@ public class QueryableIndexStorageAdapter implements StorageAdapter return baseOffset.getOffset(); } - @Override - public Offset clone() - { - return new TimestampCheckingOffset(baseOffset.clone(), timestamps, threshold, allWithinThreshold); - } - @Override public boolean withinBounds() { - return baseOffset.withinBounds() && (allWithinThreshold - || timestamps.getLongSingleValueRow(baseOffset.getOffset()) < threshold); + if (!baseOffset.withinBounds()) { + return false; + } + if (allWithinThreshold) { + return true; + } + return timeInRange(timestamps.getLongSingleValueRow(baseOffset.getOffset())); } + protected abstract boolean timeInRange(long current); + @Override public void increment() { baseOffset.increment(); } + + @Override + public Offset clone() { + throw new IllegalStateException("clone"); + } + } + + private static class AscendingTimestampCheckingOffset extends TimestampCheckingOffset + { + public AscendingTimestampCheckingOffset( + Offset baseOffset, + GenericColumn timestamps, + long timeLimit, + boolean allWithinThreshold + ) + { + super(baseOffset, timestamps, timeLimit, allWithinThreshold); + } + + @Override + protected final boolean timeInRange(long current) + { + return current < timeLimit; + } + + @Override + public String toString() + { + return (baseOffset.withinBounds() ? timestamps.getLongSingleValueRow(baseOffset.getOffset()) : "OOB") + + "<" + timeLimit + "::" + baseOffset; + } + + @Override + public Offset clone() + { + return new AscendingTimestampCheckingOffset(baseOffset.clone(), timestamps, timeLimit, allWithinThreshold); + } + } + + private static class DescendingTimestampCheckingOffset extends TimestampCheckingOffset + { + public DescendingTimestampCheckingOffset( + Offset baseOffset, + GenericColumn timestamps, + long timeLimit, + boolean allWithinThreshold + ) + { + super(baseOffset, timestamps, timeLimit, allWithinThreshold); + } + + @Override + protected final boolean timeInRange(long current) + { + return current >= timeLimit; + } + + @Override + public String toString() + { + return timeLimit + ">=" + + (baseOffset.withinBounds() ? timestamps.getLongSingleValueRow(baseOffset.getOffset()) : "OOB") + + "::" + baseOffset; + } + + @Override + public Offset clone() + { + return new DescendingTimestampCheckingOffset(baseOffset.clone(), timestamps, timeLimit, allWithinThreshold); + } } private static class NoFilterOffset implements Offset { private final int rowCount; + private final boolean descending; private volatile int currentOffset; - NoFilterOffset(int currentOffset, int rowCount) + NoFilterOffset(int currentOffset, int rowCount, boolean descending) { this.currentOffset = currentOffset; this.rowCount = rowCount; + this.descending = descending; } @Override @@ -757,13 +875,19 @@ public class QueryableIndexStorageAdapter implements StorageAdapter @Override public Offset clone() { - return new NoFilterOffset(currentOffset, rowCount); + return new NoFilterOffset(currentOffset, rowCount, descending); } @Override public int getOffset() { - return currentOffset; + return descending ? rowCount - currentOffset - 1 : currentOffset; + } + + @Override + public String toString() + { + return currentOffset + "/" + rowCount + (descending ? "(DSC)" : ""); } } diff --git a/processing/src/main/java/io/druid/segment/SingleScanTimeDimSelector.java b/processing/src/main/java/io/druid/segment/SingleScanTimeDimSelector.java index 9f9de38476c..2b5f085e2cb 100644 --- a/processing/src/main/java/io/druid/segment/SingleScanTimeDimSelector.java +++ b/processing/src/main/java/io/druid/segment/SingleScanTimeDimSelector.java @@ -32,6 +32,7 @@ public class SingleScanTimeDimSelector implements DimensionSelector { private final ExtractionFn extractionFn; private final LongColumnSelector selector; + private final boolean descending; private final Map timeValues = Maps.newHashMap(); private String currentValue = null; @@ -43,7 +44,7 @@ public class SingleScanTimeDimSelector implements DimensionSelector // - it assumes time values are scanned once and values are grouped together // (i.e. we never revisit a timestamp we have seen before, unless it is the same as the last accessed one) // - it also applies and caches extraction function values at the DimSelector level to speed things up - public SingleScanTimeDimSelector(LongColumnSelector selector, ExtractionFn extractionFn) + public SingleScanTimeDimSelector(LongColumnSelector selector, ExtractionFn extractionFn, boolean descending) { if (extractionFn == null) { throw new UnsupportedOperationException("time dimension must provide an extraction function"); @@ -51,6 +52,7 @@ public class SingleScanTimeDimSelector implements DimensionSelector this.extractionFn = extractionFn; this.selector = selector; + this.descending = descending; } @Override @@ -72,7 +74,7 @@ public class SingleScanTimeDimSelector implements DimensionSelector // we can also avoid creating a dimension value and corresponding index // and use the current one else if (timestamp != currentTimestamp) { - if(timestamp < currentTimestamp) { + if (descending ? timestamp > currentTimestamp : timestamp < currentTimestamp) { // re-using this selector for multiple scans would cause the same rows to return different IDs // we might want to re-visit if we ever need to do multiple scans with this dimension selector throw new IllegalStateException("cannot re-use time dimension selector for multiple scans"); diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 53ae2753ce6..98a25919238 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -623,19 +623,20 @@ public abstract class IncrementalIndex implements Iterable, @Override public Iterator iterator() { - return iterableWithPostAggregations(null).iterator(); + return iterableWithPostAggregations(null, false).iterator(); } - public Iterable iterableWithPostAggregations(final List postAggs) + public Iterable iterableWithPostAggregations(final List postAggs, final boolean descending) { final List dimensions = getDimensionNames(); + final ConcurrentNavigableMap facts = descending ? getFacts().descendingMap() : getFacts(); return new Iterable() { @Override public Iterator iterator() { return Iterators.transform( - getFacts().entrySet().iterator(), + facts.entrySet().iterator(), new Function, Row>() { @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 07143ee5139..8a1df70598a 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Splitter; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.metamx.collections.spatial.search.Bound; @@ -153,7 +154,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } @Override - public Sequence makeCursors(final Filter filter, final Interval interval, final QueryGranularity gran) + public Sequence makeCursors(final Filter filter, final Interval interval, final QueryGranularity gran, final boolean descending) { if (index.isEmpty()) { return Sequences.empty(); @@ -179,8 +180,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter final Interval actualInterval = actualIntervalTmp; + Iterable iterable = gran.iterable(actualInterval.getStartMillis(), actualInterval.getEndMillis()); + if (descending) { + // might be better to be included in granularity#iterable + iterable = Lists.reverse(ImmutableList.copyOf(iterable)); + } return Sequences.map( - Sequences.simple(gran.iterable(actualInterval.getStartMillis(), actualInterval.getEndMillis())), + Sequences.simple(iterable), new Function() { EntryHolder currEntry = new EntryHolder(); @@ -212,6 +218,9 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter Math.min(actualInterval.getEndMillis(), gran.next(input)), new String[][]{} ) ); + if (descending) { + cursorMap = cursorMap.descendingMap(); + } time = gran.toDateTime(input); reset(); @@ -309,7 +318,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); if (dimension.equals(Column.TIME_COLUMN_NAME)) { - return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn); + return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn, descending); } final IncrementalIndex.DimDim dimValLookup = index.getDimensionValues(dimension); diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index be3b81b0fd4..6b658fa6701 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -21,7 +21,6 @@ package io.druid.query; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.Sequence; @@ -114,7 +113,6 @@ public class ChainedExecutionQueryRunnerTest ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>( exec, - Ordering.natural(), watcher, Lists.>newArrayList( runners @@ -242,7 +240,6 @@ public class ChainedExecutionQueryRunnerTest ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>( exec, - Ordering.natural(), watcher, Lists.>newArrayList( runners diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index df66e3d8877..d685e41e260 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -54,6 +54,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -87,15 +88,26 @@ public class QueryRunnerTestHelper } ) ); + + public static final DateTime minTime = new DateTime("2011-01-12T00:00:00.000Z"); + public static final QueryGranularity dayGran = QueryGranularity.DAY; public static final QueryGranularity allGran = QueryGranularity.ALL; public static final String marketDimension = "market"; public static final String qualityDimension = "quality"; public static final String placementDimension = "placement"; public static final String placementishDimension = "placementish"; + public static final List dimensions = Lists.newArrayList( + marketDimension, + qualityDimension, + placementDimension, + placementishDimension + ); public static final String indexMetric = "index"; public static final String uniqueMetric = "uniques"; public static final String addRowsIndexConstantMetric = "addRowsIndexConstant"; + public static final List metrics = Lists.newArrayList(indexMetric, uniqueMetric, addRowsIndexConstantMetric); + public static String dependentPostAggMetric = "dependentPostAgg"; public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); @@ -192,6 +204,17 @@ public class QueryRunnerTestHelper "5506.567192077637", "4743.144546508789", "4913.282669067383", "4723.869743347168" }; + public static final String[] expectedFullOnIndexValuesDesc; + + static { + List list = new ArrayList(Arrays.asList(expectedFullOnIndexValues)); + Collections.reverse(list); + expectedFullOnIndexValuesDesc = list.toArray(new String[list.size()]); + } + + public static final DateTime earliest = new DateTime("2011-01-12"); + public static final DateTime last = new DateTime("2011-04-15"); + public static final DateTime skippedDay = new DateTime("2011-01-21T00:00:00.000Z"); public static final QuerySegmentSpec firstToThird = new MultipleIntervalSegmentSpec( @@ -222,6 +245,66 @@ public class QueryRunnerTestHelper ); } + // simple cartesian iterable + public static Iterable cartesian(final Iterable... iterables) + { + return new Iterable() + { + + @Override + public Iterator iterator() + { + return new Iterator() + { + private final Iterator[] iterators = new Iterator[iterables.length]; + private final Object[] cached = new Object[iterables.length]; + + @Override + public boolean hasNext() + { + return hasNext(0); + } + + private boolean hasNext(int index) + { + if (iterators[index] == null) { + iterators[index] = iterables[index].iterator(); + } + for (; hasMore(index); cached[index] = null) { + if (index == iterables.length - 1 || hasNext(index + 1)) { + return true; + } + } + iterators[index] = null; + return false; + } + + private boolean hasMore(int index) + { + if (cached[index] == null && iterators[index].hasNext()) { + cached[index] = iterators[index].next(); + } + return cached[index] != null; + } + + @Override + public Object[] next() + { + Object[] result = Arrays.copyOf(cached, cached.length); + cached[cached.length - 1] = null; + return result; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("remove"); + } + }; + } + }; + } + public static > List> makeQueryRunners( QueryRunnerFactory factory ) @@ -252,23 +335,13 @@ public class QueryRunnerTestHelper final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true); return Arrays.asList( - new Object[][]{ - { - makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)) - }, - { - makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)) - }, - { - makeUnionQueryRunner( - factory, - new QueryableIndexSegment(segmentId, mergedRealtimeIndex) - ) - }, - { - makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId)) - } - } + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)), + makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)), + makeUnionQueryRunner( + factory, + new QueryableIndexSegment(segmentId, mergedRealtimeIndex) + ), + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId)) ); } /** @@ -361,8 +434,7 @@ public class QueryRunnerTestHelper new BySegmentQueryRunner( segmentId, adapter.getDataInterval().getStart(), factory.createRunner(adapter) - ), - factory.getToolchest() + ) ) ) ), diff --git a/processing/src/test/java/io/druid/query/ResultGranularTimestampComparatorTest.java b/processing/src/test/java/io/druid/query/ResultGranularTimestampComparatorTest.java index bddbe57fedf..d7e7d8d8aca 100644 --- a/processing/src/test/java/io/druid/query/ResultGranularTimestampComparatorTest.java +++ b/processing/src/test/java/io/druid/query/ResultGranularTimestampComparatorTest.java @@ -23,11 +23,30 @@ import io.druid.granularity.QueryGranularity; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; /** */ +@RunWith(Parameterized.class) public class ResultGranularTimestampComparatorTest { + @Parameterized.Parameters(name = "descending={0}") + public static Iterable constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.transformToConstructionFeeder(Arrays.asList(false, true)); + } + + private final boolean descending; + + public ResultGranularTimestampComparatorTest(boolean descending) + { + this.descending = descending; + } + private final DateTime time = new DateTime("2011-11-11"); @Test @@ -36,7 +55,7 @@ public class ResultGranularTimestampComparatorTest Result r1 = new Result(time, null); Result r2 = new Result(time.plusYears(5), null); - Assert.assertEquals(new ResultGranularTimestampComparator(QueryGranularity.ALL).compare(r1, r2), 0); + Assert.assertEquals(ResultGranularTimestampComparator.create(QueryGranularity.ALL, descending).compare(r1, r2), 0); } @Test @@ -48,9 +67,9 @@ public class ResultGranularTimestampComparatorTest Result less = new Result(time.minusHours(1), null); QueryGranularity day = QueryGranularity.DAY; - Assert.assertEquals(new ResultGranularTimestampComparator(day).compare(res, same), 0); - Assert.assertEquals(new ResultGranularTimestampComparator(day).compare(res, greater), -1); - Assert.assertEquals(new ResultGranularTimestampComparator(day).compare(res, less), 1); + Assert.assertEquals(ResultGranularTimestampComparator.create(day, descending).compare(res, same), 0); + Assert.assertEquals(ResultGranularTimestampComparator.create(day, descending).compare(res, greater), descending ? 1 : -1); + Assert.assertEquals(ResultGranularTimestampComparator.create(day, descending).compare(res, less), descending ? -1 : 1); } @Test @@ -62,8 +81,8 @@ public class ResultGranularTimestampComparatorTest Result less = new Result(time.minusHours(1), null); QueryGranularity hour = QueryGranularity.HOUR; - Assert.assertEquals(new ResultGranularTimestampComparator(hour).compare(res, same), 0); - Assert.assertEquals(new ResultGranularTimestampComparator(hour).compare(res, greater), -1); - Assert.assertEquals(new ResultGranularTimestampComparator(hour).compare(res, less), 1); + Assert.assertEquals(ResultGranularTimestampComparator.create(hour, descending).compare(res, same), 0); + Assert.assertEquals(ResultGranularTimestampComparator.create(hour, descending).compare(res, greater), descending ? 1 : -1); + Assert.assertEquals(ResultGranularTimestampComparator.create(hour, descending).compare(res, less), descending ? -1 : 1); } } diff --git a/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java index f73bb6fced5..d424da96d52 100644 --- a/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; -import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import junit.framework.Assert; import org.junit.Test; @@ -56,12 +55,7 @@ public class UnionQueryRunnerTest } } }; - UnionQueryRunner runner = new UnionQueryRunner( - baseRunner, - new TimeseriesQueryQueryToolChest( - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() - ) - ); + UnionQueryRunner runner = new UnionQueryRunner(baseRunner); // Make a dummy query with Union datasource Query q = Druids.newTimeseriesQueryBuilder() .dataSource( diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index fe0198b1550..d1655c2a977 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -134,7 +134,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest public GroupByTimeseriesQueryRunnerTest(QueryRunner runner) { - super(runner); + super(runner, false); } @Override diff --git a/processing/src/test/java/io/druid/query/select/SelectBinaryFnTest.java b/processing/src/test/java/io/druid/query/select/SelectBinaryFnTest.java index 6e0a56cd494..0cff751c857 100644 --- a/processing/src/test/java/io/druid/query/select/SelectBinaryFnTest.java +++ b/processing/src/test/java/io/druid/query/select/SelectBinaryFnTest.java @@ -44,7 +44,7 @@ public class SelectBinaryFnTest @Test public void testApply() throws Exception { - SelectBinaryFn binaryFn = new SelectBinaryFn(QueryGranularity.ALL, new PagingSpec(null, 5)); + SelectBinaryFn binaryFn = new SelectBinaryFn(QueryGranularity.ALL, new PagingSpec(null, 5), false); Result res1 = new Result<>( new DateTime("2013-01-01"), diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index e41ab3160c3..20f2b7e3b47 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -43,7 +43,6 @@ import org.junit.runners.Parameterized; import java.io.IOException; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -52,27 +51,41 @@ import java.util.Map; public class TimeSeriesUnionQueryRunnerTest { private final QueryRunner runner; + private final boolean descending; public TimeSeriesUnionQueryRunnerTest( - QueryRunner runner + QueryRunner runner, boolean descending ) { this.runner = runner; + this.descending = descending; } - @Parameterized.Parameters - public static Collection constructorFeeder() throws IOException + @Parameterized.Parameters(name="{0}:descending={1}") + public static Iterable constructorFeeder() throws IOException { - return QueryRunnerTestHelper.makeUnionQueryRunners( - new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), - new TimeseriesQueryEngine(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER + return QueryRunnerTestHelper.cartesian( + QueryRunnerTestHelper.makeUnionQueryRunners( + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + QueryRunnerTestHelper.unionDataSource ), - QueryRunnerTestHelper.unionDataSource + // descending? + Arrays.asList(false, true) ); } + private void assertExpectedResults(Iterable> expectedResults, Iterable> results) + { + if (descending) { + expectedResults = TestHelper.revert(expectedResults); + } + TestHelper.assertExpectedResults(expectedResults, results); + } + @Test public void testUnionTimeseries() { @@ -90,29 +103,30 @@ public class TimeSeriesUnionQueryRunnerTest QueryRunnerTestHelper.qualityUniques ) ) + .descending(descending) .build(); List> expectedResults = Arrays.asList( - new Result( + new Result<>( new DateTime("2011-04-01"), new TimeseriesResultValue( ImmutableMap.of("rows", 52L, "idx", 26476L, "uniques", QueryRunnerTestHelper.UNIQUES_9) ) ), - new Result( + new Result<>( new DateTime("2011-04-02"), new TimeseriesResultValue( ImmutableMap.of("rows", 52L, "idx", 23308L, "uniques", QueryRunnerTestHelper.UNIQUES_9) ) ) ); - HashMap context = new HashMap(); + HashMap context = new HashMap<>(); Iterable> results = Sequences.toList( runner.run(query, context), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -138,86 +152,51 @@ public class TimeSeriesUnionQueryRunnerTest ) ) ) + .descending(descending) .build(); QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()); + final List> ds1 = Lists.newArrayList( + new Result<>( + new DateTime("2011-04-02"), + new TimeseriesResultValue(ImmutableMap.of("rows", 1L, "idx", 2L)) + ), + new Result<>( + new DateTime("2011-04-03"), + new TimeseriesResultValue(ImmutableMap.of("rows", 3L, "idx", 4L)) + ) + ); + final List> ds2 = Lists.newArrayList( + new Result<>( + new DateTime("2011-04-01"), + new TimeseriesResultValue(ImmutableMap.of("rows", 5L, "idx", 6L)) + ), + new Result<>( + new DateTime("2011-04-02"), + new TimeseriesResultValue(ImmutableMap.of("rows", 7L, "idx", 8L)) + ), + new Result<>( + new DateTime("2011-04-04"), + new TimeseriesResultValue(ImmutableMap.of("rows", 9L, "idx", 10L)) + ) + ); + QueryRunner mergingrunner = toolChest.mergeResults( - new UnionQueryRunner>( + new UnionQueryRunner<>( new QueryRunner>() { @Override - public Sequence> run(Query> query, - Map responseContext + public Sequence> run( + Query> query, + Map responseContext ) { if (query.getDataSource().equals(new TableDataSource("ds1"))) { - return Sequences.simple( - Lists.newArrayList( - new Result( - new DateTime("2011-04-02"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 1L, - "idx", - 2L - ) - ) - ), - new Result( - new DateTime("2011-04-03"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 3L, - "idx", - 4L - ) - ) - ) - ) - ); + return Sequences.simple(descending ? Lists.reverse(ds1) : ds1); } else { - return Sequences.simple( - Lists.newArrayList( - new Result( - new DateTime("2011-04-01"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 5L, - "idx", - 6L - ) - ) - ), - new Result( - new DateTime("2011-04-02"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 7L, - "idx", - 8L - ) - ) - ), - new Result( - new DateTime("2011-04-04"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 9L, - "idx", - 10L - ) - ) - ) - ) - ); + return Sequences.simple(descending ? Lists.reverse(ds2) : ds2); } } - }, - toolChest + } ) ); @@ -253,8 +232,7 @@ public class TimeSeriesUnionQueryRunnerTest Lists.>newArrayList() ); - System.out.println(results); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java index 9608cad6100..5abb0ebdc83 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.CacheStrategy; +import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; @@ -34,9 +35,27 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.io.IOException; +import java.util.Arrays; + +@RunWith(Parameterized.class) public class TimeseriesQueryQueryToolChestTest { + @Parameterized.Parameters(name = "descending={0}") + public static Iterable constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.transformToConstructionFeeder(Arrays.asList(false, true)); + } + + private final boolean descending; + + public TimeseriesQueryQueryToolChestTest(boolean descending) + { + this.descending = descending; + } @Test public void testCacheStrategy() throws Exception @@ -53,6 +72,7 @@ public class TimeseriesQueryQueryToolChestTest ) ) ), + descending, null, QueryGranularity.ALL, ImmutableList.of(new CountAggregatorFactory("metric1")), diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java index 4b9d1cae210..bb6cfe7b329 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java @@ -42,12 +42,30 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; +@RunWith(Parameterized.class) public class TimeseriesQueryRunnerBonusTest { + @Parameterized.Parameters(name = "descending={0}") + public static Iterable constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.transformToConstructionFeeder(Arrays.asList(false, true)); + } + + private final boolean descending; + + public TimeseriesQueryRunnerBonusTest(boolean descending) + { + this.descending = descending; + } + @Test public void testOneRowAtATime() throws Exception { @@ -88,7 +106,7 @@ public class TimeseriesQueryRunnerBonusTest Assert.assertEquals("result count metric", 2, (long) results.get(0).getValue().getLongMetric("rows")); } - private static List> runTimeseriesCount(IncrementalIndex index) + private List> runTimeseriesCount(IncrementalIndex index) { final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( new TimeseriesQueryQueryToolChest( @@ -111,6 +129,7 @@ public class TimeseriesQueryRunnerBonusTest new CountAggregatorFactory("rows") ) ) + .descending(descending) .build(); HashMap context = new HashMap(); return Sequences.toList( diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java index 7ec34b473a5..c3821630647 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -69,10 +69,11 @@ public class TimeseriesQueryRunnerTest public static final Map CONTEXT = ImmutableMap.of(); - @Parameterized.Parameters + @Parameterized.Parameters(name="{0}:descending={1}") public static Iterable constructorFeeder() throws IOException { - return QueryRunnerTestHelper.transformToConstructionFeeder( + return QueryRunnerTestHelper.cartesian( + // runners QueryRunnerTestHelper.makeQueryRunners( new TimeseriesQueryRunnerFactory( new TimeseriesQueryQueryToolChest( @@ -81,17 +82,29 @@ public class TimeseriesQueryRunnerTest new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ) - ) + ), + // descending? + Arrays.asList(false, true) ); } + private void assertExpectedResults(Iterable> expectedResults, Iterable> results) + { + if (descending) { + expectedResults = TestHelper.revert(expectedResults); + } + TestHelper.assertExpectedResults(expectedResults, results); + } + private final QueryRunner runner; + private final boolean descending; public TimeseriesQueryRunnerTest( - QueryRunner runner + QueryRunner runner, boolean descending ) { this.runner = runner; + this.descending = descending; } @Test @@ -110,52 +123,57 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); - DateTime expectedEarliest = new DateTime("2011-01-12"); - DateTime expectedLast = new DateTime("2011-04-15"); - Iterable> results = Sequences.toList( runner.run(query, CONTEXT), Lists.>newArrayList() ); + final String[] expectedIndex = descending ? + QueryRunnerTestHelper.expectedFullOnIndexValuesDesc : + QueryRunnerTestHelper.expectedFullOnIndexValues; + + final DateTime expectedLast = descending ? + QueryRunnerTestHelper.earliest : + QueryRunnerTestHelper.last; + int count = 0; Result lastResult = null; for (Result result : results) { - lastResult = result; - Assert.assertEquals(expectedEarliest, result.getTimestamp()); + DateTime current = result.getTimestamp(); Assert.assertFalse( - String.format("Timestamp[%s] > expectedLast[%s]", result.getTimestamp(), expectedLast), - result.getTimestamp().isAfter(expectedLast) + String.format("Timestamp[%s] > expectedLast[%s]", current, expectedLast), + descending ? current.isBefore(expectedLast) : current.isAfter(expectedLast) ); final TimeseriesResultValue value = result.getValue(); Assert.assertEquals( result.toString(), - QueryRunnerTestHelper.skippedDay.equals(result.getTimestamp()) ? 0L : 13L, + QueryRunnerTestHelper.skippedDay.equals(current) ? 0L : 13L, value.getLongMetric("rows").longValue() ); Assert.assertEquals( result.toString(), - QueryRunnerTestHelper.expectedFullOnIndexValues[count], + expectedIndex[count], String.valueOf(value.getDoubleMetric("index")) ); Assert.assertEquals( result.toString(), - new Double(QueryRunnerTestHelper.expectedFullOnIndexValues[count]) + - (QueryRunnerTestHelper.skippedDay.equals(result.getTimestamp()) ? 0L : 13L) + 1L, + new Double(expectedIndex[count]) + + (QueryRunnerTestHelper.skippedDay.equals(current) ? 0L : 13L) + 1L, value.getDoubleMetric("addRowsIndexConstant"), 0.0 ); Assert.assertEquals( value.getDoubleMetric("uniques"), - QueryRunnerTestHelper.skippedDay.equals(result.getTimestamp()) ? 0.0d : 9.0d, + QueryRunnerTestHelper.skippedDay.equals(current) ? 0.0d : 9.0d, 0.02 ); - expectedEarliest = gran.toDateTime(gran.next(expectedEarliest.getMillis())); + lastResult = result; ++count; } @@ -175,6 +193,7 @@ public class TimeseriesQueryRunnerTest new DoubleMinAggregatorFactory("minIndex", "index") ) ) + .descending(descending) .build(); DateTime expectedEarliest = new DateTime("2011-01-12"); @@ -202,7 +221,6 @@ public class TimeseriesQueryRunnerTest public void testFullOnTimeseriesWithFilter() { - QueryGranularity gran = QueryGranularity.DAY; TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.dataSource) .granularity(QueryRunnerTestHelper.dayGran) @@ -214,6 +232,7 @@ public class TimeseriesQueryRunnerTest QueryRunnerTestHelper.qualityUniques ) ) + .descending(descending) .build(); Assert.assertEquals( @@ -224,8 +243,9 @@ public class TimeseriesQueryRunnerTest query.getDimensionsFilter() ); - DateTime expectedEarliest = new DateTime("2011-01-12"); - DateTime expectedLast = new DateTime("2011-04-15"); + final DateTime expectedLast = descending ? + QueryRunnerTestHelper.earliest : + QueryRunnerTestHelper.last; Iterable> results = Sequences.toList( runner.run(query, CONTEXT), @@ -233,10 +253,10 @@ public class TimeseriesQueryRunnerTest ); for (Result result : results) { - Assert.assertEquals(result.toString(), expectedEarliest, result.getTimestamp()); + DateTime current = result.getTimestamp(); Assert.assertFalse( - String.format("Timestamp[%s] > expectedLast[%s]", result.getTimestamp(), expectedLast), - result.getTimestamp().isAfter(expectedLast) + String.format("Timestamp[%s] > expectedLast[%s]", current, expectedLast), + descending ? current.isBefore(expectedLast) : current.isAfter(expectedLast) ); final TimeseriesResultValue value = result.getValue(); @@ -254,8 +274,6 @@ public class TimeseriesQueryRunnerTest ), 0.01 ); - - expectedEarliest = gran.toDateTime(gran.next(expectedEarliest.getMillis())); } } @@ -276,6 +294,7 @@ public class TimeseriesQueryRunnerTest QueryRunnerTestHelper.qualityUniques ) ) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -298,7 +317,7 @@ public class TimeseriesQueryRunnerTest Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -323,6 +342,7 @@ public class TimeseriesQueryRunnerTest DateTimeZone.forID("America/Los_Angeles") ) ) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -345,7 +365,7 @@ public class TimeseriesQueryRunnerTest Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -371,6 +391,7 @@ public class TimeseriesQueryRunnerTest QueryRunnerTestHelper.qualityUniques ) ) + .descending(descending) .build(); List> expectedResults1 = Arrays.asList( @@ -386,7 +407,7 @@ public class TimeseriesQueryRunnerTest runner.run(query1, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults1, results1); + assertExpectedResults(expectedResults1, results1); TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.dataSource) @@ -423,7 +444,7 @@ public class TimeseriesQueryRunnerTest runner.run(query2, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults2, results2); + assertExpectedResults(expectedResults2, results2); } @Test @@ -455,6 +476,7 @@ public class TimeseriesQueryRunnerTest ) ) ) + .descending(descending) .build(); List> expectedResults1 = Arrays.asList( @@ -476,7 +498,7 @@ public class TimeseriesQueryRunnerTest runner.run(query1, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults1, results1); + assertExpectedResults(expectedResults1, results1); } @Test @@ -502,6 +524,7 @@ public class TimeseriesQueryRunnerTest ) ) ) + .descending(descending) .build(); List> lotsOfZeroes = Lists.newArrayList(); @@ -544,7 +567,7 @@ public class TimeseriesQueryRunnerTest runner.run(query1, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults1, results1); + assertExpectedResults(expectedResults1, results1); } @Test @@ -576,6 +599,7 @@ public class TimeseriesQueryRunnerTest ) ) ) + .descending(descending) .build(); List> expectedResults1 = Arrays.asList( @@ -591,7 +615,7 @@ public class TimeseriesQueryRunnerTest runner.run(query1, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults1, results1); + assertExpectedResults(expectedResults1, results1); } @Test @@ -618,6 +642,7 @@ public class TimeseriesQueryRunnerTest QueryRunnerTestHelper.qualityUniques ) ) + .descending(descending) .build(); List> expectedResults1 = Arrays.asList( @@ -632,7 +657,7 @@ public class TimeseriesQueryRunnerTest runner.run(query1, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults1, results1); + assertExpectedResults(expectedResults1, results1); TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.dataSource) @@ -670,7 +695,7 @@ public class TimeseriesQueryRunnerTest runner.run(query2, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults2, results2); + assertExpectedResults(expectedResults2, results2); } @Test @@ -697,6 +722,7 @@ public class TimeseriesQueryRunnerTest ) ) ) + .descending(descending) .build(); List> expectedResults = Arrays.asList(); @@ -705,7 +731,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -724,6 +750,7 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -755,7 +782,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -774,6 +801,7 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -805,7 +833,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -824,6 +852,7 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -855,7 +884,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -874,6 +903,7 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -905,7 +935,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -924,6 +954,7 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -955,7 +986,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -982,6 +1013,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -1013,7 +1045,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -1040,6 +1072,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -1071,7 +1104,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -1098,6 +1131,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -1129,7 +1163,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -1154,6 +1188,7 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -1185,7 +1220,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @@ -1214,6 +1249,7 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -1245,7 +1281,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -1276,6 +1312,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -1307,7 +1344,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -1320,6 +1357,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -1351,7 +1389,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -1365,6 +1403,7 @@ public class TimeseriesQueryRunnerTest .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) .context(ImmutableMap.of("skipEmptyBuckets", "true")) + .descending(descending) .build(); List> expectedResults = Arrays.asList(); @@ -1373,7 +1412,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, new HashMap()), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -1386,6 +1425,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -1417,7 +1457,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, new HashMap()), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -1430,6 +1470,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -1461,7 +1502,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, new HashMap()), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -1474,6 +1515,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -1505,7 +1547,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -1532,6 +1574,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -1563,7 +1606,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, results); + assertExpectedResults(expectedResults, results); } @Test @@ -1580,6 +1623,7 @@ public class TimeseriesQueryRunnerTest QueryRunnerTestHelper.jsPlacementishCount ) ) + .descending(descending) .build(); Iterable> expectedResults = ImmutableList.of( @@ -1603,7 +1647,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, actualResults); + assertExpectedResults(expectedResults, actualResults); } @Test @@ -1621,9 +1665,10 @@ public class TimeseriesQueryRunnerTest QueryRunnerTestHelper.jsPlacementishCount ) ) + .descending(descending) .build(); - Iterable> expectedResults = ImmutableList.of( + List> expectedResults = ImmutableList.of( new Result<>( new DateTime( QueryRunnerTestHelper.firstToThird.getIntervals() @@ -1644,7 +1689,7 @@ public class TimeseriesQueryRunnerTest runner.run(query, CONTEXT), Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, actualResults); + assertExpectedResults(expectedResults, actualResults); } @Test @@ -1657,6 +1702,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); Iterable> expectedResults = Sequences.toList( @@ -1667,6 +1713,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(), CONTEXT ), @@ -1689,6 +1736,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); Iterable> expectedResults = Sequences.toList( @@ -1700,6 +1748,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(), CONTEXT ), @@ -1736,6 +1785,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); AndDimFilter andDimFilter2 = Druids.newAndDimFilterBuilder() @@ -1762,6 +1812,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(), CONTEXT ), @@ -1797,6 +1848,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); AndDimFilter andDimFilter2 = Druids.newAndDimFilterBuilder() @@ -1826,6 +1878,7 @@ public class TimeseriesQueryRunnerTest .intervals(QueryRunnerTestHelper.firstToThird) .aggregators(QueryRunnerTestHelper.commonAggregators) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(), CONTEXT ), @@ -1862,6 +1915,7 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); Iterable> actualResults = Sequences.toList( @@ -1883,7 +1937,7 @@ public class TimeseriesQueryRunnerTest ) ); - TestHelper.assertExpectedResults(expectedResults, actualResults); + assertExpectedResults(expectedResults, actualResults); } @Test @@ -1910,6 +1964,7 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); Iterable> actualResults = Sequences.toList( @@ -1932,7 +1987,7 @@ public class TimeseriesQueryRunnerTest ) ); - TestHelper.assertExpectedResults(expectedResults, actualResults); + assertExpectedResults(expectedResults, actualResults); } @Test @@ -1959,6 +2014,7 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); Iterable> actualResults = Sequences.toList( @@ -1981,7 +2037,7 @@ public class TimeseriesQueryRunnerTest ) ); - TestHelper.assertExpectedResults(expectedResults, actualResults); + assertExpectedResults(expectedResults, actualResults); } @Test @@ -2010,6 +2066,7 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); Iterable> actualResults = Sequences.toList( @@ -2031,7 +2088,7 @@ public class TimeseriesQueryRunnerTest ) ); - TestHelper.assertExpectedResults(expectedResults, actualResults); + assertExpectedResults(expectedResults, actualResults); } @Test @@ -2060,6 +2117,7 @@ public class TimeseriesQueryRunnerTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); Iterable> actualResults = Sequences.toList( @@ -2081,7 +2139,7 @@ public class TimeseriesQueryRunnerTest ) ); - TestHelper.assertExpectedResults(expectedResults, actualResults); + assertExpectedResults(expectedResults, actualResults); } @Test @@ -2098,6 +2156,7 @@ public class TimeseriesQueryRunnerTest ) ) .granularity(QueryRunnerTestHelper.allGran) + .descending(descending) .build(); List> expectedResults = Arrays.asList( @@ -2121,7 +2180,7 @@ public class TimeseriesQueryRunnerTest Lists.>newArrayList() ); - TestHelper.assertExpectedResults(expectedResults, actualResults); + assertExpectedResults(expectedResults, actualResults); } @Test diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryTest.java index 31a23165f16..3a93a703c17 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryTest.java @@ -27,14 +27,30 @@ import io.druid.query.QueryRunnerTestHelper; import io.druid.query.aggregation.PostAggregator; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.util.Arrays; +@RunWith(Parameterized.class) public class TimeseriesQueryTest { private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + @Parameterized.Parameters(name="descending={0}") + public static Iterable constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.cartesian(Arrays.asList(false, true)); + } + + private final boolean descending; + + public TimeseriesQueryTest(boolean descending) + { + this.descending = descending; + } + @Test public void testQuerySerialization() throws IOException { @@ -49,6 +65,7 @@ public class TimeseriesQueryTest ) ) .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(descending) .build(); String json = jsonMapper.writeValueAsString(query); diff --git a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java index 97af958da14..a7b27c0f0f3 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java @@ -41,7 +41,6 @@ import org.junit.runners.Parameterized; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,40 +58,43 @@ public class TopNUnionQueryTest } @Parameterized.Parameters - public static Collection constructorFeeder() throws IOException + public static Iterable constructorFeeder() throws IOException { - List retVal = Lists.newArrayList(); - retVal.addAll( - QueryRunnerTestHelper.makeUnionQueryRunners( - new TopNQueryRunnerFactory( - TestQueryRunners.getPool(), - new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), - QueryRunnerTestHelper.NOOP_QUERYWATCHER - ), - QueryRunnerTestHelper.unionDataSource - ) - ); - retVal.addAll( - QueryRunnerTestHelper.makeUnionQueryRunners( - new TopNQueryRunnerFactory( - new StupidPool( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(2000); - } - } + return QueryRunnerTestHelper.cartesian( + Iterables.concat( + QueryRunnerTestHelper.makeUnionQueryRunners( + new TopNQueryRunnerFactory( + TestQueryRunners.getPool(), + new TopNQueryQueryToolChest( + new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ), + QueryRunnerTestHelper.NOOP_QUERYWATCHER ), - new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), - QueryRunnerTestHelper.NOOP_QUERYWATCHER + QueryRunnerTestHelper.unionDataSource ), - QueryRunnerTestHelper.unionDataSource + QueryRunnerTestHelper.makeUnionQueryRunners( + new TopNQueryRunnerFactory( + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(2000); + } + } + ), + new TopNQueryQueryToolChest( + new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + QueryRunnerTestHelper.unionDataSource + ) ) ); - - return retVal; } @Test diff --git a/processing/src/test/java/io/druid/segment/BitmapOffsetTest.java b/processing/src/test/java/io/druid/segment/BitmapOffsetTest.java index b694d90fd7f..7ae48a7671c 100644 --- a/processing/src/test/java/io/druid/segment/BitmapOffsetTest.java +++ b/processing/src/test/java/io/druid/segment/BitmapOffsetTest.java @@ -42,7 +42,7 @@ public class BitmapOffsetTest ImmutableConciseSet set = ImmutableConciseSet.newImmutableFromMutable(mutableSet); - BitmapOffset offset = new BitmapOffset(new ConciseBitmapFactory(), new WrappedImmutableConciseBitmap(set)); + BitmapOffset offset = new BitmapOffset(new ConciseBitmapFactory(), new WrappedImmutableConciseBitmap(set), false); int count = 0; while (offset.withinBounds()) { diff --git a/processing/src/test/java/io/druid/segment/TestHelper.java b/processing/src/test/java/io/druid/segment/TestHelper.java index 6875caf9076..b6fc04f27fb 100644 --- a/processing/src/test/java/io/druid/segment/TestHelper.java +++ b/processing/src/test/java/io/druid/segment/TestHelper.java @@ -69,6 +69,10 @@ public class TestHelper return JSON_MAPPER; } + public static Iterable revert(Iterable input) { + return Lists.reverse(Lists.newArrayList(input)); + } + public static void assertExpectedResults(Iterable> expectedResults, Sequence> results) { assertResults(expectedResults, Sequences.toList(results, Lists.>newArrayList()), ""); @@ -150,17 +154,19 @@ public class TestHelper } } - private static void assertObjects(Iterable expectedResults, Iterable actualResults, String failMsg) + private static void assertObjects(Iterable expectedResults, Iterable actualResults, String msg) { Iterator resultsIter = actualResults.iterator(); Iterator resultsIter2 = actualResults.iterator(); Iterator expectedResultsIter = expectedResults.iterator(); + int index = 0; while (resultsIter.hasNext() && resultsIter2.hasNext() && expectedResultsIter.hasNext()) { Object expectedNext = expectedResultsIter.next(); final Object next = resultsIter.next(); final Object next2 = resultsIter2.next(); + String failMsg = msg + "-" + index++; Assert.assertEquals(failMsg, expectedNext, next); Assert.assertEquals( String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg), @@ -171,13 +177,13 @@ public class TestHelper if (resultsIter.hasNext()) { Assert.fail( - String.format("%s: Expected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next()) + String.format("%s: Expected resultsIter to be exhausted, next element was %s", msg, resultsIter.next()) ); } if (resultsIter2.hasNext()) { Assert.fail( - String.format("%s: Expected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next()) + String.format("%s: Expected resultsIter2 to be exhausted, next element was %s", msg, resultsIter.next()) ); } @@ -185,7 +191,7 @@ public class TestHelper Assert.fail( String.format( "%s: Expected expectedResultsIter to be exhausted, next element was %s", - failMsg, + msg, expectedResultsIter.next() ) ); diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 6816f40ed99..31130d83bf3 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -252,31 +252,35 @@ public class IncrementalIndexStorageAdapterTest ); IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(index); - Sequence cursorSequence = adapter.makeCursors( - new SelectorFilter("sally", "bo"), - interval, - QueryGranularity.NONE - ); - Cursor cursor = Sequences.toList(Sequences.limit(cursorSequence, 1), Lists.newArrayList()).get(0); - DimensionSelector dimSelector; + for (boolean descending : Arrays.asList(false, true)) { + Sequence cursorSequence = adapter.makeCursors( + new SelectorFilter("sally", "bo"), + interval, + QueryGranularity.NONE, + descending + ); - dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally")); - Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0))); + Cursor cursor = Sequences.toList(Sequences.limit(cursorSequence, 1), Lists.newArrayList()).get(0); + DimensionSelector dimSelector; - index.add( - new MapBasedInputRow( - t.minus(1).getMillis(), - Lists.newArrayList("sally"), - ImmutableMap.of("sally", "ah") - ) - ); + dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally")); + Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0))); - // Cursor reset should not be affected by out of order values - cursor.reset(); + index.add( + new MapBasedInputRow( + t.minus(1).getMillis(), + Lists.newArrayList("sally"), + ImmutableMap.of("sally", "ah") + ) + ); - dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally")); - Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0))); + // Cursor reset should not be affected by out of order values + cursor.reset(); + + dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally")); + Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0))); + } } @Test diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 4d007218dc6..fddc482dfe1 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -39,6 +39,7 @@ import com.google.inject.Inject; import com.metamx.common.Pair; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.LazySequence; +import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.EmittingLogger; @@ -49,6 +50,7 @@ import io.druid.client.selector.ServerSelector; import io.druid.concurrent.Execs; import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Smile; +import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValueClass; import io.druid.query.CacheStrategy; import io.druid.query.Query; @@ -131,20 +133,20 @@ public class CachingClusteredClient implements QueryRunner final List> cachedResults = Lists.newArrayList(); final Map cachePopulatorMap = Maps.newHashMap(); - final boolean useCache = query.getContextUseCache(true) + final boolean useCache = BaseQuery.getContextUseCache(query, true) && strategy != null && cacheConfig.isUseCache() && cacheConfig.isQueryCacheable(query); - final boolean populateCache = query.getContextPopulateCache(true) + final boolean populateCache = BaseQuery.getContextPopulateCache(query, true) && strategy != null && cacheConfig.isPopulateCache() && cacheConfig.isQueryCacheable(query); - final boolean isBySegment = query.getContextBySegment(false); + final boolean isBySegment = BaseQuery.getContextBySegment(query, false); final ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); - final int priority = query.getContextPriority(0); + final int priority = BaseQuery.getContextPriority(query, 0); contextBuilder.put("priority", priority); if (populateCache) { @@ -292,7 +294,7 @@ public class CachingClusteredClient implements QueryRunner addSequencesFromCache(sequencesByInterval); addSequencesFromServer(sequencesByInterval); - return mergeCachedAndUncachedSequences(sequencesByInterval, toolChest); + return mergeCachedAndUncachedSequences(query, sequencesByInterval); } private void addSequencesFromCache(ArrayList> listOfSequences) @@ -340,8 +342,7 @@ public class CachingClusteredClient implements QueryRunner { listOfSequences.ensureCapacity(listOfSequences.size() + serverSegments.size()); - final Query>> rewrittenQuery = (Query>>) query - .withOverriddenContext(contextBuilder.build()); + final Query rewrittenQuery = query.withOverriddenContext(contextBuilder.build()); // Loop through each server, setting up the query and initiating it. // The data gets handled as a Future and parsed in the long Sequence chain in the resultSeqToAdd setter. @@ -366,7 +367,8 @@ public class CachingClusteredClient implements QueryRunner // bySegment queries need to be de-serialized, see DirectDruidClient.run() @SuppressWarnings("unchecked") - final Query>> bySegmentQuery = (Query>>) query; + final Query>> bySegmentQuery = + (Query>>) ((Query) query); @SuppressWarnings("unchecked") final Sequence>> resultSequence = clientQueryable.run( @@ -406,7 +408,8 @@ public class CachingClusteredClient implements QueryRunner rewrittenQuery.withQuerySegmentSpec(segmentSpec), responseContext ); - resultSeqToAdd = toolChest.mergeSequencesUnordered( + resultSeqToAdd = new MergeSequence( + query.getResultOrdering(), Sequences.>, Sequence>map( runningSequence, new Function>, Sequence>() @@ -504,18 +507,17 @@ public class CachingClusteredClient implements QueryRunner } protected Sequence mergeCachedAndUncachedSequences( - List> sequencesByInterval, - QueryToolChest> toolChest + Query query, + List> sequencesByInterval ) { if (sequencesByInterval.isEmpty()) { return Sequences.empty(); } - return toolChest.mergeSequencesUnordered( - Sequences.simple( - sequencesByInterval - ) + return new MergeSequence<>( + query.getResultOrdering(), + Sequences.simple(sequencesByInterval) ); } diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index cc8d6adb493..cf422b7aaf4 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -35,6 +35,7 @@ import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -87,12 +88,12 @@ public class CachingQueryRunner implements QueryRunner { final CacheStrategy strategy = toolChest.getCacheStrategy(query); - final boolean populateCache = query.getContextPopulateCache(true) + final boolean populateCache = BaseQuery.getContextPopulateCache(query, true) && strategy != null && cacheConfig.isPopulateCache() && cacheConfig.isQueryCacheable(query); - final boolean useCache = query.getContextUseCache(true) + final boolean useCache = BaseQuery.getContextUseCache(query, true) && strategy != null && cacheConfig.isUseCache() && cacheConfig.isQueryCacheable(query); diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 92cbefc5487..83cb2c983a0 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -52,6 +52,7 @@ import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValueClass; import io.druid.query.DruidMetrics; import io.druid.query.Query; @@ -134,7 +135,7 @@ public class DirectDruidClient implements QueryRunner public Sequence run(final Query query, final Map context) { QueryToolChest> toolChest = warehouse.getToolChest(query); - boolean isBySegment = query.getContextBySegment(false); + boolean isBySegment = BaseQuery.getContextBySegment(query, false); Pair types = typesMap.get(query.getClass()); if (types == null) { diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java index 3b885b0b8cb..14a49077693 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java @@ -75,7 +75,8 @@ public class IngestSegmentFirehose implements Firehose adapter.getAdapter().makeCursors( Filters.convertDimensionFilters(dimFilter), adapter.getInterval(), - granularity + granularity, + false ), new Function>() { @Nullable diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 6493e5ec4a0..c8325ca07dd 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -97,8 +97,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker retryConfig, objectMapper ) - ), - toolChest + ) ) ) ), diff --git a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java index 17c7f842821..66164f91c70 100644 --- a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java +++ b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.collect.Iterables; +import io.druid.query.BaseQuery; import io.druid.query.Query; /** @@ -45,7 +46,7 @@ public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelecto @Override public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) { - final int priority = query.getContextPriority(0); + final int priority = BaseQuery.getContextPriority(query, 0); if (priority < minPriority) { return Optional.of( diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index fca21d4a469..4f09b05cfcf 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -926,11 +926,15 @@ public class CachingClusteredClientTest new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 ), client.mergeCachedAndUncachedSequences( - sequences, - new TopNQueryQueryToolChest( - new TopNQueryConfig(), - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() - ) + new TopNQueryBuilder() + .dataSource("test") + .intervals("2011-01-06/2011-01-10") + .dimension("a") + .metric("b") + .threshold(3) + .aggregators(Arrays.asList(new CountAggregatorFactory("b"))) + .build(), + sequences ) ); } diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index 3da0bf9e7e8..16375b1669c 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -37,15 +37,19 @@ import io.druid.client.cache.MapCache; import io.druid.granularity.AllGranularity; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.CacheStrategy; +import io.druid.query.Druids; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; -import io.druid.query.topn.TopNQuery; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.query.topn.TopNQueryBuilder; import io.druid.query.topn.TopNQueryConfig; import io.druid.query.topn.TopNQueryQueryToolChest; @@ -84,8 +88,10 @@ public class CachingQueryRunnerTest @Test public void testCloseAndPopulate() throws Exception { - Iterable> expectedRes = makeTopNResults(false, objects); - final TopNQueryBuilder builder = new TopNQueryBuilder() + List expectedRes = makeTopNResults(false, objects); + List expectedCacheRes = makeTopNResults(true, objects); + + TopNQueryBuilder builder = new TopNQueryBuilder() .dataSource("ds") .dimension("top_dim") .metric("imps") @@ -94,6 +100,72 @@ public class CachingQueryRunnerTest .aggregators(AGGS) .granularity(AllGranularity.ALL); + QueryToolChest toolchest = new TopNQueryQueryToolChest( + new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + + testCloseAndPopulate(expectedRes, expectedCacheRes, builder.build(), toolchest); + testUseCache(expectedCacheRes, builder.build(), toolchest); + } + + @Test + public void testTimeseries() throws Exception + { + for (boolean descending : new boolean[]{false, true}) { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.dayGran) + .intervals(QueryRunnerTestHelper.firstToThird) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory( + "idx", + "index" + ), + QueryRunnerTestHelper.qualityUniques + ) + ) + .descending(descending) + .build(); + + Result row1 = new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 13L, "idx", 6619L, "uniques", QueryRunnerTestHelper.UNIQUES_9) + ) + ); + Result row2 = new Result<>( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9) + ) + ); + List expectedResults; + if (descending) { + expectedResults = Lists.newArrayList(row2, row1); + } else { + expectedResults = Lists.newArrayList(row1, row2); + } + + QueryToolChest toolChest = new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + + testCloseAndPopulate(expectedResults, expectedResults, query, toolChest); + testUseCache(expectedResults, query, toolChest); + } + } + + private void testCloseAndPopulate( + List expectedRes, + List expectedCacheRes, + Query query, + QueryToolChest toolchest + ) + throws Exception + { final AssertingClosable closable = new AssertingClosable(); final Sequence resultSeq = new ResourceClosingSequence( Sequences.simple(expectedRes), closable @@ -115,8 +187,6 @@ public class CachingQueryRunnerTest String segmentIdentifier = "segment"; SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0); - TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig(), - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()); DefaultObjectMapper objectMapper = new DefaultObjectMapper(); CachingQueryRunner runner = new CachingQueryRunner( segmentIdentifier, @@ -149,8 +219,7 @@ public class CachingQueryRunnerTest } ); - TopNQuery query = builder.build(); - CacheStrategy, Object, TopNQuery> cacheStrategy = toolchest.getCacheStrategy(query); + CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query); Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey( segmentIdentifier, segmentDescriptor, @@ -165,15 +234,13 @@ public class CachingQueryRunnerTest ArrayList results = Sequences.toList(res, new ArrayList()); Assert.assertTrue(closable.isClosed()); - Assert.assertEquals(expectedRes, results); - - Iterable> expectedCacheRes = makeTopNResults(true, objects); + Assert.assertEquals(expectedRes.toString(), results.toString()); byte[] cacheValue = cache.get(cacheKey); Assert.assertNotNull(cacheValue); - Function> fn = cacheStrategy.pullFromCache(); - List> cacheResults = Lists.newArrayList( + Function fn = cacheStrategy.pullFromCache(); + List cacheResults = Lists.newArrayList( Iterators.transform( objectMapper.readValues( objectMapper.getFactory().createParser(cacheValue), @@ -182,31 +249,20 @@ public class CachingQueryRunnerTest fn ) ); - Assert.assertEquals(expectedCacheRes, cacheResults); + Assert.assertEquals(expectedCacheRes.toString(), cacheResults.toString()); } - @Test - public void testUseCache() throws Exception + private void testUseCache( + List expectedResults, + Query query, + QueryToolChest toolchest + ) throws Exception { DefaultObjectMapper objectMapper = new DefaultObjectMapper(); - Iterable> expectedResults = makeTopNResults(true, objects); String segmentIdentifier = "segment"; SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0); - TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig(), - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()); - final TopNQueryBuilder builder = new TopNQueryBuilder() - .dataSource("ds") - .dimension("top_dim") - .metric("imps") - .threshold(3) - .intervals("2011-01-05/2011-01-10") - .aggregators(AGGS) - .granularity(AllGranularity.ALL); - - final TopNQuery query = builder.build(); - - CacheStrategy, Object, TopNQuery> cacheStrategy = toolchest.getCacheStrategy(query); + CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query); Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey( segmentIdentifier, segmentDescriptor, @@ -254,14 +310,14 @@ public class CachingQueryRunnerTest ); HashMap context = new HashMap(); - List results = Sequences.toList(runner.run(query, context), new ArrayList()); - Assert.assertEquals(expectedResults, results); + List results = Sequences.toList(runner.run(query, context), new ArrayList()); + Assert.assertEquals(expectedResults.toString(), results.toString()); } - private Iterable> makeTopNResults + private List makeTopNResults (boolean cachedResults, Object... objects) { - List> retVal = Lists.newArrayList(); + List retVal = Lists.newArrayList(); int index = 0; while (index < objects.length) { DateTime timestamp = (DateTime) objects[index++]; diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index ca85ddc2709..29ce37c14be 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -30,7 +30,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.IAE; import com.metamx.common.MapUtils; import com.metamx.common.Pair; -import com.metamx.common.guava.ConcatSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Yielder; @@ -571,18 +570,6 @@ public class ServerManagerTest return runner; } - @Override - public Sequence mergeSequences(Sequence> seqOfSequences) - { - return new ConcatSequence(seqOfSequences); - } - - @Override - public Sequence mergeSequencesUnordered(Sequence> seqOfSequences) - { - return new ConcatSequence(seqOfSequences); - } - @Override public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query) {