diff --git a/hll/pom.xml b/hll/pom.xml new file mode 100644 index 00000000000..58e17295c96 --- /dev/null +++ b/hll/pom.xml @@ -0,0 +1,82 @@ + + + + + 4.0.0 + io.druid.extensions + druid-hll + druid-hll + druid-hll + + + io.druid + druid + 0.6.53-SNAPSHOT + + + + + io.druid + druid-api + + + io.druid + druid-processing + ${project.parent.version} + + + com.metamx + emitter + + + net.sf.trove4j + trove4j + 3.0.3 + + + commons-codec + commons-codec + + + + + junit + junit + test + + + + + + + maven-jar-plugin + + + + true + true + + + + + + + diff --git a/hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregator.java b/hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregator.java new file mode 100755 index 00000000000..24013a2e391 --- /dev/null +++ b/hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregator.java @@ -0,0 +1,137 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation; + +import com.google.common.hash.Hashing; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import gnu.trove.map.TIntByteMap; +import gnu.trove.map.hash.TIntByteHashMap; +import io.druid.segment.ObjectColumnSelector; + +import java.util.Comparator; + +public class HyperloglogAggregator implements Aggregator +{ + private static final Logger log = new Logger(HyperloglogAggregator.class); + + public static final int log2m = 12; + public static final int m = (int) Math.pow(2, log2m); + public static final double alphaMM = (0.7213 / (1 + 1.079 / m)) * m * m; + + private final String name; + private final ObjectColumnSelector selector; + + private TIntByteHashMap ibMap; + + static final Comparator COMPARATOR = new Comparator() + { + @Override + public int compare(Object o, Object o1) + { + return o.equals(o1) ? 0 : 1; + } + }; + + static Object combine(Object lhs, Object rhs) + { + final TIntByteMap newIbMap = new TIntByteHashMap((TIntByteMap) lhs); + final TIntByteMap rightIbMap = (TIntByteMap) rhs; + final int[] keys = rightIbMap.keys(); + + for (int key : keys) { + if (newIbMap.get(key) == newIbMap.getNoEntryValue() || rightIbMap.get(key) > newIbMap.get(key)) { + newIbMap.put(key, rightIbMap.get(key)); + } + } + + return newIbMap; + } + + public HyperloglogAggregator(String name, ObjectColumnSelector selector) + { + this.name = name; + this.selector = selector; + this.ibMap = new TIntByteHashMap(); + } + + @Override + public void aggregate() + { + final Object value = selector.get(); + + if (value == null) { + return; + } + + if (value instanceof TIntByteHashMap) { + final TIntByteHashMap newIbMap = (TIntByteHashMap) value; + final int[] indexes = newIbMap.keys(); + + for (int index : indexes) { + if (ibMap.get(index) == ibMap.getNoEntryValue() || newIbMap.get(index) > ibMap.get(index)) { + ibMap.put(index, newIbMap.get(index)); + } + } + } else if (value instanceof String) { + log.debug("value [%s]", selector.get()); + + final long id = Hashing.murmur3_128().hashString((String) (value)).asLong(); + final int bucket = (int) (id >>> (Long.SIZE - log2m)); + final int zerolength = Long.numberOfLeadingZeros((id << log2m) | (1 << (log2m - 1)) + 1) + 1; + + if (ibMap.get(bucket) == ibMap.getNoEntryValue() || ibMap.get(bucket) < (byte) zerolength) { + ibMap.put(bucket, (byte) zerolength); + } + } else { + throw new ISE("Aggregate does not support values of type[%s]", value.getClass().getName()); + } + } + + @Override + public void reset() + { + this.ibMap = new TIntByteHashMap(); + } + + @Override + public Object get() + { + return ibMap; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("HyperloglogAggregator does not support getFloat()"); + } + + @Override + public String getName() + { + return name; + } + + @Override + public void close() + { + // do nothing + } +} diff --git a/hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregatorFactory.java b/hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregatorFactory.java new file mode 100755 index 00000000000..3f7150b9221 --- /dev/null +++ b/hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregatorFactory.java @@ -0,0 +1,209 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.metamx.common.logger.Logger; +import gnu.trove.map.hash.TIntByteHashMap; +import io.druid.segment.ColumnSelectorFactory; +import org.apache.commons.codec.binary.Base64; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +public class HyperloglogAggregatorFactory implements AggregatorFactory +{ + private static final Logger log = new Logger(HyperloglogAggregatorFactory.class); + private static final byte[] CACHE_KEY = new byte[]{0x37}; + + private final String name; + private final String fieldName; + + @JsonCreator + public HyperloglogAggregatorFactory( + @JsonProperty("name") final String name, + @JsonProperty("fieldName") final String fieldName + ) + { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + + this.name = name; + this.fieldName = fieldName; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new HyperloglogAggregator( + name, + metricFactory.makeObjectColumnSelector(fieldName) + ); + } + + @Override + public BufferAggregator factorizeBuffered( + ColumnSelectorFactory metricFactory + ) + { + return new HyperloglogBufferAggregator( + metricFactory.makeObjectColumnSelector(fieldName) + ); + } + + @Override + public Comparator getComparator() + { + return HyperloglogAggregator.COMPARATOR; + } + + @Override + public Object combine(Object lhs, Object rhs) + { + if (rhs == null) { + return lhs; + } + if (lhs == null) { + return rhs; + } + return HyperloglogAggregator.combine(lhs, rhs); + } + + @Override + public AggregatorFactory getCombiningFactory() + { + log.debug("factory name: %s", name); + return new HyperloglogAggregatorFactory(name, fieldName); + } + + @Override + public Object deserialize(Object object) + { + log.debug("class name: [%s]:value [%s]", object.getClass().getName(), object); + + final String k = (String) object; + final byte[] ibmapByte = Base64.decodeBase64(k); + + final ByteBuffer buffer = ByteBuffer.wrap(ibmapByte); + final int keylength = buffer.getInt(); + final int valuelength = buffer.getInt(); + + TIntByteHashMap newIbMap; + + if (keylength == 0) { + newIbMap = new TIntByteHashMap(); + } else { + final int[] keys = new int[keylength]; + final byte[] values = new byte[valuelength]; + + for (int i = 0; i < keylength; i++) { + keys[i] = buffer.getInt(); + } + buffer.get(values); + + newIbMap = new TIntByteHashMap(keys, values); + } + + return newIbMap; + } + + @Override + public Object finalizeComputation(Object object) + { + final TIntByteHashMap ibMap = (TIntByteHashMap) object; + final int[] keys = ibMap.keys(); + final int count = keys.length; + + double registerSum = 0; + double zeros = 0.0; + + for (int key : keys) { + int val = ibMap.get(key); + + registerSum += 1.0 / (1 << val); + + if (val == 0) { + zeros++; + } + } + + registerSum += (HyperloglogAggregator.m - count); + zeros += HyperloglogAggregator.m - count; + + double estimate = HyperloglogAggregator.alphaMM * (1.0 / registerSum); + + if (estimate <= (5.0 / 2.0) * (HyperloglogAggregator.m)) { + // Small Range Estimate + return Math.round(HyperloglogAggregator.m * Math.log(HyperloglogAggregator.m / zeros)); + } else { + return Math.round(estimate); + } + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @Override + public List requiredFields() + { + return Arrays.asList(fieldName); + } + + @Override + public byte[] getCacheKey() + { + + byte[] fieldNameBytes = fieldName.getBytes(); + return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_KEY) + .put(fieldNameBytes).array(); + } + + @Override + public String getTypeName() + { + return "hyperloglog"; + } + + @Override + public int getMaxIntermediateSize() + { + return HyperloglogAggregator.m; + } + + @Override + public Object getAggregatorStartValue() + { + return new TIntByteHashMap(); + } +} diff --git a/hll/src/main/java/io/druid/query/aggregation/HyperloglogBufferAggregator.java b/hll/src/main/java/io/druid/query/aggregation/HyperloglogBufferAggregator.java new file mode 100755 index 00000000000..3681fc8bd40 --- /dev/null +++ b/hll/src/main/java/io/druid/query/aggregation/HyperloglogBufferAggregator.java @@ -0,0 +1,94 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation; + +import gnu.trove.map.hash.TIntByteHashMap; +import gnu.trove.procedure.TIntByteProcedure; +import io.druid.segment.ObjectColumnSelector; + +import java.nio.ByteBuffer; + +public class HyperloglogBufferAggregator implements BufferAggregator +{ + private final ObjectColumnSelector selector; + + public HyperloglogBufferAggregator(ObjectColumnSelector selector) + { + this.selector = selector; + } + + /* + * byte 1 key length byte 2 value length byte 3...n key array byte n+1.... + * value array + */ + @Override + public void init(ByteBuffer buf, int position) + { + for (int i = 0; i < HyperloglogAggregator.m; i++) { + buf.put(position + i, (byte) 0); + } + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + final ByteBuffer fb = buf; + final int fp = position; + final TIntByteHashMap newObj = (TIntByteHashMap) (selector.get()); + + newObj.forEachEntry( + new TIntByteProcedure() + { + public boolean execute(int a, byte b) + { + if (b > fb.get(fp + a)) { + fb.put(fp + a, b); + } + return true; + } + } + ); + } + + @Override + public Object get(ByteBuffer buf, int position) + { + final TIntByteHashMap ret = new TIntByteHashMap(); + + for (int i = 0; i < HyperloglogAggregator.m; i++) { + if (buf.get(position + i) != 0) { + ret.put(i, buf.get(position + i)); + } + } + return ret; + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("HyperloglogAggregator does not support getFloat()"); + } + + @Override + public void close() + { + // do nothing + } +} diff --git a/hll/src/main/java/io/druid/query/aggregation/HyperloglogComplexMetricSerde.java b/hll/src/main/java/io/druid/query/aggregation/HyperloglogComplexMetricSerde.java new file mode 100755 index 00000000000..8ba20b4a458 --- /dev/null +++ b/hll/src/main/java/io/druid/query/aggregation/HyperloglogComplexMetricSerde.java @@ -0,0 +1,137 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation; + +import gnu.trove.map.hash.TIntByteHashMap; +import io.druid.data.input.InputRow; +import io.druid.segment.column.ColumnBuilder; +import io.druid.segment.column.ValueType; +import io.druid.segment.data.GenericIndexed; +import io.druid.segment.data.ObjectStrategy; +import io.druid.segment.serde.ColumnPartSerde; +import io.druid.segment.serde.ComplexColumnPartSerde; +import io.druid.segment.serde.ComplexColumnPartSupplier; +import io.druid.segment.serde.ComplexMetricExtractor; +import io.druid.segment.serde.ComplexMetricSerde; + +import java.nio.ByteBuffer; +import java.util.List; + +public class HyperloglogComplexMetricSerde extends ComplexMetricSerde +{ + @Override + public String getTypeName() + { + return "hyperloglog"; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + return new HyperloglogComplexMetricExtractor(); + } + + @Override + public ColumnPartSerde deserializeColumn(ByteBuffer buffer, ColumnBuilder builder) + { + GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy()); + builder.setType(ValueType.COMPLEX); + builder.setComplexColumn(new ComplexColumnPartSupplier("hyperloglog", column)); + return new ComplexColumnPartSerde(column, "hyperloglog"); + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return new HyperloglogObjectStrategy(); + } + + public static class HyperloglogObjectStrategy implements ObjectStrategy + { + @Override + public Class getClazz() + { + return TIntByteHashMap.class; + } + + @Override + public TIntByteHashMap fromByteBuffer(ByteBuffer buffer, int numBytes) + { + int keylength = buffer.getInt(); + int valuelength = buffer.getInt(); + if (keylength == 0) { + return new TIntByteHashMap(); + } + int[] keys = new int[keylength]; + byte[] values = new byte[valuelength]; + + for (int i = 0; i < keylength; i++) { + keys[i] = buffer.getInt(); + } + + buffer.get(values); + + TIntByteHashMap tib = new TIntByteHashMap(keys, values); + return tib; + } + + @Override + public byte[] toBytes(TIntByteHashMap val) + { + TIntByteHashMap ibmap = val; + int[] indexesResult = ibmap.keys(); + byte[] valueResult = ibmap.values(); + ByteBuffer buffer = ByteBuffer.allocate(4 * indexesResult.length + valueResult.length + 8); + byte[] result = new byte[4 * indexesResult.length + valueResult.length + 8]; + buffer.putInt((int) indexesResult.length); + buffer.putInt((int) valueResult.length); + for (int i = 0; i < indexesResult.length; i++) { + buffer.putInt(indexesResult[i]); + } + + buffer.put(valueResult); + buffer.flip(); + buffer.get(result); + return result; + } + + @Override + public int compare(TIntByteHashMap o1, TIntByteHashMap o2) + { + return o1.equals(o2) ? 0 : 1; + } + } + + public static class HyperloglogComplexMetricExtractor implements ComplexMetricExtractor + { + @Override + public Class extractedClass() + { + return List.class; + } + + @Override + public Object extractValue(InputRow inputRow, String metricName) + { + return inputRow.getRaw(metricName); + } + } +} + diff --git a/hll/src/main/java/io/druid/query/aggregation/HyperloglogDruidModule.java b/hll/src/main/java/io/druid/query/aggregation/HyperloglogDruidModule.java new file mode 100644 index 00000000000..f7ef5cad748 --- /dev/null +++ b/hll/src/main/java/io/druid/query/aggregation/HyperloglogDruidModule.java @@ -0,0 +1,140 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import gnu.trove.map.hash.TIntByteHashMap; +import io.druid.initialization.DruidModule; +import io.druid.segment.serde.ComplexMetrics; +import org.apache.commons.codec.binary.Base64; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** + */ +public class HyperloglogDruidModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new HyperloglogJacksonSerdeModule().registerSubtypes( + new NamedType(HyperloglogAggregatorFactory.class, "hyperloglog") + ) + ); + } + + @Override + public void configure(Binder binder) + { + if (ComplexMetrics.getSerdeForType("hyperloglog") == null) { + ComplexMetrics.registerSerde("hyperloglog", new HyperloglogComplexMetricSerde()); + } + } + + public static class HyperloglogJacksonSerdeModule extends SimpleModule + { + public HyperloglogJacksonSerdeModule() + { + super("Hyperloglog deserializers"); + + addDeserializer( + TIntByteHashMap.class, + new JsonDeserializer() + { + @Override + public TIntByteHashMap deserialize( + JsonParser jp, + DeserializationContext ctxt + ) throws IOException + { + byte[] ibmapByte = Base64.decodeBase64(jp.getText()); + + ByteBuffer buffer = ByteBuffer.wrap(ibmapByte); + int keylength = buffer.getInt(); + int valuelength = buffer.getInt(); + if (keylength == 0) { + return (new TIntByteHashMap()); + } + int[] keys = new int[keylength]; + byte[] values = new byte[valuelength]; + + for (int i = 0; i < keylength; i++) { + keys[i] = buffer.getInt(); + } + buffer.get(values); + + return (new TIntByteHashMap(keys, values)); + } + } + ); + + addSerializer( + TIntByteHashMap.class, + new JsonSerializer() + { + @Override + public void serialize( + TIntByteHashMap ibmap, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider + ) + throws IOException, JsonProcessingException + { + int[] indexesResult = ibmap.keys(); + byte[] valueResult = ibmap.values(); + ByteBuffer buffer = ByteBuffer + .allocate( + 4 * indexesResult.length + + valueResult.length + 8 + ); + byte[] result = new byte[4 * indexesResult.length + + valueResult.length + 8]; + buffer.putInt((int) indexesResult.length); + buffer.putInt((int) valueResult.length); + for (int i = 0; i < indexesResult.length; i++) { + buffer.putInt(indexesResult[i]); + } + + buffer.put(valueResult); + buffer.flip(); + buffer.get(result); + String str = Base64.encodeBase64String(result); + jsonGenerator.writeString(str); + } + } + ); + + } + } +} diff --git a/hll/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/hll/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 00000000000..75977329c70 --- /dev/null +++ b/hll/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.query.aggregation.HyperloglogDruidModule \ No newline at end of file diff --git a/pom.xml b/pom.xml index 182c624b95f..127a80413b9 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ UTF-8 0.25.2 2.3.0 - 0.1.7 + 0.1.8 @@ -59,6 +59,7 @@ kafka-seven kafka-eight rabbitmq + hll diff --git a/server/src/main/java/io/druid/initialization/Initialization.java b/server/src/main/java/io/druid/initialization/Initialization.java index 5e8e0461202..2186e66fea3 100644 --- a/server/src/main/java/io/druid/initialization/Initialization.java +++ b/server/src/main/java/io/druid/initialization/Initialization.java @@ -196,7 +196,7 @@ public class Initialization if (!exclusions.contains(artifact.getGroupId())) { urls.add(artifact.getFile().toURI().toURL()); } else { - log.debug("Skipped Artifact[%s]", artifact); + log.info("Skipped Artifact[%s]", artifact); } }