latest datasketches-java-3.1.0 (#12224)

These changes are to use the latest datasketches-java-3.1.0 and also to restore support for quantile and HLL4 sketches to be able to grow larger than a given buffer in a buffer aggregator and move to heap in rare cases. This was discussed in #11544.

Co-authored-by: AlexanderSaydakov <AlexanderSaydakov@users.noreply.github.com>
This commit is contained in:
Alexander Saydakov 2022-03-01 17:14:42 -08:00 committed by GitHub
parent 3f709db173
commit 50038d9344
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 57 additions and 144 deletions

View File

@ -23,6 +23,8 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.memory.DefaultMemoryRequestServer;
import org.apache.datasketches.memory.MemoryRequestServer;
import org.apache.datasketches.memory.WritableMemory;
import java.nio.ByteBuffer;
@ -31,6 +33,7 @@ import java.util.IdentityHashMap;
public class HllSketchBuildBufferAggregatorHelper
{
private static final MemoryRequestServer MEM_REQ_SERVER = new DefaultMemoryRequestServer();
private final int lgK;
private final int size;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
@ -123,7 +126,8 @@ public class HllSketchBuildBufferAggregatorHelper
private WritableMemory getMemory(final ByteBuffer buf)
{
return memCache.computeIfAbsent(buf, b -> WritableMemory.writableWrap(b, ByteOrder.LITTLE_ENDIAN));
return memCache.computeIfAbsent(buf,
b -> WritableMemory.writableWrap(b, ByteOrder.LITTLE_ENDIAN, MEM_REQ_SERVER));
}
private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch)

View File

@ -57,7 +57,7 @@ public class DoublesSketchBuildBufferAggregator implements BufferAggregator
}
final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buffer, position);
DoublesSketches.handleMaxStreamLengthLimit(() -> sketch.update(selector.getDouble()));
sketch.update(selector.getDouble());
}
@Nullable

View File

@ -21,6 +21,8 @@ package org.apache.druid.query.aggregation.datasketches.quantiles;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.memory.DefaultMemoryRequestServer;
import org.apache.datasketches.memory.MemoryRequestServer;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.CompactDoublesSketch;
import org.apache.datasketches.quantiles.DoublesSketch;
@ -32,6 +34,7 @@ import java.util.IdentityHashMap;
public class DoublesSketchBuildBufferAggregatorHelper
{
private static final MemoryRequestServer MEM_REQ_SERVER = new DefaultMemoryRequestServer();
private final int size;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
@ -92,7 +95,8 @@ public class DoublesSketchBuildBufferAggregatorHelper
private WritableMemory getMemory(final ByteBuffer buffer)
{
return memCache.computeIfAbsent(buffer, buf -> WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN));
return memCache.computeIfAbsent(buffer,
buf -> WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN, MEM_REQ_SERVER));
}
private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch)

View File

@ -55,13 +55,11 @@ public class DoublesSketchBuildVectorAggregator implements VectorAggregator
final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buf, position);
DoublesSketches.handleMaxStreamLengthLimit(() -> {
for (int i = startRow; i < endRow; i++) {
if (nulls == null || !nulls[i]) {
sketch.update(doubles[i]);
}
for (int i = startRow; i < endRow; i++) {
if (nulls == null || !nulls[i]) {
sketch.update(doubles[i]);
}
});
}
}
@Override
@ -76,16 +74,14 @@ public class DoublesSketchBuildVectorAggregator implements VectorAggregator
final double[] doubles = selector.getDoubleVector();
final boolean[] nulls = selector.getNullVector();
DoublesSketches.handleMaxStreamLengthLimit(() -> {
for (int i = 0; i < numRows; i++) {
final int idx = rows != null ? rows[i] : i;
for (int i = 0; i < numRows; i++) {
final int idx = rows != null ? rows[i] : i;
if (nulls == null || !nulls[idx]) {
final int position = positions[i] + positionOffset;
helper.getSketchAtPosition(buf, position).update(doubles[idx]);
}
if (nulls == null || !nulls[idx]) {
final int position = positions[i] + positionOffset;
helper.getSketchAtPosition(buf, position).update(doubles[idx]);
}
});
}
}
@Override

View File

@ -76,12 +76,10 @@ public class DoublesSketchMergeAggregator implements Aggregator
if (object == null) {
return;
}
DoublesSketches.handleMaxStreamLengthLimit(() -> {
if (object instanceof DoublesSketch) {
union.update((DoublesSketch) object);
} else {
union.update(selector.getDouble());
}
});
if (object instanceof DoublesSketch) {
union.update((DoublesSketch) object);
} else {
union.update(selector.getDouble());
}
}
}

View File

@ -21,6 +21,8 @@ package org.apache.druid.query.aggregation.datasketches.quantiles;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.memory.DefaultMemoryRequestServer;
import org.apache.datasketches.memory.MemoryRequestServer;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.DoublesUnion;
@ -30,6 +32,7 @@ import java.util.IdentityHashMap;
public class DoublesSketchMergeBufferAggregatorHelper
{
private static final MemoryRequestServer MEM_REQ_SERVER = new DefaultMemoryRequestServer();
private final int k;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
@ -93,7 +96,8 @@ public class DoublesSketchMergeBufferAggregatorHelper
private WritableMemory getMemory(final ByteBuffer buffer)
{
return memCache.computeIfAbsent(buffer, buf -> WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN));
return memCache.computeIfAbsent(buffer,
buf -> WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN, MEM_REQ_SERVER));
}
private void putUnion(final ByteBuffer buffer, final int position, final DoublesUnion union)

View File

@ -55,14 +55,12 @@ public class DoublesSketchMergeVectorAggregator implements VectorAggregator
final DoublesUnion union = helper.getSketchAtPosition(buf, position);
DoublesSketches.handleMaxStreamLengthLimit(() -> {
for (int i = startRow; i < endRow; i++) {
final DoublesSketch sketch = (DoublesSketch) vector[i];
if (sketch != null) {
union.update(sketch);
}
for (int i = startRow; i < endRow; i++) {
final DoublesSketch sketch = (DoublesSketch) vector[i];
if (sketch != null) {
union.update(sketch);
}
});
}
}
@Override
@ -76,17 +74,15 @@ public class DoublesSketchMergeVectorAggregator implements VectorAggregator
{
final Object[] vector = selector.getObjectVector();
DoublesSketches.handleMaxStreamLengthLimit(() -> {
for (int i = 0; i < numRows; i++) {
final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? rows[i] : i];
for (int i = 0; i < numRows; i++) {
final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? rows[i] : i];
if (sketch != null) {
final int position = positions[i] + positionOffset;
final DoublesUnion union = helper.getSketchAtPosition(buf, position);
union.update(sketch);
}
if (sketch != null) {
final int position = positions[i] + positionOffset;
final DoublesUnion union = helper.getSketchAtPosition(buf, position);
union.update(sketch);
}
});
}
}
@Nullable

View File

@ -1,53 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles;
import org.apache.druid.java.util.common.ISE;
public final class DoublesSketches
{
/**
* Runs the given task that updates a Doubles sketch backed by a direct byte buffer. This method intentionally
* accepts the update task as a {@link Runnable} instead of accpeting parameters of a sketch and other values
* needed for the update. This is to avoid any potential performance impact especially when the sketch is updated
* in a tight loop. The update task can throw NullPointerException because of the known issue filed in
* https://github.com/apache/druid/issues/11544. This method catches NPE and converts it to a more user-friendly
* exception. This method should be removed once the known bug above is fixed.
*/
public static void handleMaxStreamLengthLimit(Runnable updateSketchTask)
{
try {
updateSketchTask.run();
}
catch (NullPointerException e) {
throw new ISE(
e,
"NullPointerException was thrown while updating Doubles sketch. "
+ "This exception could be potentially because of the known bug filed in https://github.com/apache/druid/issues/11544. "
+ "You could try a higher maxStreamLength than current to work around this bug if that is the case. "
+ "See https://druid.apache.org/docs/latest/development/extensions-core/datasketches-quantiles.html for more details."
);
}
}
private DoublesSketches()
{
}
}

View File

@ -33,13 +33,10 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -60,9 +57,6 @@ public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Rule
public final ExpectedException expectedException = ExpectedException.none();
public DoublesSketchAggregatorTest(final GroupByQueryConfig config, final String vectorize)
{
this.config = config;
@ -544,12 +538,9 @@ public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest
}
@Test
public void testFailureWhenMaxStreamLengthHit() throws Exception
public void testSuccessWhenMaxStreamLengthHit() throws Exception
{
if (GroupByStrategySelector.STRATEGY_V1.equals(config.getDefaultStrategy())) {
expectedException.expect(new RecursiveExceptionMatcher(IllegalStateException.class));
expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch");
helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()),
String.join(
@ -633,39 +624,8 @@ public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest
"}"
)
);
expectedException.expect(new RecursiveExceptionMatcher(IllegalStateException.class));
expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch");
seq.toList();
}
}
private static class RecursiveExceptionMatcher extends BaseMatcher<Object>
{
private final Class<? extends Throwable> expected;
private RecursiveExceptionMatcher(Class<? extends Throwable> expected)
{
this.expected = expected;
}
@Override
public boolean matches(Object item)
{
if (expected.isInstance(item)) {
return true;
} else if (item instanceof Throwable) {
if (((Throwable) item).getCause() != null) {
return matches(((Throwable) item).getCause());
}
}
return false;
}
@Override
public void describeTo(Description description)
{
description.appendText("a recursive instance of ").appendText(expected.getName());
}
}
}

View File

@ -828,14 +828,14 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
}
@Test
public void testFailWithSmallMaxStreamLength() throws Exception
public void testSuccessWithSmallMaxStreamLength() throws Exception
{
final Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(
DoublesSketchApproxQuantileSqlAggregator.CTX_APPROX_QUANTILE_DS_MAX_STREAM_LENGTH,
1
);
testQueryThrows(
testQuery(
"SELECT\n"
+ "APPROX_QUANTILE_DS(m1, 0.01),\n"
+ "APPROX_QUANTILE_DS(cnt, 0.5)\n"
@ -856,11 +856,13 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
)
.context(context)
.build()
),
expectedException -> {
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch");
}
),
ImmutableList.of(
new Object[]{
1.0,
1.0
}
)
);
}

View File

@ -242,6 +242,7 @@ public class ArrayOfDoublesSketchAggregationTest extends InitializedNullHandling
" \"name\": \"intersection\",",
" \"operation\": \"INTERSECT\",",
" \"nominalEntries\": 1024,",
" \"numberOfValues\": 2,",
" \"fields\": [{\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}, {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}]",
" }},",
" {\"type\": \"arrayOfDoublesSketchToEstimate\", \"name\": \"anotb\", \"field\": {",
@ -249,6 +250,7 @@ public class ArrayOfDoublesSketchAggregationTest extends InitializedNullHandling
" \"name\": \"anotb\",",
" \"operation\": \"NOT\",",
" \"nominalEntries\": 1024,",
" \"numberOfValues\": 2,",
" \"fields\": [{\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}, {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}]",
" }},",
" {",

View File

@ -3731,7 +3731,7 @@ name: DataSketches
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 3.0.0
version: 3.1.0
libraries:
- org.apache.datasketches: datasketches-java

View File

@ -84,7 +84,7 @@
<!-- sql/src/main/codegen/config.fmpp is based on a file from calcite-core, and needs to be
updated when upgrading Calcite. Refer to the top-level comments in that file for details. -->
<calcite.version>1.21.0</calcite.version>
<datasketches.version>3.0.0</datasketches.version>
<datasketches.version>3.1.0</datasketches.version>
<datasketches.memory.version>2.0.0</datasketches.memory.version>
<derby.version>10.14.2.0</derby.version>
<dropwizard.metrics.version>4.0.0</dropwizard.metrics.version>