From 9576fd3141e81b77e4a8da4bfa6bdecdf0be90b9 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 3 Nov 2023 08:31:22 +0530 Subject: [PATCH] HllSketch Merge Aggregator optimizations (#15162) * Null byte serde for empty sketches * Cache for HllSketchMerge * Check for empty sketches * Address review comments * Revert changes to HllSketchHolder * Handle null sketch holders instead of null sketches * Add unit test for MSQ HllSketch * Add comments * Fix style --- .../hll/HllSketchAggregatorFactory.java | 6 +- .../hll/HllSketchHolderObjectStrategy.java | 67 ++++++++++++- .../hll/HllSketchMergeBufferAggregator.java | 15 +-- .../HllSketchMergeBufferAggregatorHelper.java | 94 ++++++++++++++++--- .../hll/HllSketchMergeVectorAggregator.java | 18 ++-- .../HllSketchToEstimatePostAggregator.java | 6 +- ...tchToEstimateWithBoundsPostAggregator.java | 6 +- .../hll/HllSketchToStringPostAggregator.java | 6 +- .../hll/HllSketchUnionPostAggregator.java | 7 +- .../hll/sql/HllPostAggExprMacros.java | 2 +- .../hll/HllSketchAggregatorFactoryTest.java | 2 +- .../hll/HllSketchAggregatorTest.java | 20 ++++ .../druid/msq/exec/MSQDataSketchesTest.java | 46 +++++++++ 13 files changed, 254 insertions(+), 41 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java index 043c977f8be..1c908be244a 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java @@ -205,10 +205,14 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory @Override public Object finalizeComputation(@Nullable final Object object) { - if (!shouldFinalize || object == null) { + if (!shouldFinalize) { return object; } + if (object == null) { + return 0.0D; + } + final HllSketchHolder sketch = HllSketchHolder.fromObj(object); final double estimate = sketch.getEstimate(); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchHolderObjectStrategy.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchHolderObjectStrategy.java index 21a94ecf31d..18b7f03319e 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchHolderObjectStrategy.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchHolderObjectStrategy.java @@ -48,13 +48,23 @@ public class HllSketchHolderObjectStrategy implements ObjectStrategy 0; // (flags & EMPTY_FLAG_MASK) > 0 + default: // Unknown implementation + // Can't say for sure, so return false. + return false; + } + } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java index 27920d1b4d1..8e3bfffa073 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java @@ -22,13 +22,11 @@ package org.apache.druid.query.aggregation.datasketches.hll; import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.hll.Union; -import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -import java.nio.ByteOrder; /** * This aggregator merges existing sketches. @@ -64,10 +62,7 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator return; } - final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN) - .writableRegion(position, helper.getSize()); - - final Union union = Union.writableWrap(mem); + final Union union = helper.getOrCreateUnion(buf, position); union.update(sketch.getSketch()); } @@ -80,7 +75,7 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator @Override public void close() { - // nothing to close + helper.close(); } @Override @@ -104,4 +99,10 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator // See https://github.com/apache/druid/pull/6893#discussion_r250726028 inspector.visit("lgK", helper.getLgK()); } + + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer); + } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java index 95d1e1b5bca..22653019772 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java @@ -19,6 +19,8 @@ package org.apache.druid.query.aggregation.datasketches.hll; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.hll.Union; @@ -26,15 +28,18 @@ import org.apache.datasketches.memory.WritableMemory; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.IdentityHashMap; public class HllSketchMergeBufferAggregatorHelper { private final int lgK; private final TgtHllType tgtHllType; private final int size; + private final IdentityHashMap> unions = new IdentityHashMap<>(); + private final IdentityHashMap memCache = new IdentityHashMap<>(); /** - * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image. + * Used by {@link #initializeEmptyUnion(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image. * {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects. */ @@ -57,20 +62,7 @@ public class HllSketchMergeBufferAggregatorHelper */ public void init(final ByteBuffer buf, final int position) { - // Copy prebuilt empty union object. - // Not necessary to cache a Union wrapper around the initialized memory, because: - // - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get". - // - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the - // max size and therefore do not need to be potentially moved in-heap. - - final int oldPosition = buf.position(); - try { - buf.position(position); - buf.put(emptyUnion); - } - finally { - buf.position(oldPosition); - } + createNewUnion(buf, position, false); } /** @@ -93,4 +85,76 @@ public class HllSketchMergeBufferAggregatorHelper { return size; } + + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + createNewUnion(newBuffer, newPosition, true); + Int2ObjectMap unionMap = unions.get(oldBuffer); + if (unionMap != null) { + unionMap.remove(oldPosition); + if (unionMap.isEmpty()) { + unions.remove(oldBuffer); + memCache.remove(oldBuffer); + } + } + } + + public Union getOrCreateUnion(ByteBuffer buf, int position) + { + Int2ObjectMap unionMap = unions.get(buf); + Union union = unionMap != null ? unionMap.get(position) : null; + if (union != null) { + return union; + } + return createNewUnion(buf, position, true); + } + + private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) + { + if (!isWrapped) { + initializeEmptyUnion(buf, position); + } + + final WritableMemory mem = getMemory(buf).writableRegion(position, size); + Union union = Union.writableWrap(mem); + + Int2ObjectMap unionMap = unions.get(buf); + if (unionMap == null) { + unionMap = new Int2ObjectOpenHashMap<>(); + unions.put(buf, unionMap); + } + unionMap.put(position, union); + return union; + } + + /** + * Copy prebuilt empty union object into the specified buffer and position + */ + private void initializeEmptyUnion(ByteBuffer buf, int position) + { + final int oldPosition = buf.position(); + try { + buf.position(position); + buf.put(emptyUnion); + } + finally { + buf.position(oldPosition); + } + } + + public void close() + { + unions.clear(); + memCache.clear(); + } + + private WritableMemory getMemory(ByteBuffer buffer) + { + WritableMemory mem = memCache.get(buffer); + if (mem == null) { + mem = WritableMemory.writableWrap(buffer, ByteOrder.LITTLE_ENDIAN); + memCache.put(buffer, mem); + } + return mem; + } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java index 8c7e214aa98..5fec9b94ba2 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java @@ -21,7 +21,6 @@ package org.apache.druid.query.aggregation.datasketches.hll; import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.hll.Union; -import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory; import org.apache.druid.segment.ColumnProcessors; @@ -29,7 +28,6 @@ import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.util.function.Supplier; public class HllSketchMergeVectorAggregator implements VectorAggregator @@ -65,10 +63,7 @@ public class HllSketchMergeVectorAggregator implements VectorAggregator { final Object[] vector = objectSupplier.get(); - final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN) - .writableRegion(position, helper.getSize()); - - final Union union = Union.writableWrap(mem); + final Union union = helper.getOrCreateUnion(buf, position); for (int i = startRow; i < endRow; i++) { if (vector[i] != null) { union.update(((HllSketchHolder) vector[i]).getSketch()); @@ -85,7 +80,6 @@ public class HllSketchMergeVectorAggregator implements VectorAggregator final int positionOffset ) { - final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN); final Object[] vector = objectSupplier.get(); for (int i = 0; i < numRows; i++) { @@ -93,7 +87,7 @@ public class HllSketchMergeVectorAggregator implements VectorAggregator if (o != null) { final int position = positions[i] + positionOffset; - final Union union = Union.writableWrap(mem.writableRegion(position, helper.getSize())); + final Union union = helper.getOrCreateUnion(buf, position); union.update(o.getSketch()); } } @@ -108,6 +102,12 @@ public class HllSketchMergeVectorAggregator implements VectorAggregator @Override public void close() { - // Nothing to close. + helper.close(); + } + + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregator.java index cbbd389c6f8..0ce05ba7e21 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregator.java @@ -97,7 +97,11 @@ public class HllSketchToEstimatePostAggregator implements PostAggregator @Override public Object compute(final Map combinedAggregators) { - final HllSketchHolder holder = HllSketchHolder.fromObj(field.compute(combinedAggregators)); + Object hllSketchHolderObject = field.compute(combinedAggregators); + if (hllSketchHolderObject == null) { + return 0.0D; + } + final HllSketchHolder holder = HllSketchHolder.fromObj(hllSketchHolderObject); // The union object always uses an HLL_8 sketch, so we always get that. The target type doesn't actually impact // the estimate anyway, so whatever gives us the "cheapest" operation should be good. double estimate = holder.getEstimate(); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimateWithBoundsPostAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimateWithBoundsPostAggregator.java index 8e85e18ab5d..0d7c2e3b90b 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimateWithBoundsPostAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimateWithBoundsPostAggregator.java @@ -103,7 +103,11 @@ public class HllSketchToEstimateWithBoundsPostAggregator implements PostAggregat @Override public double[] compute(final Map combinedAggregators) { - final HllSketchHolder sketch = HllSketchHolder.fromObj(field.compute(combinedAggregators)); + Object hllSketchHolderObject = field.compute(combinedAggregators); + if (hllSketchHolderObject == null) { + return new double[] {0.0D, 0.0D, 0.0D}; + } + final HllSketchHolder sketch = HllSketchHolder.fromObj(hllSketchHolderObject); return new double[] {sketch.getEstimate(), sketch.getLowerBound(numStdDevs), sketch.getUpperBound(numStdDevs)}; } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToStringPostAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToStringPostAggregator.java index 2f6d9a991d9..e8e43bf4dd4 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToStringPostAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToStringPostAggregator.java @@ -83,7 +83,11 @@ public class HllSketchToStringPostAggregator implements PostAggregator @Override public String compute(final Map combinedAggregators) { - final HllSketch sketch = HllSketchHolder.fromObj(field.compute(combinedAggregators)).getSketch(); + Object hllSketchHolderObject = field.compute(combinedAggregators); + if (hllSketchHolderObject == null) { + return "Null Sketch"; + } + final HllSketch sketch = HllSketchHolder.fromObj(hllSketchHolderObject).getSketch(); return sketch.toString(); } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUnionPostAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUnionPostAggregator.java index 9d96e31cdfe..2ce213520d3 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUnionPostAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUnionPostAggregator.java @@ -121,8 +121,11 @@ public class HllSketchUnionPostAggregator implements PostAggregator { final Union union = new Union(lgK); for (final PostAggregator field : fields) { - final HllSketchHolder sketch = HllSketchHolder.fromObj(field.compute(combinedAggregators)); - union.update(sketch.getSketch()); + Object hllSketchHolderObject = field.compute(combinedAggregators); + if (hllSketchHolderObject != null) { + final HllSketchHolder holder = HllSketchHolder.fromObj(hllSketchHolderObject); + union.update(holder.getSketch()); + } } return HllSketchHolder.of(union.getResult(tgtHllType)); } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllPostAggExprMacros.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllPostAggExprMacros.java index 8e310d19d13..7786f8be98d 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllPostAggExprMacros.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllPostAggExprMacros.java @@ -82,7 +82,7 @@ public class HllPostAggExprMacros final Object valObj = eval.value(); if (valObj == null) { - return ExprEval.of(null); + return ExprEval.of(0.0D); } HllSketchHolder h = HllSketchHolder.fromObj(valObj); double estimate = h.getEstimate(); diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java index 53b0ddbdf89..09a20ab8213 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java @@ -82,7 +82,7 @@ public class HllSketchAggregatorFactoryTest @Test public void testFinalizeComputationNull() { - Assert.assertNull(target.finalizeComputation(null)); + Assert.assertEquals(0.0D, target.finalizeComputation(null)); } @Test diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java index 0cd4d8cf39b..7c218934034 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.datasketches.hll.HllSketch; +import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.java.util.common.granularity.Granularities; @@ -36,6 +38,8 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.GroupByTestColumnSelectorFactory; +import org.apache.druid.query.groupby.epinephelinae.GrouperTestUtil; import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; @@ -417,6 +421,22 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest Assert.assertEquals(expectedSummary, ((HllSketchHolder) row.get(4)).getSketch().toString()); } + @Test + public void testRelocation() + { + final GroupByTestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); + HllSketchHolder sketchHolder = new HllSketchHolder(null, new HllSketch()); + sketchHolder.getSketch().update(1); + + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder))); + HllSketchHolder[] holders = groupByHelper.runRelocateVerificationTest( + new HllSketchMergeAggregatorFactory("sketch", "sketch", null, null, null, true, true), + columnSelectorFactory, + HllSketchHolder.class + ); + Assert.assertEquals(holders[0].getEstimate(), holders[1].getEstimate(), 0); + } + private static String buildParserJson(List dimensions, List columns) { Map timestampSpec = ImmutableMap.of( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQDataSketchesTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQDataSketchesTest.java index 716239ac056..1f856027de3 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQDataSketchesTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQDataSketchesTest.java @@ -26,6 +26,7 @@ import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQTuningConfig; import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; import org.apache.druid.msq.test.MSQTestBase; +import org.apache.druid.query.aggregation.FilteredAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.groupby.GroupByQuery; @@ -95,4 +96,49 @@ public class MSQDataSketchesTest extends MSQTestBase ) .verifyResults(); } + + @Test + public void testEmptyHllSketch() + { + RowSignature resultSignature = + RowSignature.builder() + .add("c", ColumnType.LONG) + .build(); + + GroupByQuery query = + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs( + aggregators( + new FilteredAggregatorFactory( + new HllSketchBuildAggregatorFactory("a0", "dim2", 12, "HLL_4", null, true, true), + equality("dim1", "nonexistent", ColumnType.STRING), + "a0" + ) + ) + ) + .setContext(DEFAULT_MSQ_CONTEXT) + .build(); + + testSelectQuery() + .setSql("SELECT APPROX_COUNT_DISTINCT_DS_HLL(dim2) FILTER(WHERE dim1 = 'nonexistent') AS c FROM druid.foo") + .setExpectedMSQSpec(MSQSpec.builder() + .query(query) + .columnMappings(new ColumnMappings(ImmutableList.of( + new ColumnMapping("a0", "c")) + )) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(TaskReportMSQDestination.INSTANCE) + .build()) + .setQueryContext(DEFAULT_MSQ_CONTEXT) + .setExpectedRowSignature(resultSignature) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{0L} + ) + ) + .verifyResults(); + } }