time series post Aggregation improved

move post aggregation to finalise results
This commit is contained in:
nishantmonu51 2014-04-08 03:17:15 +05:30
parent 4bb36dd453
commit c322f44863
5 changed files with 21 additions and 56 deletions

View File

@ -24,7 +24,6 @@ import io.druid.granularity.AllGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import java.util.LinkedHashMap;
import java.util.List;
@ -37,17 +36,14 @@ public class TimeseriesBinaryFn
{
private final QueryGranularity gran;
private final List<AggregatorFactory> aggregations;
private final List<PostAggregator> postAggregations;
public TimeseriesBinaryFn(
QueryGranularity granularity,
List<AggregatorFactory> aggregations,
List<PostAggregator> postAggregations
List<AggregatorFactory> aggregations
)
{
this.gran = granularity;
this.aggregations = aggregations;
this.postAggregations = postAggregations;
}
@Override
@ -71,11 +67,6 @@ public class TimeseriesBinaryFn
retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName)));
}
for (PostAggregator pf : postAggregations) {
final String metricName = pf.getName();
retVal.put(metricName, pf.compute(retVal));
}
return (gran instanceof AllGranularity) ?
new Result<TimeseriesResultValue>(
arg1.getTimestamp(),

View File

@ -74,10 +74,6 @@ public class TimeseriesQueryEngine
bob.addMetric(aggregator);
}
for (PostAggregator postAgg : postAggregatorSpecs) {
bob.addMetric(postAgg);
}
Result<TimeseriesResultValue> retVal = bob.build();
// cleanup

View File

@ -101,8 +101,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
TimeseriesQuery query = (TimeseriesQuery) input;
return new TimeseriesBinaryFn(
query.getGranularity(),
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
query.getAggregatorSpecs()
);
}
};
@ -147,7 +146,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName())));
}
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), holder.getMetric(postAgg.getName()));
Object computedPostAggValue = holder.getMetric(postAgg.getName());
if (computedPostAggValue != null) {
values.put(postAgg.getName(), computedPostAggValue);
} else {
values.put(postAgg.getName(), postAgg.compute(values));
}
}
return new Result<TimeseriesResultValue>(
result.getTimestamp(),
@ -169,7 +173,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
return new CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery>()
{
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<PostAggregator> postAggs = query.getPostAggregatorSpecs();
@Override
public byte[] computeCacheKey(TimeseriesQuery query)
@ -238,10 +241,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
retVal.put(factory.getName(), factory.deserialize(resultIter.next()));
}
for (PostAggregator postAgg : postAggs) {
retVal.put(postAgg.getName(), postAgg.compute(retVal));
}
return new Result<TimeseriesResultValue>(
timestamp,
new TimeseriesResultValue(retVal)

View File

@ -20,16 +20,11 @@
package io.druid.query.timeseries;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.junit.Test;
@ -43,21 +38,10 @@ public class TimeseriesBinaryFnTest
{
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final ArithmeticPostAggregator addRowsIndexConstant = new ArithmeticPostAggregator(
"addRowsIndexConstant",
"+",
Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
);
final List<AggregatorFactory> aggregatorFactories = Arrays.asList(
rowsCount,
indexLongSum
);
final List<PostAggregator> postAggregators = Arrays.<PostAggregator>asList(
addRowsIndexConstant
);
final DateTime currTime = new DateTime();
@Test
@ -87,16 +71,14 @@ public class TimeseriesBinaryFnTest
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows", 3L,
"index", 5L,
"addRowsIndexConstant", 9.0
"index", 5L
)
)
);
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
QueryGranularity.ALL,
aggregatorFactories,
postAggregators
aggregatorFactories
).apply(
result1,
result2
@ -131,16 +113,14 @@ public class TimeseriesBinaryFnTest
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows", 3L,
"index", 5L,
"addRowsIndexConstant", 9.0
"index", 5L
)
)
);
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
QueryGranularity.DAY,
aggregatorFactories,
postAggregators
aggregatorFactories
).apply(
result1,
result2
@ -166,8 +146,7 @@ public class TimeseriesBinaryFnTest
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
QueryGranularity.ALL,
aggregatorFactories,
postAggregators
aggregatorFactories
).apply(
result1,
result2
@ -202,16 +181,14 @@ public class TimeseriesBinaryFnTest
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows", 3L,
"index", 5L,
"addRowsIndexConstant", 9.0
"index", 5L
)
)
);
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
QueryGranularity.ALL,
aggregatorFactories,
postAggregators
aggregatorFactories
).apply(
result1,
result2

View File

@ -216,8 +216,9 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
testQueryCaching(
client,
runner,
builder.build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000),
new Interval("2011-01-02/2011-01-03"), makeTimeResults(new DateTime("2011-01-02"), 30, 6000),
@ -258,7 +259,7 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09"), 18, 521,
new DateTime("2011-01-09T01"), 181, 52
),
client.run(
runner.run(
builder.intervals("2011-01-01/2011-01-10")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
@ -279,9 +280,10 @@ public class CachingClusteredClientTest
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
testQueryCaching(
client,
runner,
builder.build(),
new Interval("2011-11-04/2011-11-08"),
makeTimeResults(
@ -299,7 +301,7 @@ public class CachingClusteredClientTest
new DateTime("2011-11-06", TIMEZONE), 23, 85312,
new DateTime("2011-11-07", TIMEZONE), 85, 102
),
client.run(
runner.run(
builder.intervals("2011-11-04/2011-11-08")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)