long max/min aggregators implementation

This commit is contained in:
Himanshu Gupta 2015-05-10 10:30:46 -05:00
parent 9fb9b8a510
commit 00436f93e2
7 changed files with 701 additions and 0 deletions

View File

@ -27,6 +27,8 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.HistogramAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongMinAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.MaxAggregatorFactory;
import io.druid.query.aggregation.MinAggregatorFactory;
@ -64,6 +66,8 @@ public class AggregatorsModule extends SimpleModule
@JsonSubTypes.Type(name = "doubleSum", value = DoubleSumAggregatorFactory.class),
@JsonSubTypes.Type(name = "max", value = MaxAggregatorFactory.class),
@JsonSubTypes.Type(name = "min", value = MinAggregatorFactory.class),
@JsonSubTypes.Type(name = "longMax", value = LongMaxAggregatorFactory.class),
@JsonSubTypes.Type(name = "longMin", value = LongMinAggregatorFactory.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptAggregatorFactory.class),
@JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class),
@JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class),

View File

@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.segment.LongColumnSelector;
import java.util.Comparator;
/**
*/
public class LongMaxAggregator implements Aggregator
{
static final Comparator COMPARATOR = LongSumAggregator.COMPARATOR;
static long combineValues(Object lhs, Object rhs)
{
return Math.max(((Number) lhs).longValue(), ((Number) rhs).longValue());
}
private final LongColumnSelector selector;
private final String name;
private long max;
public LongMaxAggregator(String name, LongColumnSelector selector)
{
this.name = name;
this.selector = selector;
reset();
}
@Override
public void aggregate()
{
max = Math.max(max, selector.get());
}
@Override
public void reset()
{
max = Long.MIN_VALUE;
}
@Override
public Object get()
{
return max;
}
@Override
public float getFloat()
{
return (float) max;
}
@Override
public long getLong()
{
return max;
}
@Override
public String getName()
{
return this.name;
}
@Override
public Aggregator clone()
{
return new LongMaxAggregator(name, selector);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import io.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
/**
*/
public class LongMaxAggregatorFactory implements AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0xA;
private final String fieldName;
private final String name;
@JsonCreator
public LongMaxAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name;
this.fieldName = fieldName;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new LongMaxAggregator(name, metricFactory.makeLongColumnSelector(fieldName));
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new LongMaxBufferAggregator(metricFactory.makeLongColumnSelector(fieldName));
}
@Override
public Comparator getComparator()
{
return LongMaxAggregator.COMPARATOR;
}
@Override
public Object combine(Object lhs, Object rhs)
{
return LongMaxAggregator.combineValues(lhs, rhs);
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new LongMaxAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new LongMaxAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{
return object;
}
@Override
public Object finalizeComputation(Object object)
{
return object;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public List<String> requiredFields()
{
return Arrays.asList(fieldName);
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_TYPE_ID).put(fieldNameBytes).array();
}
@Override
public String getTypeName()
{
return "long";
}
@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES;
}
@Override
public Object getAggregatorStartValue()
{
return Long.MIN_VALUE;
}
@Override
public String toString()
{
return "LongMaxAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LongMaxAggregatorFactory that = (LongMaxAggregatorFactory) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.segment.LongColumnSelector;
import java.nio.ByteBuffer;
/**
*/
public class LongMaxBufferAggregator implements BufferAggregator
{
private final LongColumnSelector selector;
public LongMaxBufferAggregator(LongColumnSelector selector)
{
this.selector = selector;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MIN_VALUE);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
buf.putLong(position, Math.max(buf.getLong(position), selector.get()));
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.segment.LongColumnSelector;
import java.util.Comparator;
/**
*/
public class LongMinAggregator implements Aggregator
{
static final Comparator COMPARATOR = LongSumAggregator.COMPARATOR;
static long combineValues(Object lhs, Object rhs)
{
return Math.min(((Number) lhs).longValue(), ((Number) rhs).longValue());
}
private final LongColumnSelector selector;
private final String name;
private long min;
public LongMinAggregator(String name, LongColumnSelector selector)
{
this.name = name;
this.selector = selector;
reset();
}
@Override
public void aggregate()
{
min = Math.min(min, selector.get());
}
@Override
public void reset()
{
min = Long.MAX_VALUE;
}
@Override
public Object get()
{
return min;
}
@Override
public float getFloat()
{
return (float) min;
}
@Override
public long getLong()
{
return min;
}
@Override
public String getName()
{
return this.name;
}
@Override
public Aggregator clone()
{
return new LongMinAggregator(name, selector);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import io.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
/**
*/
public class LongMinAggregatorFactory implements AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0xB;
private final String fieldName;
private final String name;
@JsonCreator
public LongMinAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name;
this.fieldName = fieldName;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new LongMinAggregator(name, metricFactory.makeLongColumnSelector(fieldName));
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new LongMinBufferAggregator(metricFactory.makeLongColumnSelector(fieldName));
}
@Override
public Comparator getComparator()
{
return LongMinAggregator.COMPARATOR;
}
@Override
public Object combine(Object lhs, Object rhs)
{
return LongMinAggregator.combineValues(lhs, rhs);
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new LongMinAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new LongMinAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{
return object;
}
@Override
public Object finalizeComputation(Object object)
{
return object;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public List<String> requiredFields()
{
return Arrays.asList(fieldName);
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_TYPE_ID).put(fieldNameBytes).array();
}
@Override
public String getTypeName()
{
return "long";
}
@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES;
}
@Override
public Object getAggregatorStartValue()
{
return Long.MAX_VALUE;
}
@Override
public String toString()
{
return "LongMinAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LongMinAggregatorFactory that = (LongMinAggregatorFactory) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.segment.LongColumnSelector;
import java.nio.ByteBuffer;
/**
*/
public class LongMinBufferAggregator implements BufferAggregator
{
private final LongColumnSelector selector;
public LongMinBufferAggregator(LongColumnSelector selector)
{
this.selector = selector;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MAX_VALUE);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
buf.putLong(position, Math.min(buf.getLong(position), selector.get()));
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}