clean up code

This commit is contained in:
fjy 2014-04-18 17:41:47 -07:00
parent a0c8d9d413
commit e3fbbff237
6 changed files with 120 additions and 81 deletions

View File

@ -25,6 +25,8 @@ import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.FinalizeMetricManipulationFn;
import io.druid.query.aggregation.IdentityMetricManipulationFn;
import io.druid.query.aggregation.MetricManipulationFn;
import javax.annotation.Nullable;
@ -62,14 +64,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
{
final Function<T, T> baseFinalizer = toolChest.makePostComputeManipulatorFn(
query,
new MetricManipulationFn()
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
return factory.finalizeComputation(factory.deserialize(object));
}
}
new FinalizeMetricManipulationFn()
);
@Override
@ -90,31 +85,14 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
}
};
} else {
finalizerFn = toolChest.makePostComputeManipulatorFn(
query,
new MetricManipulationFn()
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
return factory.finalizeComputation(object);
}
}
);
finalizerFn = toolChest.makePostComputeManipulatorFn(query, new FinalizeMetricManipulationFn());
}
} else {
// finalize is false here.
queryToRun = query;
finalizerFn = toolChest.makePostComputeManipulatorFn(
query,
new MetricManipulationFn()
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
return object;
}
}
new IdentityMetricManipulationFn()
);
}

View File

@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
/**
*/
public class FinalizeMetricManipulationFn implements MetricManipulationFn
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
return factory.finalizeComputation(object);
}
}

View File

@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
/**
*/
public class IdentityMetricManipulationFn implements MetricManipulationFn
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
return object;
}
}

View File

@ -130,40 +130,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
.setUser9(Minutes.minutes(numMinutes).toString());
}
@Override
public Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makePreComputeManipulatorFn(
final TimeseriesQuery query, final MetricManipulationFn fn
)
{
return makeManipulatorFn(query, fn);
}
private Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makeManipulatorFn(
final TimeseriesQuery query, final MetricManipulationFn fn
)
{
return new Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>>()
{
@Override
public Result<TimeseriesResultValue> apply(Result<TimeseriesResultValue> result)
{
final Map<String, Object> values = Maps.newHashMap();
final TimeseriesResultValue holder = result.getValue();
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName())));
}
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), postAgg.compute(values));
}
return new Result<TimeseriesResultValue>(
result.getTimestamp(),
new TimeseriesResultValue(values)
);
}
};
}
@Override
public TypeReference<Result<TimeseriesResultValue>> getResultTypeReference()
{
@ -271,13 +237,47 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
return Ordering.natural();
}
@Override
public Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makePreComputeManipulatorFn(
final TimeseriesQuery query, final MetricManipulationFn fn
)
{
return makeComputeManipulatorFn(query, fn, false);
}
@Override
public Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makePostComputeManipulatorFn(
TimeseriesQuery query, MetricManipulationFn fn
)
{
return makeManipulatorFn(query, fn);
return makeComputeManipulatorFn(query, fn, true);
}
private Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makeComputeManipulatorFn(
final TimeseriesQuery query, final MetricManipulationFn fn, final boolean calculatePostAggs
)
{
return new Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>>()
{
@Override
public Result<TimeseriesResultValue> apply(Result<TimeseriesResultValue> result)
{
final Map<String, Object> values = Maps.newHashMap();
final TimeseriesResultValue holder = result.getValue();
if (calculatePostAggs) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), postAgg.compute(holder.getBaseObject()));
}
}
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName())));
}
return new Result<TimeseriesResultValue>(
result.getTimestamp(),
new TimeseriesResultValue(values)
);
}
};
}
}

View File

@ -149,7 +149,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
private String dimension = query.getDimensionSpec().getOutputName();
@Override
public Result<TopNResultValue> apply(@Nullable Result<TopNResultValue> result)
public Result<TopNResultValue> apply(Result<TopNResultValue> result)
{
List<Map<String, Object>> serializedValues = Lists.newArrayList(
Iterables.transform(
@ -157,7 +157,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
new Function<DimensionAndMetricValueExtractor, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(@Nullable DimensionAndMetricValueExtractor input)
public Map<String, Object> apply(DimensionAndMetricValueExtractor input)
{
final Map<String, Object> values = Maps.newHashMap();
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
@ -197,7 +197,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
private String dimension = query.getDimensionSpec().getOutputName();
@Override
public Result<TopNResultValue> apply(@Nullable Result<TopNResultValue> result)
public Result<TopNResultValue> apply(Result<TopNResultValue> result)
{
List<Map<String, Object>> serializedValues = Lists.newArrayList(
Iterables.transform(
@ -205,7 +205,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
new Function<DimensionAndMetricValueExtractor, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(@Nullable DimensionAndMetricValueExtractor input)
public Map<String, Object> apply(DimensionAndMetricValueExtractor input)
{
final Map<String, Object> values = Maps.newHashMap();
// compute all post aggs
@ -249,7 +249,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return new CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>()
{
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<PostAggregator> postAggs = query.getPostAggregatorSpecs();
@Override
public byte[] computeCacheKey(TopNQuery query)
@ -289,7 +288,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return new Function<Result<TopNResultValue>, Object>()
{
@Override
public Object apply(@Nullable final Result<TopNResultValue> input)
public Object apply(final Result<TopNResultValue> input)
{
List<DimensionAndMetricValueExtractor> results = Lists.newArrayList(input.getValue());
final List<Object> retVal = Lists.newArrayListWithCapacity(results.size() + 1);
@ -317,7 +316,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
private final QueryGranularity granularity = query.getGranularity();
@Override
public Result<TopNResultValue> apply(@Nullable Object input)
public Result<TopNResultValue> apply(Object input)
{
List<Object> results = (List<Object>) input;
List<Map<String, Object>> retVal = Lists.newArrayListWithCapacity(results.size());
@ -418,7 +417,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
@Override
public Result<TopNResultValue> apply(@Nullable Result<TopNResultValue> input)
public Result<TopNResultValue> apply(Result<TopNResultValue> input)
{
return new Result<TopNResultValue>(
input.getTimestamp(),

View File

@ -122,8 +122,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
final JavaType typeRef;
if (isBySegment) {
typeRef = types.rhs;
}
else {
} else {
typeRef = types.lhs;
}
@ -219,14 +218,15 @@ public class DirectDruidClient<T> implements QueryRunner<T>
retVal = Sequences.map(
retVal,
toolChest.makePreComputeManipulatorFn(
query, new MetricManipulationFn()
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
return factory.deserialize(object);
}
}
query,
new MetricManipulationFn()
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
return factory.deserialize(object);
}
}
)
);
}
@ -313,7 +313,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override
public void close() throws IOException
{
if(jp != null) {
if (jp != null) {
jp.close();
}
}