From bc20658239807f74f7b6126eda85abebf16c9456 Mon Sep 17 00:00:00 2001 From: Dave Li Date: Mon, 1 Aug 2016 21:30:39 -0400 Subject: [PATCH] groupBy nested query using v2 strategy (#3269) * changed v2 nested query strategy * add test for #3239 * update for new ValueMatcher interface and add benchmarks * enable time filtering * address PR comments * add failing test for outer filter aggregator * add helper class for sharing code * update nested groupby doc * move temporary storage instantiation * address PR comment * address PR comment 2 --- .../benchmark/query/GroupByBenchmark.java | 44 +- docs/content/querying/groupbyquery.md | 10 +- .../query/groupby/GroupByQueryHelper.java | 4 +- .../groupby/GroupByQueryQueryToolChest.java | 97 +-- .../GroupByMergingQueryRunnerV2.java | 429 +----------- .../epinephelinae/GroupByRowProcessor.java | 271 ++++++++ .../epinephelinae/RowBasedGrouperHelper.java | 614 ++++++++++++++++++ .../groupby/strategy/GroupByStrategy.java | 6 + .../groupby/strategy/GroupByStrategyV1.java | 103 +++ .../groupby/strategy/GroupByStrategyV2.java | 24 + .../query/groupby/GroupByQueryRunnerTest.java | 217 +++++++ 11 files changed, 1303 insertions(+), 516 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java create mode 100644 processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index a5c3254cd4e..16efbbba13b 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -116,7 +116,7 @@ public class GroupByBenchmark @Param({"100000"}) private int rowsPerSegment; - @Param({"basic.A"}) + @Param({"basic.A", "basic.nested"}) private String schemaAndQuery; @Param({"v1", "v2"}) @@ -190,6 +190,44 @@ public class GroupByBenchmark basicQueries.put("A", queryA); } + { // basic.nested + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval())); + List queryAggs = new ArrayList<>(); + queryAggs.add(new LongSumAggregatorFactory( + "sumLongSequential", + "sumLongSequential" + )); + + GroupByQuery subqueryA = GroupByQuery + .builder() + .setDataSource("blah") + .setQuerySegmentSpec(intervalSpec) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("dimSequential", null), + new DefaultDimensionSpec("dimZipf", null) + )) + .setAggregatorSpecs( + queryAggs + ) + .setGranularity(QueryGranularities.DAY) + .build(); + + GroupByQuery queryA = GroupByQuery + .builder() + .setDataSource(subqueryA) + .setQuerySegmentSpec(intervalSpec) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("dimSequential", null) + )) + .setAggregatorSpecs( + queryAggs + ) + .setGranularity(QueryGranularities.WEEK) + .build(); + + basicQueries.put("nested", queryA); + } + SCHEMA_QUERY_MAP.put("basic", basicQueries); } @@ -257,9 +295,11 @@ public class GroupByBenchmark new OffheapBufferGenerator("compute", 250000000), Integer.MAX_VALUE ); + + // limit of 2 is required since we simulate both historical merge and broker merge in the same process BlockingPool mergePool = new BlockingPool<>( new OffheapBufferGenerator("merge", 250000000), - 1 + 2 ); final GroupByQueryConfig config = new GroupByQueryConfig() { diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index 57cdff5c3cd..d1858a0104f 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -150,11 +150,11 @@ results acceptable. #### Nested groupBys -Nested groupBys (dataSource of type "query") are performed the same way for both "v1" and "v2". The broker runs the -inner groupBy query in the usual way, then materializes the inner query's results, then runs the outer query on those -materialized results. In particular, the outer query is not distributed at all; it takes place completely on the broker. -Currently the materialized results are stored on-heap in the broker, and the outer query is done in a single-threaded -fashion. +Nested groupBys (dataSource of type "query") are performed differently for "v1" and "v2". The broker first runs the +inner groupBy query in the usual way. "v1" strategy then materializes the inner query's results on-heap with Druid's +indexing mechanism, and runs the outer query on these materialized results. "v2" strategy runs the outer query on the +inner query's results stream with off-heap fact map and on-heap string dictionary that can spill to disk. Both +strategy perform the outer query on the broker in a single-threaded fashion. #### Server configuration diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index d13ec68bc47..89eb145f3c5 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -163,7 +163,7 @@ public class GroupByQueryHelper return new Pair<>(init, accumulator); } - // Used by GroupByQueryQueryToolChest, GroupByStrategyV1 + // Used by GroupByStrategyV1 public static IncrementalIndex makeIncrementalIndex( GroupByQuery query, GroupByQueryConfig config, @@ -180,7 +180,7 @@ public class GroupByQueryHelper return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); } - // Used by GroupByQueryQueryToolChest, GroupByStrategyV1 + // Used by GroupByStrategyV1 public static Sequence postAggregate(final GroupByQuery query, IncrementalIndex index) { return Sequences.map( diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index dd66d743f02..ca3fbe60251 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -24,19 +24,14 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.Collections2; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.inject.Inject; -import com.metamx.common.IAE; import com.metamx.common.ISE; -import com.metamx.common.guava.ResourceClosingSequence; import com.metamx.common.guava.Sequence; -import com.metamx.common.guava.Sequences; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedRow; @@ -56,18 +51,12 @@ import io.druid.query.QueryToolChest; import io.druid.query.SubqueryQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; -import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.DimFilter; -import io.druid.query.groupby.strategy.GroupByStrategy; import io.druid.query.groupby.strategy.GroupByStrategySelector; -import io.druid.query.spec.MultipleIntervalSegmentSpec; -import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexStorageAdapter; import org.joda.time.DateTime; -import org.joda.time.Interval; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -167,96 +156,12 @@ public class GroupByQueryQueryToolChest extends QueryToolChest aggs = Sets.newHashSet(); - - // Nested group-bys work by first running the inner query and then materializing the results in an incremental - // index which the outer query is then run against. To build the incremental index, we use the fieldNames from - // the aggregators for the outer query to define the column names so that the index will match the query. If - // there are multiple types of aggregators in the outer query referencing the same fieldName, we will try to build - // multiple columns of the same name using different aggregator types and will fail. Here, we permit multiple - // aggregators of the same type referencing the same fieldName (and skip creating identical columns for the - // subsequent ones) and return an error if the aggregator types are different. - for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) { - for (final AggregatorFactory transferAgg : aggregatorFactory.getRequiredColumns()) { - if (Iterables.any(aggs, new Predicate() - { - @Override - public boolean apply(AggregatorFactory agg) - { - return agg.getName().equals(transferAgg.getName()) && !agg.equals(transferAgg); - } - })) { - throw new IAE("Inner aggregator can currently only be referenced by a single type of outer aggregator" + - " for '%s'", transferAgg.getName()); - } - - aggs.add(transferAgg); - } - } - - // We need the inner incremental index to have all the columns required by the outer query - final GroupByQuery innerQuery = new GroupByQuery.Builder(subquery) - .setAggregatorSpecs(Lists.newArrayList(aggs)) - .setInterval(subquery.getIntervals()) - .setPostAggregatorSpecs(Lists.newArrayList()) - .build(); - - final GroupByQuery outerQuery = new GroupByQuery.Builder(query) - .setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec())) - .build(); - - final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex( - innerQuery.withOverriddenContext( - ImmutableMap.of( - GroupByQueryHelper.CTX_KEY_SORT_RESULTS, true - ) - ), - subqueryResult - ); - - //Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which - //is ensured by QuerySegmentSpec. - //GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval - //and concatenate the results. - final GroupByStrategy strategy = strategySelector.strategize(query); - final IncrementalIndex outerQueryResultIndex = makeIncrementalIndex( - outerQuery, - Sequences.concat( - Sequences.map( - Sequences.simple(outerQuery.getIntervals()), - new Function>() - { - @Override - public Sequence apply(Interval interval) - { - return strategy.process( - outerQuery.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(ImmutableList.of(interval)) - ), - new IncrementalIndexStorageAdapter(innerQueryResultIndex) - ); - } - } - ) - ) - ); - - innerQueryResultIndex.close(); - - return new ResourceClosingSequence<>( - outerQuery.applyLimit(GroupByQueryHelper.postAggregate(query, outerQueryResultIndex)), - outerQueryResultIndex - ); + return strategySelector.strategize(query).processSubqueryResult(subquery, query, subqueryResult); } else { return strategySelector.strategize(query).mergeResults(runner, query, context); } } - private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence rows) - { - return GroupByQueryHelper.makeIncrementalIndex(query, configSupplier.get(), bufferPool, rows); - } - @Override public ServiceMetricEvent.Builder makeMetricBuilder(GroupByQuery query) { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 95f42743ee6..a7422e4c641 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -19,25 +19,19 @@ package io.druid.query.groupby.epinephelinae; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Predicates; -import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.primitives.Chars; -import com.google.common.primitives.Ints; -import com.google.common.primitives.Longs; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.CloseQuietly; @@ -47,8 +41,8 @@ import io.druid.collections.BlockingPool; import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.Releaser; import io.druid.common.utils.JodaUtils; -import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; +import io.druid.granularity.QueryGranularities; import io.druid.query.AbstractPrioritizedCallable; import io.druid.query.BaseQuery; import io.druid.query.ChainedExecutionQueryRunner; @@ -58,22 +52,14 @@ import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.dimension.DimensionSpec; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; -import io.druid.query.groupby.strategy.GroupByStrategyV2; -import io.druid.segment.ColumnSelectorFactory; -import io.druid.segment.DimensionSelector; -import io.druid.segment.FloatColumnSelector; -import io.druid.segment.LongColumnSelector; -import io.druid.segment.ObjectColumnSelector; -import io.druid.segment.column.ColumnCapabilities; +import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.UUID; @@ -156,24 +142,22 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner : System.currentTimeMillis() + queryTimeout.longValue(); return new BaseSequence<>( - new BaseSequence.IteratorMaker>() + new BaseSequence.IteratorMaker>() { @Override - public CloseableGrouperIterator make() + public CloseableGrouperIterator make() { final List closeOnFailure = Lists.newArrayList(); try { - final ReferenceCountingResourceHolder mergeBufferHolder; - final LimitedTemporaryStorage temporaryStorage; - final Grouper grouper; - - temporaryStorage = new LimitedTemporaryStorage( + final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( temporaryStorageDirectory, querySpecificConfig.getMaxOnDiskStorage() ); + closeOnFailure.add(temporaryStorage); + final ReferenceCountingResourceHolder mergeBufferHolder; try { // This will potentially block if there are no merge buffers left in the pool. final long timeout = timeoutAt - System.currentTimeMillis(); @@ -187,61 +171,23 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner throw Throwables.propagate(e); } - final GroupByMergingKeySerdeFactory keySerdeFactory = new GroupByMergingKeySerdeFactory( - query.getDimensions().size(), - querySpecificConfig.getMaxMergingDictionarySize() / concurrencyHint - ); - final GroupByMergingColumnSelectorFactory columnSelectorFactory = new GroupByMergingColumnSelectorFactory(); - - grouper = new ConcurrentGrouper<>( + Pair, Accumulator, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( + query, + false, + config, mergeBufferHolder.get(), concurrencyHint, temporaryStorage, spillMapper, - querySpecificConfig.getBufferGrouperMaxSize(), - querySpecificConfig.getBufferGrouperInitialBuckets(), - keySerdeFactory, - columnSelectorFactory, combiningAggregatorFactories ); + final Grouper grouper = pair.lhs; + final Accumulator, Row> accumulator = pair.rhs; closeOnFailure.add(grouper); - final Accumulator, Row> accumulator = new Accumulator, Row>() - { - @Override - public Grouper accumulate( - final Grouper theGrouper, - final Row row - ) - { - if (theGrouper == null) { - // Pass-through null returns without doing more work. - return null; - } - - final long timestamp = row.getTimestampFromEpoch(); - - final String[] dimensions = new String[query.getDimensions().size()]; - for (int i = 0; i < dimensions.length; i++) { - final Object dimValue = row.getRaw(query.getDimensions().get(i).getOutputName()); - dimensions[i] = Strings.nullToEmpty((String) dimValue); - } - - columnSelectorFactory.setRow(row); - final boolean didAggregate = theGrouper.aggregate(new GroupByMergingKey(timestamp, dimensions)); - if (!didAggregate) { - // null return means grouping resources were exhausted. - return null; - } - columnSelectorFactory.setRow(null); - - return theGrouper; - } - }; - final int priority = BaseQuery.getContextPriority(query, 0); - final ReferenceCountingResourceHolder> grouperHolder = new ReferenceCountingResourceHolder<>( + final ReferenceCountingResourceHolder> grouperHolder = new ReferenceCountingResourceHolder<>( grouper, new Closeable() { @@ -319,35 +265,9 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner waitForFutureCompletion(query, futures, timeoutAt - System.currentTimeMillis()); - return new CloseableGrouperIterator<>( + return RowBasedGrouperHelper.makeGrouperIterator( grouper, - true, - new Function, Row>() - { - @Override - public Row apply(Grouper.Entry entry) - { - Map theMap = Maps.newLinkedHashMap(); - - // Add dimensions. - for (int i = 0; i < entry.getKey().getDimensions().length; i++) { - theMap.put( - query.getDimensions().get(i).getOutputName(), - Strings.emptyToNull(entry.getKey().getDimensions()[i]) - ); - } - - // Add aggregations. - for (int i = 0; i < entry.getValues().length; i++) { - theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]); - } - - return new MapBasedRow( - query.getGranularity().toDateTime(entry.getKey().getTimestamp()), - theMap - ); - } - }, + query, new Closeable() { @Override @@ -370,7 +290,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner } @Override - public void cleanup(CloseableGrouperIterator iterFromMake) + public void cleanup(CloseableGrouperIterator iterFromMake) { iterFromMake.close(); } @@ -420,317 +340,4 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner } } - private static class GroupByMergingKey implements Comparable - { - private final long timestamp; - private final String[] dimensions; - - @JsonCreator - public GroupByMergingKey( - // Using short key names to reduce serialized size when spilling to disk. - @JsonProperty("t") long timestamp, - @JsonProperty("d") String[] dimensions - ) - { - this.timestamp = timestamp; - this.dimensions = dimensions; - } - - @JsonProperty("t") - public long getTimestamp() - { - return timestamp; - } - - @JsonProperty("d") - public String[] getDimensions() - { - return dimensions; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - GroupByMergingKey that = (GroupByMergingKey) o; - - if (timestamp != that.timestamp) { - return false; - } - // Probably incorrect - comparing Object[] arrays with Arrays.equals - return Arrays.equals(dimensions, that.dimensions); - - } - - @Override - public int hashCode() - { - int result = (int) (timestamp ^ (timestamp >>> 32)); - result = 31 * result + Arrays.hashCode(dimensions); - return result; - } - - @Override - public int compareTo(GroupByMergingKey other) - { - final int timeCompare = Longs.compare(timestamp, other.getTimestamp()); - if (timeCompare != 0) { - return timeCompare; - } - - for (int i = 0; i < dimensions.length; i++) { - final int cmp = dimensions[i].compareTo(other.getDimensions()[i]); - if (cmp != 0) { - return cmp; - } - } - - return 0; - } - - @Override - public String toString() - { - return "GroupByMergingKey{" + - "timestamp=" + timestamp + - ", dimensions=" + Arrays.toString(dimensions) + - '}'; - } - } - - private static class GroupByMergingKeySerdeFactory implements Grouper.KeySerdeFactory - { - private final int dimCount; - private final long maxDictionarySize; - - public GroupByMergingKeySerdeFactory(int dimCount, long maxDictionarySize) - { - this.dimCount = dimCount; - this.maxDictionarySize = maxDictionarySize; - } - - @Override - public Grouper.KeySerde factorize() - { - return new GroupByMergingKeySerde(dimCount, maxDictionarySize); - } - } - - private static class GroupByMergingKeySerde implements Grouper.KeySerde - { - // Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes - private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Longs.BYTES * 5 + Ints.BYTES; - - private final int dimCount; - private final int keySize; - private final ByteBuffer keyBuffer; - private final List dictionary = Lists.newArrayList(); - private final Map reverseDictionary = Maps.newHashMap(); - - // Size limiting for the dictionary, in (roughly estimated) bytes. - private final long maxDictionarySize; - private long currentEstimatedSize = 0; - - // dictionary id -> its position if it were sorted by dictionary value - private int[] sortableIds = null; - - public GroupByMergingKeySerde(final int dimCount, final long maxDictionarySize) - { - this.dimCount = dimCount; - this.maxDictionarySize = maxDictionarySize; - this.keySize = Longs.BYTES + dimCount * Ints.BYTES; - this.keyBuffer = ByteBuffer.allocate(keySize); - } - - @Override - public int keySize() - { - return keySize; - } - - @Override - public Class keyClazz() - { - return GroupByMergingKey.class; - } - - @Override - public ByteBuffer toByteBuffer(GroupByMergingKey key) - { - keyBuffer.rewind(); - keyBuffer.putLong(key.getTimestamp()); - for (int i = 0; i < key.getDimensions().length; i++) { - final int id = addToDictionary(key.getDimensions()[i]); - if (id < 0) { - return null; - } - keyBuffer.putInt(id); - } - keyBuffer.flip(); - return keyBuffer; - } - - @Override - public GroupByMergingKey fromByteBuffer(ByteBuffer buffer, int position) - { - final long timestamp = buffer.getLong(position); - final String[] dimensions = new String[dimCount]; - for (int i = 0; i < dimensions.length; i++) { - dimensions[i] = dictionary.get(buffer.getInt(position + Longs.BYTES + (Ints.BYTES * i))); - } - return new GroupByMergingKey(timestamp, dimensions); - } - - @Override - public Grouper.KeyComparator comparator() - { - if (sortableIds == null) { - Map sortedMap = Maps.newTreeMap(); - for (int id = 0; id < dictionary.size(); id++) { - sortedMap.put(dictionary.get(id), id); - } - sortableIds = new int[dictionary.size()]; - int index = 0; - for (final Integer id : sortedMap.values()) { - sortableIds[id] = index++; - } - } - - return new Grouper.KeyComparator() - { - @Override - public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) - { - final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); - if (timeCompare != 0) { - return timeCompare; - } - - for (int i = 0; i < dimCount; i++) { - final int cmp = Ints.compare( - sortableIds[lhsBuffer.getInt(lhsPosition + Longs.BYTES + (Ints.BYTES * i))], - sortableIds[rhsBuffer.getInt(rhsPosition + Longs.BYTES + (Ints.BYTES * i))] - ); - - if (cmp != 0) { - return cmp; - } - } - - return 0; - } - }; - } - - @Override - public void reset() - { - dictionary.clear(); - reverseDictionary.clear(); - sortableIds = null; - currentEstimatedSize = 0; - } - - /** - * Adds s to the dictionary. If the dictionary's size limit would be exceeded by adding this key, then - * this returns -1. - * - * @param s a string - * - * @return id for this string, or -1 - */ - private int addToDictionary(final String s) - { - Integer idx = reverseDictionary.get(s); - if (idx == null) { - final long additionalEstimatedSize = (long) s.length() * Chars.BYTES + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY; - if (currentEstimatedSize + additionalEstimatedSize > maxDictionarySize) { - return -1; - } - - idx = dictionary.size(); - reverseDictionary.put(s, idx); - dictionary.add(s); - currentEstimatedSize += additionalEstimatedSize; - } - return idx; - } - } - - private static class GroupByMergingColumnSelectorFactory implements ColumnSelectorFactory - { - private ThreadLocal row = new ThreadLocal<>(); - - public void setRow(Row row) - { - this.row.set(row); - } - - @Override - public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) - { - // Combining factories shouldn't need dimension selectors, that'd be weird. - throw new UnsupportedOperationException(); - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(final String columnName) - { - return new FloatColumnSelector() - { - @Override - public float get() - { - return row.get().getFloatMetric(columnName); - } - }; - } - - @Override - public LongColumnSelector makeLongColumnSelector(final String columnName) - { - return new LongColumnSelector() - { - @Override - public long get() - { - return row.get().getLongMetric(columnName); - } - }; - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(final String columnName) - { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Object.class; - } - - @Override - public Object get() - { - return row.get().getRaw(columnName); - } - }; - } - - @Override - public ColumnCapabilities getColumnCapabilities(String columnName) - { - // getColumnCapabilities() is only used by FilteredAggregatorFactory for determining dimension types. - // Since FilteredAggregatorFactory only works with ColumnSelectorFactory implementations - // that support makeDimensionSelector(), this method is also left unsupported. - throw new UnsupportedOperationException(); - } - } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java new file mode 100644 index 00000000000..8603ff4307c --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -0,0 +1,271 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby.epinephelinae; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.metamx.common.Pair; +import com.metamx.common.guava.Accumulator; +import com.metamx.common.guava.BaseSequence; +import com.metamx.common.guava.CloseQuietly; +import com.metamx.common.guava.FilteredSequence; +import com.metamx.common.guava.Sequence; +import com.metamx.common.logger.Logger; +import io.druid.collections.BlockingPool; +import io.druid.collections.ReferenceCountingResourceHolder; +import io.druid.common.utils.JodaUtils; +import io.druid.data.input.Row; +import io.druid.query.Query; +import io.druid.query.QueryContextKeys; +import io.druid.query.QueryInterruptedException; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.filter.DruidLongPredicate; +import io.druid.query.filter.DruidPredicateFactory; +import io.druid.query.filter.Filter; +import io.druid.query.filter.ValueMatcher; +import io.druid.query.filter.ValueMatcherFactory; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; +import io.druid.segment.column.Column; +import io.druid.segment.filter.BooleanValueMatcher; +import io.druid.segment.filter.Filters; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +public class GroupByRowProcessor +{ + private static final Logger log = new Logger(GroupByRowProcessor.class); + + public static Sequence process( + final Query queryParam, + final Sequence rows, + final GroupByQueryConfig config, + final BlockingPool mergeBufferPool, + final ObjectMapper spillMapper + ) + { + final GroupByQuery query = (GroupByQuery) queryParam; + final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); + + final AggregatorFactory[] aggregatorFactories = new AggregatorFactory[query.getAggregatorSpecs().size()]; + for (int i = 0; i < query.getAggregatorSpecs().size(); i++) { + aggregatorFactories[i] = query.getAggregatorSpecs().get(i); + } + + final File temporaryStorageDirectory = new File( + System.getProperty("java.io.tmpdir"), + String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) + ); + + final long timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT); + final List queryIntervals = query.getIntervals(); + final Filter filter = Filters.convertToCNFFromQueryContext( + query, + Filters.toFilter(query.getDimFilter()) + ); + final RowBasedValueMatcherFactory filterMatcherFactory = new RowBasedValueMatcherFactory(); + final ValueMatcher filterMatcher = filter == null + ? new BooleanValueMatcher(true) + : filter.makeMatcher(filterMatcherFactory); + + final FilteredSequence filteredSequence = new FilteredSequence<>( + rows, + new Predicate() + { + @Override + public boolean apply(Row input) + { + boolean inInterval = false; + DateTime rowTime = input.getTimestamp(); + for (Interval queryInterval : queryIntervals) { + if (queryInterval.contains(rowTime)) { + inInterval = true; + } + } + if (!inInterval) { + return false; + } + filterMatcherFactory.setRow(input); + return filterMatcher.matches(); + } + } + ); + + return new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public CloseableGrouperIterator make() + { + final List closeOnFailure = Lists.newArrayList(); + + try { + final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( + temporaryStorageDirectory, + querySpecificConfig.getMaxOnDiskStorage() + ); + + closeOnFailure.add(temporaryStorage); + + final ReferenceCountingResourceHolder mergeBufferHolder; + try { + // This will potentially block if there are no merge buffers left in the pool. + if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { + throw new QueryInterruptedException(new TimeoutException()); + } + closeOnFailure.add(mergeBufferHolder); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + + Pair, Accumulator, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( + query, + true, + config, + mergeBufferHolder.get(), + -1, + temporaryStorage, + spillMapper, + aggregatorFactories + ); + final Grouper grouper = pair.lhs; + final Accumulator, Row> accumulator = pair.rhs; + closeOnFailure.add(grouper); + + filteredSequence.accumulate(grouper, accumulator); + + return RowBasedGrouperHelper.makeGrouperIterator( + grouper, + query, + new Closeable() + { + @Override + public void close() throws IOException + { + grouper.close(); + mergeBufferHolder.close(); + CloseQuietly.close(temporaryStorage); + } + } + ); + } + catch (Throwable e) { + // Exception caught while setting up the iterator; release resources. + for (Closeable closeable : Lists.reverse(closeOnFailure)) { + CloseQuietly.close(closeable); + } + throw e; + } + } + + @Override + public void cleanup(CloseableGrouperIterator iterFromMake) + { + iterFromMake.close(); + } + } + ); + } + + private static class RowBasedValueMatcherFactory implements ValueMatcherFactory + { + private Row row; + + public void setRow(Row row) + { + this.row = row; + } + + @Override + public ValueMatcher makeValueMatcher(final String dimension, final Comparable value) + { + if (dimension.equals(Column.TIME_COLUMN_NAME)) { + return new ValueMatcher() + { + @Override + public boolean matches() + { + return value.equals(row.getTimestampFromEpoch()); + } + }; + } else { + return new ValueMatcher() + { + @Override + public boolean matches() + { + return row.getDimension(dimension).contains(value); + } + }; + } + } + + // There is no easy way to determine the dimension value type from the map based row, so this defaults all + // dimensions (except time) to string, and provide the string value matcher. This has some performance impact + // on filtering, but should provide the same result. It should be changed to support dimension types when better + // type hinting is implemented + @Override + public ValueMatcher makeValueMatcher(final String dimension, final DruidPredicateFactory predicateFactory) + { + if (dimension.equals(Column.TIME_COLUMN_NAME)) { + return new ValueMatcher() + { + final DruidLongPredicate predicate = predicateFactory.makeLongPredicate(); + + @Override + public boolean matches() + { + return predicate.applyLong(row.getTimestampFromEpoch()); + } + }; + } else { + return new ValueMatcher() + { + final Predicate predicate = predicateFactory.makeStringPredicate(); + + @Override + public boolean matches() + { + List values = row.getDimension(dimension); + for (String value : values) { + if (predicate.apply(value)) { + return true; + } + } + return false; + } + }; + } + } + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java new file mode 100644 index 00000000000..5ff9424d7f0 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -0,0 +1,614 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby.epinephelinae; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.primitives.Chars; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.metamx.common.Pair; +import com.metamx.common.guava.Accumulator; +import io.druid.data.input.MapBasedRow; +import io.druid.data.input.Row; +import io.druid.granularity.AllGranularity; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.extraction.ExtractionFn; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionSelector; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.column.Column; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.data.IndexedInts; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +// this class contains shared code between GroupByMergingQueryRunnerV2 and GroupByRowProcessor +public class RowBasedGrouperHelper +{ + + public static Pair, Accumulator, Row>> createGrouperAccumulatorPair( + final GroupByQuery query, + final boolean isInputRaw, + final GroupByQueryConfig config, + final ByteBuffer buffer, + final int concurrencyHint, + final LimitedTemporaryStorage temporaryStorage, + final ObjectMapper spillMapper, + final AggregatorFactory[] aggregatorFactories + ) + { + // concurrencyHint >= 1 for thread safe groupers, -1 for non-thread-safe + Preconditions.checkArgument(concurrencyHint >= 1 || concurrencyHint == -1, "invalid concurrencyHint"); + + final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); + final Grouper.KeySerdeFactory keySerdeFactory = new RowBasedKeySerdeFactory( + query.getDimensions().size(), + querySpecificConfig.getMaxMergingDictionarySize() / (concurrencyHint == -1 ? 1 : concurrencyHint) + ); + final RowBasedColumnSelectorFactory columnSelectorFactory = new RowBasedColumnSelectorFactory(); + final Grouper grouper; + if (concurrencyHint == -1) { + grouper = new SpillingGrouper<>( + buffer, + keySerdeFactory, + columnSelectorFactory, + aggregatorFactories, + temporaryStorage, + spillMapper, + querySpecificConfig.getBufferGrouperMaxSize(), + querySpecificConfig.getBufferGrouperInitialBuckets() + ); + } else { + grouper = new ConcurrentGrouper<>( + buffer, + concurrencyHint, + temporaryStorage, + spillMapper, + querySpecificConfig.getBufferGrouperMaxSize(), + querySpecificConfig.getBufferGrouperInitialBuckets(), + keySerdeFactory, + columnSelectorFactory, + aggregatorFactories + ); + } + + final DimensionSelector[] dimensionSelectors; + if (isInputRaw) { + dimensionSelectors = new DimensionSelector[query.getDimensions().size()]; + for (int i = 0; i < dimensionSelectors.length; i++) { + dimensionSelectors[i] = columnSelectorFactory.makeDimensionSelector(query.getDimensions().get(i)); + } + } else { + dimensionSelectors = null; + } + + final Accumulator, Row> accumulator = new Accumulator, Row>() + { + @Override + public Grouper accumulate( + final Grouper theGrouper, + final Row row + ) + { + if (theGrouper == null) { + // Pass-through null returns without doing more work. + return null; + } + + long timestamp = row.getTimestampFromEpoch(); + if (isInputRaw) { + if (query.getGranularity() instanceof AllGranularity) { + timestamp = query.getIntervals().get(0).getStartMillis(); + } else { + timestamp = query.getGranularity().truncate(timestamp); + } + } + + columnSelectorFactory.setRow(row); + final String[] dimensions = new String[query.getDimensions().size()]; + for (int i = 0; i < dimensions.length; i++) { + final String value; + if (isInputRaw) { + IndexedInts index = dimensionSelectors[i].getRow(); + value = index.size() == 0 ? "" : dimensionSelectors[i].lookupName(index.get(0)); + } else { + value = (String) row.getRaw(query.getDimensions().get(i).getOutputName()); + } + dimensions[i] = Strings.nullToEmpty(value); + } + + final boolean didAggregate = theGrouper.aggregate(new RowBasedKey(timestamp, dimensions)); + if (!didAggregate) { + // null return means grouping resources were exhausted. + return null; + } + columnSelectorFactory.setRow(null); + + return theGrouper; + } + }; + + return new Pair<>(grouper, accumulator); + } + + public static CloseableGrouperIterator makeGrouperIterator( + final Grouper grouper, + final GroupByQuery query, + final Closeable closeable + ) + { + return new CloseableGrouperIterator<>( + grouper, + true, + new Function, Row>() + { + @Override + public Row apply(Grouper.Entry entry) + { + Map theMap = Maps.newLinkedHashMap(); + + // Add dimensions. + for (int i = 0; i < entry.getKey().getDimensions().length; i++) { + theMap.put( + query.getDimensions().get(i).getOutputName(), + Strings.emptyToNull(entry.getKey().getDimensions()[i]) + ); + } + + // Add aggregations. + for (int i = 0; i < entry.getValues().length; i++) { + theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]); + } + + return new MapBasedRow( + query.getGranularity().toDateTime(entry.getKey().getTimestamp()), + theMap + ); + } + }, + closeable + ); + } + + static class RowBasedKey implements Comparable + { + private final long timestamp; + private final String[] dimensions; + + @JsonCreator + public RowBasedKey( + // Using short key names to reduce serialized size when spilling to disk. + @JsonProperty("t") long timestamp, + @JsonProperty("d") String[] dimensions + ) + { + this.timestamp = timestamp; + this.dimensions = dimensions; + } + + @JsonProperty("t") + public long getTimestamp() + { + return timestamp; + } + + @JsonProperty("d") + public String[] getDimensions() + { + return dimensions; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RowBasedKey that = (RowBasedKey) o; + + if (timestamp != that.timestamp) { + return false; + } + // Probably incorrect - comparing Object[] arrays with Arrays.equals + return Arrays.equals(dimensions, that.dimensions); + + } + + @Override + public int hashCode() + { + int result = (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + Arrays.hashCode(dimensions); + return result; + } + + @Override + public int compareTo(RowBasedKey other) + { + final int timeCompare = Longs.compare(timestamp, other.getTimestamp()); + if (timeCompare != 0) { + return timeCompare; + } + + for (int i = 0; i < dimensions.length; i++) { + final int cmp = dimensions[i].compareTo(other.getDimensions()[i]); + if (cmp != 0) { + return cmp; + } + } + + return 0; + } + + @Override + public String toString() + { + return "RowBasedKey{" + + "timestamp=" + timestamp + + ", dimensions=" + Arrays.toString(dimensions) + + '}'; + } + } + + static class RowBasedKeySerdeFactory implements Grouper.KeySerdeFactory + { + private final int dimCount; + private final long maxDictionarySize; + + public RowBasedKeySerdeFactory(int dimCount, long maxDictionarySize) + { + this.dimCount = dimCount; + this.maxDictionarySize = maxDictionarySize; + } + + @Override + public Grouper.KeySerde factorize() + { + return new RowBasedKeySerde(dimCount, maxDictionarySize); + } + } + + static class RowBasedKeySerde implements Grouper.KeySerde + { + // Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes + private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Longs.BYTES * 5 + Ints.BYTES; + + private final int dimCount; + private final int keySize; + private final ByteBuffer keyBuffer; + private final List dictionary = Lists.newArrayList(); + private final Map reverseDictionary = Maps.newHashMap(); + + // Size limiting for the dictionary, in (roughly estimated) bytes. + private final long maxDictionarySize; + private long currentEstimatedSize = 0; + + // dictionary id -> its position if it were sorted by dictionary value + private int[] sortableIds = null; + + public RowBasedKeySerde(final int dimCount, final long maxDictionarySize) + { + this.dimCount = dimCount; + this.maxDictionarySize = maxDictionarySize; + this.keySize = Longs.BYTES + dimCount * Ints.BYTES; + this.keyBuffer = ByteBuffer.allocate(keySize); + } + + @Override + public int keySize() + { + return keySize; + } + + @Override + public Class keyClazz() + { + return RowBasedKey.class; + } + + @Override + public ByteBuffer toByteBuffer(RowBasedKey key) + { + keyBuffer.rewind(); + keyBuffer.putLong(key.getTimestamp()); + for (int i = 0; i < key.getDimensions().length; i++) { + final int id = addToDictionary(key.getDimensions()[i]); + if (id < 0) { + return null; + } + keyBuffer.putInt(id); + } + keyBuffer.flip(); + return keyBuffer; + } + + @Override + public RowBasedKey fromByteBuffer(ByteBuffer buffer, int position) + { + final long timestamp = buffer.getLong(position); + final String[] dimensions = new String[dimCount]; + for (int i = 0; i < dimensions.length; i++) { + dimensions[i] = dictionary.get(buffer.getInt(position + Longs.BYTES + (Ints.BYTES * i))); + } + return new RowBasedKey(timestamp, dimensions); + } + + @Override + public Grouper.KeyComparator comparator() + { + if (sortableIds == null) { + Map sortedMap = Maps.newTreeMap(); + for (int id = 0; id < dictionary.size(); id++) { + sortedMap.put(dictionary.get(id), id); + } + sortableIds = new int[dictionary.size()]; + int index = 0; + for (final Integer id : sortedMap.values()) { + sortableIds[id] = index++; + } + } + + return new Grouper.KeyComparator() + { + @Override + public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) + { + final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); + if (timeCompare != 0) { + return timeCompare; + } + + for (int i = 0; i < dimCount; i++) { + final int cmp = Ints.compare( + sortableIds[lhsBuffer.getInt(lhsPosition + Longs.BYTES + (Ints.BYTES * i))], + sortableIds[rhsBuffer.getInt(rhsPosition + Longs.BYTES + (Ints.BYTES * i))] + ); + + if (cmp != 0) { + return cmp; + } + } + + return 0; + } + }; + } + + @Override + public void reset() + { + dictionary.clear(); + reverseDictionary.clear(); + sortableIds = null; + currentEstimatedSize = 0; + } + + /** + * Adds s to the dictionary. If the dictionary's size limit would be exceeded by adding this key, then + * this returns -1. + * + * @param s a string + * + * @return id for this string, or -1 + */ + private int addToDictionary(final String s) + { + Integer idx = reverseDictionary.get(s); + if (idx == null) { + final long additionalEstimatedSize = (long) s.length() * Chars.BYTES + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY; + if (currentEstimatedSize + additionalEstimatedSize > maxDictionarySize) { + return -1; + } + + idx = dictionary.size(); + reverseDictionary.put(s, idx); + dictionary.add(s); + currentEstimatedSize += additionalEstimatedSize; + } + return idx; + } + } + + private static class RowBasedColumnSelectorFactory implements ColumnSelectorFactory + { + private ThreadLocal row = new ThreadLocal<>(); + + public void setRow(Row row) + { + this.row.set(row); + } + + // This dimension selector does not have an associated lookup dictionary, which means lookup can only be done + // on the same row. This dimension selector is used for applying the extraction function on dimension, which + // requires a DimensionSelector implementation + @Override + public DimensionSelector makeDimensionSelector( + DimensionSpec dimensionSpec + ) + { + return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec)); + } + + private DimensionSelector makeDimensionSelectorUndecorated( + DimensionSpec dimensionSpec + ) + { + final String dimension = dimensionSpec.getDimension(); + final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); + + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + final List dimensionValues = row.get().getDimension(dimension); + final ArrayList vals = Lists.newArrayList(); + if (dimensionValues != null) { + for (int i = 0; i < dimensionValues.size(); ++i) { + vals.add(i); + } + } + + return new IndexedInts() + { + @Override + public int size() + { + return vals.size(); + } + + @Override + public int get(int index) + { + return vals.get(index); + } + + @Override + public Iterator iterator() + { + return vals.iterator(); + } + + @Override + public void close() throws IOException + { + + } + + @Override + public void fill(int index, int[] toFill) + { + throw new UnsupportedOperationException("fill not supported"); + } + }; + } + + @Override + public int getValueCardinality() + { + throw new UnsupportedOperationException("value cardinality is unknown"); + } + + @Override + public String lookupName(int id) + { + final String value = row.get().getDimension(dimension).get(id); + return extractionFn == null ? value : extractionFn.apply(value); + } + + @Override + public int lookupId(String name) + { + if (extractionFn != null) { + throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function"); + } + return row.get().getDimension(dimension).indexOf(name); + } + }; + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(final String columnName) + { + return new FloatColumnSelector() + { + @Override + public float get() + { + return row.get().getFloatMetric(columnName); + } + }; + } + + @Override + public LongColumnSelector makeLongColumnSelector(final String columnName) + { + if (columnName.equals(Column.TIME_COLUMN_NAME)) { + return new LongColumnSelector() + { + @Override + public long get() + { + return row.get().getTimestampFromEpoch(); + } + }; + } + return new LongColumnSelector() + { + @Override + public long get() + { + return row.get().getLongMetric(columnName); + } + }; + } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(final String columnName) + { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Object.class; + } + + @Override + public Object get() + { + return row.get().getRaw(columnName); + } + }; + } + + @Override + public ColumnCapabilities getColumnCapabilities(String columnName) + { + // We don't have any information on the column value type, returning null defaults type to string + return null; + } + } + +} diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java index 758325a6231..301a92487cc 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java @@ -36,6 +36,12 @@ public interface GroupByStrategy Map responseContext ); + Sequence processSubqueryResult( + GroupByQuery subquery, + GroupByQuery query, + Sequence subqueryResult + ); + QueryRunner mergeRunners( ListeningExecutorService exec, Iterable> queryRunners diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 546fd14c400..0786eda4a91 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -19,30 +19,42 @@ package io.druid.query.groupby.strategy; +import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; +import com.metamx.common.IAE; import com.metamx.common.guava.ResourceClosingSequence; import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; import io.druid.collections.StupidPool; import io.druid.data.input.Row; import io.druid.guice.annotations.Global; import io.druid.query.GroupByMergedQueryRunner; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryHelper; import io.druid.query.groupby.GroupByQueryQueryToolChest; +import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.StorageAdapter; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.joda.time.Interval; import java.nio.ByteBuffer; import java.util.Map; +import java.util.Set; public class GroupByStrategyV1 implements GroupByStrategy { @@ -109,6 +121,97 @@ public class GroupByStrategyV1 implements GroupByStrategy return new ResourceClosingSequence<>(query.applyLimit(GroupByQueryHelper.postAggregate(query, index)), index); } + @Override + public Sequence processSubqueryResult( + GroupByQuery subquery, GroupByQuery query, Sequence subqueryResult + ) + { + final Set aggs = Sets.newHashSet(); + + // Nested group-bys work by first running the inner query and then materializing the results in an incremental + // index which the outer query is then run against. To build the incremental index, we use the fieldNames from + // the aggregators for the outer query to define the column names so that the index will match the query. If + // there are multiple types of aggregators in the outer query referencing the same fieldName, we will try to build + // multiple columns of the same name using different aggregator types and will fail. Here, we permit multiple + // aggregators of the same type referencing the same fieldName (and skip creating identical columns for the + // subsequent ones) and return an error if the aggregator types are different. + for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) { + for (final AggregatorFactory transferAgg : aggregatorFactory.getRequiredColumns()) { + if (Iterables.any(aggs, new Predicate() + { + @Override + public boolean apply(AggregatorFactory agg) + { + return agg.getName().equals(transferAgg.getName()) && !agg.equals(transferAgg); + } + })) { + throw new IAE("Inner aggregator can currently only be referenced by a single type of outer aggregator" + + " for '%s'", transferAgg.getName()); + } + + aggs.add(transferAgg); + } + } + + // We need the inner incremental index to have all the columns required by the outer query + final GroupByQuery innerQuery = new GroupByQuery.Builder(subquery) + .setAggregatorSpecs(Lists.newArrayList(aggs)) + .setInterval(subquery.getIntervals()) + .setPostAggregatorSpecs(Lists.newArrayList()) + .build(); + + final GroupByQuery outerQuery = new GroupByQuery.Builder(query) + .setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec())) + .build(); + + final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex( + innerQuery.withOverriddenContext( + ImmutableMap.of( + GroupByQueryHelper.CTX_KEY_SORT_RESULTS, true + ) + ), + subqueryResult + ); + + //Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which + //is ensured by QuerySegmentSpec. + //GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval + //and concatenate the results. + final IncrementalIndex outerQueryResultIndex = makeIncrementalIndex( + outerQuery, + Sequences.concat( + Sequences.map( + Sequences.simple(outerQuery.getIntervals()), + new Function>() + { + @Override + public Sequence apply(Interval interval) + { + return process( + outerQuery.withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(ImmutableList.of(interval)) + ), + new IncrementalIndexStorageAdapter(innerQueryResultIndex) + ); + } + } + ) + ) + ); + + innerQueryResultIndex.close(); + + return new ResourceClosingSequence<>( + outerQuery.applyLimit(GroupByQueryHelper.postAggregate(query, outerQueryResultIndex)), + outerQueryResultIndex + ); + } + + private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence rows) + { + return GroupByQueryHelper.makeIncrementalIndex(query, configSupplier.get(), bufferPool, rows); + } + @Override public QueryRunner mergeRunners( final ListeningExecutorService exec, diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index acc9856ffae..75648390840 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -41,6 +41,7 @@ import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Merging; import io.druid.guice.annotations.Smile; import io.druid.query.DruidProcessingConfig; +import io.druid.query.NoopQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; @@ -51,6 +52,7 @@ import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.epinephelinae.GroupByBinaryFnV2; import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; +import io.druid.query.groupby.epinephelinae.GroupByRowProcessor; import io.druid.segment.StorageAdapter; import org.joda.time.DateTime; @@ -178,6 +180,28 @@ public class GroupByStrategyV2 implements GroupByStrategy ); } + @Override + public Sequence processSubqueryResult( + GroupByQuery subquery, GroupByQuery query, Sequence subqueryResult + ) + { + final Sequence results = GroupByRowProcessor.process( + query, + subqueryResult, + configSupplier.get(), + mergeBufferPool, + spillMapper + ); + return mergeResults(new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + return results; + } + }, query, null); + } + @Override public QueryRunner mergeRunners( ListeningExecutorService exec, diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 3c16b23a939..c30384b7770 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -58,6 +58,7 @@ import io.druid.query.aggregation.FilteredAggregatorFactory; import io.druid.query.aggregation.JavaScriptAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.PostAggregator; +import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.post.ArithmeticPostAggregator; @@ -4463,6 +4464,222 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testSubqueryWithOuterFilterAggregator() + { + final GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("market", "market"), + new DefaultDimensionSpec("quality", "quality"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("index", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final DimFilter filter = new SelectorDimFilter("market", "spot", null); + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval) + .setDimensions(Lists.newArrayList()) + .setAggregatorSpecs( + ImmutableList.of( + new FilteredAggregatorFactory(QueryRunnerTestHelper.rowsCount, filter) + ) + ) + .setGranularity(QueryRunnerTestHelper.allGran) + .build(); + + // v2 strategy would throw an exception for this because the dimension selector in GroupByRowProcessor is not + // dictionary encoded, but instead require the row to be set when doing lookup. Null Pointer Exception occurs + // when the filter aggregator is initialized with no row set. + if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + expectedException.expect(NullPointerException.class); + GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + } else { + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01", "rows", 837L) + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + } + + @Test + public void testSubqueryWithOuterTimeFilter() + { + final GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("market", "market"), + new DefaultDimensionSpec("quality", "quality"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("index", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final DimFilter fridayFilter = new SelectorDimFilter(Column.TIME_COLUMN_NAME, "Friday", new TimeFormatExtractionFn("EEEE", null, null)); + final DimFilter firstDaysFilter = new InDimFilter(Column.TIME_COLUMN_NAME, ImmutableList.of("1", "2", "3"), new TimeFormatExtractionFn("d", null, null)); + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval) + .setDimensions(Lists.newArrayList()) + .setDimFilter(firstDaysFilter) + .setAggregatorSpecs( + ImmutableList.of( + new FilteredAggregatorFactory(QueryRunnerTestHelper.rowsCount, fridayFilter) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-01", "rows", 0L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-02", "rows", 0L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-03", "rows", 0L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-01", "rows", 0L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-02", "rows", 0L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-03", "rows", 0L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "rows", 13L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "rows", 0L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-03", "rows", 0L) + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test + public void testSubqueryWithOuterCardinalityAggregator() + { + final GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("market", "market"), + new DefaultDimensionSpec("quality", "quality"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("index", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval) + .setDimensions(Lists.newArrayList()) + .setAggregatorSpecs( + ImmutableList.of( + new CardinalityAggregatorFactory("car", ImmutableList.of("quality"), false) + ) + ) + .setGranularity(QueryRunnerTestHelper.allGran) + .build(); + + // v1 strategy would throw an exception for this because it calls AggregatorFactory.getRequiredColumns to get + // aggregator for all fields to build the inner query result incremental index. When a field type does not match + // aggregator value type, parse exception occurs. In this case, quality is a string field but getRequiredColumn + // returned a Cardinality aggregator for it, which has type hyperUnique. Since this is a complex type, no converter + // is found for it and NullPointerException occurs when it tries to use the converter. + if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) { + expectedException.expect(NullPointerException.class); + GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + } else { + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01", "car", QueryRunnerTestHelper.UNIQUES_9) + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + } + + @Test + public void testSubqueryWithOuterJavascriptAggregators() + { + final GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("market", "market"), + new DefaultDimensionSpec("quality", "quality"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("index", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "quality"))) + .setAggregatorSpecs( + Arrays.asList( + new JavaScriptAggregatorFactory( + "js_agg", + Arrays.asList("index", "market"), + "function(current, index, dim){return current + index + dim.length;}", + "function(){return 0;}", + "function(a,b){return a + b;}", + JavaScriptConfig.getDefault() + ) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + // v1 strategy would throw an exception for this because it calls AggregatorFactory.getRequiredColumns to get + // aggregator for all fields to build the inner query result incremental index. When a field type does not match + // aggregator value type, parse exception occurs. In this case, market is a string field but getRequiredColumn + // returned a Javascript aggregator for it, which has type float. + if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) { + expectedException.expect(ParseException.class); + GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + } else { + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "automotive", "js_agg", 139D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "business", "js_agg", 122D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "entertainment", "js_agg", 162D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "health", "js_agg", 124D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "mezzanine", "js_agg", 2893D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "news", "js_agg", 125D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "premium", "js_agg", 2923D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "technology", "js_agg", 82D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "travel", "js_agg", 123D), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "automotive", "js_agg", 151D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "business", "js_agg", 116D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "entertainment", "js_agg", 170D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "health", "js_agg", 117D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "mezzanine", "js_agg", 2470D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "news", "js_agg", 118D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "premium", "js_agg", 2528D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "technology", "js_agg", 101D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "travel", "js_agg", 130D) + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + } + @Test public void testSubqueryWithHyperUniques() {