Use an HllSketchHolder object to enable optimized merge (#13737)

* Use an HllSketchHolder object to enable optimized merge

HllSketchAggregatorFactory.combine had been implemented using a
pure pair-wise, "make a union -> add 2 things to union -> get sketch"
algorithm.  This algorithm does 2 things that was CPU

1) The Union object always builds an HLL_8 sketch regardless of the
  target type.  This means that when the target type is not HLL_8, we
  spent CPU cycles converting to HLL_8 and back over and over again
2) By throwing away the Union object and converting back to the
  HllSketch only to build another Union object, we do lots and lots
  of copy+conversions of the HllSketch

This change introduces an HllSketchHolder object which can hold onto
a Union object and delay conversion back into an HllSketch until
it is actually needed.  This follows the same pattern as the
SketchHolder object for theta sketches.
This commit is contained in:
imply-cheddar 2023-02-08 06:57:48 +09:00 committed by GitHub
parent dcdae84888
commit f684df4c22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 384 additions and 153 deletions

View File

@ -48,8 +48,8 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
public static final int DEFAULT_LG_K = 12;
public static final TgtHllType DEFAULT_TGT_HLL_TYPE = TgtHllType.HLL_4;
static final Comparator<HllSketch> COMPARATOR =
Comparator.nullsFirst(Comparator.comparingDouble(HllSketch::getEstimate));
static final Comparator<HllSketchHolder> COMPARATOR =
Comparator.nullsFirst(Comparator.comparingDouble(HllSketchHolder::getEstimate));
private final String name;
private final String fieldName;
@ -133,24 +133,31 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
}
@Override
public HllSketch deserialize(final Object object)
public HllSketchHolder deserialize(final Object object)
{
return HllSketchMergeComplexMetricSerde.deserializeSketch(object);
if (object == null) {
return HllSketchHolder.of(new HllSketch(lgK, tgtHllType));
}
return HllSketchHolder.fromObj(object);
}
@Override
public HllSketch combine(final Object objectA, final Object objectB)
public Object combine(final Object lhs, final Object rhs)
{
final Union union = new Union(lgK);
union.update((HllSketch) objectA);
union.update((HllSketch) objectB);
return union.getResult(tgtHllType);
if (lhs == null) {
return rhs;
}
if (rhs == null) {
return lhs;
}
return ((HllSketchHolder) lhs).merge((HllSketchHolder) rhs);
}
@Override
public AggregateCombiner makeAggregateCombiner()
{
return new ObjectAggregateCombiner<HllSketch>()
return new ObjectAggregateCombiner<HllSketchHolder>()
{
private final Union union = new Union(lgK);
@ -164,21 +171,21 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
@Override
public void fold(final ColumnValueSelector selector)
{
final HllSketch sketch = (HllSketch) selector.getObject();
union.update(sketch);
final HllSketchHolder sketch = (HllSketchHolder) selector.getObject();
union.update(sketch.getSketch());
}
@Nullable
@Override
public HllSketch getObject()
public HllSketchHolder getObject()
{
return union.getResult(tgtHllType);
return HllSketchHolder.of(union.getResult(tgtHllType));
}
@Override
public Class<HllSketch> classOfObject()
public Class<HllSketchHolder> classOfObject()
{
return HllSketch.class;
return HllSketchHolder.class;
}
};
}
@ -197,7 +204,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
return object;
}
final HllSketch sketch = (HllSketch) object;
final HllSketchHolder sketch = HllSketchHolder.fromObj(object);
final double estimate = sketch.getEstimate();
if (round) {
@ -208,7 +215,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
}
@Override
public Comparator<HllSketch> getComparator()
public Comparator<HllSketchHolder> getComparator()
{
return COMPARATOR;
}

View File

@ -73,7 +73,7 @@ public class HllSketchBuildAggregator implements Aggregator
@Override
public synchronized Object get()
{
return sketch.copy();
return HllSketchHolder.of(sketch.copy());
}
@Override

View File

@ -66,7 +66,7 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
@Override
public Object get(final ByteBuffer buf, final int position)
{
return helper.get(buf, position);
return HllSketchHolder.of(helper.get(buf, position));
}
@Override

View File

@ -82,7 +82,7 @@ public class HllSketchBuildBufferAggregatorHelper
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and
* {@link org.apache.druid.query.aggregation.VectorAggregator#get}.
*/
public Object get(ByteBuffer buf, int position)
public HllSketch get(ByteBuffer buf, int position)
{
return sketchCache.get(buf).get(position).copy();
}

View File

@ -93,7 +93,7 @@ public class HllSketchBuildVectorAggregator implements VectorAggregator
@Override
public Object get(final ByteBuffer buf, final int position)
{
return helper.get(buf, position);
return HllSketchHolder.of(helper.get(buf, position));
}
/**

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll;
import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.hll.Union;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
public class HllSketchHolder
{
public static HllSketchHolder fromObj(Object obj)
{
if (obj == null) {
throw new NullPointerException("HllSketchHolder.fromObj cannot take a null argument");
}
if (obj instanceof HllSketchHolder) {
return (HllSketchHolder) obj;
} else if (obj instanceof HllSketch) {
return HllSketchHolder.of((HllSketch) obj);
} else if (obj instanceof Union) {
return HllSketchHolder.of((Union) obj);
} else if (obj instanceof byte[]) {
return HllSketchHolder.of(HllSketch.heapify((byte[]) obj));
} else if (obj instanceof Memory) {
return HllSketchHolder.of(HllSketch.wrap((Memory) obj));
} else if (obj instanceof String) {
return HllSketchHolder.of(HllSketch.heapify(StringUtils.decodeBase64(StringUtils.toUtf8((String) obj))));
}
throw new ISE("Object is not of a type[%s] that can be deserialized to sketch.", obj.getClass());
}
public static HllSketchHolder of(Union union)
{
return new HllSketchHolder(union, null);
}
public static HllSketchHolder of(HllSketch sketch)
{
return new HllSketchHolder(null, sketch);
}
private Union union;
private HllSketch sketch;
public HllSketchHolder(
Union union,
HllSketch sketch
)
{
this.union = union;
this.sketch = sketch;
if (this.union == null && this.sketch == null) {
throw new ISE("Both union and sketch were null!");
}
}
@JsonValue
public HllSketch getSketch()
{
if (sketch == null) {
sketch = union.getResult();
}
return sketch;
}
public HllSketch getSketch(TgtHllType type)
{
if (sketch == null) {
sketch = union.getResult(type);
}
if (sketch.getTgtHllType() != type) {
sketch = sketch.copyAs(type);
}
return sketch;
}
public void add(HllSketch sketchToAdd)
{
if (union == null) {
union = new Union(sketchToAdd.getLgConfigK());
union.update(this.sketch);
}
union.update(sketchToAdd);
sketch = null;
}
public double getEstimate()
{
if (sketch != null) {
return sketch.getEstimate();
} else if (union != null) {
return union.getEstimate();
}
return 0;
}
public double getLowerBound(int stdDevs)
{
if (sketch != null) {
return sketch.getLowerBound(stdDevs);
} else if (union != null) {
return union.getLowerBound(stdDevs);
}
return 0;
}
public double getUpperBound(int stdDevs)
{
if (sketch != null) {
return sketch.getUpperBound(stdDevs);
} else if (union != null) {
return union.getUpperBound(stdDevs);
}
return 0;
}
@SuppressWarnings("VariableNotUsedInsideIf")
public HllSketchHolder merge(HllSketchHolder other)
{
// It appears like we could make this code cleaner by checking for other.union first and then delegating to add
// if it's not. But, we check ourselves first because callers would tend to expect that the object they are
// calling a method on is more likely to get mutated, so we prefer to check and merge into this object if
// possible first.
if (union == null) {
if (other.union == null) {
add(other.getSketch());
return this;
} else {
other.add(sketch);
return other;
}
} else {
add(other.getSketch());
return this;
}
}
}

View File

@ -28,41 +28,43 @@ import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
public class HllSketchObjectStrategy implements ObjectStrategy<HllSketch>
public class HllSketchHolderObjectStrategy implements ObjectStrategy<HllSketchHolder>
{
static final HllSketchObjectStrategy STRATEGY = new HllSketchObjectStrategy();
static final HllSketchHolderObjectStrategy STRATEGY = new HllSketchHolderObjectStrategy();
@Override
public Class<HllSketch> getClazz()
public Class<HllSketchHolder> getClazz()
{
return HllSketch.class;
return HllSketchHolder.class;
}
@Override
public int compare(final HllSketch sketch1, final HllSketch sketch2)
public int compare(final HllSketchHolder sketch1, final HllSketchHolder sketch2)
{
return HllSketchAggregatorFactory.COMPARATOR.compare(sketch1, sketch2);
}
@Override
public HllSketch fromByteBuffer(final ByteBuffer buf, final int size)
public HllSketchHolder fromByteBuffer(final ByteBuffer buf, final int size)
{
return HllSketch.wrap(Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN).region(buf.position(), size));
return HllSketchHolder.of(HllSketch.wrap(Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN).region(buf.position(), size)));
}
@Override
public byte[] toBytes(final HllSketch sketch)
public byte[] toBytes(final HllSketchHolder sketch)
{
return sketch.toCompactByteArray();
return sketch.getSketch().toCompactByteArray();
}
@Nullable
@Override
public HllSketch fromByteBufferSafe(ByteBuffer buffer, int numBytes)
public HllSketchHolder fromByteBufferSafe(ByteBuffer buffer, int numBytes)
{
return HllSketch.wrap(
SafeWritableMemory.wrap(buffer, ByteOrder.LITTLE_ENDIAN).region(buffer.position(), numBytes)
return HllSketchHolder.of(
HllSketch.wrap(
SafeWritableMemory.wrap(buffer, ByteOrder.LITTLE_ENDIAN).region(buffer.position(), numBytes)
)
);
}
}

View File

@ -32,12 +32,12 @@ import org.apache.druid.segment.ColumnValueSelector;
public class HllSketchMergeAggregator implements Aggregator
{
private final ColumnValueSelector<HllSketch> selector;
private final ColumnValueSelector<HllSketchHolder> selector;
private final TgtHllType tgtHllType;
private Union union;
public HllSketchMergeAggregator(
final ColumnValueSelector<HllSketch> selector,
final ColumnValueSelector<HllSketchHolder> selector,
final int lgK,
final TgtHllType tgtHllType
)
@ -55,12 +55,12 @@ public class HllSketchMergeAggregator implements Aggregator
@Override
public void aggregate()
{
final HllSketch sketch = selector.getObject();
final HllSketchHolder sketch = selector.getObject();
if (sketch == null) {
return;
}
synchronized (this) {
union.update(sketch);
union.update(sketch.getSketch());
}
}
@ -72,7 +72,7 @@ public class HllSketchMergeAggregator implements Aggregator
@Override
public synchronized Object get()
{
return union.getResult(tgtHllType);
return HllSketchHolder.of(union.getResult(tgtHllType));
}
@Override

View File

@ -92,17 +92,19 @@ public class HllSketchMergeAggregatorFactory extends HllSketchAggregatorFactory
return AggregatorUtil.HLL_SKETCH_MERGE_CACHE_TYPE_ID;
}
@SuppressWarnings("unchecked")
@Override
public Aggregator factorize(final ColumnSelectorFactory columnSelectorFactory)
{
final ColumnValueSelector<HllSketch> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
final ColumnValueSelector<HllSketchHolder> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
return new HllSketchMergeAggregator(selector, getLgK(), TgtHllType.valueOf(getTgtHllType()));
}
@SuppressWarnings("unchecked")
@Override
public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSelectorFactory)
{
final ColumnValueSelector<HllSketch> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
final ColumnValueSelector<HllSketchHolder> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
return new HllSketchMergeBufferAggregator(
selector,
getLgK(),

View File

@ -36,11 +36,11 @@ import java.nio.ByteOrder;
*/
public class HllSketchMergeBufferAggregator implements BufferAggregator
{
private final ColumnValueSelector<HllSketch> selector;
private final ColumnValueSelector<HllSketchHolder> selector;
private final HllSketchMergeBufferAggregatorHelper helper;
public HllSketchMergeBufferAggregator(
final ColumnValueSelector<HllSketch> selector,
final ColumnValueSelector<HllSketchHolder> selector,
final int lgK,
final TgtHllType tgtHllType,
final int size
@ -59,7 +59,7 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
@Override
public void aggregate(final ByteBuffer buf, final int position)
{
final HllSketch sketch = selector.getObject();
final HllSketchHolder sketch = selector.getObject();
if (sketch == null) {
return;
}
@ -68,13 +68,13 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
.writableRegion(position, helper.getSize());
final Union union = Union.writableWrap(mem);
union.update(sketch);
union.update(sketch.getSketch());
}
@Override
public Object get(final ByteBuffer buf, final int position)
{
return helper.get(buf, position);
return HllSketchHolder.of(helper.get(buf, position));
}
@Override

View File

@ -77,7 +77,7 @@ public class HllSketchMergeBufferAggregatorHelper
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and
* {@link org.apache.druid.query.aggregation.VectorAggregator#get}.
*/
public Object get(ByteBuffer buf, int position)
public HllSketch get(ByteBuffer buf, int position)
{
final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
final Union union = Union.writableWrap(mem);

View File

@ -20,10 +20,7 @@
package org.apache.druid.query.aggregation.datasketches.hll;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
@ -36,7 +33,6 @@ import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerialize
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class HllSketchMergeComplexMetricSerde extends ComplexMetricSerde
{
@ -50,7 +46,7 @@ public class HllSketchMergeComplexMetricSerde extends ComplexMetricSerde
@Override
public ObjectStrategy getObjectStrategy()
{
return HllSketchObjectStrategy.STRATEGY;
return HllSketchHolderObjectStrategy.STRATEGY;
}
@Override
@ -65,13 +61,16 @@ public class HllSketchMergeComplexMetricSerde extends ComplexMetricSerde
}
@Override
public HllSketch extractValue(final InputRow inputRow, final String metricName)
public HllSketchHolder extractValue(final InputRow inputRow, final String metricName)
{
final Object object = inputRow.getRaw(metricName);
Object object = inputRow.getRaw(metricName);
if (object == null) {
return null;
}
return deserializeSketchSafe(object);
if (object instanceof byte[]) {
object = SafeWritableMemory.wrap((byte[]) object);
}
return HllSketchHolder.fromObj(object);
}
};
}
@ -82,36 +81,11 @@ public class HllSketchMergeComplexMetricSerde extends ComplexMetricSerde
columnBuilder.setComplexColumnSupplier(
new ComplexColumnPartSupplier(
getTypeName(),
GenericIndexed.read(buf, HllSketchObjectStrategy.STRATEGY, columnBuilder.getFileMapper())
GenericIndexed.read(buf, HllSketchHolderObjectStrategy.STRATEGY, columnBuilder.getFileMapper())
)
);
}
static HllSketch deserializeSketch(final Object object)
{
if (object instanceof String) {
return HllSketch.wrap(Memory.wrap(StringUtils.decodeBase64(((String) object).getBytes(StandardCharsets.UTF_8))));
} else if (object instanceof byte[]) {
return HllSketch.wrap(Memory.wrap((byte[]) object));
} else if (object instanceof HllSketch) {
return (HllSketch) object;
}
throw new IAE("Object is not of a type that can be deserialized to an HllSketch:" + object.getClass().getName());
}
static HllSketch deserializeSketchSafe(final Object object)
{
if (object instanceof String) {
return HllSketch.wrap(SafeWritableMemory.wrap(StringUtils.decodeBase64(((String) object).getBytes(StandardCharsets.UTF_8))));
} else if (object instanceof byte[]) {
return HllSketch.wrap(SafeWritableMemory.wrap((byte[]) object));
} else if (object instanceof HllSketch) {
return (HllSketch) object;
}
throw new IAE("Object is not of a type that can be deserialized to an HllSketch:" + object.getClass().getName());
}
// support large columns
@Override
public GenericColumnSerializer getSerializer(final SegmentWriteOutMedium segmentWriteOutMedium, final String column)
{

View File

@ -19,7 +19,6 @@
package org.apache.druid.query.aggregation.datasketches.hll;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.hll.Union;
import org.apache.datasketches.memory.WritableMemory;
@ -71,7 +70,9 @@ public class HllSketchMergeVectorAggregator implements VectorAggregator
final Union union = Union.writableWrap(mem);
for (int i = startRow; i < endRow; i++) {
union.update((HllSketch) vector[i]);
if (vector[i] != null) {
union.update(((HllSketchHolder) vector[i]).getSketch());
}
}
}
@ -87,7 +88,7 @@ public class HllSketchMergeVectorAggregator implements VectorAggregator
final Object[] vector = objectSupplier.get();
for (int i = 0; i < numRows; i++) {
final HllSketch o = (HllSketch) vector[rows != null ? rows[i] : i];
final HllSketchHolder o = (HllSketchHolder) vector[rows != null ? rows[i] : i];
if (o != null) {
final int position = positions[i] + positionOffset;
@ -96,7 +97,7 @@ public class HllSketchMergeVectorAggregator implements VectorAggregator
.writableRegion(position, helper.getSize());
final Union union = Union.writableWrap(mem);
union.update(o);
union.update(o.getSketch());
}
}
}
@ -104,7 +105,7 @@ public class HllSketchMergeVectorAggregator implements VectorAggregator
@Override
public Object get(final ByteBuffer buf, final int position)
{
return helper.get(buf, position);
return HllSketchHolder.of(helper.get(buf, position));
}
@Override

View File

@ -97,8 +97,11 @@ public class HllSketchToEstimatePostAggregator implements PostAggregator
@Override
public Object compute(final Map<String, Object> combinedAggregators)
{
final HllSketch sketch = (HllSketch) field.compute(combinedAggregators);
return round ? Math.round(sketch.getEstimate()) : sketch.getEstimate();
final HllSketchHolder holder = HllSketchHolder.fromObj(field.compute(combinedAggregators));
// The union object always uses an HLL_8 sketch, so we always get that. The target type doesn't actually impact
// the estimate anyway, so whatever gives us the "cheapest" operation should be good.
double estimate = holder.getEstimate();
return round ? Math.round(estimate) : estimate;
}
@Override

View File

@ -103,7 +103,7 @@ public class HllSketchToEstimateWithBoundsPostAggregator implements PostAggregat
@Override
public double[] compute(final Map<String, Object> combinedAggregators)
{
final HllSketch sketch = (HllSketch) field.compute(combinedAggregators);
final HllSketchHolder sketch = HllSketchHolder.fromObj(field.compute(combinedAggregators));
return new double[] {sketch.getEstimate(), sketch.getLowerBound(numStdDevs), sketch.getUpperBound(numStdDevs)};
}

View File

@ -83,7 +83,7 @@ public class HllSketchToStringPostAggregator implements PostAggregator
@Override
public String compute(final Map<String, Object> combinedAggregators)
{
final HllSketch sketch = (HllSketch) field.compute(combinedAggregators);
final HllSketch sketch = HllSketchHolder.fromObj(field.compute(combinedAggregators)).getSketch();
return sketch.toString();
}

View File

@ -111,20 +111,20 @@ public class HllSketchUnionPostAggregator implements PostAggregator
}
@Override
public Comparator<HllSketch> getComparator()
public Comparator<HllSketchHolder> getComparator()
{
return HllSketchAggregatorFactory.COMPARATOR;
}
@Override
public HllSketch compute(final Map<String, Object> combinedAggregators)
public HllSketchHolder compute(final Map<String, Object> combinedAggregators)
{
final Union union = new Union(lgK);
for (final PostAggregator field : fields) {
final HllSketch sketch = (HllSketch) field.compute(combinedAggregators);
union.update(sketch);
final HllSketchHolder sketch = HllSketchHolder.fromObj(field.compute(combinedAggregators));
union.update(sketch.getSketch());
}
return union.getResult(tgtHllType);
return HllSketchHolder.of(union.getResult(tgtHllType));
}
@Override

View File

@ -25,14 +25,17 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Rule;
@ -54,17 +57,25 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
{
private static final boolean ROUND = true;
private final AggregationTestHelper helper;
private final AggregationTestHelper groupByHelper;
private final AggregationTestHelper timeseriesHelper;
private final QueryContexts.Vectorize vectorize;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
public final TemporaryFolder groupByFolder = new TemporaryFolder();
@Rule
public final TemporaryFolder timeseriesFolder = new TemporaryFolder();
public HllSketchAggregatorTest(GroupByQueryConfig config, String vectorize)
{
HllSketchModule.registerSerde();
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
new HllSketchModule().getJacksonModules(), config, tempFolder);
groupByHelper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
new HllSketchModule().getJacksonModules(), config, groupByFolder
);
timeseriesHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(
new HllSketchModule().getJacksonModules(), timeseriesFolder
);
this.vectorize = QueryContexts.Vectorize.fromString(vectorize);
}
@ -83,7 +94,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
@Test
public void ingestSketches() throws Exception
{
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
Sequence<ResultRow> seq = groupByHelper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_sketches.tsv").getFile()),
buildParserJson(
Arrays.asList("dim", "multiDim"),
@ -92,7 +103,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
buildAggregatorJson("HLLSketchMerge", "sketch", !ROUND),
0, // minTimestamp
Granularities.NONE,
200, // maxRowCount
10, // maxRowCount
buildGroupByQueryJson("HLLSketchMerge", "sketch", !ROUND)
);
List<ResultRow> results = seq.toList();
@ -101,10 +112,37 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
Assert.assertEquals(200, (double) row.get(0), 0.1);
}
@Test
public void ingestSketchesTimeseries() throws Exception
{
final File inputFile = new File(this.getClass().getClassLoader().getResource("hll/hll_sketches.tsv").getFile());
final String parserJson = buildParserJson(
Arrays.asList("dim", "multiDim"),
Arrays.asList("timestamp", "dim", "multiDim", "sketch")
);
final String aggregators = buildAggregatorJson("HLLSketchMerge", "sketch", !ROUND);
final int minTimestamp = 0;
final Granularity gran = Granularities.NONE;
final int maxRowCount = 10;
final String queryJson = buildTimeseriesQueryJson("HLLSketchMerge", "sketch", !ROUND);
File segmentDir1 = timeseriesFolder.newFolder();
timeseriesHelper.createIndex(inputFile, parserJson, aggregators, segmentDir1, minTimestamp, gran, maxRowCount, true);
File segmentDir2 = timeseriesFolder.newFolder();
timeseriesHelper.createIndex(inputFile, parserJson, aggregators, segmentDir2, minTimestamp, gran, maxRowCount, true);
Sequence<Result> seq = timeseriesHelper.runQueryOnSegments(Arrays.asList(segmentDir1, segmentDir2), queryJson);
List<Result> results = seq.toList();
Assert.assertEquals(1, results.size());
Result row = results.get(0);
Assert.assertEquals(200, (double) ((TimeseriesResultValue) row.getValue()).getMetric("sketch"), 0.1);
}
@Test
public void buildSketchesAtIngestionTime() throws Exception
{
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
Sequence<ResultRow> seq = groupByHelper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()),
buildParserJson(
Collections.singletonList("dim"),
@ -113,7 +151,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
buildAggregatorJson("HLLSketchBuild", "id", !ROUND),
0, // minTimestamp
Granularities.NONE,
200, // maxRowCount
10, // maxRowCount
buildGroupByQueryJson("HLLSketchMerge", "sketch", !ROUND)
);
List<ResultRow> results = seq.toList();
@ -122,10 +160,31 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
Assert.assertEquals(200, (double) row.get(0), 0.1);
}
@Test
public void buildSketchesAtIngestionTimeTimeseries() throws Exception
{
Sequence<Result> seq = timeseriesHelper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()),
buildParserJson(
Collections.singletonList("dim"),
Arrays.asList("timestamp", "dim", "multiDim", "id")
),
buildAggregatorJson("HLLSketchBuild", "id", !ROUND),
0, // minTimestamp
Granularities.NONE,
10, // maxRowCount
buildTimeseriesQueryJson("HLLSketchMerge", "sketch", !ROUND)
);
List<Result> results = seq.toList();
Assert.assertEquals(1, results.size());
Result row = results.get(0);
Assert.assertEquals(200, (double) ((TimeseriesResultValue) row.getValue()).getMetric("sketch"), 0.1);
}
@Test
public void buildSketchesAtQueryTime() throws Exception
{
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
Sequence<ResultRow> seq = groupByHelper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()),
buildParserJson(
Arrays.asList("dim", "multiDim", "id"),
@ -134,7 +193,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
"[]",
0, // minTimestamp
Granularities.NONE,
200, // maxRowCount
10, // maxRowCount
buildGroupByQueryJson("HLLSketchBuild", "id", !ROUND)
);
List<ResultRow> results = seq.toList();
@ -143,6 +202,27 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
Assert.assertEquals(200, (double) row.get(0), 0.1);
}
@Test
public void buildSketchesAtQueryTimeTimeseries() throws Exception
{
Sequence<Result> seq = timeseriesHelper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()),
buildParserJson(
Arrays.asList("dim", "multiDim", "id"),
Arrays.asList("timestamp", "dim", "multiDim", "id")
),
"[]",
0, // minTimestamp
Granularities.NONE,
10, // maxRowCount
buildTimeseriesQueryJson("HLLSketchBuild", "id", !ROUND)
);
List<Result> results = seq.toList();
Assert.assertEquals(1, results.size());
Result row = results.get(0);
Assert.assertEquals(200, (double) ((TimeseriesResultValue) row.getValue()).getMetric("sketch"), 0.1);
}
@Test
public void unsuccessfulComplexTypesInHLL() throws Exception
{
@ -152,7 +232,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
+ "\"fieldName\": \"id\""
+ "}]";
try {
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
Sequence<ResultRow> seq = groupByHelper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_sketches.tsv").getFile()),
buildParserJson(
Arrays.asList("dim", "multiDim", "id"),
@ -161,7 +241,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
metricSpec,
0, // minTimestamp
Granularities.NONE,
200, // maxRowCount
10, // maxRowCount
buildGroupByQueryJson("HLLSketchBuild", "index_hll", !ROUND)
);
}
@ -169,13 +249,12 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
Assert.assertTrue(
e.getMessage().contains("Invalid input [index_hll] of type [COMPLEX<hyperUnique>] for [HLLSketchBuild]"));
}
}
@Test
public void buildSketchesAtQueryTimeMultiValue() throws Exception
{
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
Sequence<ResultRow> seq = groupByHelper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()),
buildParserJson(
Arrays.asList("dim", "multiDim", "id"),
@ -184,7 +263,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
"[]",
0, // minTimestamp
Granularities.NONE,
200, // maxRowCount
10, // maxRowCount
buildGroupByQueryJson("HLLSketchBuild", "multiDim", !ROUND)
);
List<ResultRow> results = seq.toList();
@ -196,7 +275,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
@Test
public void roundBuildSketch() throws Exception
{
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
Sequence<ResultRow> seq = groupByHelper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()),
buildParserJson(
Arrays.asList("dim", "multiDim", "id"),
@ -205,7 +284,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
"[]",
0, // minTimestamp
Granularities.NONE,
200, // maxRowCount
10, // maxRowCount
buildGroupByQueryJson("HLLSketchBuild", "id", ROUND)
);
List<ResultRow> results = seq.toList();
@ -217,7 +296,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
@Test
public void roundMergeSketch() throws Exception
{
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
Sequence<ResultRow> seq = groupByHelper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_sketches.tsv").getFile()),
buildParserJson(
Arrays.asList("dim", "multiDim"),
@ -226,7 +305,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
buildAggregatorJson("HLLSketchMerge", "sketch", ROUND),
0, // minTimestamp
Granularities.NONE,
200, // maxRowCount
10, // maxRowCount
buildGroupByQueryJson("HLLSketchMerge", "sketch", ROUND)
);
List<ResultRow> results = seq.toList();
@ -238,7 +317,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
@Test
public void testPostAggs() throws Exception
{
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
Sequence<ResultRow> seq = groupByHelper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_sketches.tsv").getFile()),
buildParserJson(
Arrays.asList("dim", "multiDim"),
@ -247,8 +326,8 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
buildAggregatorJson("HLLSketchMerge", "sketch", ROUND),
0, // minTimestamp
Granularities.NONE,
200, // maxRowCount
helper.getObjectMapper().writeValueAsString(
10, // maxRowCount
groupByHelper.getObjectMapper().writeValueAsString(
GroupByQuery.builder()
.setDataSource("test_datasource")
.setGranularity(Granularities.ALL)
@ -307,7 +386,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
Assert.assertArrayEquals(new double[]{200, 200, 200}, (double[]) row.get(2), 0.1);
Assert.assertEquals(expectedSummary, row.get(3));
// union with self = self
Assert.assertEquals(expectedSummary, row.get(4).toString());
Assert.assertEquals(expectedSummary, ((HllSketchHolder) row.get(4)).getSketch().toString());
}
private static String buildParserJson(List<String> dimensions, List<String> columns)
@ -398,4 +477,27 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
.build();
return toJson(object);
}
private String buildTimeseriesQueryJson(
String aggregationType,
String aggregationFieldName,
boolean aggregationRound
)
{
Map<String, Object> aggregation = buildAggregatorObject(
aggregationType,
aggregationFieldName,
aggregationRound
);
Map<String, Object> object = new ImmutableMap.Builder<String, Object>()
.put("queryType", "timeseries")
.put("dataSource", "test_dataSource")
.put("granularity", "ALL")
.put("aggregations", Collections.singletonList(aggregation))
.put("intervals", Collections.singletonList("2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z"))
.put("context", ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize.toString()))
.build();
return toJson(object);
}
}

View File

@ -39,10 +39,10 @@ public class HllSketchObjectStrategyTest
final byte[] bytes = sketch.toCompactByteArray();
ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
HllSketchObjectStrategy objectStrategy = new HllSketchObjectStrategy();
HllSketchHolderObjectStrategy objectStrategy = new HllSketchHolderObjectStrategy();
// valid sketch should not explode when copied, which reads the memory
objectStrategy.fromByteBufferSafe(buf, bytes.length).copy();
objectStrategy.fromByteBufferSafe(buf, bytes.length).getSketch().copy();
// corrupted sketch should fail with a regular java buffer exception
for (int subset = 3; subset < bytes.length - 1; subset++) {
@ -54,7 +54,7 @@ public class HllSketchObjectStrategyTest
final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
IndexOutOfBoundsException.class,
() -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).copy()
() -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).getSketch().copy()
);
}
@ -63,7 +63,7 @@ public class HllSketchObjectStrategyTest
final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
IndexOutOfBoundsException.class,
() -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).copy()
() -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).getSketch().copy()
);
// non sketch that is long enough to check (this one doesn't actually need 'safe' read)
@ -71,7 +71,7 @@ public class HllSketchObjectStrategyTest
final ByteBuffer buf4 = ByteBuffer.wrap(garbageLonger).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
SketchesArgumentException.class,
() -> objectStrategy.fromByteBufferSafe(buf4, garbageLonger.length).copy()
() -> objectStrategy.fromByteBufferSafe(buf4, garbageLonger.length).getSketch().copy()
);
}
}

View File

@ -28,16 +28,14 @@ import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.input.SqlTestUtils;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.transform.TransformSpec;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
@ -47,11 +45,12 @@ import java.util.List;
public class SqlFirehoseFactoryTest
{
private static final List<File> FIREHOSE_TMP_DIRS = new ArrayList<>();
private static File TEST_DIR;
private static final String TABLE_NAME_1 = "FOOS_TABLE_1";
private static final String TABLE_NAME_2 = "FOOS_TABLE_2";
@Rule
public TemporaryFolder test_dir = new TemporaryFolder();
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private final ObjectMapper mapper = TestHelper.makeSmileMapper();
@ -68,23 +67,6 @@ public class SqlFirehoseFactoryTest
);
private TestDerbyConnector derbyConnector;
@BeforeClass
public static void setup() throws IOException
{
TEST_DIR = File.createTempFile(SqlFirehoseFactoryTest.class.getSimpleName(), "testDir");
org.apache.commons.io.FileUtils.forceDelete(TEST_DIR);
FileUtils.mkdirp(TEST_DIR);
}
@AfterClass
public static void teardown() throws IOException
{
org.apache.commons.io.FileUtils.forceDelete(TEST_DIR);
for (File dir : FIREHOSE_TMP_DIRS) {
org.apache.commons.io.FileUtils.forceDelete(dir);
}
}
private void assertNumRemainingCacheFiles(File firehoseTmpDir, int expectedNumFiles)
{
final String[] files = firehoseTmpDir.list();
@ -94,14 +76,7 @@ public class SqlFirehoseFactoryTest
private File createFirehoseTmpDir(String dirSuffix) throws IOException
{
final File firehoseTempDir = File.createTempFile(
SqlFirehoseFactoryTest.class.getSimpleName(),
dirSuffix
);
org.apache.commons.io.FileUtils.forceDelete(firehoseTempDir);
FileUtils.mkdirp(firehoseTempDir);
FIREHOSE_TMP_DIRS.add(firehoseTempDir);
return firehoseTempDir;
return test_dir.newFolder(dirSuffix);
}
@Test