Use datasketches version 3.2.0 (#12509)

Changes:
- Use apache datasketches version 3.2.0.
- Remove unsafe reflection-based usage of datasketch internals added in #12022
This commit is contained in:
Kashif Faraz 2022-05-13 11:28:15 +05:30 committed by GitHub
parent 39b3487aa9
commit 7ab2170802
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 4 additions and 48 deletions

View File

@ -21,7 +21,6 @@ package org.apache.druid.query.aggregation.datasketches.theta;
import org.apache.datasketches.Family;
import org.apache.datasketches.theta.SetOperation;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
@ -29,7 +28,6 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.util.List;
public class SketchAggregator implements Aggregator
@ -41,31 +39,6 @@ public class SketchAggregator implements Aggregator
@Nullable
private Union union;
@Nullable
private Sketch sketch;
@Nullable
private static Field SKETCH_FIELD;
/**
* Initializes static fields of the SketchAggregator needed for memory
* estimation.
*/
public static synchronized void initialize()
{
if (SKETCH_FIELD != null) {
return;
}
try {
SKETCH_FIELD = Class.forName("org.apache.datasketches.theta.UnionImpl")
.getDeclaredField("gadget_");
SKETCH_FIELD.setAccessible(true);
}
catch (NoSuchFieldException | ClassNotFoundException e) {
throw new ISE(e, "Could not initialize SketchAggregator");
}
}
public SketchAggregator(BaseObjectColumnValueSelector selector, int size)
{
this.selector = selector;
@ -77,16 +50,6 @@ public class SketchAggregator implements Aggregator
union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION);
}
private void initSketch()
{
try {
sketch = (Sketch) SKETCH_FIELD.get(union);
}
catch (IllegalAccessException e) {
throw new ISE(e, "Could not initialize sketch field in SketchAggregator");
}
}
@Override
public void aggregate()
{
@ -111,24 +74,20 @@ public class SketchAggregator implements Aggregator
}
synchronized (this) {
long unionSizeDelta = 0;
long initialSketchSize = 0;
if (union == null) {
initUnion();
// Size of UnionImpl = 16B (object header) + 8B (sketch ref) + 2B (short)
// + 8B (long) + 1B (boolean) + 5B (padding) = 40B
unionSizeDelta = 40L;
}
long initialSketchSize = 0;
if (sketch == null) {
initSketch();
} else {
initialSketchSize = sketch.getCurrentBytes();
initialSketchSize = union.getCurrentBytes();
}
updateUnion(union, update);
long sketchSizeDelta = sketch.getCurrentBytes() - initialSketchSize;
long sketchSizeDelta = union.getCurrentBytes() - initialSketchSize;
return sketchSizeDelta + unionSizeDelta;
}
}

View File

@ -71,7 +71,6 @@ public class SketchModule implements DruidModule
ThetaSketchApproxCountDistinctSqlAggregator.NAME,
ThetaSketchApproxCountDistinctSqlAggregator.class
);
SketchAggregator.initialize();
}
@Override

View File

@ -549,8 +549,6 @@ public class SketchAggregationTest
@Test
public void testAggregateWithSize()
{
SketchAggregator.initialize();
final String[] columnValues = new String[20];
for (int i = 0; i < columnValues.length; ++i) {
columnValues[i] = "" + i;

View File

@ -84,7 +84,7 @@
<!-- sql/src/main/codegen/config.fmpp is based on a file from calcite-core, and needs to be
updated when upgrading Calcite. Refer to the top-level comments in that file for details. -->
<calcite.version>1.21.0</calcite.version>
<datasketches.version>3.1.0</datasketches.version>
<datasketches.version>3.2.0</datasketches.version>
<datasketches.memory.version>2.0.0</datasketches.memory.version>
<derby.version>10.14.2.0</derby.version>
<dropwizard.metrics.version>4.0.0</dropwizard.metrics.version>