1) Adjust the GroupByQuery to also be able to merge results on the local node. Fixes #116

2) Make the GroupByQuery operate in a multi-threaded fashion by default (this is configurable via druid.query.groupBy.singleThreaded).  Fixes #96
3) Fix up some post aggregation computation stuff.  I believe #72 is fixed
4) Fix case sensitivity issue with post aggregations on GroupBy queries
This commit is contained in:
Eric Tschetter 2013-04-09 19:13:08 -07:00
parent a678f08b67
commit 55648c47a7
9 changed files with 187 additions and 153 deletions

View File

@ -330,12 +330,15 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
private void initializeEmitter()
{
if (emitter == null) {
final HttpClient httpClient = HttpClientInit.createClient(
HttpClientConfig.builder()
.withNumConnections(1)
.withReadTimeout(new Duration(PropUtils.getProperty(props, "druid.emitter.timeOut")))
.build(), lifecycle
);
final HttpClientConfig.Builder configBuilder = HttpClientConfig.builder()
.withNumConnections(1);
final String emitterTimeoutDuration = props.getProperty("druid.emitter.timeOut");
if (emitterTimeoutDuration != null) {
configBuilder.withReadTimeout(new Duration(emitterTimeoutDuration));
}
final HttpClient httpClient = HttpClientInit.createClient(configBuilder.build(), lifecycle);
setEmitter(
new ServiceEmitter(

View File

@ -30,6 +30,7 @@ import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.post.PostAggregator;
import com.metamx.druid.input.Row;
import com.metamx.druid.query.Queries;
import com.metamx.druid.query.dimension.DefaultDimensionSpec;
import com.metamx.druid.query.dimension.DimensionSpec;
import com.metamx.druid.query.filter.DimFilter;
@ -75,6 +76,7 @@ public class GroupByQuery extends BaseQuery<Row>
Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
Preconditions.checkNotNull(this.aggregatorSpecs, "Must specify at least one aggregator");
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);
}
@JsonProperty("filter")
@ -120,7 +122,7 @@ public class GroupByQuery extends BaseQuery<Row>
}
@Override
public Query withOverriddenContext(Map<String, String> contextOverride)
public GroupByQuery withOverriddenContext(Map<String, String> contextOverride)
{
return new GroupByQuery(
getDataSource(),
@ -135,7 +137,7 @@ public class GroupByQuery extends BaseQuery<Row>
}
@Override
public Query withQuerySegmentSpec(QuerySegmentSpec spec)
public GroupByQuery withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new GroupByQuery(
getDataSource(),

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.guava.Accumulator;
@ -49,6 +50,7 @@ import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
@ -57,6 +59,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{
private static final TypeReference<Row> TYPE_REFERENCE = new TypeReference<Row>(){};
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");
private static final int maxRows;
@ -75,77 +79,85 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public Sequence<Row> run(Query<Row> input)
{
final GroupByQuery query = (GroupByQuery) input;
final QueryGranularity gran = query.getGranularity();
final long timeStart = query.getIntervals().get(0).getStartMillis();
// use gran.iterable instead of gran.truncate so that
// AllGranularity returns timeStart instead of Long.MIN_VALUE
final long granTimeStart = gran.iterable(timeStart, timeStart+1).iterator().next();
final List<AggregatorFactory> aggs = Lists.transform(
query.getAggregatorSpecs(),
new Function<AggregatorFactory, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(@Nullable AggregatorFactory input)
{
return input.getCombiningFactory();
}
}
);
final List<String> dimensions = Lists.transform(
query.getDimensions(),
new Function<DimensionSpec, String>()
{
@Override
public String apply(@Nullable DimensionSpec input)
{
return input.getOutputName();
}
}
);
final IncrementalIndex index = runner.run(query).accumulate(
new IncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()])
),
new Accumulator<IncrementalIndex, Row>()
{
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{
if (accumulated.add(Rows.toInputRow(in, dimensions)) > maxRows) {
throw new ISE("Computation exceeds maxRows limit[%s]", maxRows);
}
return accumulated;
}
}
);
// convert millis back to timestamp according to granularity to preserve time zone information
return Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>()
{
@Override
public Row apply(Row input)
{
final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
}
}
);
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
}
else {
return runner.run(input);
}
}
};
}
private Sequence<Row> mergeGroupByResults(GroupByQuery query, QueryRunner<Row> runner)
{
final QueryGranularity gran = query.getGranularity();
final long timeStart = query.getIntervals().get(0).getStartMillis();
// use gran.iterable instead of gran.truncate so that
// AllGranularity returns timeStart instead of Long.MIN_VALUE
final long granTimeStart = gran.iterable(timeStart, timeStart+1).iterator().next();
final List<AggregatorFactory> aggs = Lists.transform(
query.getAggregatorSpecs(),
new Function<AggregatorFactory, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(@Nullable AggregatorFactory input)
{
return input.getCombiningFactory();
}
}
);
final List<String> dimensions = Lists.transform(
query.getDimensions(),
new Function<DimensionSpec, String>()
{
@Override
public String apply(@Nullable DimensionSpec input)
{
return input.getOutputName();
}
}
);
final IncrementalIndex index = runner.run(query).accumulate(
new IncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()])
),
new Accumulator<IncrementalIndex, Row>()
{
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > maxRows) {
throw new ISE("Computation exceeds maxRows limit[%s]", maxRows);
}
return accumulated;
}
}
);
// convert millis back to timestamp according to granularity to preserve time zone information
return Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>()
{
@Override
public Row apply(Row input)
{
final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
}
}
);
}
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{

View File

@ -19,45 +19,29 @@
package com.metamx.druid.input;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import java.util.List;
import java.util.TreeMap;
/**
*/
public class Rows
{
public static InputRow toInputRow(final Row row, final List<String> dimensions)
public static InputRow toCaseInsensitiveInputRow(final Row row, final List<String> dimensions)
{
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return dimensions;
}
if (row instanceof MapBasedRow) {
MapBasedRow mapBasedRow = (MapBasedRow) row;
@Override
public long getTimestampFromEpoch()
{
return row.getTimestampFromEpoch();
}
@Override
public List<String> getDimension(String dimension)
{
return row.getDimension(dimension);
}
@Override
public float getFloatMetric(String metric)
{
return row.getFloatMetric(metric);
}
@Override
public String toString()
{
return row.toString();
}
};
TreeMap<String, Object> caseInsensitiveMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
caseInsensitiveMap.putAll(mapBasedRow.getEvent());
return new MapBasedInputRow(
mapBasedRow.getTimestampFromEpoch(),
dimensions,
caseInsensitiveMap
);
}
throw new ISE("Can only convert MapBasedRow objects because we are ghetto like that.");
}
}

View File

@ -47,6 +47,7 @@ import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.group.GroupByQueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQueryRunnerFactoryConfig;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
import com.metamx.druid.query.search.SearchQuery;
@ -147,7 +148,8 @@ public class ServerInit
new GroupByQueryEngine(
configFactory.build(GroupByQueryEngineConfig.class),
computationBufferPool
)
),
configFactory.build(GroupByQueryRunnerFactoryConfig.class)
)
);
queryRunners.put(SearchQuery.class, new SearchQueryRunnerFactory());

View File

@ -21,7 +21,8 @@ package com.metamx.druid.query.group;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.metamx.common.ISE;
import com.metamx.common.guava.ExecutorExecutingSequence;
import com.metamx.common.guava.Sequence;
@ -30,6 +31,7 @@ import com.metamx.druid.Query;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.index.Segment;
import com.metamx.druid.input.Row;
import com.metamx.druid.query.ChainedExecutionQueryRunner;
import com.metamx.druid.query.ConcatQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
@ -44,21 +46,18 @@ import java.util.concurrent.Future;
*/
public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupByQuery>
{
private static final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(){
@Override
public QueryRunner<Row> mergeResults(QueryRunner<Row> runner)
{
return new ConcatQueryRunner<Row>(Sequences.simple(ImmutableList.of(runner)));
}
};
private static final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest();
private final GroupByQueryEngine engine;
private final GroupByQueryRunnerFactoryConfig config;
public GroupByQueryRunnerFactory(
GroupByQueryEngine engine
GroupByQueryEngine engine,
GroupByQueryRunnerFactoryConfig config
)
{
this.engine = engine;
this.config = config;
}
@Override
@ -70,48 +69,53 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
@Override
public QueryRunner<Row> mergeRunners(final ExecutorService queryExecutor, Iterable<QueryRunner<Row>> queryRunners)
{
return new ConcatQueryRunner<Row>(
Sequences.map(
Sequences.simple(queryRunners),
new Function<QueryRunner<Row>, QueryRunner<Row>>()
{
@Override
public QueryRunner<Row> apply(final QueryRunner<Row> input)
if (config.isSingleThreaded()) {
return new ConcatQueryRunner<Row>(
Sequences.map(
Sequences.simple(queryRunners),
new Function<QueryRunner<Row>, QueryRunner<Row>>()
{
return new QueryRunner<Row>()
@Override
public QueryRunner<Row> apply(final QueryRunner<Row> input)
{
@Override
public Sequence<Row> run(final Query<Row> query)
return new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(final Query<Row> query)
{
Future<Sequence<Row>> future = queryExecutor.submit(
new Callable<Sequence<Row>>()
{
@Override
public Sequence<Row> call() throws Exception
Future<Sequence<Row>> future = queryExecutor.submit(
new Callable<Sequence<Row>>()
{
return new ExecutorExecutingSequence<Row>(
input.run(query),
queryExecutor
);
@Override
public Sequence<Row> call() throws Exception
{
return new ExecutorExecutingSequence<Row>(
input.run(query),
queryExecutor
);
}
}
}
);
try {
return future.get();
);
try {
return future.get();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
}
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
}
}
};
};
}
}
}
)
);
)
);
}
else {
return new ChainedExecutionQueryRunner<Row>(queryExecutor, new RowOrdering(), queryRunners);
}
}
@Override
@ -141,4 +145,13 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
return engine.process((GroupByQuery) input, adapter);
}
}
private static class RowOrdering extends Ordering<Row>
{
@Override
public int compare(Row left, Row right)
{
return Longs.compare(left.getTimestampFromEpoch(), right.getTimestampFromEpoch());
}
}
}

View File

@ -0,0 +1,14 @@
package com.metamx.druid.query.group;
import org.skife.config.Config;
/**
*/
public abstract class GroupByQueryRunnerFactoryConfig
{
@Config("druid.query.groupBy.singleThreaded")
public boolean isSingleThreaded()
{
return false;
}
}

View File

@ -87,7 +87,8 @@ public class GroupByQueryRunnerTest
}
}
)
)
),
new GroupByQueryRunnerFactoryConfig(){}
);
return Lists.newArrayList(

View File

@ -72,7 +72,10 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
}
}
)
)
),
new GroupByQueryRunnerFactoryConfig()
{
}
);
final Collection<?> objects = QueryRunnerTestHelper.makeQueryRunners(factory);