From 3c9b8dc9999bcc2c7980d672a9010ef87084e914 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 24 Apr 2014 22:08:25 -0700 Subject: [PATCH] cardinality aggregator --- .../io/druid/jackson/AggregatorsModule.java | 4 +- .../cardinality/CardinalityAggregator.java | 148 +++++++ .../CardinalityAggregatorFactory.java | 228 +++++++++++ .../CardinalityBufferAggregator.java | 91 +++++ .../CardinalityAggregatorBenchmark.java | 124 ++++++ .../CardinalityAggregatorTest.java | 370 ++++++++++++++++++ 6 files changed, 964 insertions(+), 1 deletion(-) create mode 100644 processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java create mode 100644 processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorBenchmark.java create mode 100644 processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java diff --git a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java index 3029d2bcc4e..908a17cb058 100644 --- a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java +++ b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java @@ -32,6 +32,7 @@ import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.MaxAggregatorFactory; import io.druid.query.aggregation.MinAggregatorFactory; import io.druid.query.aggregation.PostAggregator; +import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; @@ -66,7 +67,8 @@ public class AggregatorsModule extends SimpleModule @JsonSubTypes.Type(name = "min", value = MinAggregatorFactory.class), @JsonSubTypes.Type(name = "javascript", value = JavaScriptAggregatorFactory.class), @JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class), - @JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class) + @JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class), + @JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class) }) public static interface AggregatorFactoryMixin { diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java new file mode 100644 index 00000000000..cde4e643110 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java @@ -0,0 +1,148 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.cardinality; + +import com.google.common.collect.Lists; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; +import io.druid.segment.DimensionSelector; +import io.druid.segment.data.IndexedInts; + +import java.util.Arrays; +import java.util.List; + +public class CardinalityAggregator implements Aggregator +{ + private static final String NULL_STRING = "\u0000"; + + private final String name; + private final List selectorList; + private final boolean byRow; + + private static final HashFunction hashFn = Hashing.murmur3_128(); + public static final char SEPARATOR = '\u0001'; + + protected static void hashRow(List selectorList, HyperLogLogCollector collector) + { + final Hasher hasher = hashFn.newHasher(); + for (int k = 0; k < selectorList.size(); ++k) { + if(k != 0) { + hasher.putByte((byte)0); + } + final DimensionSelector selector = selectorList.get(k); + final IndexedInts row = selector.getRow(); + final int size = row.size(); + if (size == 1) { + final String value = selector.lookupName(row.get(0)); + hasher.putString(value != null ? value : NULL_STRING); + } else if (size == 0) { + // nothing to add to hasher + } else { + final String[] values = new String[size]; + for (int i = 0; i < size; ++i) { + final String value = selector.lookupName(row.get(i)); + values[i] = value != null ? value : NULL_STRING; + } + Arrays.sort(values); + for (int i = 0; i < size; ++i) { + if(i != 0) { + hasher.putChar(SEPARATOR); + } + final String value = values[i]; + hasher.putString(value); + } + } + } + collector.add(hasher.hash().asBytes()); + } + + protected static void hashValues(final List selectors, HyperLogLogCollector collector) + { + for (final DimensionSelector selector : selectors) { + for (final Integer index : selector.getRow()) { + final String value = selector.lookupName(index); + collector.add(hashFn.hashString(value == null ? NULL_STRING : value).asBytes()); + } + } + } + + private HyperLogLogCollector collector; + + public CardinalityAggregator( + String name, + List selectorList, + boolean byRow + ) + { + this.name = name; + this.selectorList = Lists.newArrayList(selectorList); + this.collector = HyperLogLogCollector.makeLatestCollector(); + this.byRow = byRow; + } + + @Override + public void aggregate() + { + if(byRow) { + hashRow(selectorList, collector); + } else { + hashValues(selectorList, collector); + } + } + + @Override + public void reset() + { + collector = HyperLogLogCollector.makeLatestCollector(); + } + + @Override + public Object get() + { + return collector; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("CardinalityAggregator does not support getFloat()"); + } + + @Override + public String getName() + { + return name; + } + + @Override + public Aggregator clone() + { + return new CardinalityAggregator(name, selectorList, byRow); + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java new file mode 100644 index 00000000000..95303187e8d --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java @@ -0,0 +1,228 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.cardinality; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.NoopAggregator; +import io.druid.query.aggregation.NoopBufferAggregator; +import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionSelector; +import org.apache.commons.codec.binary.Base64; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.List; + +public class CardinalityAggregatorFactory implements AggregatorFactory +{ + public static Object estimateCardinality(Object object) + { + if (object == null) { + return 0; + } + + return ((HyperLogLogCollector) object).estimateCardinality(); + } + + private static final byte CACHE_TYPE_ID = (byte) 0x8; + + private final String name; + private final List fieldNames; + private final boolean byRow; + + @JsonCreator + public CardinalityAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldNames") final List fieldNames, + @JsonProperty("byRow") final Boolean byRow + ) + { + this.name = name; + this.fieldNames = fieldNames; + this.byRow = byRow == null ? false : byRow; + } + + @Override + public Aggregator factorize(final ColumnSelectorFactory columnFactory) + { + List selectors = makeDimensionSelectors(columnFactory); + + if (selectors.isEmpty()) { + return new NoopAggregator(name); + } + + return new CardinalityAggregator(name, selectors, byRow); + } + + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) + { + List selectors = makeDimensionSelectors(columnFactory); + + if (selectors.isEmpty()) { + return new NoopBufferAggregator(); + } + + return new CardinalityBufferAggregator(selectors, byRow); + } + + private List makeDimensionSelectors(final ColumnSelectorFactory columnFactory) + { + return Lists.newArrayList( + Iterables.filter( + Iterables.transform( + fieldNames, new Function() + { + @Nullable + @Override + public DimensionSelector apply(@Nullable String input) + { + return columnFactory.makeDimensionSelector(input); + } + } + ), Predicates.notNull() + ) + ); + } + + @Override + public Comparator getComparator() + { + return new Comparator() + { + @Override + public int compare(HyperLogLogCollector lhs, HyperLogLogCollector rhs) + { + return lhs.compareTo(rhs); + } + }; + } + + @Override + public Object combine(Object lhs, Object rhs) + { + if (rhs == null) { + return lhs; + } + if (lhs == null) { + return rhs; + } + return ((HyperLogLogCollector) lhs).fold((HyperLogLogCollector) rhs); + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new CardinalityAggregatorFactory(name, fieldNames, byRow); + } + + @Override + public Object deserialize(Object object) + { + if (object instanceof byte[]) { + return HyperLogLogCollector.makeCollector(ByteBuffer.wrap((byte[]) object)); + } else if (object instanceof ByteBuffer) { + return HyperLogLogCollector.makeCollector((ByteBuffer) object); + } else if (object instanceof String) { + return HyperLogLogCollector.makeCollector( + ByteBuffer.wrap(Base64.decodeBase64(((String) object).getBytes(Charsets.UTF_8))) + ); + } + return object; + } + + @Override + + public Object finalizeComputation(Object object) + { + return estimateCardinality(object); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @Override + public List requiredFields() + { + return fieldNames; + } + + @JsonProperty + public List getFieldNames() + { + return fieldNames; + } + + @Override + public byte[] getCacheKey() + { + byte[] fieldNameBytes = Joiner.on("\u0001").join(fieldNames).getBytes(Charsets.UTF_8); + + return ByteBuffer.allocate(1 + fieldNameBytes.length) + .put(CACHE_TYPE_ID) + .put(fieldNameBytes) + .array(); + } + + @Override + public String getTypeName() + { + return "hyperUnique"; + } + + @Override + public int getMaxIntermediateSize() + { + return HyperLogLogCollector.getLatestNumBytesForDenseStorage(); + } + + @Override + public Object getAggregatorStartValue() + { + return HyperLogLogCollector.makeLatestCollector(); + } + + @Override + public String toString() + { + return "CardinalityAggregatorFactory{" + + "name='" + name + '\'' + + ", fieldNames='" + fieldNames + '\'' + + '}'; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java new file mode 100644 index 00000000000..eb4b4e25101 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java @@ -0,0 +1,91 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.cardinality; + +import com.google.common.collect.Lists; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; +import io.druid.segment.DimensionSelector; + +import java.nio.ByteBuffer; +import java.util.List; + +public class CardinalityBufferAggregator implements BufferAggregator +{ + private final List selectorList; + private final boolean byRow; + + private static final byte[] EMPTY_BYTES = HyperLogLogCollector.makeEmptyVersionedByteArray(); + + public CardinalityBufferAggregator( + List selectorList, + boolean byRow + ) + { + this.selectorList = Lists.newArrayList(selectorList); + this.byRow = byRow; + } + + @Override + public void init(ByteBuffer buf, int position) + { + final ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + mutationBuffer.put(EMPTY_BYTES); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector( + (ByteBuffer) buf.duplicate().position(position).limit( + position + + HyperLogLogCollector.getLatestNumBytesForDenseStorage() + ) + ); + if(byRow) { + CardinalityAggregator.hashRow(selectorList, collector); + } else { + CardinalityAggregator.hashValues(selectorList, collector); + } + } + + @Override + public Object get(ByteBuffer buf, int position) + { + ByteBuffer dataCopyBuffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + mutationBuffer.get(dataCopyBuffer.array()); + return HyperLogLogCollector.makeCollector(dataCopyBuffer); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorBenchmark.java b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorBenchmark.java new file mode 100644 index 00000000000..9adc153fb50 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorBenchmark.java @@ -0,0 +1,124 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.cardinality; + +import com.google.caliper.Param; +import com.google.caliper.Runner; +import com.google.caliper.SimpleBenchmark; +import com.google.common.base.Function; +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.DiscreteDomain; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import io.druid.segment.DimensionSelector; + +import java.nio.ByteBuffer; +import java.util.List; + +public class CardinalityAggregatorBenchmark extends SimpleBenchmark +{ + private final static int MAX = 5_000_000; + + CardinalityBufferAggregator agg; + List selectorList; + ByteBuffer buf; + int pos; + + @Param({"1", "5"}) int multivaluedSized; + @Param({"true", "false"}) boolean byRow; + + protected void setUp() { + Iterable values = FluentIterable + .from(ContiguousSet.create(Range.closedOpen(0, 500), DiscreteDomain.integers())) + .transform( + new Function() + { + @Override + public String[] apply(Integer input) + { + if(multivaluedSized == 1) { + return new String[]{input.toString()}; + } + else { + String[] res = new String[multivaluedSized]; + String value = input.toString(); + for(int i = 0; i < multivaluedSized; ++i) res[i] = value + i; + return res; + } + } + } + ) + .cycle() + .limit(MAX); + + + final CardinalityAggregatorTest.TestDimensionSelector dim1 = + new CardinalityAggregatorTest.TestDimensionSelector(values); + + selectorList = Lists.newArrayList( + (DimensionSelector) dim1 + ); + + agg = new CardinalityBufferAggregator( + selectorList, + byRow + ); + + CardinalityAggregatorFactory factory = new CardinalityAggregatorFactory( + "billy", + Lists.newArrayList("dim1"), + byRow + ); + + int maxSize = factory.getMaxIntermediateSize(); + buf = ByteBuffer.allocate(maxSize + 64); + pos = 10; + buf.limit(pos + maxSize); + + agg.init(buf, pos); + } + + public Object timeBufferAggregate(int reps) throws Exception { + for (int i = 0; i < reps; ++i) { + agg.aggregate(buf, pos); + + for (final DimensionSelector selector : selectorList) { + if(i % (MAX - 1) == 0) { + ((CardinalityAggregatorTest.TestDimensionSelector) selector).reset(); + } + else { + ((CardinalityAggregatorTest.TestDimensionSelector) selector).increment(); + } + } + } + return agg.get(buf, pos); + } + + + @Override + protected void tearDown() { + + } + + public static void main(String[] args) throws Exception { + Runner.main(CardinalityAggregatorBenchmark.class, args); + } +} diff --git a/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java new file mode 100644 index 00000000000..239d249a5d6 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java @@ -0,0 +1,370 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.cardinality; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.DimensionSelector; +import io.druid.segment.data.IndexedInts; +import junit.framework.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class CardinalityAggregatorTest +{ + public static class TestDimensionSelector implements DimensionSelector + { + private final List column; + private final Map ids; + private final Map lookup; + + private int pos = 0; + + public TestDimensionSelector(Iterable values) + { + this.lookup = Maps.newHashMap(); + this.ids = Maps.newHashMap(); + + int index = 0; + for (String[] multiValue : values) { + for (String value : multiValue) { + if (!ids.containsKey(value)) { + ids.put(value, index); + lookup.put(index, value); + index++; + } + } + } + + this.column = Lists.newArrayList( + Iterables.transform( + values, new Function() + { + @Nullable + @Override + public Integer[] apply(@Nullable String[] input) + { + return Iterators.toArray( + Iterators.transform( + Iterators.forArray(input), new Function() + { + @Nullable + @Override + public Integer apply(@Nullable String input) + { + return ids.get(input); + } + } + ), Integer.class + ); + } + } + ) + ); + } + + public void increment() + { + pos++; + } + + public void reset() + { + pos = 0; + } + + @Override + public IndexedInts getRow() + { + final int p = this.pos; + return new IndexedInts() + { + @Override + public int size() + { + return column.get(p).length; + } + + @Override + public int get(int i) + { + return column.get(p)[i]; + } + + @Override + public Iterator iterator() + { + return Iterators.forArray(column.get(p)); + } + }; + } + + @Override + public int getValueCardinality() + { + return 1; + } + + @Override + public String lookupName(int i) + { + return lookup.get(i); + } + + @Override + public int lookupId(String s) + { + return ids.get(s); + } + } + + /* + values1: 4 distinct rows + values1: 4 distinct values + values2: 8 distinct rows + values2: 7 distinct values + groupBy(values1, values2): 9 distinct rows + groupBy(values1, values2): 7 distinct values + combine(values1, values2): 8 distinct rows + combine(values1, values2): 7 distinct values + */ + private static final List values1 = dimensionValues( + "a", "b", "c", "a", "a", null, "b", "b", "b", "b", "a", "a" + ); + private static final List values2 = dimensionValues( + "a", "b", "c", "x", "a", "e", "b", new String[]{null, "x"}, new String[]{"x", null}, new String[]{"y", "x"}, new String[]{"x", "y"}, new String[] {"x", "y", "a"} + ); + + private static List dimensionValues(Object... values) + { + return Lists.transform( + Lists.newArrayList(values), new Function() + { + @Nullable + @Override + public String[] apply(@Nullable Object input) + { + if (input instanceof String[]) { + return (String[]) input; + } else { + return new String[]{(String) input}; + } + } + } + ); + } + + private static void aggregate(List selectorList, Aggregator agg) + { + agg.aggregate(); + + for (DimensionSelector selector : selectorList) { + ((TestDimensionSelector) selector).increment(); + } + } + + private static void bufferAggregate( + List selectorList, + BufferAggregator agg, + ByteBuffer buf, + int pos + ) + { + agg.aggregate(buf, pos); + + for (DimensionSelector selector : selectorList) { + ((TestDimensionSelector) selector).increment(); + } + } + + List selectorList; + CardinalityAggregatorFactory rowAggregatorFactory; + CardinalityAggregatorFactory valueAggregatorFactory; + final TestDimensionSelector dim1; + final TestDimensionSelector dim2; + + public CardinalityAggregatorTest() + { + dim1 = new TestDimensionSelector(values1); + dim2 = new TestDimensionSelector(values2); + + selectorList = Lists.newArrayList( + (DimensionSelector) dim1, + dim2 + ); + + rowAggregatorFactory = new CardinalityAggregatorFactory( + "billy", + Lists.newArrayList("dim1", "dim2"), + true + ); + + valueAggregatorFactory = new CardinalityAggregatorFactory( + "billy", + Lists.newArrayList("dim1", "dim2"), + true + ); + } + + @Test + public void testAggregateRows() throws Exception + { + CardinalityAggregator agg = new CardinalityAggregator( + "billy", + selectorList, + true + ); + + + for (int i = 0; i < values1.size(); ++i) { + aggregate(selectorList, agg); + } + Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get()), 0.05); + } + + @Test + public void testAggregateValues() throws Exception + { + CardinalityAggregator agg = new CardinalityAggregator( + "billy", + selectorList, + false + ); + + for (int i = 0; i < values1.size(); ++i) { + aggregate(selectorList, agg); + } + Assert.assertEquals(7.0, (Double) valueAggregatorFactory.finalizeComputation(agg.get()), 0.05); + } + + @Test + public void testBufferAggregateRows() throws Exception + { + CardinalityBufferAggregator agg = new CardinalityBufferAggregator( + selectorList, + true + ); + + int maxSize = rowAggregatorFactory.getMaxIntermediateSize(); + ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); + int pos = 10; + buf.limit(pos + maxSize); + + agg.init(buf, pos); + + for (int i = 0; i < values1.size(); ++i) { + bufferAggregate(selectorList, agg, buf, pos); + } + Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get(buf, pos)), 0.05); + } + + @Test + public void testBufferAggregateValues() throws Exception + { + CardinalityBufferAggregator agg = new CardinalityBufferAggregator( + selectorList, + false + ); + + int maxSize = valueAggregatorFactory.getMaxIntermediateSize(); + ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); + int pos = 10; + buf.limit(pos + maxSize); + + agg.init(buf, pos); + + for (int i = 0; i < values1.size(); ++i) { + bufferAggregate(selectorList, agg, buf, pos); + } + Assert.assertEquals(7.0, (Double) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)), 0.05); + } + + @Test + public void testCombineRows() + { + List selector1 = Lists.newArrayList((DimensionSelector)dim1); + List selector2 = Lists.newArrayList((DimensionSelector) dim2); + + CardinalityAggregator agg1 = new CardinalityAggregator("billy", selector1, true); + CardinalityAggregator agg2 = new CardinalityAggregator("billy", selector2, true); + + for (int i = 0; i < values1.size(); ++i) { + aggregate(selector1, agg1); + } + for (int i = 0; i < values2.size(); ++i) { + aggregate(selector2, agg2); + } + + Assert.assertEquals(4.0, (Double) rowAggregatorFactory.finalizeComputation(agg1.get()), 0.05); + Assert.assertEquals(8.0, (Double) rowAggregatorFactory.finalizeComputation(agg2.get()), 0.05); + + Assert.assertEquals( + 9.0, + (Double) rowAggregatorFactory.finalizeComputation( + rowAggregatorFactory.combine( + agg1.get(), + agg2.get() + ) + ), + 0.05 + ); + } + + @Test + public void testCombineValues() + { + List selector1 = Lists.newArrayList((DimensionSelector)dim1); + List selector2 = Lists.newArrayList((DimensionSelector)dim2); + + CardinalityAggregator agg1 = new CardinalityAggregator("billy", selector1, false); + CardinalityAggregator agg2 = new CardinalityAggregator("billy", selector2, false); + + for (int i = 0; i < values1.size(); ++i) { + aggregate(selector1, agg1); + } + for (int i = 0; i < values2.size(); ++i) { + aggregate(selector2, agg2); + } + + Assert.assertEquals(4.0, (Double) valueAggregatorFactory.finalizeComputation(agg1.get()), 0.05); + Assert.assertEquals(7.0, (Double) valueAggregatorFactory.finalizeComputation(agg2.get()), 0.05); + + Assert.assertEquals( + 7.0, + (Double) rowAggregatorFactory.finalizeComputation( + rowAggregatorFactory.combine( + agg1.get(), + agg2.get() + ) + ), + 0.05 + ); + } +}