mirror of https://github.com/apache/druid.git
groupBy improvements
This commit is contained in:
parent
533a263fbd
commit
3302cff6db
|
@ -25,7 +25,6 @@ import com.google.common.base.Throwables;
|
|||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.BaseSequence;
|
||||
import com.metamx.common.guava.MergeIterable;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
|
@ -52,9 +51,10 @@ import java.util.concurrent.Future;
|
|||
* That is, the two sub queryables for A would run *after* B is run, effectively meaning that the results for B
|
||||
* must be fully cached in memory before the results for Aa and Ab are computed.
|
||||
*/
|
||||
public class ChainedExecutionQueryRunner<T> implements ParallelQueryRunner<T>
|
||||
public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
private static final Logger log = new Logger(ChainedExecutionQueryRunner.class);
|
||||
|
||||
private final Iterable<QueryRunner<T>> queryables;
|
||||
private final ExecutorService exec;
|
||||
private final Ordering<T> ordering;
|
||||
|
@ -157,63 +157,4 @@ public class ChainedExecutionQueryRunner<T> implements ParallelQueryRunner<T>
|
|||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <OutType> OutType runAndAccumulate(
|
||||
final Query<T> query,
|
||||
final OutType outType, final Accumulator<OutType, T> outTypeTAccumulator
|
||||
)
|
||||
{
|
||||
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
|
||||
|
||||
if (Iterables.isEmpty(queryables)) {
|
||||
log.warn("No queryables found.");
|
||||
return outType;
|
||||
}
|
||||
List<Future<Boolean>> futures = Lists.newArrayList(
|
||||
Iterables.transform(
|
||||
queryables,
|
||||
new Function<QueryRunner<T>, Future<Boolean>>()
|
||||
{
|
||||
@Override
|
||||
public Future<Boolean> apply(final QueryRunner<T> input)
|
||||
{
|
||||
return exec.submit(
|
||||
new PrioritizedCallable<Boolean>(priority)
|
||||
{
|
||||
@Override
|
||||
public Boolean call() throws Exception
|
||||
{
|
||||
try {
|
||||
input.run(query).accumulate(outType, outTypeTAccumulator);
|
||||
return true;
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception with one of the sequences!");
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
// Let the runners complete
|
||||
for (Future<Boolean> future : futures) {
|
||||
try {
|
||||
future.get();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
catch (ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return outType;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
package io.druid.query;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.query.groupby.GroupByQueryHelper;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
|
||||
public class GroupByParallelQueryRunner implements QueryRunner<Row>
|
||||
{
|
||||
private static final Logger log = new Logger(GroupByParallelQueryRunner.class);
|
||||
private final Iterable<QueryRunner<Row>> queryables;
|
||||
private final ExecutorService exec;
|
||||
private final Ordering<Row> ordering;
|
||||
private final Supplier<GroupByQueryConfig> configSupplier;
|
||||
|
||||
public GroupByParallelQueryRunner(
|
||||
ExecutorService exec,
|
||||
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier,
|
||||
QueryRunner<Row>... queryables
|
||||
)
|
||||
{
|
||||
this(exec, ordering, configSupplier, Arrays.asList(queryables));
|
||||
}
|
||||
|
||||
public GroupByParallelQueryRunner(
|
||||
ExecutorService exec,
|
||||
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier,
|
||||
Iterable<QueryRunner<Row>> queryables
|
||||
)
|
||||
{
|
||||
this.exec = exec;
|
||||
this.ordering = ordering;
|
||||
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
|
||||
this.configSupplier = configSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Row> run(final Query<Row> queryParam)
|
||||
{
|
||||
|
||||
final GroupByQuery query = (GroupByQuery) queryParam;
|
||||
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
|
||||
query,
|
||||
configSupplier.get()
|
||||
);
|
||||
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
|
||||
|
||||
if (Iterables.isEmpty(queryables)) {
|
||||
log.warn("No queryables found.");
|
||||
}
|
||||
List<Future<Boolean>> futures = Lists.newArrayList(
|
||||
Iterables.transform(
|
||||
queryables,
|
||||
new Function<QueryRunner<Row>, Future<Boolean>>()
|
||||
{
|
||||
@Override
|
||||
public Future<Boolean> apply(final QueryRunner<Row> input)
|
||||
{
|
||||
return exec.submit(
|
||||
new PrioritizedCallable<Boolean>(priority)
|
||||
{
|
||||
@Override
|
||||
public Boolean call() throws Exception
|
||||
{
|
||||
try {
|
||||
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
|
||||
return true;
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception with one of the sequences!");
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
// Let the runners complete
|
||||
for (Future<Boolean> future : futures) {
|
||||
try {
|
||||
future.get();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
catch (ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null));
|
||||
}
|
||||
|
||||
}
|
|
@ -1,14 +0,0 @@
|
|||
package io.druid.query;
|
||||
|
||||
public interface ParallelQueryRunner<T> extends QueryRunner<T>
|
||||
{
|
||||
|
||||
/**
|
||||
* accumulator passed should be thread safe
|
||||
*/
|
||||
<OutType> OutType runAndAccumulate(
|
||||
Query<T> query,
|
||||
OutType outType,
|
||||
com.metamx.common.guava.Accumulator<OutType, T> outTypeTAccumulator
|
||||
);
|
||||
}
|
|
@ -1,56 +0,0 @@
|
|||
package io.druid.query.aggregation;
|
||||
|
||||
public class Aggregators
|
||||
{
|
||||
|
||||
public static Aggregator synchronizedAggregator(Aggregator aggregator){
|
||||
return new SynchronizedAggregator(aggregator);
|
||||
}
|
||||
|
||||
private static class SynchronizedAggregator implements Aggregator
|
||||
{
|
||||
|
||||
private final Aggregator delegate;
|
||||
|
||||
SynchronizedAggregator(Aggregator delegate)
|
||||
{
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void aggregate()
|
||||
{
|
||||
delegate.aggregate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void reset()
|
||||
{
|
||||
delegate.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Object get()
|
||||
{
|
||||
return delegate.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized float getFloat()
|
||||
{
|
||||
return delegate.getFloat();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String getName()
|
||||
{
|
||||
return delegate.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close()
|
||||
{
|
||||
delegate.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.query.groupby;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.data.input.MapBasedRow;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.data.input.Rows;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
|
||||
public class GroupByQueryHelper
|
||||
{
|
||||
public static Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> createIndexAccumulatorPair(
|
||||
final GroupByQuery query,
|
||||
final GroupByQueryConfig config
|
||||
)
|
||||
{
|
||||
final QueryGranularity gran = query.getGranularity();
|
||||
final long timeStart = query.getIntervals().get(0).getStartMillis();
|
||||
|
||||
// use gran.iterable instead of gran.truncate so that
|
||||
// AllGranularity returns timeStart instead of Long.MIN_VALUE
|
||||
final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next();
|
||||
|
||||
final List<AggregatorFactory> aggs = Lists.transform(
|
||||
query.getAggregatorSpecs(),
|
||||
new Function<AggregatorFactory, AggregatorFactory>()
|
||||
{
|
||||
@Override
|
||||
public AggregatorFactory apply(@Nullable AggregatorFactory input)
|
||||
{
|
||||
return input.getCombiningFactory();
|
||||
}
|
||||
}
|
||||
);
|
||||
final List<String> dimensions = Lists.transform(
|
||||
query.getDimensions(),
|
||||
new Function<DimensionSpec, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable DimensionSpec input)
|
||||
{
|
||||
return input.getOutputName();
|
||||
}
|
||||
}
|
||||
);
|
||||
IncrementalIndex index = new IncrementalIndex(
|
||||
// use granularity truncated min timestamp
|
||||
// since incoming truncated timestamps may precede timeStart
|
||||
granTimeStart,
|
||||
gran,
|
||||
aggs.toArray(new AggregatorFactory[aggs.size()])
|
||||
);
|
||||
|
||||
Accumulator<IncrementalIndex, Row> accumulator = new Accumulator<IncrementalIndex, Row>()
|
||||
{
|
||||
@Override
|
||||
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
|
||||
{
|
||||
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) {
|
||||
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
|
||||
}
|
||||
|
||||
return accumulated;
|
||||
}
|
||||
};
|
||||
return new Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>>(index, accumulator);
|
||||
}
|
||||
|
||||
}
|
|
@ -24,10 +24,9 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.ConcatSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
|
@ -35,22 +34,16 @@ import com.metamx.common.guava.Sequences;
|
|||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.data.input.MapBasedRow;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.data.input.Rows;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.IntervalChunkingQueryRunner;
|
||||
import io.druid.query.ParallelQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.MetricManipulationFn;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Minutes;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -92,64 +85,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
|
||||
{
|
||||
final GroupByQueryConfig config = configSupplier.get();
|
||||
final QueryGranularity gran = query.getGranularity();
|
||||
final long timeStart = query.getIntervals().get(0).getStartMillis();
|
||||
|
||||
// use gran.iterable instead of gran.truncate so that
|
||||
// AllGranularity returns timeStart instead of Long.MIN_VALUE
|
||||
final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next();
|
||||
|
||||
final List<AggregatorFactory> aggs = Lists.transform(
|
||||
query.getAggregatorSpecs(),
|
||||
new Function<AggregatorFactory, AggregatorFactory>()
|
||||
{
|
||||
@Override
|
||||
public AggregatorFactory apply(@Nullable AggregatorFactory input)
|
||||
{
|
||||
return input.getCombiningFactory();
|
||||
}
|
||||
}
|
||||
Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
|
||||
query,
|
||||
config
|
||||
);
|
||||
final List<String> dimensions = Lists.transform(
|
||||
query.getDimensions(),
|
||||
new Function<DimensionSpec, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable DimensionSpec input)
|
||||
{
|
||||
return input.getOutputName();
|
||||
}
|
||||
}
|
||||
);
|
||||
IncrementalIndex index = new IncrementalIndex(
|
||||
// use granularity truncated min timestamp
|
||||
// since incoming truncated timestamps may precede timeStart
|
||||
granTimeStart,
|
||||
gran,
|
||||
aggs.toArray(new AggregatorFactory[aggs.size()])
|
||||
);
|
||||
Accumulator<IncrementalIndex, Row> accumulator = new Accumulator<IncrementalIndex, Row>()
|
||||
{
|
||||
@Override
|
||||
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
|
||||
{
|
||||
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) {
|
||||
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
|
||||
}
|
||||
|
||||
return accumulated;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
if (runner instanceof ParallelQueryRunner && Boolean.getBoolean("optimize")) {
|
||||
index = (IncrementalIndex) ((ParallelQueryRunner) runner).runAndAccumulate(query, index, accumulator);
|
||||
} else {
|
||||
index = runner.run(query).accumulate(index, accumulator);
|
||||
}
|
||||
|
||||
// convert millis back to timestamp according to granularity to preserve time zone information
|
||||
Sequence<Row> retVal = Sequences.map(
|
||||
IncrementalIndex index = runner.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
|
||||
Sequence<Row> sequence = Sequences.map(
|
||||
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
|
||||
new Function<Row, Row>()
|
||||
{
|
||||
|
@ -157,12 +99,15 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
public Row apply(Row input)
|
||||
{
|
||||
final MapBasedRow row = (MapBasedRow) input;
|
||||
return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
|
||||
return new MapBasedRow(
|
||||
query.getGranularity()
|
||||
.toDateTime(row.getTimestampFromEpoch()),
|
||||
row.getEvent()
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return query.applyLimit(retVal);
|
||||
return query.applyLimit(sequence);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -30,8 +30,8 @@ import com.metamx.common.guava.ExecutorExecutingSequence;
|
|||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.ChainedExecutionQueryRunner;
|
||||
import io.druid.query.ConcatQueryRunner;
|
||||
import io.druid.query.GroupByParallelQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
|
@ -116,9 +116,8 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
|
|||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
else {
|
||||
return new ChainedExecutionQueryRunner<Row>(queryExecutor, new RowOrdering(), queryRunners);
|
||||
} else {
|
||||
return new GroupByParallelQueryRunner(queryExecutor, new RowOrdering(), config, queryRunners);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -142,7 +141,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
|
|||
@Override
|
||||
public Sequence<Row> run(Query<Row> input)
|
||||
{
|
||||
if (! (input instanceof GroupByQuery)) {
|
||||
if (!(input instanceof GroupByQuery)) {
|
||||
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class);
|
||||
}
|
||||
|
||||
|
|
|
@ -40,7 +40,6 @@ import io.druid.data.input.impl.SpatialDimensionSchema;
|
|||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.Aggregators;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
|
@ -84,7 +83,7 @@ public class IncrementalIndex implements Iterable<Row>
|
|||
private final DimensionHolder dimValues;
|
||||
private final ConcurrentSkipListMap<TimeAndDims, Aggregator[]> facts;
|
||||
private volatile AtomicInteger numEntries = new AtomicInteger();
|
||||
// This is modified on add() by a (hopefully) single thread.
|
||||
// This is modified on add() in a critical section.
|
||||
private InputRow in;
|
||||
|
||||
public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema)
|
||||
|
@ -287,7 +286,7 @@ public class IncrementalIndex implements Iterable<Row>
|
|||
}
|
||||
}
|
||||
|
||||
);
|
||||
);
|
||||
}
|
||||
|
||||
Aggregator[] prev = facts.putIfAbsent(key, aggs);
|
||||
|
@ -601,6 +600,7 @@ public class IncrementalIndex implements Iterable<Row>
|
|||
private final Map<String, Integer> falseIds;
|
||||
private final Map<Integer, String> falseIdsReverse;
|
||||
private volatile String[] sortedVals = null;
|
||||
private final AtomicInteger falseIDSize = new AtomicInteger();
|
||||
|
||||
public DimDim()
|
||||
{
|
||||
|
|
|
@ -34,7 +34,7 @@ import java.util.List;
|
|||
)
|
||||
public class CliRealtime extends ServerRunnable
|
||||
{
|
||||
private static final Logger log = new Logger(CliBroker.class);
|
||||
private static final Logger log = new Logger(CliRealtime.class);
|
||||
|
||||
public CliRealtime()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue