diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java index 8ad8dfe25fc..f7e386a730c 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java @@ -23,6 +23,8 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; +import org.apache.datasketches.memory.DefaultMemoryRequestServer; +import org.apache.datasketches.memory.MemoryRequestServer; import org.apache.datasketches.memory.WritableMemory; import java.nio.ByteBuffer; @@ -31,6 +33,7 @@ import java.util.IdentityHashMap; public class HllSketchBuildBufferAggregatorHelper { + private static final MemoryRequestServer MEM_REQ_SERVER = new DefaultMemoryRequestServer(); private final int lgK; private final int size; private final IdentityHashMap memCache = new IdentityHashMap<>(); @@ -123,7 +126,8 @@ public class HllSketchBuildBufferAggregatorHelper private WritableMemory getMemory(final ByteBuffer buf) { - return memCache.computeIfAbsent(buf, b -> WritableMemory.writableWrap(b, ByteOrder.LITTLE_ENDIAN)); + return memCache.computeIfAbsent(buf, + b -> WritableMemory.writableWrap(b, ByteOrder.LITTLE_ENDIAN, MEM_REQ_SERVER)); } private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java index c2529ac7c30..74be19bfd27 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java @@ -57,7 +57,7 @@ public class DoublesSketchBuildBufferAggregator implements BufferAggregator } final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buffer, position); - DoublesSketches.handleMaxStreamLengthLimit(() -> sketch.update(selector.getDouble())); + sketch.update(selector.getDouble()); } @Nullable diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java index 2acad90b08d..dc7e2f19fe4 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java @@ -21,6 +21,8 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.datasketches.memory.DefaultMemoryRequestServer; +import org.apache.datasketches.memory.MemoryRequestServer; import org.apache.datasketches.memory.WritableMemory; import org.apache.datasketches.quantiles.CompactDoublesSketch; import org.apache.datasketches.quantiles.DoublesSketch; @@ -32,6 +34,7 @@ import java.util.IdentityHashMap; public class DoublesSketchBuildBufferAggregatorHelper { + private static final MemoryRequestServer MEM_REQ_SERVER = new DefaultMemoryRequestServer(); private final int size; private final int maxIntermediateSize; private final IdentityHashMap memCache = new IdentityHashMap<>(); @@ -92,7 +95,8 @@ public class DoublesSketchBuildBufferAggregatorHelper private WritableMemory getMemory(final ByteBuffer buffer) { - return memCache.computeIfAbsent(buffer, buf -> WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN)); + return memCache.computeIfAbsent(buffer, + buf -> WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN, MEM_REQ_SERVER)); } private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java index c1074f5f0bc..af29c5b9cde 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java @@ -55,13 +55,11 @@ public class DoublesSketchBuildVectorAggregator implements VectorAggregator final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buf, position); - DoublesSketches.handleMaxStreamLengthLimit(() -> { - for (int i = startRow; i < endRow; i++) { - if (nulls == null || !nulls[i]) { - sketch.update(doubles[i]); - } + for (int i = startRow; i < endRow; i++) { + if (nulls == null || !nulls[i]) { + sketch.update(doubles[i]); } - }); + } } @Override @@ -76,16 +74,14 @@ public class DoublesSketchBuildVectorAggregator implements VectorAggregator final double[] doubles = selector.getDoubleVector(); final boolean[] nulls = selector.getNullVector(); - DoublesSketches.handleMaxStreamLengthLimit(() -> { - for (int i = 0; i < numRows; i++) { - final int idx = rows != null ? rows[i] : i; + for (int i = 0; i < numRows; i++) { + final int idx = rows != null ? rows[i] : i; - if (nulls == null || !nulls[idx]) { - final int position = positions[i] + positionOffset; - helper.getSketchAtPosition(buf, position).update(doubles[idx]); - } + if (nulls == null || !nulls[idx]) { + final int position = positions[i] + positionOffset; + helper.getSketchAtPosition(buf, position).update(doubles[idx]); } - }); + } } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java index a5f12d2227d..6693742aefd 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java @@ -76,12 +76,10 @@ public class DoublesSketchMergeAggregator implements Aggregator if (object == null) { return; } - DoublesSketches.handleMaxStreamLengthLimit(() -> { - if (object instanceof DoublesSketch) { - union.update((DoublesSketch) object); - } else { - union.update(selector.getDouble()); - } - }); + if (object instanceof DoublesSketch) { + union.update((DoublesSketch) object); + } else { + union.update(selector.getDouble()); + } } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java index d9f4badca81..6cb0f3a4b28 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java @@ -21,6 +21,8 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.datasketches.memory.DefaultMemoryRequestServer; +import org.apache.datasketches.memory.MemoryRequestServer; import org.apache.datasketches.memory.WritableMemory; import org.apache.datasketches.quantiles.DoublesUnion; @@ -30,6 +32,7 @@ import java.util.IdentityHashMap; public class DoublesSketchMergeBufferAggregatorHelper { + private static final MemoryRequestServer MEM_REQ_SERVER = new DefaultMemoryRequestServer(); private final int k; private final int maxIntermediateSize; private final IdentityHashMap memCache = new IdentityHashMap<>(); @@ -93,7 +96,8 @@ public class DoublesSketchMergeBufferAggregatorHelper private WritableMemory getMemory(final ByteBuffer buffer) { - return memCache.computeIfAbsent(buffer, buf -> WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN)); + return memCache.computeIfAbsent(buffer, + buf -> WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN, MEM_REQ_SERVER)); } private void putUnion(final ByteBuffer buffer, final int position, final DoublesUnion union) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java index 92437d09bec..8a8e10b48a0 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java @@ -55,14 +55,12 @@ public class DoublesSketchMergeVectorAggregator implements VectorAggregator final DoublesUnion union = helper.getSketchAtPosition(buf, position); - DoublesSketches.handleMaxStreamLengthLimit(() -> { - for (int i = startRow; i < endRow; i++) { - final DoublesSketch sketch = (DoublesSketch) vector[i]; - if (sketch != null) { - union.update(sketch); - } + for (int i = startRow; i < endRow; i++) { + final DoublesSketch sketch = (DoublesSketch) vector[i]; + if (sketch != null) { + union.update(sketch); } - }); + } } @Override @@ -76,17 +74,15 @@ public class DoublesSketchMergeVectorAggregator implements VectorAggregator { final Object[] vector = selector.getObjectVector(); - DoublesSketches.handleMaxStreamLengthLimit(() -> { - for (int i = 0; i < numRows; i++) { - final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? rows[i] : i]; + for (int i = 0; i < numRows; i++) { + final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? rows[i] : i]; - if (sketch != null) { - final int position = positions[i] + positionOffset; - final DoublesUnion union = helper.getSketchAtPosition(buf, position); - union.update(sketch); - } + if (sketch != null) { + final int position = positions[i] + positionOffset; + final DoublesUnion union = helper.getSketchAtPosition(buf, position); + union.update(sketch); } - }); + } } @Nullable diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketches.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketches.java deleted file mode 100644 index d7fc420e7f3..00000000000 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketches.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.datasketches.quantiles; - -import org.apache.druid.java.util.common.ISE; - -public final class DoublesSketches -{ - /** - * Runs the given task that updates a Doubles sketch backed by a direct byte buffer. This method intentionally - * accepts the update task as a {@link Runnable} instead of accpeting parameters of a sketch and other values - * needed for the update. This is to avoid any potential performance impact especially when the sketch is updated - * in a tight loop. The update task can throw NullPointerException because of the known issue filed in - * https://github.com/apache/druid/issues/11544. This method catches NPE and converts it to a more user-friendly - * exception. This method should be removed once the known bug above is fixed. - */ - public static void handleMaxStreamLengthLimit(Runnable updateSketchTask) - { - try { - updateSketchTask.run(); - } - catch (NullPointerException e) { - throw new ISE( - e, - "NullPointerException was thrown while updating Doubles sketch. " - + "This exception could be potentially because of the known bug filed in https://github.com/apache/druid/issues/11544. " - + "You could try a higher maxStreamLength than current to work around this bug if that is the case. " - + "See https://druid.apache.org/docs/latest/development/extensions-core/datasketches-quantiles.html for more details." - ); - } - } - - private DoublesSketches() - { - } -} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java index d20f369a9d4..2c05509e7d1 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java @@ -33,13 +33,10 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.testing.InitializedNullHandlingTest; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; import org.junit.After; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -60,9 +57,6 @@ public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - @Rule - public final ExpectedException expectedException = ExpectedException.none(); - public DoublesSketchAggregatorTest(final GroupByQueryConfig config, final String vectorize) { this.config = config; @@ -544,12 +538,9 @@ public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest } @Test - public void testFailureWhenMaxStreamLengthHit() throws Exception + public void testSuccessWhenMaxStreamLengthHit() throws Exception { if (GroupByStrategySelector.STRATEGY_V1.equals(config.getDefaultStrategy())) { - expectedException.expect(new RecursiveExceptionMatcher(IllegalStateException.class)); - expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch"); - helper.createIndexAndRunQueryOnSegment( new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()), String.join( @@ -633,39 +624,8 @@ public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest "}" ) ); - - expectedException.expect(new RecursiveExceptionMatcher(IllegalStateException.class)); - expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch"); seq.toList(); } } - private static class RecursiveExceptionMatcher extends BaseMatcher - { - private final Class expected; - - private RecursiveExceptionMatcher(Class expected) - { - this.expected = expected; - } - - @Override - public boolean matches(Object item) - { - if (expected.isInstance(item)) { - return true; - } else if (item instanceof Throwable) { - if (((Throwable) item).getCause() != null) { - return matches(((Throwable) item).getCause()); - } - } - return false; - } - - @Override - public void describeTo(Description description) - { - description.appendText("a recursive instance of ").appendText(expected.getName()); - } - } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java index 75229872dd1..4959e4ce5b4 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java @@ -828,14 +828,14 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest } @Test - public void testFailWithSmallMaxStreamLength() throws Exception + public void testSuccessWithSmallMaxStreamLength() throws Exception { final Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); context.put( DoublesSketchApproxQuantileSqlAggregator.CTX_APPROX_QUANTILE_DS_MAX_STREAM_LENGTH, 1 ); - testQueryThrows( + testQuery( "SELECT\n" + "APPROX_QUANTILE_DS(m1, 0.01),\n" + "APPROX_QUANTILE_DS(cnt, 0.5)\n" @@ -856,11 +856,13 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest ) .context(context) .build() - ), - expectedException -> { - expectedException.expect(IllegalStateException.class); - expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch"); - } + ), + ImmutableList.of( + new Object[]{ + 1.0, + 1.0 + } + ) ); } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java index 7c37097860c..badbf0a1110 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java @@ -242,6 +242,7 @@ public class ArrayOfDoublesSketchAggregationTest extends InitializedNullHandling " \"name\": \"intersection\",", " \"operation\": \"INTERSECT\",", " \"nominalEntries\": 1024,", + " \"numberOfValues\": 2,", " \"fields\": [{\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}, {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}]", " }},", " {\"type\": \"arrayOfDoublesSketchToEstimate\", \"name\": \"anotb\", \"field\": {", @@ -249,6 +250,7 @@ public class ArrayOfDoublesSketchAggregationTest extends InitializedNullHandling " \"name\": \"anotb\",", " \"operation\": \"NOT\",", " \"nominalEntries\": 1024,", + " \"numberOfValues\": 2,", " \"fields\": [{\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}, {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}]", " }},", " {", diff --git a/licenses.yaml b/licenses.yaml index 93333e0517a..abf4200a432 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3731,7 +3731,7 @@ name: DataSketches license_category: binary module: java-core license_name: Apache License version 2.0 -version: 3.0.0 +version: 3.1.0 libraries: - org.apache.datasketches: datasketches-java diff --git a/pom.xml b/pom.xml index c4829ccd8cc..425de34be98 100644 --- a/pom.xml +++ b/pom.xml @@ -84,7 +84,7 @@ 1.21.0 - 3.0.0 + 3.1.0 2.0.0 10.14.2.0 4.0.0