diff --git a/docs/content/Tasks.md b/docs/content/Tasks.md index 845ac707fbc..868e75efe88 100644 --- a/docs/content/Tasks.md +++ b/docs/content/Tasks.md @@ -220,7 +220,7 @@ Kill tasks delete all information about a segment and removes it from deep stora "type": "kill", "id": , "dataSource": , - "segments": + "interval" : } ``` diff --git a/pom.xml b/pom.xml index 7d270176674..7bde87c2e24 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ UTF-8 0.26.5 2.5.0 - 0.2.5 + 0.2.6 diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index 932af432ad1..ff03e8db7f7 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -35,6 +35,8 @@ import io.druid.query.search.SearchResultValue; import io.druid.query.search.search.InsensitiveContainsSearchQuerySpec; import io.druid.query.search.search.SearchQuery; import io.druid.query.search.search.SearchQuerySpec; +import io.druid.query.select.PagingSpec; +import io.druid.query.select.SelectQuery; import io.druid.query.spec.LegacySegmentSpec; import io.druid.query.spec.QuerySegmentSpec; import io.druid.query.timeboundary.TimeBoundaryQuery; @@ -943,4 +945,158 @@ public class Druids { return new SegmentMetadataQueryBuilder(); } + + /** + * A Builder for SelectQuery. + *

+ * Required: dataSource(), intervals() must be called before build() + *

+ * Usage example: + *


+   *   SelectQuery query = new SelectQueryBuilder()
+   *                                  .dataSource("Example")
+   *                                  .interval("2010/2013")
+   *                                  .build();
+   * 
+ * + * @see io.druid.query.select.SelectQuery + */ + public static class SelectQueryBuilder + { + private DataSource dataSource; + private QuerySegmentSpec querySegmentSpec; + private Map context; + private DimFilter dimFilter; + private QueryGranularity granularity; + private List dimensions; + private List metrics; + private PagingSpec pagingSpec; + + public SelectQueryBuilder() + { + dataSource = null; + querySegmentSpec = null; + context = null; + dimFilter = null; + granularity = QueryGranularity.ALL; + dimensions = Lists.newArrayList(); + metrics = Lists.newArrayList(); + pagingSpec = null; + } + + public SelectQuery build() + { + return new SelectQuery( + dataSource, + querySegmentSpec, + dimFilter, + granularity, + dimensions, + metrics, + pagingSpec, + context + ); + } + + public SelectQueryBuilder copy(SelectQueryBuilder builder) + { + return new SelectQueryBuilder() + .dataSource(builder.dataSource) + .intervals(builder.querySegmentSpec) + .context(builder.context); + } + + public SelectQueryBuilder dataSource(String ds) + { + dataSource = new TableDataSource(ds); + return this; + } + + public SelectQueryBuilder dataSource(DataSource ds) + { + dataSource = ds; + return this; + } + + public SelectQueryBuilder intervals(QuerySegmentSpec q) + { + querySegmentSpec = q; + return this; + } + + public SelectQueryBuilder intervals(String s) + { + querySegmentSpec = new LegacySegmentSpec(s); + return this; + } + + public SelectQueryBuilder intervals(List l) + { + querySegmentSpec = new LegacySegmentSpec(l); + return this; + } + + public SelectQueryBuilder context(Map c) + { + context = c; + return this; + } + + public SelectQueryBuilder filters(String dimensionName, String value) + { + dimFilter = new SelectorDimFilter(dimensionName, value); + return this; + } + + public SelectQueryBuilder filters(String dimensionName, String value, String... values) + { + List fields = Lists.newArrayList(new SelectorDimFilter(dimensionName, value)); + for (String val : values) { + fields.add(new SelectorDimFilter(dimensionName, val)); + } + dimFilter = new OrDimFilter(fields); + return this; + } + + public SelectQueryBuilder filters(DimFilter f) + { + dimFilter = f; + return this; + } + + public SelectQueryBuilder granularity(String g) + { + granularity = QueryGranularity.fromString(g); + return this; + } + + public SelectQueryBuilder granularity(QueryGranularity g) + { + granularity = g; + return this; + } + + public SelectQueryBuilder dimensions(List d) + { + dimensions = d; + return this; + } + + public SelectQueryBuilder metrics(List m) + { + metrics = m; + return this; + } + + public SelectQueryBuilder pagingSpec(PagingSpec p) + { + pagingSpec = p; + return this; + } + } + + public static SelectQueryBuilder newSelectQueryBuilder() + { + return new SelectQueryBuilder(); + } } diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index cb3402eb2ce..1edf07b94d4 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -25,7 +25,6 @@ import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -41,7 +40,6 @@ import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryHelper; import io.druid.segment.incremental.IncrementalIndex; -import java.util.Arrays; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -50,72 +48,66 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -public class GroupByParallelQueryRunner implements QueryRunner +public class GroupByParallelQueryRunner implements QueryRunner { private static final Logger log = new Logger(GroupByParallelQueryRunner.class); - private final Iterable> queryables; + private final Iterable> queryables; private final ListeningExecutorService exec; - private final Ordering ordering; private final Supplier configSupplier; private final QueryWatcher queryWatcher; - public GroupByParallelQueryRunner( ExecutorService exec, - Ordering ordering, Supplier configSupplier, QueryWatcher queryWatcher, - QueryRunner... queryables - ) - { - this(exec, ordering, configSupplier, queryWatcher, Arrays.asList(queryables)); - } - - public GroupByParallelQueryRunner( - ExecutorService exec, - Ordering ordering, Supplier configSupplier, - QueryWatcher queryWatcher, - Iterable> queryables + Iterable> queryables ) { this.exec = MoreExecutors.listeningDecorator(exec); - this.ordering = ordering; this.queryWatcher = queryWatcher; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.configSupplier = configSupplier; } @Override - public Sequence run(final Query queryParam) + public Sequence run(final Query queryParam) { final GroupByQuery query = (GroupByQuery) queryParam; - final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( + final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( query, configSupplier.get() ); + final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); + final boolean bySegment = query.getContextBySegment(false); final int priority = query.getContextPriority(0); if (Iterables.isEmpty(queryables)) { log.warn("No queryables found."); } - ListenableFuture> futures = Futures.allAsList( + ListenableFuture> futures = Futures.allAsList( Lists.newArrayList( Iterables.transform( queryables, - new Function, ListenableFuture>() + new Function, ListenableFuture>() { @Override - public ListenableFuture apply(final QueryRunner input) + public ListenableFuture apply(final QueryRunner input) { return exec.submit( - new AbstractPrioritizedCallable(priority) + new AbstractPrioritizedCallable(priority) { @Override - public Boolean call() throws Exception + public Void call() throws Exception { try { - input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); - return true; + if (bySegment) { + input.run(queryParam) + .accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs); + } else { + input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); + } + + return null; } catch (QueryInterruptedException e) { throw Throwables.propagate(e); @@ -137,7 +129,7 @@ public class GroupByParallelQueryRunner implements QueryRunner try { queryWatcher.registerQuery(query, futures); final Number timeout = query.getContextValue("timeout", (Number) null); - if(timeout == null) { + if (timeout == null) { futures.get(); } else { futures.get(timeout.longValue(), TimeUnit.MILLISECONDS); @@ -148,10 +140,10 @@ public class GroupByParallelQueryRunner implements QueryRunner futures.cancel(true); throw new QueryInterruptedException("Query interrupted"); } - catch(CancellationException e) { + catch (CancellationException e) { throw new QueryInterruptedException("Query cancelled"); } - catch(TimeoutException e) { + catch (TimeoutException e) { log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); futures.cancel(true); throw new QueryInterruptedException("Query timeout"); @@ -160,7 +152,22 @@ public class GroupByParallelQueryRunner implements QueryRunner throw Throwables.propagate(e.getCause()); } - return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null)); - } + if (bySegment) { + return Sequences.simple(bySegmentAccumulatorPair.lhs); + } + return Sequences.simple( + Iterables.transform( + indexAccumulatorPair.lhs.iterableWithPostAggregations(null), + new Function() + { + @Override + public T apply(Row input) + { + return (T) input; + } + } + ) + ); + } } diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index 8299ecaad0a..d2722c622be 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -41,13 +41,17 @@ public abstract class QueryToolChest mergeSequences(Sequence> seqOfSequences); public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query); - public abstract Function makePreComputeManipulatorFn(QueryType query, MetricManipulationFn fn); + public abstract Function makePreComputeManipulatorFn( + QueryType query, + MetricManipulationFn fn + ); public Function makePostComputeManipulatorFn(QueryType query, MetricManipulationFn fn) { @@ -56,19 +60,23 @@ public abstract class QueryToolChest getResultTypeReference(); - public CacheStrategy getCacheStrategy(QueryType query) { + public CacheStrategy getCacheStrategy(QueryType query) + { return null; } - public QueryRunner preMergeQueryDecoration(QueryRunner runner) { + public QueryRunner preMergeQueryDecoration(QueryRunner runner) + { return runner; } - public QueryRunner postMergeQueryDecoration(QueryRunner runner) { + public QueryRunner postMergeQueryDecoration(QueryRunner runner) + { return runner; } - public List filterSegments(QueryType query, List segments) { + public List filterSegments(QueryType query, List segments) + { return segments; } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 98ac83e32cd..a0fab78da07 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -51,7 +51,6 @@ import io.druid.query.groupby.orderby.OrderByColumnSpec; import io.druid.query.spec.LegacySegmentSpec; import io.druid.query.spec.QuerySegmentSpec; -import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -111,14 +110,14 @@ public class GroupByQuery extends BaseQuery new Function, Sequence>() { @Override - public Sequence apply(@Nullable Sequence input) + public Sequence apply(Sequence input) { return Sequences.filter( input, new Predicate() { @Override - public boolean apply(@Nullable Row input) + public boolean apply(Row input) { return GroupByQuery.this.havingSpec.eval(input); } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index 8599c6bdd16..a3cc06c0b9a 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -81,7 +81,7 @@ public class GroupByQueryEngine this.intermediateResultsBufferPool = intermediateResultsBufferPool; } - public Sequence process(final GroupByQuery query, StorageAdapter storageAdapter) + public Sequence process(final GroupByQuery query, final StorageAdapter storageAdapter) { if (storageAdapter == null) { throw new ISE( @@ -104,41 +104,41 @@ public class GroupByQueryEngine return Sequences.concat( Sequences.withBaggage( - Sequences.map( - cursors, - new Function>() - { - @Override - public Sequence apply(@Nullable final Cursor cursor) - { - return new BaseSequence( - new BaseSequence.IteratorMaker() + Sequences.map( + cursors, + new Function>() + { + @Override + public Sequence apply(final Cursor cursor) { - @Override - public RowIterator make() - { - return new RowIterator(query, cursor, bufferHolder.get(), config.get()); - } + return new BaseSequence<>( + new BaseSequence.IteratorMaker() + { + @Override + public RowIterator make() + { + return new RowIterator(query, cursor, bufferHolder.get(), config.get()); + } - @Override - public void cleanup(RowIterator iterFromMake) - { - CloseQuietly.close(iterFromMake); - } + @Override + public void cleanup(RowIterator iterFromMake) + { + CloseQuietly.close(iterFromMake); + } + } + ); } - ); + } + ), + new Closeable() + { + @Override + public void close() throws IOException + { + CloseQuietly.close(bufferHolder); + } } - } - ), - new Closeable() - { - @Override - public void close() throws IOException - { - CloseQuietly.close(bufferHolder); - } - } - ) + ) ); } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index b7183b12354..093b2ee3c7a 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -24,21 +24,18 @@ import com.google.common.collect.Lists; import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; -import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.data.input.Rows; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexSchema; -import javax.annotation.Nullable; import java.util.List; public class GroupByQueryHelper { - public static Pair> createIndexAccumulatorPair( + public static Pair> createIndexAccumulatorPair( final GroupByQuery query, final GroupByQueryConfig config ) @@ -80,13 +77,18 @@ public class GroupByQueryHelper aggs.toArray(new AggregatorFactory[aggs.size()]) ); - Accumulator accumulator = new Accumulator() + Accumulator accumulator = new Accumulator() { @Override - public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in) + public IncrementalIndex accumulate(IncrementalIndex accumulated, T in) { - if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions), false) > config.getMaxResults()) { - throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults()); + if (in instanceof Row) { + if (accumulated.add(Rows.toCaseInsensitiveInputRow((Row) in, dimensions), false) + > config.getMaxResults()) { + throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults()); + } + } else { + throw new ISE("Unable to accumulate something of type [%s]", in.getClass()); } return accumulated; @@ -95,4 +97,18 @@ public class GroupByQueryHelper return new Pair<>(index, accumulator); } + public static Pair> createBySegmentAccumulatorPair() + { + List init = Lists.newArrayList(); + Accumulator accumulator = new Accumulator() + { + @Override + public List accumulate(List accumulated, T in) + { + accumulated.add(in); + return accumulated; + } + }; + return new Pair<>(init, accumulator); + } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 025af21a2d7..88216281c4e 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -20,25 +20,32 @@ package io.druid.query.groupby; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import com.google.inject.Inject; +import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; -import com.metamx.common.guava.ConcatSequence; +import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.collections.OrderedMergeSequence; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; +import io.druid.granularity.QueryGranularity; +import io.druid.query.CacheStrategy; import io.druid.query.DataSource; import io.druid.query.DataSourceUtil; import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryCacheHelper; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; @@ -46,11 +53,16 @@ import io.druid.query.SubqueryQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.PostAggregator; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.filter.DimFilter; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Minutes; +import java.nio.ByteBuffer; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -58,6 +70,11 @@ import java.util.Map; */ public class GroupByQueryQueryToolChest extends QueryToolChest { + private static final byte GROUPBY_QUERY = 0x14; + private static final TypeReference OBJECT_TYPE_REFERENCE = + new TypeReference() + { + }; private static final TypeReference TYPE_REFERENCE = new TypeReference() { }; @@ -66,16 +83,20 @@ public class GroupByQueryQueryToolChest extends QueryToolChest configSupplier; + private final ObjectMapper jsonMapper; private GroupByQueryEngine engine; // For running the outer query around a subquery @Inject public GroupByQueryQueryToolChest( Supplier configSupplier, + ObjectMapper jsonMapper, GroupByQueryEngine engine ) { this.configSupplier = configSupplier; + this.jsonMapper = jsonMapper; this.engine = engine; } @@ -87,11 +108,14 @@ public class GroupByQueryQueryToolChest extends QueryToolChest run(Query input) { - if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { - return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner); - } else { + if (input.getContextBySegment(false)) { return runner.run(input); } + + if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { + return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner); + } + return runner.run(input); } }; } @@ -169,7 +193,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) { - return new ConcatSequence<>(seqOfSequences); + return new OrderedMergeSequence<>(Ordering.natural().nullsFirst(), seqOfSequences); } @Override @@ -191,7 +215,10 @@ public class GroupByQueryQueryToolChest extends QueryToolChest makePreComputeManipulatorFn(final GroupByQuery query, final MetricManipulationFn fn) + public Function makePreComputeManipulatorFn( + final GroupByQuery query, + final MetricManipulationFn fn + ) { return new Function() { @@ -200,7 +227,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest values = Maps.newHashMap(((MapBasedRow) input).getEvent()); + final Map values = Maps.newHashMap(inputRow.getEvent()); for (AggregatorFactory agg : query.getAggregatorSpecs()) { values.put(agg.getName(), fn.manipulate(agg, inputRow.getEvent().get(agg.getName()))); } @@ -220,8 +247,119 @@ public class GroupByQueryQueryToolChest extends QueryToolChest preMergeQueryDecoration(QueryRunner runner) { - return new SubqueryQueryRunner( - new IntervalChunkingQueryRunner(runner, configSupplier.get().getChunkPeriod()) + return new SubqueryQueryRunner<>( + new IntervalChunkingQueryRunner<>(runner, configSupplier.get().getChunkPeriod()) ); } + + @Override + public CacheStrategy getCacheStrategy(final GroupByQuery query) + { + return new CacheStrategy() + { + @Override + public byte[] computeCacheKey(GroupByQuery query) + { + final DimFilter dimFilter = query.getDimFilter(); + final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey(); + final byte[] aggregatorBytes = QueryCacheHelper.computeAggregatorBytes(query.getAggregatorSpecs()); + final byte[] granularityBytes = query.getGranularity().cacheKey(); + final byte[][] dimensionsBytes = new byte[query.getDimensions().size()][]; + int dimensionsBytesSize = 0; + int index = 0; + for (DimensionSpec dimension : query.getDimensions()) { + dimensionsBytes[index] = dimension.getCacheKey(); + dimensionsBytesSize += dimensionsBytes[index].length; + ++index; + } + final byte[] havingBytes = query.getHavingSpec() == null ? new byte[]{} : query.getHavingSpec().getCacheKey(); + final byte[] limitBytes = query.getLimitSpec().getCacheKey(); + + ByteBuffer buffer = ByteBuffer + .allocate( + 1 + + granularityBytes.length + + filterBytes.length + + aggregatorBytes.length + + dimensionsBytesSize + + havingBytes.length + + limitBytes.length + ) + .put(GROUPBY_QUERY) + .put(granularityBytes) + .put(filterBytes) + .put(aggregatorBytes); + + for (byte[] dimensionsByte : dimensionsBytes) { + buffer.put(dimensionsByte); + } + + return buffer + .put(havingBytes) + .put(limitBytes) + .array(); + } + + @Override + public TypeReference getCacheObjectClazz() + { + return OBJECT_TYPE_REFERENCE; + } + + @Override + public Function prepareForCache() + { + return new Function() + { + @Override + public Object apply(Row input) + { + if (input instanceof MapBasedRow) { + final MapBasedRow row = (MapBasedRow) input; + final List retVal = Lists.newArrayListWithCapacity(2); + retVal.add(row.getTimestamp().getMillis()); + retVal.add(row.getEvent()); + + return retVal; + } + + throw new ISE("Don't know how to cache input rows of type[%s]", input.getClass()); + } + }; + } + + @Override + public Function pullFromCache() + { + return new Function() + { + private final QueryGranularity granularity = query.getGranularity(); + + @Override + public Row apply(Object input) + { + Iterator results = ((List) input).iterator(); + + DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue()); + + return new MapBasedRow( + timestamp, + (Map) jsonMapper.convertValue( + results.next(), + new TypeReference>() + { + } + ) + ); + } + }; + } + + @Override + public Sequence mergeSequences(Sequence> seqOfSequences) + { + return new MergeSequence<>(Ordering.natural().nullsFirst(), seqOfSequences); + } + }; + } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java index e8634089c2f..1cc5600c3b6 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -22,18 +22,20 @@ package io.druid.query.groupby; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.collect.Ordering; -import com.google.common.primitives.Longs; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.ISE; +import com.metamx.common.Pair; +import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.ExecutorExecutingSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; import io.druid.data.input.Row; +import io.druid.query.AbstractPrioritizedCallable; import io.druid.query.ConcatQueryRunner; import io.druid.query.GroupByParallelQueryRunner; import io.druid.query.Query; @@ -44,7 +46,9 @@ import io.druid.query.QueryToolChest; import io.druid.query.QueryWatcher; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; +import io.druid.segment.incremental.IncrementalIndex; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -88,8 +92,9 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory( + return new ConcatQueryRunner<>( Sequences.map( Sequences.simple(queryRunners), new Function, QueryRunner>() @@ -102,34 +107,54 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory run(final Query query) { + final GroupByQuery queryParam = (GroupByQuery) query; + final Pair> indexAccumulatorPair = GroupByQueryHelper + .createIndexAccumulatorPair( + queryParam, + config.get() + ); + final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); + final int priority = query.getContextPriority(0); + final boolean bySegment = query.getContextBySegment(false); - ListenableFuture> future = queryExecutor.submit( - new Callable>() + final ListenableFuture future = queryExecutor.submit( + new AbstractPrioritizedCallable(priority) { @Override - public Sequence call() throws Exception + public Void call() throws Exception { - return new ExecutorExecutingSequence( - input.run(query), - queryExecutor - ); + if (bySegment) { + input.run(queryParam) + .accumulate( + bySegmentAccumulatorPair.lhs, + bySegmentAccumulatorPair.rhs + ); + } else { + input.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); + } + + return null; } } ); try { queryWatcher.registerQuery(query, future); - final Number timeout = query.getContextValue("timeout", (Number)null); - return timeout == null ? future.get() : future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + final Number timeout = query.getContextValue("timeout", (Number) null); + if (timeout == null) { + future.get(); + } else { + future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + } } catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); future.cancel(true); throw new QueryInterruptedException("Query interrupted"); } - catch(CancellationException e) { + catch (CancellationException e) { throw new QueryInterruptedException("Query cancelled"); } - catch(TimeoutException e) { + catch (TimeoutException e) { log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); future.cancel(true); throw new QueryInterruptedException("Query timeout"); @@ -137,6 +162,12 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory - { - @Override - public int compare(Row left, Row right) - { - return Longs.compare(left.getTimestampFromEpoch(), right.getTimestampFromEpoch()); - } - } } diff --git a/processing/src/main/java/io/druid/query/groupby/having/AlwaysHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/AlwaysHavingSpec.java index 51b938a9891..3868378d7a2 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/AlwaysHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/AlwaysHavingSpec.java @@ -22,12 +22,21 @@ package io.druid.query.groupby.having; import io.druid.data.input.Row; /** + * A "having" spec that always evaluates to true */ public class AlwaysHavingSpec implements HavingSpec { + private static final byte CACHE_KEY = 0x0; + @Override public boolean eval(Row row) { return true; } + + @Override + public byte[] getCacheKey() + { + return new byte[]{CACHE_KEY}; + } } diff --git a/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java index 8e0fdb6a91f..4b0a0ce30fd 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.druid.data.input.Row; +import java.nio.ByteBuffer; import java.util.List; /** @@ -31,6 +32,8 @@ import java.util.List; */ public class AndHavingSpec implements HavingSpec { + private static final byte CACHE_KEY = 0x2; + private List havingSpecs; @JsonCreator @@ -57,6 +60,25 @@ public class AndHavingSpec implements HavingSpec return true; } + @Override + public byte[] getCacheKey() + { + final byte[][] havingBytes = new byte[havingSpecs.size()][]; + int havingBytesSize = 0; + int index = 0; + for (HavingSpec havingSpec : havingSpecs) { + havingBytes[index] = havingSpec.getCacheKey(); + havingBytesSize += havingBytes[index].length; + ++index; + } + + ByteBuffer buffer = ByteBuffer.allocate(1 + havingBytesSize).put(CACHE_KEY); + for (byte[] havingByte : havingBytes) { + buffer.put(havingByte); + } + return buffer.array(); + } + @Override public boolean equals(Object o) { diff --git a/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java index 34bd7b887f2..bc26524e60b 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java @@ -21,14 +21,20 @@ package io.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.primitives.Bytes; import io.druid.data.input.Row; +import java.nio.ByteBuffer; +import java.util.Arrays; + /** * The "=" operator in a "having" clause. This is similar to SQL's "having aggregation = value", * except that in SQL an aggregation is an expression instead of an aggregation name as in Druid. */ public class EqualToHavingSpec implements HavingSpec { + private static final byte CACHE_KEY = 0x3; + private String aggregationName; private Number value; @@ -62,6 +68,18 @@ public class EqualToHavingSpec implements HavingSpec return Float.compare(value.floatValue(), metricValue) == 0; } + @Override + public byte[] getCacheKey() + { + final byte[] aggBytes = aggregationName.getBytes(); + final byte[] valBytes = Bytes.toArray(Arrays.asList(value)); + return ByteBuffer.allocate(1 + aggBytes.length + valBytes.length) + .put(CACHE_KEY) + .put(aggBytes) + .put(valBytes) + .array(); + } + /** * This method treats internal value as double mainly for ease of test. */ diff --git a/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java index 0c2c9e7810b..6d4c2c78071 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java @@ -21,7 +21,12 @@ package io.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.primitives.Bytes; import io.druid.data.input.Row; +import io.druid.query.Result; + +import java.nio.ByteBuffer; +import java.util.Arrays; /** * The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value", @@ -29,6 +34,8 @@ import io.druid.data.input.Row; */ public class GreaterThanHavingSpec implements HavingSpec { + private static final byte CACHE_KEY = 0x4; + private String aggregationName; private Number value; @@ -62,6 +69,18 @@ public class GreaterThanHavingSpec implements HavingSpec return Float.compare(metricValue, value.floatValue()) > 0; } + @Override + public byte[] getCacheKey() + { + final byte[] aggBytes = aggregationName.getBytes(); + final byte[] valBytes = Bytes.toArray(Arrays.asList(value)); + return ByteBuffer.allocate(1 + aggBytes.length + valBytes.length) + .put(CACHE_KEY) + .put(aggBytes) + .put(valBytes) + .array(); + } + /** * This method treats internal value as double mainly for ease of test. */ diff --git a/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java index 00a8389cff0..37ad9e1b8df 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java @@ -38,6 +38,11 @@ import io.druid.data.input.Row; }) public interface HavingSpec { + // Atoms for easy combination, but for now they are mostly useful + // for testing. + public static final HavingSpec NEVER = new NeverHavingSpec(); + public static final HavingSpec ALWAYS = new AlwaysHavingSpec(); + /** * Evaluates if a given row satisfies the having spec. * @@ -49,22 +54,5 @@ public interface HavingSpec */ public boolean eval(Row row); - // Atoms for easy combination, but for now they are mostly useful - // for testing. - /** - * A "having" spec that always evaluates to false - */ - public static final HavingSpec NEVER = new HavingSpec() - { - @Override - public boolean eval(Row row) - { - return false; - } - }; - - /** - * A "having" spec that always evaluates to true - */ - public static final HavingSpec ALWAYS = new AlwaysHavingSpec(); + public byte[] getCacheKey(); } diff --git a/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java index ce8dd20b661..1f1a27bd9a7 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java @@ -20,7 +20,12 @@ package io.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.primitives.Bytes; import io.druid.data.input.Row; +import io.druid.query.Result; + +import java.nio.ByteBuffer; +import java.util.Arrays; /** * The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value", @@ -28,14 +33,15 @@ import io.druid.data.input.Row; */ public class LessThanHavingSpec implements HavingSpec { + private static final byte CACHE_KEY = 0x5; + private String aggregationName; private Number value; - public LessThanHavingSpec - ( - @JsonProperty("aggregation") String aggName, - @JsonProperty("value") Number value - ) + public LessThanHavingSpec( + @JsonProperty("aggregation") String aggName, + @JsonProperty("value") Number value + ) { this.aggregationName = aggName; this.value = value; @@ -61,6 +67,18 @@ public class LessThanHavingSpec implements HavingSpec return Float.compare(metricValue, value.floatValue()) < 0; } + @Override + public byte[] getCacheKey() + { + final byte[] aggBytes = aggregationName.getBytes(); + final byte[] valBytes = Bytes.toArray(Arrays.asList(value)); + return ByteBuffer.allocate(1 + aggBytes.length + valBytes.length) + .put(CACHE_KEY) + .put(aggBytes) + .put(valBytes) + .array(); + } + /** * This method treats internal value as double mainly for ease of test. */ diff --git a/processing/src/main/java/io/druid/query/groupby/having/NeverHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/NeverHavingSpec.java new file mode 100644 index 00000000000..bad1cd03f37 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/having/NeverHavingSpec.java @@ -0,0 +1,42 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.groupby.having; + +import io.druid.data.input.Row; + +/** + * A "having" spec that always evaluates to false + */ +public class NeverHavingSpec implements HavingSpec +{ + private static final byte CACHE_KEY = 0x1; + + @Override + public boolean eval(Row row) + { + return false; + } + + @Override + public byte[] getCacheKey() + { + return new byte[]{CACHE_KEY}; + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java index e963e0154e5..590e47d3d0b 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java @@ -22,12 +22,17 @@ package io.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.Result; + +import java.nio.ByteBuffer; /** * The logical "not" operator for the "having" clause. */ public class NotHavingSpec implements HavingSpec { + private static final byte CACHE_KEY = 0x6; + private HavingSpec havingSpec; @JsonCreator @@ -48,6 +53,15 @@ public class NotHavingSpec implements HavingSpec return !havingSpec.eval(row); } + @Override + public byte[] getCacheKey() + { + return ByteBuffer.allocate(1 + havingSpec.getCacheKey().length) + .put(CACHE_KEY) + .put(havingSpec.getCacheKey()) + .array(); + } + @Override public String toString() { diff --git a/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java index 73f46dfaa24..c413a9cf2f0 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java @@ -23,7 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.druid.data.input.Row; +import io.druid.query.Result; +import java.nio.ByteBuffer; import java.util.List; /** @@ -31,23 +33,27 @@ import java.util.List; */ public class OrHavingSpec implements HavingSpec { + private static final byte CACHE_KEY = 0x7; + private List havingSpecs; @JsonCreator - public OrHavingSpec(@JsonProperty("havingSpecs") List havingSpecs) { + public OrHavingSpec(@JsonProperty("havingSpecs") List havingSpecs) + { this.havingSpecs = havingSpecs == null ? ImmutableList.of() : havingSpecs; } @JsonProperty("havingSpecs") - public List getHavingSpecs(){ + public List getHavingSpecs() + { return havingSpecs; } @Override public boolean eval(Row row) { - for(HavingSpec havingSpec: havingSpecs) { - if(havingSpec.eval(row)){ + for (HavingSpec havingSpec : havingSpecs) { + if (havingSpec.eval(row)) { return true; } } @@ -55,15 +61,40 @@ public class OrHavingSpec implements HavingSpec return false; } + @Override + public byte[] getCacheKey() + { + final byte[][] havingBytes = new byte[havingSpecs.size()][]; + int havingBytesSize = 0; + int index = 0; + for (HavingSpec havingSpec : havingSpecs) { + havingBytes[index] = havingSpec.getCacheKey(); + havingBytesSize += havingBytes[index].length; + ++index; + } + + ByteBuffer buffer = ByteBuffer.allocate(1 + havingBytesSize).put(CACHE_KEY); + for (byte[] havingByte : havingBytes) { + buffer.put(havingByte); + } + return buffer.array(); + } + @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } OrHavingSpec that = (OrHavingSpec) o; - if (havingSpecs != null ? !havingSpecs.equals(that.havingSpecs) : that.havingSpecs != null) return false; + if (havingSpecs != null ? !havingSpecs.equals(that.havingSpecs) : that.havingSpecs != null) { + return false; + } return true; } diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java index 3d78e112cb5..bf6fc138ad4 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; @@ -37,6 +38,7 @@ import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; import javax.annotation.Nullable; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -46,6 +48,8 @@ import java.util.Map; */ public class DefaultLimitSpec implements LimitSpec { + private static final byte CACHE_KEY = 0x1; + private final List columns; private final int limit; @@ -196,7 +200,7 @@ public class DefaultLimitSpec implements LimitSpec @Override public Sequence apply( - @Nullable Sequence input + Sequence input ) { return Sequences.limit(input, limit); @@ -275,12 +279,12 @@ public class DefaultLimitSpec implements LimitSpec { this.limit = limit; - this.sorter = new TopNSorter(ordering); + this.sorter = new TopNSorter<>(ordering); } @Override public Sequence apply( - @Nullable Sequence input + Sequence input ) { final ArrayList materializedList = Sequences.toList(input, Lists.newArrayList()); @@ -347,4 +351,25 @@ public class DefaultLimitSpec implements LimitSpec result = 31 * result + limit; return result; } + + @Override + public byte[] getCacheKey() + { + final byte[][] columnBytes = new byte[columns.size()][]; + int columnsBytesSize = 0; + int index = 0; + for (OrderByColumnSpec column : columns) { + columnBytes[index] = column.getCacheKey(); + columnsBytesSize += columnBytes[index].length; + ++index; + } + + ByteBuffer buffer = ByteBuffer.allocate(1 + columnsBytesSize + 4) + .put(CACHE_KEY); + for (byte[] columnByte : columnBytes) { + buffer.put(columnByte); + } + buffer.put(Ints.toByteArray(limit)); + return buffer.array(); + } } diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java index fa50d62016c..3e8153355e0 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Function; import com.metamx.common.guava.Sequence; import io.druid.data.input.Row; +import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; @@ -45,4 +46,6 @@ public interface LimitSpec ); public LimitSpec merge(LimitSpec other); + + public byte[] getCacheKey(); } diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java index e71038d4918..bc05f1cb352 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.base.Functions; import com.metamx.common.guava.Sequence; import io.druid.data.input.Row; +import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; @@ -33,6 +34,8 @@ import java.util.List; */ public class NoopLimitSpec implements LimitSpec { + private static final byte CACHE_KEY = 0x0; + @Override public Function, Sequence> build( List dimensions, List aggs, List postAggs @@ -63,4 +66,10 @@ public class NoopLimitSpec implements LimitSpec public int hashCode() { return 0; } + + @Override + public byte[] getCacheKey() + { + return new byte[]{CACHE_KEY}; + } } diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/OrderByColumnSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/OrderByColumnSpec.java index 147f1911816..bf0432ad5c9 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/OrderByColumnSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/OrderByColumnSpec.java @@ -29,6 +29,7 @@ import com.metamx.common.IAE; import com.metamx.common.ISE; import javax.annotation.Nullable; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -37,14 +38,19 @@ import java.util.Map; */ public class OrderByColumnSpec { + public static enum Direction + { + ASCENDING, + DESCENDING + } + /** * Maintain a map of the enum values so that we can just do a lookup and get a null if it doesn't exist instead * of an exception thrown. */ private static final Map stupidEnumMap; - static - { + static { final ImmutableMap.Builder bob = ImmutableMap.builder(); for (Direction direction : Direction.values()) { bob.put(direction.toString(), direction); @@ -62,16 +68,14 @@ public class OrderByColumnSpec if (obj instanceof String) { return new OrderByColumnSpec(obj.toString(), null); - } - else if (obj instanceof Map) { + } else if (obj instanceof Map) { final Map map = (Map) obj; final String dimension = map.get("dimension").toString(); final Direction direction = determineDirection(map.get("direction")); return new OrderByColumnSpec(dimension, direction); - } - else { + } else { throw new ISE("Cannot build an OrderByColumnSpec from a %s", obj.getClass()); } } @@ -176,9 +180,14 @@ public class OrderByColumnSpec '}'; } - public static enum Direction + public byte[] getCacheKey() { - ASCENDING, - DESCENDING + final byte[] dimensionBytes = dimension.getBytes(); + final byte[] directionBytes = direction.name().getBytes(); + + return ByteBuffer.allocate(dimensionBytes.length + dimensionBytes.length) + .put(dimensionBytes) + .put(directionBytes) + .array(); } } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index a1f649fed06..f21665b1a38 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -27,6 +27,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; @@ -37,6 +38,7 @@ import io.druid.common.utils.JodaUtils; import io.druid.query.CacheStrategy; import io.druid.query.DataSourceUtil; import io.druid.query.Query; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.ResultMergeQueryRunner; @@ -60,6 +62,14 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest mergeResults(final QueryRunner runner) { diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index c8e7208638c..00909ea6cbd 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -55,16 +55,19 @@ import java.util.concurrent.TimeoutException; public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory { private static final SegmentAnalyzer analyzer = new SegmentAnalyzer(); - private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest(); private static final Logger log = new Logger(SegmentMetadataQueryRunnerFactory.class); + + private final SegmentMetadataQueryQueryToolChest toolChest; private final QueryWatcher queryWatcher; @Inject public SegmentMetadataQueryRunnerFactory( + SegmentMetadataQueryQueryToolChest toolChest, QueryWatcher queryWatcher ) { + this.toolChest = toolChest; this.queryWatcher = queryWatcher; } diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index a0a2487cca4..e24239f8896 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -73,6 +73,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest OBJECT_TYPE_REFERENCE = new TypeReference() { }; + private final SearchQueryConfig config; @Inject diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index f12c5c18544..95480302336 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -70,6 +70,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest>() { }; + private final QueryConfig config; private final ObjectMapper jsonMapper; diff --git a/processing/src/main/java/io/druid/query/select/SelectResultValue.java b/processing/src/main/java/io/druid/query/select/SelectResultValue.java index d3ce5d6ef68..5493723702f 100644 --- a/processing/src/main/java/io/druid/query/select/SelectResultValue.java +++ b/processing/src/main/java/io/druid/query/select/SelectResultValue.java @@ -21,12 +21,7 @@ package io.druid.query.select; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonValue; -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import com.metamx.common.ISE; -import javax.annotation.Nullable; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -41,7 +36,8 @@ public class SelectResultValue implements Iterable @JsonCreator public SelectResultValue( @JsonProperty("pagingIdentifiers") Map pagingIdentifiers, - @JsonProperty("events") List events) + @JsonProperty("events") List events + ) { this.pagingIdentifiers = pagingIdentifiers; this.events = events; diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 7d0a5781ea4..e7c7e0849b5 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -71,6 +71,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest>() { }; + private final QueryConfig config; @Inject @@ -173,7 +174,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest, Object>() { @Override - public Object apply(@Nullable final Result input) + public Object apply(final Result input) { TimeseriesResultValue results = input.getValue(); final List retVal = Lists.newArrayListWithCapacity(1 + aggs.size()); diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java index 724d4818226..c46363ac389 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java @@ -19,13 +19,11 @@ package io.druid.query.timeseries; -import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; -import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index f275651383f..ab1faebf67e 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -72,6 +72,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest OBJECT_TYPE_REFERENCE = new TypeReference() { }; + private final TopNQueryConfig config; @Inject diff --git a/processing/src/main/java/io/druid/segment/incremental/SpatialDimensionRowFormatter.java b/processing/src/main/java/io/druid/segment/incremental/SpatialDimensionRowFormatter.java index 82311045d27..a9bec20d29c 100644 --- a/processing/src/main/java/io/druid/segment/incremental/SpatialDimensionRowFormatter.java +++ b/processing/src/main/java/io/druid/segment/incremental/SpatialDimensionRowFormatter.java @@ -32,7 +32,9 @@ import com.google.common.primitives.Floats; import com.metamx.common.ISE; import com.metamx.common.parsers.ParseException; import io.druid.data.input.InputRow; +import io.druid.data.input.Row; import io.druid.data.input.impl.SpatialDimensionSchema; +import org.joda.time.DateTime; import java.util.Arrays; import java.util.List; @@ -128,6 +130,12 @@ public class SpatialDimensionRowFormatter return row.getTimestampFromEpoch(); } + @Override + public DateTime getTimestamp() + { + return row.getTimestamp(); + } + @Override public List getDimension(String dimension) { @@ -157,6 +165,12 @@ public class SpatialDimensionRowFormatter { return row.toString(); } + + @Override + public int compareTo(Row o) + { + return getTimestamp().compareTo(o.getTimestamp()); + } }; if (!spatialPartialDimNames.isEmpty()) { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index d718c0775d9..afe13f31184 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -19,6 +19,7 @@ package io.druid.query.groupby; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; @@ -35,6 +36,7 @@ import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.granularity.PeriodGranularity; import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -99,41 +101,71 @@ public class GroupByQueryRunnerTest @Parameterized.Parameters public static Collection constructorFeeder() throws IOException { - GroupByQueryConfig config = new GroupByQueryConfig(); + final ObjectMapper mapper = new DefaultObjectMapper(); + final StupidPool pool = new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ); + + final GroupByQueryConfig config = new GroupByQueryConfig(); config.setMaxIntermediateRows(10000); final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByQueryEngine engine = new GroupByQueryEngine( - configSupplier, - new StupidPool( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(1024 * 1024); - } - } - ) - ); + final GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool); final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( engine, QueryRunnerTestHelper.NOOP_QUERYWATCHER, configSupplier, - new GroupByQueryQueryToolChest(configSupplier, engine) + new GroupByQueryQueryToolChest(configSupplier, mapper, engine) ); + GroupByQueryConfig singleThreadedConfig = new GroupByQueryConfig() + { + @Override + public boolean isSingleThreaded() + { + return true; + } + }; + singleThreadedConfig.setMaxIntermediateRows(10000); + + final Supplier singleThreadedConfigSupplier = Suppliers.ofInstance(singleThreadedConfig); + final GroupByQueryEngine singleThreadEngine = new GroupByQueryEngine(singleThreadedConfigSupplier, pool); + + final GroupByQueryRunnerFactory singleThreadFactory = new GroupByQueryRunnerFactory( + singleThreadEngine, + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + singleThreadedConfigSupplier, + new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine) + ); + + + Function function = new Function() + { + @Override + public Object apply(@Nullable Object input) + { + return new Object[]{factory, ((Object[]) input)[0]}; + } + }; + return Lists.newArrayList( - Iterables.transform( - QueryRunnerTestHelper.makeQueryRunners(factory), new Function() - { - @Override - public Object apply(@Nullable Object input) - { - return new Object[]{factory, ((Object[]) input)[0]}; - } - } + Iterables.concat( + Iterables.transform( + QueryRunnerTestHelper.makeQueryRunners(factory), + function + ), + Iterables.transform( + QueryRunnerTestHelper.makeQueryRunners(singleThreadFactory), + function + ) ) ); } @@ -795,7 +827,11 @@ public class GroupByQueryRunnerTest ) ); - QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); + QueryRunner mergeRunner = new GroupByQueryQueryToolChest( + configSupplier, + new DefaultObjectMapper(), + engine + ).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); } @@ -844,7 +880,11 @@ public class GroupByQueryRunnerTest ) ); - QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); + QueryRunner mergeRunner = new GroupByQueryQueryToolChest( + configSupplier, + new DefaultObjectMapper(), + engine + ).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); } @@ -893,7 +933,11 @@ public class GroupByQueryRunnerTest ) ); - QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); + QueryRunner mergeRunner = new GroupByQueryQueryToolChest( + configSupplier, + new DefaultObjectMapper(), + engine + ).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); } @@ -1182,6 +1226,12 @@ public class GroupByQueryRunnerTest { return (row.getFloatMetric("idx_subpostagg") < 3800); } + + @Override + public byte[] getCacheKey() + { + return new byte[0]; + } } ) .addOrderByColumn("alias") @@ -1281,6 +1331,12 @@ public class GroupByQueryRunnerTest { return (row.getFloatMetric("idx_subpostagg") < 3800); } + + @Override + public byte[] getCacheKey() + { + return new byte[0]; + } } ) .addOrderByColumn("alias") @@ -1325,11 +1381,61 @@ public class GroupByQueryRunnerTest .build(); List expectedResults = Arrays.asList( - createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 11119.0, "js_outer_agg", 123.92274475097656), - createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 11078.0, "js_outer_agg", 82.62254333496094), - createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 11121.0, "js_outer_agg", 125.58358001708984), - createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 11120.0, "js_outer_agg", 124.13470458984375), - createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 11158.0, "js_outer_agg", 162.74722290039062) + createExpectedRow( + "2011-04-01", + "alias", + "travel", + "rows", + 1L, + "idx", + 11119.0, + "js_outer_agg", + 123.92274475097656 + ), + createExpectedRow( + "2011-04-01", + "alias", + "technology", + "rows", + 1L, + "idx", + 11078.0, + "js_outer_agg", + 82.62254333496094 + ), + createExpectedRow( + "2011-04-01", + "alias", + "news", + "rows", + 1L, + "idx", + 11121.0, + "js_outer_agg", + 125.58358001708984 + ), + createExpectedRow( + "2011-04-01", + "alias", + "health", + "rows", + 1L, + "idx", + 11120.0, + "js_outer_agg", + 124.13470458984375 + ), + createExpectedRow( + "2011-04-01", + "alias", + "entertainment", + "rows", + 1L, + "idx", + 11158.0, + "js_outer_agg", + 162.74722290039062 + ) ); // Subqueries are handled by the ToolChest @@ -1350,7 +1456,6 @@ public class GroupByQueryRunnerTest return Sequences.toList(queryResult, Lists.newArrayList()); } - private Row createExpectedRow(final String timestamp, Object... vals) { return createExpectedRow(new DateTime(timestamp), vals); @@ -1365,6 +1470,7 @@ public class GroupByQueryRunnerTest theVals.put(vals[i].toString(), vals[i + 1]); } - return new MapBasedRow(new DateTime(timestamp), theVals); + DateTime ts = new DateTime(timestamp); + return new MapBasedRow(ts, theVals); } } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index a9fb506ca0b..dc2eb6657e3 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -28,6 +28,7 @@ import com.metamx.common.guava.Sequences; import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; +import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; @@ -74,7 +75,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest engine, QueryRunnerTestHelper.NOOP_QUERYWATCHER, configSupplier, - new GroupByQueryQueryToolChest(configSupplier, engine) + new GroupByQueryQueryToolChest(configSupplier, new DefaultObjectMapper(), engine) ); final Collection objects = QueryRunnerTestHelper.makeQueryRunners(factory); @@ -98,13 +99,13 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest return Sequences.map( groupByRunner.run( GroupByQuery.builder() - .setDataSource(tsQuery.getDataSource()) - .setQuerySegmentSpec(tsQuery.getQuerySegmentSpec()) - .setGranularity(tsQuery.getGranularity()) - .setDimFilter(tsQuery.getDimensionsFilter()) - .setAggregatorSpecs(tsQuery.getAggregatorSpecs()) - .setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs()) - .build() + .setDataSource(tsQuery.getDataSource()) + .setQuerySegmentSpec(tsQuery.getQuerySegmentSpec()) + .setGranularity(tsQuery.getGranularity()) + .setDimFilter(tsQuery.getDimensionsFilter()) + .setAggregatorSpecs(tsQuery.getAggregatorSpecs()) + .setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs()) + .build() ), new Function>() { diff --git a/processing/src/test/java/io/druid/query/groupby/having/HavingSpecTest.java b/processing/src/test/java/io/druid/query/groupby/having/HavingSpecTest.java index 460ace869b7..b476ffcd608 100644 --- a/processing/src/test/java/io/druid/query/groupby/having/HavingSpecTest.java +++ b/processing/src/test/java/io/druid/query/groupby/having/HavingSpecTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.Row; import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Result; import org.junit.Test; import java.util.ArrayList; @@ -148,6 +149,12 @@ public class HavingSpecTest counter.incrementAndGet(); return value; } + + @Override + public byte[] getCacheKey() + { + return new byte[0]; + } } @Test diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java index 70c65f8da88..b9834c106cc 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java @@ -22,10 +22,10 @@ package io.druid.query.metadata; import com.google.common.collect.Lists; import com.metamx.common.guava.Sequences; import io.druid.query.LegacyDataSource; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; -import io.druid.query.QueryWatcher; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.SegmentAnalysis; import io.druid.query.metadata.metadata.SegmentMetadataQuery; @@ -67,7 +67,7 @@ public class SegmentAnalyzerTest final SegmentAnalysis analysis = results.get(0); Assert.assertEquals("test_1", analysis.getId()); - final Map columns = analysis.getColumns(); + final Map columns = analysis.getColumns(); Assert.assertEquals(TestIndex.COLUMNS.length, columns.size()); // All columns including time for (String dimension : TestIndex.DIMENSIONS) { @@ -91,12 +91,16 @@ public class SegmentAnalyzerTest * *Awesome* method name auto-generated by IntelliJ! I love IntelliJ! * * @param index + * * @return */ private List getSegmentAnalysises(Segment index) { final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner( - (QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER), index + (QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory( + new SegmentMetadataQueryQueryToolChest(new QueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), index ); final SegmentMetadataQuery query = new SegmentMetadataQuery( diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java index ed1740460f8..0c58576c1d4 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java @@ -26,6 +26,7 @@ import com.metamx.common.guava.Sequences; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Druids; import io.druid.query.Query; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -45,7 +46,9 @@ public class SegmentMetadataQueryTest { @SuppressWarnings("unchecked") private final QueryRunner runner = makeQueryRunner( - new SegmentMetadataQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER) + new SegmentMetadataQueryRunnerFactory( + new SegmentMetadataQueryQueryToolChest(new QueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER) ); private ObjectMapper mapper = new DefaultObjectMapper(); diff --git a/processing/src/test/java/io/druid/segment/TestHelper.java b/processing/src/test/java/io/druid/segment/TestHelper.java index a1fab7efecb..68f4a908049 100644 --- a/processing/src/test/java/io/druid/segment/TestHelper.java +++ b/processing/src/test/java/io/druid/segment/TestHelper.java @@ -22,6 +22,7 @@ package io.druid.segment; import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import io.druid.data.input.Row; import io.druid.query.Result; import org.junit.Assert; @@ -41,7 +42,11 @@ public class TestHelper assertResults(expectedResults, results, ""); } - public static void assertExpectedResults(Iterable> expectedResults, Iterable> results, String failMsg) + public static void assertExpectedResults( + Iterable> expectedResults, + Iterable> results, + String failMsg + ) { assertResults(expectedResults, results, failMsg); } @@ -56,23 +61,33 @@ public class TestHelper assertObjects(expectedResults, Sequences.toList(results, Lists.newArrayList()), failMsg); } - private static void assertResults(Iterable> expectedResults, Iterable> actualResults, String failMsg) + private static void assertResults( + Iterable> expectedResults, + Iterable> actualResults, + String failMsg + ) { Iterator resultsIter = actualResults.iterator(); Iterator resultsIter2 = actualResults.iterator(); Iterator expectedResultsIter = expectedResults.iterator(); while (resultsIter.hasNext() && resultsIter2.hasNext() && expectedResultsIter.hasNext()) { - Result expectedNext = expectedResultsIter.next(); - final Result next = resultsIter.next(); - final Result next2 = resultsIter2.next(); + Object expectedNext = expectedResultsIter.next(); + final Object next = resultsIter.next(); + final Object next2 = resultsIter2.next(); - assertResult(failMsg, expectedNext, next); - assertResult( - String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg), - expectedNext, - next2 - ); + if (expectedNext instanceof Row) { + // HACK! Special casing for groupBy + Assert.assertEquals(failMsg, expectedNext, next); + Assert.assertEquals(failMsg, expectedNext, next2); + } else { + assertResult(failMsg, (Result) expectedNext, (Result) next); + assertResult( + String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg), + (Result) expectedNext, + (Result) next2 + ); + } } if (resultsIter.hasNext()) { @@ -90,7 +105,9 @@ public class TestHelper if (expectedResultsIter.hasNext()) { Assert.fail( String.format( - "%s: Expected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next() + "%s: Expected expectedResultsIter to be exhausted, next element was %s", + failMsg, + expectedResultsIter.next() ) ); } @@ -130,7 +147,9 @@ public class TestHelper if (expectedResultsIter.hasNext()) { Assert.fail( String.format( - "%s: Expected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next() + "%s: Expected expectedResultsIter to be exhausted, next element was %s", + failMsg, + expectedResultsIter.next() ) ); } diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index fe38811f332..1e296f0a4d1 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -127,9 +127,12 @@ public class CachingClusteredClient implements QueryRunner final boolean useCache = query.getContextUseCache(true) && strategy != null - && cacheConfig.isUseCache(); + && cacheConfig.isUseCache() + && cacheConfig.isQueryCacheable(query); final boolean populateCache = query.getContextPopulateCache(true) - && strategy != null && cacheConfig.isPopulateCache(); + && strategy != null + && cacheConfig.isPopulateCache() + && cacheConfig.isQueryCacheable(query); final boolean isBySegment = query.getContextBySegment(false); @@ -239,6 +242,15 @@ public class CachingClusteredClient implements QueryRunner } descriptors.add(segment.rhs); + System.out.println( + String.format( + "Server %s has %s_%s_%s", + server.getHost(), + segment.rhs.getInterval(), + segment.rhs.getPartitionNumber(), + segment.rhs.getVersion() + ) + ); } } diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index ad2718e6a45..41d3813ada7 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -78,14 +78,16 @@ public class CachingQueryRunner implements QueryRunner final boolean populateCache = query.getContextPopulateCache(true) && strategy != null - && cacheConfig.isPopulateCache(); + && cacheConfig.isPopulateCache() + && cacheConfig.isQueryCacheable(query); final boolean useCache = query.getContextUseCache(true) - && strategy != null - && cacheConfig.isUseCache(); + && strategy != null + && cacheConfig.isUseCache() + && cacheConfig.isQueryCacheable(query); final Cache.NamedKey key; - if(strategy != null && (useCache || populateCache)) { + if (strategy != null && (useCache || populateCache)) { key = CacheUtil.computeSegmentCacheKey( segmentIdentifier, segmentDescriptor, @@ -95,10 +97,10 @@ public class CachingQueryRunner implements QueryRunner key = null; } - if(useCache) { + if (useCache) { final Function cacheFn = strategy.pullFromCache(); final byte[] cachedResult = cache.get(key); - if(cachedResult != null) { + if (cachedResult != null) { final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz(); return Sequences.map( diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java b/server/src/main/java/io/druid/client/cache/CacheConfig.java index 817cdb4070a..26c5f383f2d 100644 --- a/server/src/main/java/io/druid/client/cache/CacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java @@ -20,16 +20,25 @@ package io.druid.client.cache; import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.query.Query; + +import java.util.Arrays; +import java.util.List; public class CacheConfig { public static String USE_CACHE = "useCache"; public static String POPULATE_CACHE = "populateCache"; + @JsonProperty private boolean useCache = true; + @JsonProperty private boolean populateCache = true; + @JsonProperty + private List unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT); + public boolean isPopulateCache() { return populateCache; @@ -39,4 +48,10 @@ public class CacheConfig { return useCache; } + + public boolean isQueryCacheable(Query query) + { + // O(n) impl, but I don't think we'll ever have a million query types here + return !unCacheable.contains(query.getType()); + } } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/WikipediaIrcDecoder.java b/server/src/main/java/io/druid/segment/realtime/firehose/WikipediaIrcDecoder.java index d2c671930e6..4695521089f 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/WikipediaIrcDecoder.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/WikipediaIrcDecoder.java @@ -29,6 +29,7 @@ import com.maxmind.geoip2.exception.GeoIp2Exception; import com.maxmind.geoip2.model.Omni; import com.metamx.common.logger.Logger; import io.druid.data.input.InputRow; +import io.druid.data.input.Row; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; @@ -211,6 +212,12 @@ class WikipediaIrcDecoder implements IrcDecoder return timestamp.getMillis(); } + @Override + public DateTime getTimestamp() + { + return timestamp; + } + @Override public List getDimension(String dimension) { @@ -234,6 +241,12 @@ class WikipediaIrcDecoder implements IrcDecoder return metrics.get(metric); } + @Override + public int compareTo(Row o) + { + return timestamp.compareTo(o.getTimestamp()); + } + @Override public String toString() { diff --git a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java index 6f9dd6258e0..d3ab608a328 100644 --- a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java @@ -25,7 +25,6 @@ import com.google.common.base.Preconditions; import io.druid.data.input.InputRow; import java.util.List; -import java.util.Set; public class LinearShardSpec implements ShardSpec { diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 90ac6f24e03..40036ea0e7a 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -42,6 +44,9 @@ import io.druid.client.selector.HighestPriorityTierSelectorStrategy; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.RandomServerSelectorStrategy; import io.druid.client.selector.ServerSelector; +import io.druid.collections.StupidPool; +import io.druid.data.input.MapBasedRow; +import io.druid.data.input.Row; import io.druid.granularity.PeriodGranularity; import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; @@ -56,7 +61,6 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.SegmentDescriptor; -import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -64,13 +68,23 @@ import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ConstantPostAggregator; import io.druid.query.aggregation.post.FieldAccessPostAggregator; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; import io.druid.query.filter.DimFilter; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.query.groupby.GroupByQueryEngine; +import io.druid.query.groupby.GroupByQueryQueryToolChest; import io.druid.query.search.SearchQueryQueryToolChest; import io.druid.query.search.SearchResultValue; -import io.druid.query.search.search.InsensitiveContainsSearchQuerySpec; import io.druid.query.search.search.SearchHit; import io.druid.query.search.search.SearchQuery; import io.druid.query.search.search.SearchQueryConfig; +import io.druid.query.select.EventHolder; +import io.druid.query.select.PagingSpec; +import io.druid.query.select.SelectQuery; +import io.druid.query.select.SelectQueryQueryToolChest; +import io.druid.query.select.SelectResultValue; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.query.timeboundary.TimeBoundaryQueryQueryToolChest; @@ -105,6 +119,7 @@ import org.junit.runners.Parameterized; import javax.annotation.Nullable; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -657,19 +672,19 @@ public class CachingClusteredClientTest @Test public void testSearchCaching() throws Exception { + final Druids.SearchQueryBuilder builder = Druids.newSearchQueryBuilder() + .dataSource(DATA_SOURCE) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .limit(1000) + .intervals(SEG_SPEC) + .dimensions(Arrays.asList("a_dim")) + .query("how") + .context(CONTEXT); + testQueryCaching( client, - new SearchQuery( - new TableDataSource(DATA_SOURCE), - DIM_FILTER, - GRANULARITY, - 1000, - SEG_SPEC, - Arrays.asList("a_dim"), - new InsensitiveContainsSearchQuerySpec("how"), - null, - CONTEXT - ), + builder.build(), new Interval("2011-01-01/2011-01-02"), makeSearchResults(new DateTime("2011-01-01"), "how", "howdy", "howwwwww", "howwy"), @@ -694,6 +709,188 @@ public class CachingClusteredClientTest new DateTime("2011-01-09T01"), "how6", "howdy6", "howwwwww6", "howww6" ) ); + + QueryRunner runner = new FinalizeResultsQueryRunner(client, new SearchQueryQueryToolChest(new SearchQueryConfig())); + TestHelper.assertExpectedResults( + makeSearchResults( + new DateTime("2011-01-01"), "how", "howdy", "howwwwww", "howwy", + new DateTime("2011-01-02"), "how1", "howdy1", "howwwwww1", "howwy1", + new DateTime("2011-01-05"), "how2", "howdy2", "howwwwww2", "howww2", + new DateTime("2011-01-05T01"), "how2", "howdy2", "howwwwww2", "howww2", + new DateTime("2011-01-06"), "how3", "howdy3", "howwwwww3", "howww3", + new DateTime("2011-01-06T01"), "how3", "howdy3", "howwwwww3", "howww3", + new DateTime("2011-01-07"), "how4", "howdy4", "howwwwww4", "howww4", + new DateTime("2011-01-07T01"), "how4", "howdy4", "howwwwww4", "howww4", + new DateTime("2011-01-08"), "how5", "howdy5", "howwwwww5", "howww5", + new DateTime("2011-01-08T01"), "how5", "howdy5", "howwwwww5", "howww5", + new DateTime("2011-01-09"), "how6", "howdy6", "howwwwww6", "howww6", + new DateTime("2011-01-09T01"), "how6", "howdy6", "howwwwww6", "howww6" + ), + runner.run( + builder.intervals("2011-01-01/2011-01-10") + .build() + ) + ); + } + + @Test + public void testSelectCaching() throws Exception + { + Druids.SelectQueryBuilder builder = Druids.newSelectQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .dimensions(Arrays.asList("a")) + .metrics(Arrays.asList("rows")) + .pagingSpec(new PagingSpec(null, 3)) + .context(CONTEXT); + + testQueryCaching( + client, + builder.build(), + new Interval("2011-01-01/2011-01-02"), + makeSelectResults(new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1)), + + new Interval("2011-01-02/2011-01-03"), + makeSelectResults(new DateTime("2011-01-02"), ImmutableMap.of("a", "c", "rows", 5)), + + new Interval("2011-01-05/2011-01-10"), + makeSelectResults( + new DateTime("2011-01-05"), ImmutableMap.of("a", "d", "rows", 5), + new DateTime("2011-01-06"), ImmutableMap.of("a", "e", "rows", 6), + new DateTime("2011-01-07"), ImmutableMap.of("a", "f", "rows", 7), + new DateTime("2011-01-08"), ImmutableMap.of("a", "g", "rows", 8), + new DateTime("2011-01-09"), ImmutableMap.of("a", "h", "rows", 9) + ), + + new Interval("2011-01-05/2011-01-10"), + makeSelectResults( + new DateTime("2011-01-05T01"), ImmutableMap.of("a", "d", "rows", 5), + new DateTime("2011-01-06T01"), ImmutableMap.of("a", "e", "rows", 6), + new DateTime("2011-01-07T01"), ImmutableMap.of("a", "f", "rows", 7), + new DateTime("2011-01-08T01"), ImmutableMap.of("a", "g", "rows", 8), + new DateTime("2011-01-09T01"), ImmutableMap.of("a", "h", "rows", 9) + ) + ); + + QueryRunner runner = new FinalizeResultsQueryRunner( + client, + new SelectQueryQueryToolChest( + new QueryConfig(), + jsonMapper + ) + ); + TestHelper.assertExpectedResults( + makeSelectResults( + new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1), + new DateTime("2011-01-02"), ImmutableMap.of("a", "c", "rows", 5), + new DateTime("2011-01-05"), ImmutableMap.of("a", "d", "rows", 5), + new DateTime("2011-01-05T01"), ImmutableMap.of("a", "d", "rows", 5), + new DateTime("2011-01-06"), ImmutableMap.of("a", "e", "rows", 6), + new DateTime("2011-01-06T01"), ImmutableMap.of("a", "e", "rows", 6), + new DateTime("2011-01-07"), ImmutableMap.of("a", "f", "rows", 7), + new DateTime("2011-01-07T01"), ImmutableMap.of("a", "f", "rows", 7), + new DateTime("2011-01-08"), ImmutableMap.of("a", "g", "rows", 8), + new DateTime("2011-01-08T01"), ImmutableMap.of("a", "g", "rows", 8), + new DateTime("2011-01-09"), ImmutableMap.of("a", "h", "rows", 9), + new DateTime("2011-01-09T01"), ImmutableMap.of("a", "h", "rows", 9) + ), + runner.run( + builder.intervals("2011-01-01/2011-01-10") + .build() + ) + ); + } + + @Test + public void testGroupByCaching() throws Exception + { + GroupByQuery.Builder builder = new GroupByQuery.Builder() + .setDataSource(DATA_SOURCE) + .setQuerySegmentSpec(SEG_SPEC) + .setDimFilter(DIM_FILTER) + .setGranularity(GRANULARITY) + .setDimensions(Arrays.asList(new DefaultDimensionSpec("a", "a"))) + .setAggregatorSpecs(AGGS) + .setPostAggregatorSpecs(POST_AGGS) + .setContext(CONTEXT); + + testQueryCaching( + client, + builder.build(), + new Interval("2011-01-01/2011-01-02"), + makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("a", "a", "rows", 1, "imps", 1, "impers", 1)), + + new Interval("2011-01-02/2011-01-03"), + makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("a", "b", "rows", 2, "imps", 2, "impers", 2)), + + new Interval("2011-01-05/2011-01-10"), + makeGroupByResults( + new DateTime("2011-01-05"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3), + new DateTime("2011-01-06"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4), + new DateTime("2011-01-07"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5), + new DateTime("2011-01-08"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6), + new DateTime("2011-01-09"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7) + ), + + new Interval("2011-01-05/2011-01-10"), + makeGroupByResults( + new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3), + new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4), + new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5), + new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6), + new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7) + ) + ); + + Supplier configSupplier = new Supplier() + { + @Override + public GroupByQueryConfig get() + { + return new GroupByQueryConfig(); + } + }; + QueryRunner runner = new FinalizeResultsQueryRunner( + client, + new GroupByQueryQueryToolChest( + configSupplier, + jsonMapper, + new GroupByQueryEngine( + configSupplier, + new StupidPool<>( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ) + ) + ); + TestHelper.assertExpectedObjects( + makeGroupByResults( + new DateTime("2011-01-05T"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3), + new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3), + new DateTime("2011-01-06T"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4), + new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4), + new DateTime("2011-01-07T"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5), + new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5), + new DateTime("2011-01-08T"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6), + new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6), + new DateTime("2011-01-09T"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7), + new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7) + ), + runner.run( + builder.setInterval("2011-01-05/2011-01-10") + .build() + ), + "" + ); } @Test @@ -751,7 +948,8 @@ public class CachingClusteredClientTest final QueryRunner runner, final int numTimesToQuery, boolean expectBySegment, - final Query query, Object... args // does this assume query intervals must be ordered? + final Query query, + Object... args // does this assume query intervals must be ordered? ) { if (args.length % 2 != 0) { @@ -843,6 +1041,30 @@ public class CachingClusteredClientTest EasyMock.expect(queryable.run(EasyMock.capture(capture))) .andReturn(toQueryableSearchResults(segmentIds, intervals, results)) .once(); + } else if (query instanceof SelectQuery) { + List segmentIds = Lists.newArrayList(); + List intervals = Lists.newArrayList(); + List>> results = Lists.newArrayList(); + for (ServerExpectation expectation : expectations) { + segmentIds.add(expectation.getSegmentId()); + intervals.add(expectation.getInterval()); + results.add(expectation.getResults()); + } + EasyMock.expect(queryable.run(EasyMock.capture(capture))) + .andReturn(toQueryableSelectResults(segmentIds, intervals, results)) + .once(); + } else if (query instanceof GroupByQuery) { + List segmentIds = Lists.newArrayList(); + List intervals = Lists.newArrayList(); + List> results = Lists.newArrayList(); + for (ServerExpectation expectation : expectations) { + segmentIds.add(expectation.getSegmentId()); + intervals.add(expectation.getInterval()); + results.add(expectation.getResults()); + } + EasyMock.expect(queryable.run(EasyMock.capture(capture))) + .andReturn(toQueryableGroupByResults(segmentIds, intervals, results)) + .once(); } else if (query instanceof TimeBoundaryQuery) { List segmentIds = Lists.newArrayList(); List intervals = Lists.newArrayList(); @@ -1097,6 +1319,74 @@ public class CachingClusteredClientTest ); } + private Sequence> toQueryableSelectResults( + Iterable segmentIds, Iterable intervals, Iterable>> results + ) + { + return Sequences.simple( + FunctionalIterable + .create(segmentIds) + .trinaryTransform( + intervals, + results, + new TrinaryFn>, Result>() + { + @Override + @SuppressWarnings("unchecked") + public Result apply( + final String segmentId, + final Interval interval, + final Iterable> results + ) + { + return new Result( + results.iterator().next().getTimestamp(), + new BySegmentResultValueClass( + Lists.newArrayList(results), + segmentId, + interval + ) + ); + } + } + ) + ); + } + + private Sequence toQueryableGroupByResults( + Iterable segmentIds, Iterable intervals, Iterable> results + ) + { + return Sequences.simple( + FunctionalIterable + .create(segmentIds) + .trinaryTransform( + intervals, + results, + new TrinaryFn, Result>() + { + @Override + @SuppressWarnings("unchecked") + public Result apply( + final String segmentId, + final Interval interval, + final Iterable results + ) + { + return new Result( + results.iterator().next().getTimestamp(), + new BySegmentResultValueClass( + Lists.newArrayList(results), + segmentId, + interval + ) + ); + } + } + ) + ); + } + private Sequence> toQueryableTimeBoundaryResults( Iterable segmentIds, Iterable intervals, @@ -1305,6 +1595,34 @@ public class CachingClusteredClientTest return retVal; } + private Iterable> makeSelectResults(Object... objects) + { + List> retVal = Lists.newArrayList(); + int index = 0; + while (index < objects.length) { + DateTime timestamp = (DateTime) objects[index++]; + + List values = Lists.newArrayList(); + while (index < objects.length && !(objects[index] instanceof DateTime)) { + values.add(new EventHolder(null, 0, (Map) objects[index++])); + } + + retVal.add(new Result<>(timestamp, new SelectResultValue(null, values))); + } + return retVal; + } + + private Iterable makeGroupByResults(Object... objects) + { + List retVal = Lists.newArrayList(); + int index = 0; + while (index < objects.length) { + DateTime timestamp = (DateTime) objects[index++]; + retVal.add(new MapBasedRow(timestamp, (Map) objects[index++])); + } + return retVal; + } + private T makeMock(List mocks, Class clazz) { T obj = EasyMock.createMock(clazz); @@ -1324,6 +1642,8 @@ public class CachingClusteredClientTest protected CachingClusteredClient makeClient() { + final Supplier groupByQueryConfigSupplier = Suppliers.ofInstance(new GroupByQueryConfig()); + return new CachingClusteredClient( new MapQueryToolChestWarehouse( ImmutableMap., QueryToolChest>builder() @@ -1333,6 +1653,30 @@ public class CachingClusteredClientTest ) .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig())) .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig())) + .put( + SelectQuery.class, + new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper) + ) + .put( + GroupByQuery.class, + new GroupByQueryQueryToolChest( + groupByQueryConfigSupplier, + jsonMapper, + new GroupByQueryEngine( + groupByQueryConfigSupplier, + new StupidPool<>( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ) + ) + ) .put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest()) .build() ), @@ -1364,6 +1708,13 @@ public class CachingClusteredClientTest cache, jsonMapper, new CacheConfig() + { + @Override + public boolean isQueryCacheable(Query query) + { + return true; + } + } ); } diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index d1d6e1e6f67..ea231c2611e 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -27,6 +27,7 @@ import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; +import io.druid.data.input.Row; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; import io.druid.query.Query; @@ -168,6 +169,12 @@ public class RealtimeManagerTest return timestamp; } + @Override + public DateTime getTimestamp() + { + return new DateTime(timestamp); + } + @Override public List getDimension(String dimension) { @@ -185,6 +192,12 @@ public class RealtimeManagerTest { return null; } + + @Override + public int compareTo(Row o) + { + return 0; + } }; } diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index b068a994cb7..cdd182e3e1f 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.metamx.common.Granularity; import io.druid.data.input.InputRow; +import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -81,6 +82,12 @@ public class SinkTest return new DateTime("2013-01-01").getMillis(); } + @Override + public DateTime getTimestamp() + { + return new DateTime("2013-01-01"); + } + @Override public List getDimension(String dimension) { @@ -98,6 +105,12 @@ public class SinkTest { return null; } + + @Override + public int compareTo(Row o) + { + return 0; + } } ); @@ -122,6 +135,12 @@ public class SinkTest return new DateTime("2013-01-01").getMillis(); } + @Override + public DateTime getTimestamp() + { + return new DateTime("2013-01-01"); + } + @Override public List getDimension(String dimension) { @@ -139,6 +158,12 @@ public class SinkTest { return null; } + + @Override + public int compareTo(Row o) + { + return 0; + } } ); diff --git a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java index bfd573dae89..afab880ff81 100644 --- a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java +++ b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java @@ -25,10 +25,12 @@ import com.google.common.collect.Lists; import com.metamx.common.ISE; import io.druid.TestUtil; import io.druid.data.input.InputRow; +import io.druid.data.input.Row; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.ShardSpec; import junit.framework.Assert; +import org.joda.time.DateTime; import org.junit.Test; import java.util.ArrayList; @@ -176,6 +178,12 @@ public class HashBasedNumberedShardSpecTest return 0; } + @Override + public DateTime getTimestamp() + { + return new DateTime(0); + } + @Override public List getDimension(String s) { @@ -193,5 +201,11 @@ public class HashBasedNumberedShardSpecTest { return 0; } + + @Override + public int compareTo(Row o) + { + return 0; + } } }