adding a new method getMergingFactory(..) to AggregatorFactory

This commit is contained in:
Himanshu Gupta 2015-09-18 12:17:06 -05:00
parent 77fc86c015
commit 52eb0f04a7
7 changed files with 133 additions and 2 deletions

View File

@ -41,7 +41,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
public abstract class SketchAggregatorFactory implements AggregatorFactory
public abstract class SketchAggregatorFactory extends AggregatorFactory
{
public static final int DEFAULT_MAX_SKETCH_SIZE = 16384;

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.yahoo.sketches.theta.Sketch;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import java.util.Collections;
import java.util.List;
@ -66,7 +67,25 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
@Override
public AggregatorFactory getCombiningFactory()
{
return new SketchMergeAggregatorFactory(name, name, size, shouldFinalize, isInputThetaSketch);
return new SketchMergeAggregatorFactory(name, name, size, shouldFinalize, true);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && other instanceof SketchMergeAggregatorFactory) {
SketchMergeAggregatorFactory castedOther = (SketchMergeAggregatorFactory) other;
return new SketchMergeAggregatorFactory(
name,
name,
Math.max(size, castedOther.size),
shouldFinalize,
true
);
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@JsonProperty

View File

@ -29,6 +29,7 @@ import com.google.common.primitives.Ints;
import com.metamx.common.StringUtils;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import org.apache.commons.codec.binary.Base64;
@ -116,6 +117,26 @@ public class ApproximateHistogramAggregatorFactory extends AggregatorFactory
return new ApproximateHistogramFoldingAggregatorFactory(name, name, resolution, numBuckets, lowerLimit, upperLimit);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && other instanceof ApproximateHistogramAggregatorFactory) {
ApproximateHistogramAggregatorFactory castedOther = (ApproximateHistogramAggregatorFactory) other;
return new ApproximateHistogramFoldingAggregatorFactory(
name,
name,
Math.max(resolution, castedOther.resolution),
numBuckets,
Math.min(lowerLimit, castedOther.lowerLimit),
Math.max(upperLimit, castedOther.upperLimit)
);
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -64,6 +64,22 @@ public abstract class AggregatorFactory
*/
public abstract AggregatorFactory getCombiningFactory();
/**
* Returns an AggregatorFactory that can be used to merge the output of aggregators from this factory and
* other factory.
* This method is relevant only for AggregatorFactory which can be used at ingestion time.
*
* @return a new Factory that can be used for merging the output of aggregators from this factory and other.
*/
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
/**
* Gets a list of all columns that this AggregatorFactory will scan
*

View File

@ -0,0 +1,57 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
/**
*/
public class AggregatorFactoryNotMergeableException extends Exception
{
public AggregatorFactoryNotMergeableException()
{
}
public AggregatorFactoryNotMergeableException(String formatText, Object... arguments)
{
super(String.format(formatText, arguments));
}
public AggregatorFactoryNotMergeableException(Throwable cause, String formatText, Object... arguments)
{
super(String.format(formatText, arguments), cause);
}
public AggregatorFactoryNotMergeableException(Throwable cause)
{
super(cause);
}
public AggregatorFactoryNotMergeableException(AggregatorFactory af1, AggregatorFactory af2)
{
this(
"can't merge [%s : %s] and [%s : %s] , with detailed info [%s] and [%s]",
af1.getName(),
af1.getClass().getName(),
af2.getName(),
af2.getClass().getName(),
af1,
af2
);
}
}

View File

@ -100,6 +100,12 @@ public class HistogramAggregatorFactory extends AggregatorFactory
return new HistogramAggregatorFactory(name, name, breaksList);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
throw new UnsupportedOperationException("can't merge HistogramAggregatorFactory");
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -137,6 +137,18 @@ public class JavaScriptAggregatorFactory extends AggregatorFactory
return new JavaScriptAggregatorFactory(name, Lists.newArrayList(name), fnCombine, fnReset, fnCombine);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && other.getClass() == this.getClass()) {
JavaScriptAggregatorFactory castedOther = (JavaScriptAggregatorFactory) other;
if (this.fnCombine.equals(castedOther.fnCombine) && this.fnReset.equals(castedOther.fnReset)) {
return getCombiningFactory();
}
}
throw new AggregatorFactoryNotMergeableException(this, other);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{