mirror of https://github.com/apache/druid.git
groupBy nested query using v2 strategy (#3269)
* changed v2 nested query strategy * add test for #3239 * update for new ValueMatcher interface and add benchmarks * enable time filtering * address PR comments * add failing test for outer filter aggregator * add helper class for sharing code * update nested groupby doc * move temporary storage instantiation * address PR comment * address PR comment 2
This commit is contained in:
parent
d51ec398d4
commit
bc20658239
|
@ -116,7 +116,7 @@ public class GroupByBenchmark
|
|||
@Param({"100000"})
|
||||
private int rowsPerSegment;
|
||||
|
||||
@Param({"basic.A"})
|
||||
@Param({"basic.A", "basic.nested"})
|
||||
private String schemaAndQuery;
|
||||
|
||||
@Param({"v1", "v2"})
|
||||
|
@ -190,6 +190,44 @@ public class GroupByBenchmark
|
|||
basicQueries.put("A", queryA);
|
||||
}
|
||||
|
||||
{ // basic.nested
|
||||
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval()));
|
||||
List<AggregatorFactory> queryAggs = new ArrayList<>();
|
||||
queryAggs.add(new LongSumAggregatorFactory(
|
||||
"sumLongSequential",
|
||||
"sumLongSequential"
|
||||
));
|
||||
|
||||
GroupByQuery subqueryA = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource("blah")
|
||||
.setQuerySegmentSpec(intervalSpec)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(
|
||||
new DefaultDimensionSpec("dimSequential", null),
|
||||
new DefaultDimensionSpec("dimZipf", null)
|
||||
))
|
||||
.setAggregatorSpecs(
|
||||
queryAggs
|
||||
)
|
||||
.setGranularity(QueryGranularities.DAY)
|
||||
.build();
|
||||
|
||||
GroupByQuery queryA = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(subqueryA)
|
||||
.setQuerySegmentSpec(intervalSpec)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(
|
||||
new DefaultDimensionSpec("dimSequential", null)
|
||||
))
|
||||
.setAggregatorSpecs(
|
||||
queryAggs
|
||||
)
|
||||
.setGranularity(QueryGranularities.WEEK)
|
||||
.build();
|
||||
|
||||
basicQueries.put("nested", queryA);
|
||||
}
|
||||
|
||||
SCHEMA_QUERY_MAP.put("basic", basicQueries);
|
||||
}
|
||||
|
||||
|
@ -257,9 +295,11 @@ public class GroupByBenchmark
|
|||
new OffheapBufferGenerator("compute", 250000000),
|
||||
Integer.MAX_VALUE
|
||||
);
|
||||
|
||||
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
|
||||
BlockingPool<ByteBuffer> mergePool = new BlockingPool<>(
|
||||
new OffheapBufferGenerator("merge", 250000000),
|
||||
1
|
||||
2
|
||||
);
|
||||
final GroupByQueryConfig config = new GroupByQueryConfig()
|
||||
{
|
||||
|
|
|
@ -150,11 +150,11 @@ results acceptable.
|
|||
|
||||
#### Nested groupBys
|
||||
|
||||
Nested groupBys (dataSource of type "query") are performed the same way for both "v1" and "v2". The broker runs the
|
||||
inner groupBy query in the usual way, then materializes the inner query's results, then runs the outer query on those
|
||||
materialized results. In particular, the outer query is not distributed at all; it takes place completely on the broker.
|
||||
Currently the materialized results are stored on-heap in the broker, and the outer query is done in a single-threaded
|
||||
fashion.
|
||||
Nested groupBys (dataSource of type "query") are performed differently for "v1" and "v2". The broker first runs the
|
||||
inner groupBy query in the usual way. "v1" strategy then materializes the inner query's results on-heap with Druid's
|
||||
indexing mechanism, and runs the outer query on these materialized results. "v2" strategy runs the outer query on the
|
||||
inner query's results stream with off-heap fact map and on-heap string dictionary that can spill to disk. Both
|
||||
strategy perform the outer query on the broker in a single-threaded fashion.
|
||||
|
||||
#### Server configuration
|
||||
|
||||
|
|
|
@ -163,7 +163,7 @@ public class GroupByQueryHelper
|
|||
return new Pair<>(init, accumulator);
|
||||
}
|
||||
|
||||
// Used by GroupByQueryQueryToolChest, GroupByStrategyV1
|
||||
// Used by GroupByStrategyV1
|
||||
public static IncrementalIndex makeIncrementalIndex(
|
||||
GroupByQuery query,
|
||||
GroupByQueryConfig config,
|
||||
|
@ -180,7 +180,7 @@ public class GroupByQueryHelper
|
|||
return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
|
||||
}
|
||||
|
||||
// Used by GroupByQueryQueryToolChest, GroupByStrategyV1
|
||||
// Used by GroupByStrategyV1
|
||||
public static Sequence<Row> postAggregate(final GroupByQuery query, IncrementalIndex index)
|
||||
{
|
||||
return Sequences.map(
|
||||
|
|
|
@ -24,19 +24,14 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.ResourceClosingSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.collections.StupidPool;
|
||||
import io.druid.data.input.MapBasedRow;
|
||||
|
@ -56,18 +51,12 @@ import io.druid.query.QueryToolChest;
|
|||
import io.druid.query.SubqueryQueryRunner;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.MetricManipulationFn;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.groupby.strategy.GroupByStrategy;
|
||||
import io.druid.query.groupby.strategy.GroupByStrategySelector;
|
||||
import io.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -167,96 +156,12 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
runner,
|
||||
context
|
||||
);
|
||||
final Set<AggregatorFactory> aggs = Sets.newHashSet();
|
||||
|
||||
// Nested group-bys work by first running the inner query and then materializing the results in an incremental
|
||||
// index which the outer query is then run against. To build the incremental index, we use the fieldNames from
|
||||
// the aggregators for the outer query to define the column names so that the index will match the query. If
|
||||
// there are multiple types of aggregators in the outer query referencing the same fieldName, we will try to build
|
||||
// multiple columns of the same name using different aggregator types and will fail. Here, we permit multiple
|
||||
// aggregators of the same type referencing the same fieldName (and skip creating identical columns for the
|
||||
// subsequent ones) and return an error if the aggregator types are different.
|
||||
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
|
||||
for (final AggregatorFactory transferAgg : aggregatorFactory.getRequiredColumns()) {
|
||||
if (Iterables.any(aggs, new Predicate<AggregatorFactory>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(AggregatorFactory agg)
|
||||
{
|
||||
return agg.getName().equals(transferAgg.getName()) && !agg.equals(transferAgg);
|
||||
}
|
||||
})) {
|
||||
throw new IAE("Inner aggregator can currently only be referenced by a single type of outer aggregator" +
|
||||
" for '%s'", transferAgg.getName());
|
||||
}
|
||||
|
||||
aggs.add(transferAgg);
|
||||
}
|
||||
}
|
||||
|
||||
// We need the inner incremental index to have all the columns required by the outer query
|
||||
final GroupByQuery innerQuery = new GroupByQuery.Builder(subquery)
|
||||
.setAggregatorSpecs(Lists.newArrayList(aggs))
|
||||
.setInterval(subquery.getIntervals())
|
||||
.setPostAggregatorSpecs(Lists.<PostAggregator>newArrayList())
|
||||
.build();
|
||||
|
||||
final GroupByQuery outerQuery = new GroupByQuery.Builder(query)
|
||||
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
|
||||
.build();
|
||||
|
||||
final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex(
|
||||
innerQuery.withOverriddenContext(
|
||||
ImmutableMap.<String, Object>of(
|
||||
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, true
|
||||
)
|
||||
),
|
||||
subqueryResult
|
||||
);
|
||||
|
||||
//Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which
|
||||
//is ensured by QuerySegmentSpec.
|
||||
//GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval
|
||||
//and concatenate the results.
|
||||
final GroupByStrategy strategy = strategySelector.strategize(query);
|
||||
final IncrementalIndex outerQueryResultIndex = makeIncrementalIndex(
|
||||
outerQuery,
|
||||
Sequences.concat(
|
||||
Sequences.map(
|
||||
Sequences.simple(outerQuery.getIntervals()),
|
||||
new Function<Interval, Sequence<Row>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Row> apply(Interval interval)
|
||||
{
|
||||
return strategy.process(
|
||||
outerQuery.withQuerySegmentSpec(
|
||||
new MultipleIntervalSegmentSpec(ImmutableList.of(interval))
|
||||
),
|
||||
new IncrementalIndexStorageAdapter(innerQueryResultIndex)
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
innerQueryResultIndex.close();
|
||||
|
||||
return new ResourceClosingSequence<>(
|
||||
outerQuery.applyLimit(GroupByQueryHelper.postAggregate(query, outerQueryResultIndex)),
|
||||
outerQueryResultIndex
|
||||
);
|
||||
return strategySelector.strategize(query).processSubqueryResult(subquery, query, subqueryResult);
|
||||
} else {
|
||||
return strategySelector.strategize(query).mergeResults(runner, query, context);
|
||||
}
|
||||
}
|
||||
|
||||
private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence<Row> rows)
|
||||
{
|
||||
return GroupByQueryHelper.makeIncrementalIndex(query, configSupplier.get(), bufferPool, rows);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceMetricEvent.Builder makeMetricBuilder(GroupByQuery query)
|
||||
{
|
||||
|
|
|
@ -19,25 +19,19 @@
|
|||
|
||||
package io.druid.query.groupby.epinephelinae;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.primitives.Chars;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.BaseSequence;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
|
@ -47,8 +41,8 @@ import io.druid.collections.BlockingPool;
|
|||
import io.druid.collections.ReferenceCountingResourceHolder;
|
||||
import io.druid.collections.Releaser;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.data.input.MapBasedRow;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.query.AbstractPrioritizedCallable;
|
||||
import io.druid.query.BaseQuery;
|
||||
import io.druid.query.ChainedExecutionQueryRunner;
|
||||
|
@ -58,22 +52,14 @@ import io.druid.query.QueryInterruptedException;
|
|||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryWatcher;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.query.groupby.strategy.GroupByStrategyV2;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
@ -156,24 +142,22 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
|
|||
: System.currentTimeMillis() + queryTimeout.longValue();
|
||||
|
||||
return new BaseSequence<>(
|
||||
new BaseSequence.IteratorMaker<Row, CloseableGrouperIterator<GroupByMergingKey, Row>>()
|
||||
new BaseSequence.IteratorMaker<Row, CloseableGrouperIterator<RowBasedKey, Row>>()
|
||||
{
|
||||
@Override
|
||||
public CloseableGrouperIterator<GroupByMergingKey, Row> make()
|
||||
public CloseableGrouperIterator<RowBasedKey, Row> make()
|
||||
{
|
||||
final List<Closeable> closeOnFailure = Lists.newArrayList();
|
||||
|
||||
try {
|
||||
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
|
||||
final LimitedTemporaryStorage temporaryStorage;
|
||||
final Grouper<GroupByMergingKey> grouper;
|
||||
|
||||
temporaryStorage = new LimitedTemporaryStorage(
|
||||
final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
|
||||
temporaryStorageDirectory,
|
||||
querySpecificConfig.getMaxOnDiskStorage()
|
||||
);
|
||||
|
||||
closeOnFailure.add(temporaryStorage);
|
||||
|
||||
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
|
||||
try {
|
||||
// This will potentially block if there are no merge buffers left in the pool.
|
||||
final long timeout = timeoutAt - System.currentTimeMillis();
|
||||
|
@ -187,61 +171,23 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
|
|||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
final GroupByMergingKeySerdeFactory keySerdeFactory = new GroupByMergingKeySerdeFactory(
|
||||
query.getDimensions().size(),
|
||||
querySpecificConfig.getMaxMergingDictionarySize() / concurrencyHint
|
||||
);
|
||||
final GroupByMergingColumnSelectorFactory columnSelectorFactory = new GroupByMergingColumnSelectorFactory();
|
||||
|
||||
grouper = new ConcurrentGrouper<>(
|
||||
Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
|
||||
query,
|
||||
false,
|
||||
config,
|
||||
mergeBufferHolder.get(),
|
||||
concurrencyHint,
|
||||
temporaryStorage,
|
||||
spillMapper,
|
||||
querySpecificConfig.getBufferGrouperMaxSize(),
|
||||
querySpecificConfig.getBufferGrouperInitialBuckets(),
|
||||
keySerdeFactory,
|
||||
columnSelectorFactory,
|
||||
combiningAggregatorFactories
|
||||
);
|
||||
final Grouper<RowBasedKey> grouper = pair.lhs;
|
||||
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = pair.rhs;
|
||||
closeOnFailure.add(grouper);
|
||||
|
||||
final Accumulator<Grouper<GroupByMergingKey>, Row> accumulator = new Accumulator<Grouper<GroupByMergingKey>, Row>()
|
||||
{
|
||||
@Override
|
||||
public Grouper<GroupByMergingKey> accumulate(
|
||||
final Grouper<GroupByMergingKey> theGrouper,
|
||||
final Row row
|
||||
)
|
||||
{
|
||||
if (theGrouper == null) {
|
||||
// Pass-through null returns without doing more work.
|
||||
return null;
|
||||
}
|
||||
|
||||
final long timestamp = row.getTimestampFromEpoch();
|
||||
|
||||
final String[] dimensions = new String[query.getDimensions().size()];
|
||||
for (int i = 0; i < dimensions.length; i++) {
|
||||
final Object dimValue = row.getRaw(query.getDimensions().get(i).getOutputName());
|
||||
dimensions[i] = Strings.nullToEmpty((String) dimValue);
|
||||
}
|
||||
|
||||
columnSelectorFactory.setRow(row);
|
||||
final boolean didAggregate = theGrouper.aggregate(new GroupByMergingKey(timestamp, dimensions));
|
||||
if (!didAggregate) {
|
||||
// null return means grouping resources were exhausted.
|
||||
return null;
|
||||
}
|
||||
columnSelectorFactory.setRow(null);
|
||||
|
||||
return theGrouper;
|
||||
}
|
||||
};
|
||||
|
||||
final int priority = BaseQuery.getContextPriority(query, 0);
|
||||
|
||||
final ReferenceCountingResourceHolder<Grouper<GroupByMergingKey>> grouperHolder = new ReferenceCountingResourceHolder<>(
|
||||
final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder = new ReferenceCountingResourceHolder<>(
|
||||
grouper,
|
||||
new Closeable()
|
||||
{
|
||||
|
@ -319,35 +265,9 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
|
|||
|
||||
waitForFutureCompletion(query, futures, timeoutAt - System.currentTimeMillis());
|
||||
|
||||
return new CloseableGrouperIterator<>(
|
||||
return RowBasedGrouperHelper.makeGrouperIterator(
|
||||
grouper,
|
||||
true,
|
||||
new Function<Grouper.Entry<GroupByMergingKey>, Row>()
|
||||
{
|
||||
@Override
|
||||
public Row apply(Grouper.Entry<GroupByMergingKey> entry)
|
||||
{
|
||||
Map<String, Object> theMap = Maps.newLinkedHashMap();
|
||||
|
||||
// Add dimensions.
|
||||
for (int i = 0; i < entry.getKey().getDimensions().length; i++) {
|
||||
theMap.put(
|
||||
query.getDimensions().get(i).getOutputName(),
|
||||
Strings.emptyToNull(entry.getKey().getDimensions()[i])
|
||||
);
|
||||
}
|
||||
|
||||
// Add aggregations.
|
||||
for (int i = 0; i < entry.getValues().length; i++) {
|
||||
theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
|
||||
}
|
||||
|
||||
return new MapBasedRow(
|
||||
query.getGranularity().toDateTime(entry.getKey().getTimestamp()),
|
||||
theMap
|
||||
);
|
||||
}
|
||||
},
|
||||
query,
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
|
@ -370,7 +290,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
|
|||
}
|
||||
|
||||
@Override
|
||||
public void cleanup(CloseableGrouperIterator<GroupByMergingKey, Row> iterFromMake)
|
||||
public void cleanup(CloseableGrouperIterator<RowBasedKey, Row> iterFromMake)
|
||||
{
|
||||
iterFromMake.close();
|
||||
}
|
||||
|
@ -420,317 +340,4 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
|
|||
}
|
||||
}
|
||||
|
||||
private static class GroupByMergingKey implements Comparable<GroupByMergingKey>
|
||||
{
|
||||
private final long timestamp;
|
||||
private final String[] dimensions;
|
||||
|
||||
@JsonCreator
|
||||
public GroupByMergingKey(
|
||||
// Using short key names to reduce serialized size when spilling to disk.
|
||||
@JsonProperty("t") long timestamp,
|
||||
@JsonProperty("d") String[] dimensions
|
||||
)
|
||||
{
|
||||
this.timestamp = timestamp;
|
||||
this.dimensions = dimensions;
|
||||
}
|
||||
|
||||
@JsonProperty("t")
|
||||
public long getTimestamp()
|
||||
{
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
@JsonProperty("d")
|
||||
public String[] getDimensions()
|
||||
{
|
||||
return dimensions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
GroupByMergingKey that = (GroupByMergingKey) o;
|
||||
|
||||
if (timestamp != that.timestamp) {
|
||||
return false;
|
||||
}
|
||||
// Probably incorrect - comparing Object[] arrays with Arrays.equals
|
||||
return Arrays.equals(dimensions, that.dimensions);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = (int) (timestamp ^ (timestamp >>> 32));
|
||||
result = 31 * result + Arrays.hashCode(dimensions);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(GroupByMergingKey other)
|
||||
{
|
||||
final int timeCompare = Longs.compare(timestamp, other.getTimestamp());
|
||||
if (timeCompare != 0) {
|
||||
return timeCompare;
|
||||
}
|
||||
|
||||
for (int i = 0; i < dimensions.length; i++) {
|
||||
final int cmp = dimensions[i].compareTo(other.getDimensions()[i]);
|
||||
if (cmp != 0) {
|
||||
return cmp;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "GroupByMergingKey{" +
|
||||
"timestamp=" + timestamp +
|
||||
", dimensions=" + Arrays.toString(dimensions) +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
private static class GroupByMergingKeySerdeFactory implements Grouper.KeySerdeFactory<GroupByMergingKey>
|
||||
{
|
||||
private final int dimCount;
|
||||
private final long maxDictionarySize;
|
||||
|
||||
public GroupByMergingKeySerdeFactory(int dimCount, long maxDictionarySize)
|
||||
{
|
||||
this.dimCount = dimCount;
|
||||
this.maxDictionarySize = maxDictionarySize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Grouper.KeySerde<GroupByMergingKey> factorize()
|
||||
{
|
||||
return new GroupByMergingKeySerde(dimCount, maxDictionarySize);
|
||||
}
|
||||
}
|
||||
|
||||
private static class GroupByMergingKeySerde implements Grouper.KeySerde<GroupByMergingKey>
|
||||
{
|
||||
// Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes
|
||||
private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Longs.BYTES * 5 + Ints.BYTES;
|
||||
|
||||
private final int dimCount;
|
||||
private final int keySize;
|
||||
private final ByteBuffer keyBuffer;
|
||||
private final List<String> dictionary = Lists.newArrayList();
|
||||
private final Map<String, Integer> reverseDictionary = Maps.newHashMap();
|
||||
|
||||
// Size limiting for the dictionary, in (roughly estimated) bytes.
|
||||
private final long maxDictionarySize;
|
||||
private long currentEstimatedSize = 0;
|
||||
|
||||
// dictionary id -> its position if it were sorted by dictionary value
|
||||
private int[] sortableIds = null;
|
||||
|
||||
public GroupByMergingKeySerde(final int dimCount, final long maxDictionarySize)
|
||||
{
|
||||
this.dimCount = dimCount;
|
||||
this.maxDictionarySize = maxDictionarySize;
|
||||
this.keySize = Longs.BYTES + dimCount * Ints.BYTES;
|
||||
this.keyBuffer = ByteBuffer.allocate(keySize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int keySize()
|
||||
{
|
||||
return keySize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<GroupByMergingKey> keyClazz()
|
||||
{
|
||||
return GroupByMergingKey.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer(GroupByMergingKey key)
|
||||
{
|
||||
keyBuffer.rewind();
|
||||
keyBuffer.putLong(key.getTimestamp());
|
||||
for (int i = 0; i < key.getDimensions().length; i++) {
|
||||
final int id = addToDictionary(key.getDimensions()[i]);
|
||||
if (id < 0) {
|
||||
return null;
|
||||
}
|
||||
keyBuffer.putInt(id);
|
||||
}
|
||||
keyBuffer.flip();
|
||||
return keyBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GroupByMergingKey fromByteBuffer(ByteBuffer buffer, int position)
|
||||
{
|
||||
final long timestamp = buffer.getLong(position);
|
||||
final String[] dimensions = new String[dimCount];
|
||||
for (int i = 0; i < dimensions.length; i++) {
|
||||
dimensions[i] = dictionary.get(buffer.getInt(position + Longs.BYTES + (Ints.BYTES * i)));
|
||||
}
|
||||
return new GroupByMergingKey(timestamp, dimensions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Grouper.KeyComparator comparator()
|
||||
{
|
||||
if (sortableIds == null) {
|
||||
Map<String, Integer> sortedMap = Maps.newTreeMap();
|
||||
for (int id = 0; id < dictionary.size(); id++) {
|
||||
sortedMap.put(dictionary.get(id), id);
|
||||
}
|
||||
sortableIds = new int[dictionary.size()];
|
||||
int index = 0;
|
||||
for (final Integer id : sortedMap.values()) {
|
||||
sortableIds[id] = index++;
|
||||
}
|
||||
}
|
||||
|
||||
return new Grouper.KeyComparator()
|
||||
{
|
||||
@Override
|
||||
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
|
||||
{
|
||||
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
|
||||
if (timeCompare != 0) {
|
||||
return timeCompare;
|
||||
}
|
||||
|
||||
for (int i = 0; i < dimCount; i++) {
|
||||
final int cmp = Ints.compare(
|
||||
sortableIds[lhsBuffer.getInt(lhsPosition + Longs.BYTES + (Ints.BYTES * i))],
|
||||
sortableIds[rhsBuffer.getInt(rhsPosition + Longs.BYTES + (Ints.BYTES * i))]
|
||||
);
|
||||
|
||||
if (cmp != 0) {
|
||||
return cmp;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset()
|
||||
{
|
||||
dictionary.clear();
|
||||
reverseDictionary.clear();
|
||||
sortableIds = null;
|
||||
currentEstimatedSize = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds s to the dictionary. If the dictionary's size limit would be exceeded by adding this key, then
|
||||
* this returns -1.
|
||||
*
|
||||
* @param s a string
|
||||
*
|
||||
* @return id for this string, or -1
|
||||
*/
|
||||
private int addToDictionary(final String s)
|
||||
{
|
||||
Integer idx = reverseDictionary.get(s);
|
||||
if (idx == null) {
|
||||
final long additionalEstimatedSize = (long) s.length() * Chars.BYTES + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY;
|
||||
if (currentEstimatedSize + additionalEstimatedSize > maxDictionarySize) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
idx = dictionary.size();
|
||||
reverseDictionary.put(s, idx);
|
||||
dictionary.add(s);
|
||||
currentEstimatedSize += additionalEstimatedSize;
|
||||
}
|
||||
return idx;
|
||||
}
|
||||
}
|
||||
|
||||
private static class GroupByMergingColumnSelectorFactory implements ColumnSelectorFactory
|
||||
{
|
||||
private ThreadLocal<Row> row = new ThreadLocal<>();
|
||||
|
||||
public void setRow(Row row)
|
||||
{
|
||||
this.row.set(row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
|
||||
{
|
||||
// Combining factories shouldn't need dimension selectors, that'd be weird.
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
|
||||
{
|
||||
return new FloatColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public float get()
|
||||
{
|
||||
return row.get().getFloatMetric(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongColumnSelector makeLongColumnSelector(final String columnName)
|
||||
{
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return row.get().getLongMetric(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectColumnSelector makeObjectColumnSelector(final String columnName)
|
||||
{
|
||||
return new ObjectColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return Object.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return row.get().getRaw(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
// getColumnCapabilities() is only used by FilteredAggregatorFactory for determining dimension types.
|
||||
// Since FilteredAggregatorFactory only works with ColumnSelectorFactory implementations
|
||||
// that support makeDimensionSelector(), this method is also left unsupported.
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,271 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.groupby.epinephelinae;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.BaseSequence;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
import com.metamx.common.guava.FilteredSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.collections.BlockingPool;
|
||||
import io.druid.collections.ReferenceCountingResourceHolder;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryContextKeys;
|
||||
import io.druid.query.QueryInterruptedException;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.filter.BooleanValueMatcher;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class GroupByRowProcessor
|
||||
{
|
||||
private static final Logger log = new Logger(GroupByRowProcessor.class);
|
||||
|
||||
public static Sequence<Row> process(
|
||||
final Query queryParam,
|
||||
final Sequence<Row> rows,
|
||||
final GroupByQueryConfig config,
|
||||
final BlockingPool<ByteBuffer> mergeBufferPool,
|
||||
final ObjectMapper spillMapper
|
||||
)
|
||||
{
|
||||
final GroupByQuery query = (GroupByQuery) queryParam;
|
||||
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
|
||||
|
||||
final AggregatorFactory[] aggregatorFactories = new AggregatorFactory[query.getAggregatorSpecs().size()];
|
||||
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
|
||||
aggregatorFactories[i] = query.getAggregatorSpecs().get(i);
|
||||
}
|
||||
|
||||
final File temporaryStorageDirectory = new File(
|
||||
System.getProperty("java.io.tmpdir"),
|
||||
String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId())
|
||||
);
|
||||
|
||||
final long timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT);
|
||||
final List<Interval> queryIntervals = query.getIntervals();
|
||||
final Filter filter = Filters.convertToCNFFromQueryContext(
|
||||
query,
|
||||
Filters.toFilter(query.getDimFilter())
|
||||
);
|
||||
final RowBasedValueMatcherFactory filterMatcherFactory = new RowBasedValueMatcherFactory();
|
||||
final ValueMatcher filterMatcher = filter == null
|
||||
? new BooleanValueMatcher(true)
|
||||
: filter.makeMatcher(filterMatcherFactory);
|
||||
|
||||
final FilteredSequence<Row> filteredSequence = new FilteredSequence<>(
|
||||
rows,
|
||||
new Predicate<Row>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(Row input)
|
||||
{
|
||||
boolean inInterval = false;
|
||||
DateTime rowTime = input.getTimestamp();
|
||||
for (Interval queryInterval : queryIntervals) {
|
||||
if (queryInterval.contains(rowTime)) {
|
||||
inInterval = true;
|
||||
}
|
||||
}
|
||||
if (!inInterval) {
|
||||
return false;
|
||||
}
|
||||
filterMatcherFactory.setRow(input);
|
||||
return filterMatcher.matches();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return new BaseSequence<>(
|
||||
new BaseSequence.IteratorMaker<Row, CloseableGrouperIterator<RowBasedKey, Row>>()
|
||||
{
|
||||
@Override
|
||||
public CloseableGrouperIterator<RowBasedKey, Row> make()
|
||||
{
|
||||
final List<Closeable> closeOnFailure = Lists.newArrayList();
|
||||
|
||||
try {
|
||||
final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
|
||||
temporaryStorageDirectory,
|
||||
querySpecificConfig.getMaxOnDiskStorage()
|
||||
);
|
||||
|
||||
closeOnFailure.add(temporaryStorage);
|
||||
|
||||
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
|
||||
try {
|
||||
// This will potentially block if there are no merge buffers left in the pool.
|
||||
if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
|
||||
throw new QueryInterruptedException(new TimeoutException());
|
||||
}
|
||||
closeOnFailure.add(mergeBufferHolder);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
|
||||
query,
|
||||
true,
|
||||
config,
|
||||
mergeBufferHolder.get(),
|
||||
-1,
|
||||
temporaryStorage,
|
||||
spillMapper,
|
||||
aggregatorFactories
|
||||
);
|
||||
final Grouper<RowBasedKey> grouper = pair.lhs;
|
||||
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = pair.rhs;
|
||||
closeOnFailure.add(grouper);
|
||||
|
||||
filteredSequence.accumulate(grouper, accumulator);
|
||||
|
||||
return RowBasedGrouperHelper.makeGrouperIterator(
|
||||
grouper,
|
||||
query,
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
grouper.close();
|
||||
mergeBufferHolder.close();
|
||||
CloseQuietly.close(temporaryStorage);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (Throwable e) {
|
||||
// Exception caught while setting up the iterator; release resources.
|
||||
for (Closeable closeable : Lists.reverse(closeOnFailure)) {
|
||||
CloseQuietly.close(closeable);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup(CloseableGrouperIterator<RowBasedKey, Row> iterFromMake)
|
||||
{
|
||||
iterFromMake.close();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private static class RowBasedValueMatcherFactory implements ValueMatcherFactory
|
||||
{
|
||||
private Row row;
|
||||
|
||||
public void setRow(Row row)
|
||||
{
|
||||
this.row = row;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(final String dimension, final Comparable value)
|
||||
{
|
||||
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
|
||||
return new ValueMatcher()
|
||||
{
|
||||
@Override
|
||||
public boolean matches()
|
||||
{
|
||||
return value.equals(row.getTimestampFromEpoch());
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new ValueMatcher()
|
||||
{
|
||||
@Override
|
||||
public boolean matches()
|
||||
{
|
||||
return row.getDimension(dimension).contains(value);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// There is no easy way to determine the dimension value type from the map based row, so this defaults all
|
||||
// dimensions (except time) to string, and provide the string value matcher. This has some performance impact
|
||||
// on filtering, but should provide the same result. It should be changed to support dimension types when better
|
||||
// type hinting is implemented
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(final String dimension, final DruidPredicateFactory predicateFactory)
|
||||
{
|
||||
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
|
||||
return new ValueMatcher()
|
||||
{
|
||||
final DruidLongPredicate predicate = predicateFactory.makeLongPredicate();
|
||||
|
||||
@Override
|
||||
public boolean matches()
|
||||
{
|
||||
return predicate.applyLong(row.getTimestampFromEpoch());
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new ValueMatcher()
|
||||
{
|
||||
final Predicate<String> predicate = predicateFactory.makeStringPredicate();
|
||||
|
||||
@Override
|
||||
public boolean matches()
|
||||
{
|
||||
List<String> values = row.getDimension(dimension);
|
||||
for (String value : values) {
|
||||
if (predicate.apply(value)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,614 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.groupby.epinephelinae;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.primitives.Chars;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import io.druid.data.input.MapBasedRow;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.granularity.AllGranularity;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
// this class contains shared code between GroupByMergingQueryRunnerV2 and GroupByRowProcessor
|
||||
public class RowBasedGrouperHelper
|
||||
{
|
||||
|
||||
public static Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> createGrouperAccumulatorPair(
|
||||
final GroupByQuery query,
|
||||
final boolean isInputRaw,
|
||||
final GroupByQueryConfig config,
|
||||
final ByteBuffer buffer,
|
||||
final int concurrencyHint,
|
||||
final LimitedTemporaryStorage temporaryStorage,
|
||||
final ObjectMapper spillMapper,
|
||||
final AggregatorFactory[] aggregatorFactories
|
||||
)
|
||||
{
|
||||
// concurrencyHint >= 1 for thread safe groupers, -1 for non-thread-safe
|
||||
Preconditions.checkArgument(concurrencyHint >= 1 || concurrencyHint == -1, "invalid concurrencyHint");
|
||||
|
||||
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
|
||||
final Grouper.KeySerdeFactory<RowBasedKey> keySerdeFactory = new RowBasedKeySerdeFactory(
|
||||
query.getDimensions().size(),
|
||||
querySpecificConfig.getMaxMergingDictionarySize() / (concurrencyHint == -1 ? 1 : concurrencyHint)
|
||||
);
|
||||
final RowBasedColumnSelectorFactory columnSelectorFactory = new RowBasedColumnSelectorFactory();
|
||||
final Grouper<RowBasedKey> grouper;
|
||||
if (concurrencyHint == -1) {
|
||||
grouper = new SpillingGrouper<>(
|
||||
buffer,
|
||||
keySerdeFactory,
|
||||
columnSelectorFactory,
|
||||
aggregatorFactories,
|
||||
temporaryStorage,
|
||||
spillMapper,
|
||||
querySpecificConfig.getBufferGrouperMaxSize(),
|
||||
querySpecificConfig.getBufferGrouperInitialBuckets()
|
||||
);
|
||||
} else {
|
||||
grouper = new ConcurrentGrouper<>(
|
||||
buffer,
|
||||
concurrencyHint,
|
||||
temporaryStorage,
|
||||
spillMapper,
|
||||
querySpecificConfig.getBufferGrouperMaxSize(),
|
||||
querySpecificConfig.getBufferGrouperInitialBuckets(),
|
||||
keySerdeFactory,
|
||||
columnSelectorFactory,
|
||||
aggregatorFactories
|
||||
);
|
||||
}
|
||||
|
||||
final DimensionSelector[] dimensionSelectors;
|
||||
if (isInputRaw) {
|
||||
dimensionSelectors = new DimensionSelector[query.getDimensions().size()];
|
||||
for (int i = 0; i < dimensionSelectors.length; i++) {
|
||||
dimensionSelectors[i] = columnSelectorFactory.makeDimensionSelector(query.getDimensions().get(i));
|
||||
}
|
||||
} else {
|
||||
dimensionSelectors = null;
|
||||
}
|
||||
|
||||
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = new Accumulator<Grouper<RowBasedKey>, Row>()
|
||||
{
|
||||
@Override
|
||||
public Grouper<RowBasedKey> accumulate(
|
||||
final Grouper<RowBasedKey> theGrouper,
|
||||
final Row row
|
||||
)
|
||||
{
|
||||
if (theGrouper == null) {
|
||||
// Pass-through null returns without doing more work.
|
||||
return null;
|
||||
}
|
||||
|
||||
long timestamp = row.getTimestampFromEpoch();
|
||||
if (isInputRaw) {
|
||||
if (query.getGranularity() instanceof AllGranularity) {
|
||||
timestamp = query.getIntervals().get(0).getStartMillis();
|
||||
} else {
|
||||
timestamp = query.getGranularity().truncate(timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
columnSelectorFactory.setRow(row);
|
||||
final String[] dimensions = new String[query.getDimensions().size()];
|
||||
for (int i = 0; i < dimensions.length; i++) {
|
||||
final String value;
|
||||
if (isInputRaw) {
|
||||
IndexedInts index = dimensionSelectors[i].getRow();
|
||||
value = index.size() == 0 ? "" : dimensionSelectors[i].lookupName(index.get(0));
|
||||
} else {
|
||||
value = (String) row.getRaw(query.getDimensions().get(i).getOutputName());
|
||||
}
|
||||
dimensions[i] = Strings.nullToEmpty(value);
|
||||
}
|
||||
|
||||
final boolean didAggregate = theGrouper.aggregate(new RowBasedKey(timestamp, dimensions));
|
||||
if (!didAggregate) {
|
||||
// null return means grouping resources were exhausted.
|
||||
return null;
|
||||
}
|
||||
columnSelectorFactory.setRow(null);
|
||||
|
||||
return theGrouper;
|
||||
}
|
||||
};
|
||||
|
||||
return new Pair<>(grouper, accumulator);
|
||||
}
|
||||
|
||||
public static CloseableGrouperIterator<RowBasedKey, Row> makeGrouperIterator(
|
||||
final Grouper<RowBasedKey> grouper,
|
||||
final GroupByQuery query,
|
||||
final Closeable closeable
|
||||
)
|
||||
{
|
||||
return new CloseableGrouperIterator<>(
|
||||
grouper,
|
||||
true,
|
||||
new Function<Grouper.Entry<RowBasedKey>, Row>()
|
||||
{
|
||||
@Override
|
||||
public Row apply(Grouper.Entry<RowBasedKey> entry)
|
||||
{
|
||||
Map<String, Object> theMap = Maps.newLinkedHashMap();
|
||||
|
||||
// Add dimensions.
|
||||
for (int i = 0; i < entry.getKey().getDimensions().length; i++) {
|
||||
theMap.put(
|
||||
query.getDimensions().get(i).getOutputName(),
|
||||
Strings.emptyToNull(entry.getKey().getDimensions()[i])
|
||||
);
|
||||
}
|
||||
|
||||
// Add aggregations.
|
||||
for (int i = 0; i < entry.getValues().length; i++) {
|
||||
theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
|
||||
}
|
||||
|
||||
return new MapBasedRow(
|
||||
query.getGranularity().toDateTime(entry.getKey().getTimestamp()),
|
||||
theMap
|
||||
);
|
||||
}
|
||||
},
|
||||
closeable
|
||||
);
|
||||
}
|
||||
|
||||
static class RowBasedKey implements Comparable<RowBasedKey>
|
||||
{
|
||||
private final long timestamp;
|
||||
private final String[] dimensions;
|
||||
|
||||
@JsonCreator
|
||||
public RowBasedKey(
|
||||
// Using short key names to reduce serialized size when spilling to disk.
|
||||
@JsonProperty("t") long timestamp,
|
||||
@JsonProperty("d") String[] dimensions
|
||||
)
|
||||
{
|
||||
this.timestamp = timestamp;
|
||||
this.dimensions = dimensions;
|
||||
}
|
||||
|
||||
@JsonProperty("t")
|
||||
public long getTimestamp()
|
||||
{
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
@JsonProperty("d")
|
||||
public String[] getDimensions()
|
||||
{
|
||||
return dimensions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
RowBasedKey that = (RowBasedKey) o;
|
||||
|
||||
if (timestamp != that.timestamp) {
|
||||
return false;
|
||||
}
|
||||
// Probably incorrect - comparing Object[] arrays with Arrays.equals
|
||||
return Arrays.equals(dimensions, that.dimensions);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = (int) (timestamp ^ (timestamp >>> 32));
|
||||
result = 31 * result + Arrays.hashCode(dimensions);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(RowBasedKey other)
|
||||
{
|
||||
final int timeCompare = Longs.compare(timestamp, other.getTimestamp());
|
||||
if (timeCompare != 0) {
|
||||
return timeCompare;
|
||||
}
|
||||
|
||||
for (int i = 0; i < dimensions.length; i++) {
|
||||
final int cmp = dimensions[i].compareTo(other.getDimensions()[i]);
|
||||
if (cmp != 0) {
|
||||
return cmp;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "RowBasedKey{" +
|
||||
"timestamp=" + timestamp +
|
||||
", dimensions=" + Arrays.toString(dimensions) +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
static class RowBasedKeySerdeFactory implements Grouper.KeySerdeFactory<RowBasedKey>
|
||||
{
|
||||
private final int dimCount;
|
||||
private final long maxDictionarySize;
|
||||
|
||||
public RowBasedKeySerdeFactory(int dimCount, long maxDictionarySize)
|
||||
{
|
||||
this.dimCount = dimCount;
|
||||
this.maxDictionarySize = maxDictionarySize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Grouper.KeySerde<RowBasedKey> factorize()
|
||||
{
|
||||
return new RowBasedKeySerde(dimCount, maxDictionarySize);
|
||||
}
|
||||
}
|
||||
|
||||
static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedKey>
|
||||
{
|
||||
// Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes
|
||||
private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Longs.BYTES * 5 + Ints.BYTES;
|
||||
|
||||
private final int dimCount;
|
||||
private final int keySize;
|
||||
private final ByteBuffer keyBuffer;
|
||||
private final List<String> dictionary = Lists.newArrayList();
|
||||
private final Map<String, Integer> reverseDictionary = Maps.newHashMap();
|
||||
|
||||
// Size limiting for the dictionary, in (roughly estimated) bytes.
|
||||
private final long maxDictionarySize;
|
||||
private long currentEstimatedSize = 0;
|
||||
|
||||
// dictionary id -> its position if it were sorted by dictionary value
|
||||
private int[] sortableIds = null;
|
||||
|
||||
public RowBasedKeySerde(final int dimCount, final long maxDictionarySize)
|
||||
{
|
||||
this.dimCount = dimCount;
|
||||
this.maxDictionarySize = maxDictionarySize;
|
||||
this.keySize = Longs.BYTES + dimCount * Ints.BYTES;
|
||||
this.keyBuffer = ByteBuffer.allocate(keySize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int keySize()
|
||||
{
|
||||
return keySize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<RowBasedKey> keyClazz()
|
||||
{
|
||||
return RowBasedKey.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer(RowBasedKey key)
|
||||
{
|
||||
keyBuffer.rewind();
|
||||
keyBuffer.putLong(key.getTimestamp());
|
||||
for (int i = 0; i < key.getDimensions().length; i++) {
|
||||
final int id = addToDictionary(key.getDimensions()[i]);
|
||||
if (id < 0) {
|
||||
return null;
|
||||
}
|
||||
keyBuffer.putInt(id);
|
||||
}
|
||||
keyBuffer.flip();
|
||||
return keyBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RowBasedKey fromByteBuffer(ByteBuffer buffer, int position)
|
||||
{
|
||||
final long timestamp = buffer.getLong(position);
|
||||
final String[] dimensions = new String[dimCount];
|
||||
for (int i = 0; i < dimensions.length; i++) {
|
||||
dimensions[i] = dictionary.get(buffer.getInt(position + Longs.BYTES + (Ints.BYTES * i)));
|
||||
}
|
||||
return new RowBasedKey(timestamp, dimensions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Grouper.KeyComparator comparator()
|
||||
{
|
||||
if (sortableIds == null) {
|
||||
Map<String, Integer> sortedMap = Maps.newTreeMap();
|
||||
for (int id = 0; id < dictionary.size(); id++) {
|
||||
sortedMap.put(dictionary.get(id), id);
|
||||
}
|
||||
sortableIds = new int[dictionary.size()];
|
||||
int index = 0;
|
||||
for (final Integer id : sortedMap.values()) {
|
||||
sortableIds[id] = index++;
|
||||
}
|
||||
}
|
||||
|
||||
return new Grouper.KeyComparator()
|
||||
{
|
||||
@Override
|
||||
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
|
||||
{
|
||||
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
|
||||
if (timeCompare != 0) {
|
||||
return timeCompare;
|
||||
}
|
||||
|
||||
for (int i = 0; i < dimCount; i++) {
|
||||
final int cmp = Ints.compare(
|
||||
sortableIds[lhsBuffer.getInt(lhsPosition + Longs.BYTES + (Ints.BYTES * i))],
|
||||
sortableIds[rhsBuffer.getInt(rhsPosition + Longs.BYTES + (Ints.BYTES * i))]
|
||||
);
|
||||
|
||||
if (cmp != 0) {
|
||||
return cmp;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset()
|
||||
{
|
||||
dictionary.clear();
|
||||
reverseDictionary.clear();
|
||||
sortableIds = null;
|
||||
currentEstimatedSize = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds s to the dictionary. If the dictionary's size limit would be exceeded by adding this key, then
|
||||
* this returns -1.
|
||||
*
|
||||
* @param s a string
|
||||
*
|
||||
* @return id for this string, or -1
|
||||
*/
|
||||
private int addToDictionary(final String s)
|
||||
{
|
||||
Integer idx = reverseDictionary.get(s);
|
||||
if (idx == null) {
|
||||
final long additionalEstimatedSize = (long) s.length() * Chars.BYTES + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY;
|
||||
if (currentEstimatedSize + additionalEstimatedSize > maxDictionarySize) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
idx = dictionary.size();
|
||||
reverseDictionary.put(s, idx);
|
||||
dictionary.add(s);
|
||||
currentEstimatedSize += additionalEstimatedSize;
|
||||
}
|
||||
return idx;
|
||||
}
|
||||
}
|
||||
|
||||
private static class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
|
||||
{
|
||||
private ThreadLocal<Row> row = new ThreadLocal<>();
|
||||
|
||||
public void setRow(Row row)
|
||||
{
|
||||
this.row.set(row);
|
||||
}
|
||||
|
||||
// This dimension selector does not have an associated lookup dictionary, which means lookup can only be done
|
||||
// on the same row. This dimension selector is used for applying the extraction function on dimension, which
|
||||
// requires a DimensionSelector implementation
|
||||
@Override
|
||||
public DimensionSelector makeDimensionSelector(
|
||||
DimensionSpec dimensionSpec
|
||||
)
|
||||
{
|
||||
return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec));
|
||||
}
|
||||
|
||||
private DimensionSelector makeDimensionSelectorUndecorated(
|
||||
DimensionSpec dimensionSpec
|
||||
)
|
||||
{
|
||||
final String dimension = dimensionSpec.getDimension();
|
||||
final ExtractionFn extractionFn = dimensionSpec.getExtractionFn();
|
||||
|
||||
return new DimensionSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
final List<String> dimensionValues = row.get().getDimension(dimension);
|
||||
final ArrayList<Integer> vals = Lists.newArrayList();
|
||||
if (dimensionValues != null) {
|
||||
for (int i = 0; i < dimensionValues.size(); ++i) {
|
||||
vals.add(i);
|
||||
}
|
||||
}
|
||||
|
||||
return new IndexedInts()
|
||||
{
|
||||
@Override
|
||||
public int size()
|
||||
{
|
||||
return vals.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int get(int index)
|
||||
{
|
||||
return vals.get(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Integer> iterator()
|
||||
{
|
||||
return vals.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fill(int index, int[] toFill)
|
||||
{
|
||||
throw new UnsupportedOperationException("fill not supported");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
throw new UnsupportedOperationException("value cardinality is unknown");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
final String value = row.get().getDimension(dimension).get(id);
|
||||
return extractionFn == null ? value : extractionFn.apply(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
if (extractionFn != null) {
|
||||
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
|
||||
}
|
||||
return row.get().getDimension(dimension).indexOf(name);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
|
||||
{
|
||||
return new FloatColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public float get()
|
||||
{
|
||||
return row.get().getFloatMetric(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongColumnSelector makeLongColumnSelector(final String columnName)
|
||||
{
|
||||
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return row.get().getTimestampFromEpoch();
|
||||
}
|
||||
};
|
||||
}
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return row.get().getLongMetric(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectColumnSelector makeObjectColumnSelector(final String columnName)
|
||||
{
|
||||
return new ObjectColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return Object.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return row.get().getRaw(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
// We don't have any information on the column value type, returning null defaults type to string
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -36,6 +36,12 @@ public interface GroupByStrategy
|
|||
Map<String, Object> responseContext
|
||||
);
|
||||
|
||||
Sequence<Row> processSubqueryResult(
|
||||
GroupByQuery subquery,
|
||||
GroupByQuery query,
|
||||
Sequence<Row> subqueryResult
|
||||
);
|
||||
|
||||
QueryRunner<Row> mergeRunners(
|
||||
ListeningExecutorService exec,
|
||||
Iterable<QueryRunner<Row>> queryRunners
|
||||
|
|
|
@ -19,30 +19,42 @@
|
|||
|
||||
package io.druid.query.groupby.strategy;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.guava.ResourceClosingSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.collections.StupidPool;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.query.GroupByMergedQueryRunner;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryWatcher;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.query.groupby.GroupByQueryEngine;
|
||||
import io.druid.query.groupby.GroupByQueryHelper;
|
||||
import io.druid.query.groupby.GroupByQueryQueryToolChest;
|
||||
import io.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class GroupByStrategyV1 implements GroupByStrategy
|
||||
{
|
||||
|
@ -109,6 +121,97 @@ public class GroupByStrategyV1 implements GroupByStrategy
|
|||
return new ResourceClosingSequence<>(query.applyLimit(GroupByQueryHelper.postAggregate(query, index)), index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Row> processSubqueryResult(
|
||||
GroupByQuery subquery, GroupByQuery query, Sequence<Row> subqueryResult
|
||||
)
|
||||
{
|
||||
final Set<AggregatorFactory> aggs = Sets.newHashSet();
|
||||
|
||||
// Nested group-bys work by first running the inner query and then materializing the results in an incremental
|
||||
// index which the outer query is then run against. To build the incremental index, we use the fieldNames from
|
||||
// the aggregators for the outer query to define the column names so that the index will match the query. If
|
||||
// there are multiple types of aggregators in the outer query referencing the same fieldName, we will try to build
|
||||
// multiple columns of the same name using different aggregator types and will fail. Here, we permit multiple
|
||||
// aggregators of the same type referencing the same fieldName (and skip creating identical columns for the
|
||||
// subsequent ones) and return an error if the aggregator types are different.
|
||||
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
|
||||
for (final AggregatorFactory transferAgg : aggregatorFactory.getRequiredColumns()) {
|
||||
if (Iterables.any(aggs, new Predicate<AggregatorFactory>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(AggregatorFactory agg)
|
||||
{
|
||||
return agg.getName().equals(transferAgg.getName()) && !agg.equals(transferAgg);
|
||||
}
|
||||
})) {
|
||||
throw new IAE("Inner aggregator can currently only be referenced by a single type of outer aggregator" +
|
||||
" for '%s'", transferAgg.getName());
|
||||
}
|
||||
|
||||
aggs.add(transferAgg);
|
||||
}
|
||||
}
|
||||
|
||||
// We need the inner incremental index to have all the columns required by the outer query
|
||||
final GroupByQuery innerQuery = new GroupByQuery.Builder(subquery)
|
||||
.setAggregatorSpecs(Lists.newArrayList(aggs))
|
||||
.setInterval(subquery.getIntervals())
|
||||
.setPostAggregatorSpecs(Lists.<PostAggregator>newArrayList())
|
||||
.build();
|
||||
|
||||
final GroupByQuery outerQuery = new GroupByQuery.Builder(query)
|
||||
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
|
||||
.build();
|
||||
|
||||
final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex(
|
||||
innerQuery.withOverriddenContext(
|
||||
ImmutableMap.<String, Object>of(
|
||||
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, true
|
||||
)
|
||||
),
|
||||
subqueryResult
|
||||
);
|
||||
|
||||
//Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which
|
||||
//is ensured by QuerySegmentSpec.
|
||||
//GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval
|
||||
//and concatenate the results.
|
||||
final IncrementalIndex outerQueryResultIndex = makeIncrementalIndex(
|
||||
outerQuery,
|
||||
Sequences.concat(
|
||||
Sequences.map(
|
||||
Sequences.simple(outerQuery.getIntervals()),
|
||||
new Function<Interval, Sequence<Row>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Row> apply(Interval interval)
|
||||
{
|
||||
return process(
|
||||
outerQuery.withQuerySegmentSpec(
|
||||
new MultipleIntervalSegmentSpec(ImmutableList.of(interval))
|
||||
),
|
||||
new IncrementalIndexStorageAdapter(innerQueryResultIndex)
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
innerQueryResultIndex.close();
|
||||
|
||||
return new ResourceClosingSequence<>(
|
||||
outerQuery.applyLimit(GroupByQueryHelper.postAggregate(query, outerQueryResultIndex)),
|
||||
outerQueryResultIndex
|
||||
);
|
||||
}
|
||||
|
||||
private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence<Row> rows)
|
||||
{
|
||||
return GroupByQueryHelper.makeIncrementalIndex(query, configSupplier.get(), bufferPool, rows);
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryRunner<Row> mergeRunners(
|
||||
final ListeningExecutorService exec,
|
||||
|
|
|
@ -41,6 +41,7 @@ import io.druid.guice.annotations.Global;
|
|||
import io.druid.guice.annotations.Merging;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.query.DruidProcessingConfig;
|
||||
import io.druid.query.NoopQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryWatcher;
|
||||
|
@ -51,6 +52,7 @@ import io.druid.query.groupby.GroupByQueryConfig;
|
|||
import io.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
|
||||
import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
|
||||
import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
|
||||
import io.druid.query.groupby.epinephelinae.GroupByRowProcessor;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
|
@ -178,6 +180,28 @@ public class GroupByStrategyV2 implements GroupByStrategy
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Row> processSubqueryResult(
|
||||
GroupByQuery subquery, GroupByQuery query, Sequence<Row> subqueryResult
|
||||
)
|
||||
{
|
||||
final Sequence<Row> results = GroupByRowProcessor.process(
|
||||
query,
|
||||
subqueryResult,
|
||||
configSupplier.get(),
|
||||
mergeBufferPool,
|
||||
spillMapper
|
||||
);
|
||||
return mergeResults(new QueryRunner<Row>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
|
||||
{
|
||||
return results;
|
||||
}
|
||||
}, query, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryRunner<Row> mergeRunners(
|
||||
ListeningExecutorService exec,
|
||||
|
|
|
@ -58,6 +58,7 @@ import io.druid.query.aggregation.FilteredAggregatorFactory;
|
|||
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
|
@ -4463,6 +4464,222 @@ public class GroupByQueryRunnerTest
|
|||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubqueryWithOuterFilterAggregator()
|
||||
{
|
||||
final GroupByQuery subquery = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("market", "market"),
|
||||
new DefaultDimensionSpec("quality", "quality")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("index", "index")
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
final DimFilter filter = new SelectorDimFilter("market", "spot", null);
|
||||
final GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(subquery)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList())
|
||||
.setAggregatorSpecs(
|
||||
ImmutableList.<AggregatorFactory>of(
|
||||
new FilteredAggregatorFactory(QueryRunnerTestHelper.rowsCount, filter)
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.allGran)
|
||||
.build();
|
||||
|
||||
// v2 strategy would throw an exception for this because the dimension selector in GroupByRowProcessor is not
|
||||
// dictionary encoded, but instead require the row to be set when doing lookup. Null Pointer Exception occurs
|
||||
// when the filter aggregator is initialized with no row set.
|
||||
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
|
||||
expectedException.expect(NullPointerException.class);
|
||||
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
} else {
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01", "rows", 837L)
|
||||
);
|
||||
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubqueryWithOuterTimeFilter()
|
||||
{
|
||||
final GroupByQuery subquery = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("market", "market"),
|
||||
new DefaultDimensionSpec("quality", "quality")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("index", "index")
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
final DimFilter fridayFilter = new SelectorDimFilter(Column.TIME_COLUMN_NAME, "Friday", new TimeFormatExtractionFn("EEEE", null, null));
|
||||
final DimFilter firstDaysFilter = new InDimFilter(Column.TIME_COLUMN_NAME, ImmutableList.of("1", "2", "3"), new TimeFormatExtractionFn("d", null, null));
|
||||
final GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(subquery)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList())
|
||||
.setDimFilter(firstDaysFilter)
|
||||
.setAggregatorSpecs(
|
||||
ImmutableList.<AggregatorFactory>of(
|
||||
new FilteredAggregatorFactory(QueryRunnerTestHelper.rowsCount, fridayFilter)
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-01", "rows", 0L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-02", "rows", 0L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-03", "rows", 0L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-01", "rows", 0L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-02", "rows", 0L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-03", "rows", 0L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "rows", 13L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "rows", 0L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-03", "rows", 0L)
|
||||
);
|
||||
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubqueryWithOuterCardinalityAggregator()
|
||||
{
|
||||
final GroupByQuery subquery = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("market", "market"),
|
||||
new DefaultDimensionSpec("quality", "quality")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("index", "index")
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
final GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(subquery)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList())
|
||||
.setAggregatorSpecs(
|
||||
ImmutableList.<AggregatorFactory>of(
|
||||
new CardinalityAggregatorFactory("car", ImmutableList.of("quality"), false)
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.allGran)
|
||||
.build();
|
||||
|
||||
// v1 strategy would throw an exception for this because it calls AggregatorFactory.getRequiredColumns to get
|
||||
// aggregator for all fields to build the inner query result incremental index. When a field type does not match
|
||||
// aggregator value type, parse exception occurs. In this case, quality is a string field but getRequiredColumn
|
||||
// returned a Cardinality aggregator for it, which has type hyperUnique. Since this is a complex type, no converter
|
||||
// is found for it and NullPointerException occurs when it tries to use the converter.
|
||||
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
|
||||
expectedException.expect(NullPointerException.class);
|
||||
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
} else {
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01", "car", QueryRunnerTestHelper.UNIQUES_9)
|
||||
);
|
||||
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubqueryWithOuterJavascriptAggregators()
|
||||
{
|
||||
final GroupByQuery subquery = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("market", "market"),
|
||||
new DefaultDimensionSpec("quality", "quality")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("index", "index")
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
final GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(subquery)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "quality")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
new JavaScriptAggregatorFactory(
|
||||
"js_agg",
|
||||
Arrays.asList("index", "market"),
|
||||
"function(current, index, dim){return current + index + dim.length;}",
|
||||
"function(){return 0;}",
|
||||
"function(a,b){return a + b;}",
|
||||
JavaScriptConfig.getDefault()
|
||||
)
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
// v1 strategy would throw an exception for this because it calls AggregatorFactory.getRequiredColumns to get
|
||||
// aggregator for all fields to build the inner query result incremental index. When a field type does not match
|
||||
// aggregator value type, parse exception occurs. In this case, market is a string field but getRequiredColumn
|
||||
// returned a Javascript aggregator for it, which has type float.
|
||||
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
|
||||
expectedException.expect(ParseException.class);
|
||||
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
} else {
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "automotive", "js_agg", 139D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "business", "js_agg", 122D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "entertainment", "js_agg", 162D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "health", "js_agg", 124D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "mezzanine", "js_agg", 2893D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "news", "js_agg", 125D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "premium", "js_agg", 2923D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "technology", "js_agg", 82D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "travel", "js_agg", 123D),
|
||||
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "automotive", "js_agg", 151D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "business", "js_agg", 116D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "entertainment", "js_agg", 170D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "health", "js_agg", 117D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "mezzanine", "js_agg", 2470D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "news", "js_agg", 118D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "premium", "js_agg", 2528D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "technology", "js_agg", 101D),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "travel", "js_agg", 130D)
|
||||
);
|
||||
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubqueryWithHyperUniques()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue