fix post aggregator

This commit is contained in:
nishantmonu51 2014-04-19 04:24:07 +05:30
parent 265ac37cdc
commit 68e8cda2da
17 changed files with 220 additions and 30 deletions

View File

@ -50,12 +50,12 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
{
final boolean isBySegment = query.getContextBySegment(false);
final boolean shouldFinalize = query.getContextFinalize(true);
Function<T, T> finalizerFn;
if (shouldFinalize) {
Function<T, T> finalizerFn;
if (isBySegment) {
finalizerFn = new Function<T, T>()
{
final Function<T, T> baseFinalizer = toolChest.makeMetricManipulatorFn(
final Function<T, T> baseFinalizer = toolChest.makePostComputeManipulatorFn(
query,
new MetricManipulationFn()
{
@ -85,7 +85,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
}
};
} else {
finalizerFn = toolChest.makeMetricManipulatorFn(
finalizerFn = toolChest.makePostComputeManipulatorFn(
query,
new MetricManipulationFn()
{
@ -97,12 +97,25 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
}
);
}
return Sequences.map(
baseRunner.run(query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", false))),
finalizerFn
} else {
// finalize is false here.
finalizerFn = toolChest.makePostComputeManipulatorFn(
query,
new MetricManipulationFn()
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
return object;
}
}
);
}
return baseRunner.run(query);
return Sequences.map(
baseRunner.run(query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", false))),
finalizerFn
);
}
}

View File

@ -44,8 +44,16 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* @return
*/
public abstract Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences);
public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
public abstract Function<ResultType, ResultType> makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn);
public abstract Function<ResultType, ResultType> makePreComputeManipulatorFn(QueryType query, MetricManipulationFn fn);
public Function<ResultType, ResultType> makePostComputeManipulatorFn(QueryType query, MetricManipulationFn fn)
{
return makePreComputeManipulatorFn(query, fn);
}
public abstract TypeReference<ResultType> getResultTypeReference();
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query) {

View File

@ -173,7 +173,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
@Override
public Function<Row, Row> makeMetricManipulatorFn(final GroupByQuery query, final MetricManipulationFn fn)
public Function<Row, Row> makePreComputeManipulatorFn(final GroupByQuery query, final MetricManipulationFn fn)
{
return new Function<Row, Row>()
{

View File

@ -155,7 +155,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
}
@Override
public Function<SegmentAnalysis, SegmentAnalysis> makeMetricManipulatorFn(
public Function<SegmentAnalysis, SegmentAnalysis> makePreComputeManipulatorFn(
SegmentMetadataQuery query, MetricManipulationFn fn
)
{

View File

@ -129,7 +129,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
@Override
public Function<Result<SearchResultValue>, Result<SearchResultValue>> makeMetricManipulatorFn(
public Function<Result<SearchResultValue>, Result<SearchResultValue>> makePreComputeManipulatorFn(
SearchQuery query, MetricManipulationFn fn
)
{

View File

@ -131,7 +131,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
}
@Override
public Function<Result<SelectResultValue>, Result<SelectResultValue>> makeMetricManipulatorFn(
public Function<Result<SelectResultValue>, Result<SelectResultValue>> makePreComputeManipulatorFn(
final SelectQuery query, final MetricManipulationFn fn
)
{

View File

@ -123,7 +123,7 @@ public class TimeBoundaryQueryQueryToolChest
}
@Override
public Function<Result<TimeBoundaryResultValue>, Result<TimeBoundaryResultValue>> makeMetricManipulatorFn(
public Function<Result<TimeBoundaryResultValue>, Result<TimeBoundaryResultValue>> makePreComputeManipulatorFn(
TimeBoundaryQuery query, MetricManipulationFn fn
)
{

View File

@ -131,9 +131,16 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
}
@Override
public Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makeMetricManipulatorFn(
public Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makePreComputeManipulatorFn(
final TimeseriesQuery query, final MetricManipulationFn fn
)
{
return makeComputeManipulatorFn(query, fn, false);
}
private Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makeComputeManipulatorFn(
final TimeseriesQuery query, final MetricManipulationFn fn, final boolean calculatePostAggs
)
{
return new Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>>()
{
@ -142,12 +149,15 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
{
final Map<String, Object> values = Maps.newHashMap();
final TimeseriesResultValue holder = result.getValue();
if (calculatePostAggs) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), postAgg.compute(holder.getBaseObject()));
}
}
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName())));
}
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), postAgg.compute(values));
}
return new Result<TimeseriesResultValue>(
result.getTimestamp(),
new TimeseriesResultValue(values)
@ -262,4 +272,14 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
{
return Ordering.natural();
}
@Override
public Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makePostComputeManipulatorFn(
TimeseriesQuery query, MetricManipulationFn fn
)
{
return makeComputeManipulatorFn(query, fn, true);
}
}

View File

@ -90,7 +90,6 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
Map<String, DimensionAndMetricValueExtractor> retVals = new LinkedHashMap<String, DimensionAndMetricValueExtractor>();
String dimension = dimensionSpec.getOutputName();
String topNMetricName = topNMetricSpec.getMetricName(dimensionSpec);
for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) {
retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val);
}

View File

@ -46,6 +46,7 @@ import io.druid.query.Result;
import io.druid.query.ResultGranularTimestampComparator;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.DimFilter;
@ -139,7 +140,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
@Override
public Function<Result<TopNResultValue>, Result<TopNResultValue>> makeMetricManipulatorFn(
public Function<Result<TopNResultValue>, Result<TopNResultValue>> makePreComputeManipulatorFn(
final TopNQuery query, final MetricManipulationFn fn
)
{
@ -162,7 +163,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName())));
}
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
for (PostAggregator postAgg : prunePostAggregators(query)) {
Object calculatedPostAgg = input.getMetric(postAgg.getName());
if (calculatedPostAgg != null) {
values.put(postAgg.getName(), calculatedPostAgg);
@ -186,6 +187,56 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
};
}
@Override
public Function<Result<TopNResultValue>, Result<TopNResultValue>> makePostComputeManipulatorFn(
final TopNQuery query, final MetricManipulationFn fn
)
{
return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
private String dimension = query.getDimensionSpec().getOutputName();
@Override
public Result<TopNResultValue> apply(@Nullable Result<TopNResultValue> result)
{
List<Map<String, Object>> serializedValues = Lists.newArrayList(
Iterables.transform(
result.getValue(),
new Function<DimensionAndMetricValueExtractor, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(@Nullable DimensionAndMetricValueExtractor input)
{
final Map<String, Object> values = Maps.newHashMap();
// compute all post aggs
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
Object calculatedPostAgg = input.getMetric(postAgg.getName());
if (calculatedPostAgg != null) {
values.put(postAgg.getName(), calculatedPostAgg);
} else {
values.put(postAgg.getName(), postAgg.compute(input.getBaseObject()));
}
}
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName())));
}
values.put(dimension, input.getDimensionValue(dimension));
return values;
}
}
)
);
return new Result<TopNResultValue>(
result.getTimestamp(),
new TopNResultValue(serializedValues)
);
}
};
}
@Override
public TypeReference<Result<TopNResultValue>> getResultTypeReference()
{
@ -405,4 +456,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
);
}
}
private static List<PostAggregator> prunePostAggregators(TopNQuery query)
{
return AggregatorUtil.pruneDependentPostAgg(
query.getPostAggregatorSpecs(),
query.getTopNMetricSpec().getMetricName(query.getDimensionSpec())
);
}
}

View File

@ -25,6 +25,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
@ -86,6 +87,12 @@ public class QueryRunnerTestHelper
)
);
public static final String hyperUniqueFinalizingPostAggMetric = "hyperUniqueFinalizingPostAggMetric";
public static ArithmeticPostAggregator hyperUniqueFinalizingPostAgg = new ArithmeticPostAggregator(
hyperUniqueFinalizingPostAggMetric,
"+",
Lists.newArrayList(new HyperUniqueFinalizingPostAggregator(uniqueMetric), new ConstantPostAggregator(null, 1, 1))
);
public static final List<AggregatorFactory> commonAggregators = Arrays.asList(
rowsCount,

View File

@ -145,4 +145,73 @@ public class AggregatorUtilTest
}
@Test
public void testNullPostAggregatorNames()
{
AggregatorFactory agg1 = new DoubleSumAggregatorFactory("agg1", "value");
AggregatorFactory agg2 = new DoubleSumAggregatorFactory("agg2", "count");
PostAggregator postAgg1 = new ArithmeticPostAggregator(
null, "*", Lists.<PostAggregator>newArrayList(
new FieldAccessPostAggregator(
null,
"agg1"
), new FieldAccessPostAggregator(null, "agg2")
)
);
PostAggregator postAgg2 = new ArithmeticPostAggregator(
"postAgg",
"/",
Lists.<PostAggregator>newArrayList(
new FieldAccessPostAggregator(
null,
"agg1"
), new FieldAccessPostAggregator(null, "agg2")
)
);
Assert.assertEquals(
new Pair(Lists.newArrayList(agg1, agg2), Lists.newArrayList(postAgg2)), AggregatorUtil.condensedAggregators(
Lists.newArrayList(agg1, agg2),
Lists.newArrayList(postAgg1, postAgg2),
"postAgg"
)
);
}
@Test
public void testCasing()
{
AggregatorFactory agg1 = new DoubleSumAggregatorFactory("Agg1", "value");
AggregatorFactory agg2 = new DoubleSumAggregatorFactory("Agg2", "count");
PostAggregator postAgg1 = new ArithmeticPostAggregator(
null, "*", Lists.<PostAggregator>newArrayList(
new FieldAccessPostAggregator(
null,
"Agg1"
), new FieldAccessPostAggregator(null, "Agg2")
)
);
PostAggregator postAgg2 = new ArithmeticPostAggregator(
"postAgg",
"/",
Lists.<PostAggregator>newArrayList(
new FieldAccessPostAggregator(
null,
"Agg1"
), new FieldAccessPostAggregator(null, "Agg2")
)
);
Assert.assertEquals(
new Pair(Lists.newArrayList(agg1, agg2), Lists.newArrayList(postAgg2)), AggregatorUtil.condensedAggregators(
Lists.newArrayList(agg1, agg2),
Lists.newArrayList(postAgg1, postAgg2),
"postAgg"
)
);
}
}

View File

@ -1200,7 +1200,8 @@ public class TopNQueryRunnerTest
.postAggregators(
Arrays.<PostAggregator>asList(
QueryRunnerTestHelper.addRowsIndexConstant,
QueryRunnerTestHelper.dependentPostAgg
QueryRunnerTestHelper.dependentPostAgg,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAgg
)
)
.build();
@ -1219,6 +1220,10 @@ public class TopNQueryRunnerTest
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.put(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_2 + 1.0
)
.build(),
ImmutableMap.<String, Object>builder()
.put(providerDimension, "upfront")
@ -1229,6 +1234,10 @@ public class TopNQueryRunnerTest
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.put(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_2 + 1.0
)
.build(),
ImmutableMap.<String, Object>builder()
.put(providerDimension, "spot")
@ -1237,6 +1246,10 @@ public class TopNQueryRunnerTest
.put("addRowsIndexConstant", 96444.57232284546D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 97282.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_9 + 1.0
)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.build()

View File

@ -357,7 +357,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
}
),
toolChest.makeMetricManipulatorFn(
toolChest.makePreComputeManipulatorFn(
rewrittenQuery,
new MetricManipulationFn()
{

View File

@ -218,7 +218,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
if (!isBySegment) {
retVal = Sequences.map(
retVal,
toolChest.makeMetricManipulatorFn(
toolChest.makePreComputeManipulatorFn(
query, new MetricManipulationFn()
{
@Override

View File

@ -116,7 +116,8 @@ import java.util.concurrent.Executor;
@RunWith(Parameterized.class)
public class CachingClusteredClientTest
{
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of();
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.<String, Object>of("finalize", false);
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
public static final String DATA_SOURCE = "test";
protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory());
@ -321,10 +322,11 @@ public class CachingClusteredClientTest
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS);
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
testQueryCaching(
client,
runner,
1,
true,
builder.context(
@ -343,7 +345,7 @@ public class CachingClusteredClientTest
cache.close("0_0");
testQueryCaching(
client,
runner,
1,
false,
builder.context(

View File

@ -578,7 +578,7 @@ public class ServerManagerTest
}
@Override
public Function<T, T> makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn)
public Function<T, T> makePreComputeManipulatorFn(QueryType query, MetricManipulationFn fn)
{
return Functions.identity();
}