diff --git a/extensions-core/datasketches/pom.xml b/extensions-core/datasketches/pom.xml
index 9f7ebfe5d84..d3fc15adc2a 100644
--- a/extensions-core/datasketches/pom.xml
+++ b/extensions-core/datasketches/pom.xml
@@ -38,7 +38,7 @@
com.yahoo.datasketches
sketches-core
- 0.8.4
+ 0.10.1
io.druid
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java
index 8d2df6fb9a6..8587fe59ea3 100644
--- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java
@@ -37,7 +37,7 @@ public class SketchAggregator implements Aggregator
public SketchAggregator(ObjectColumnSelector selector, int size)
{
this.selector = selector;
- union = new SynchronizedUnion((Union) SetOperation.builder().build(size, Family.UNION));
+ union = new SynchronizedUnion((Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION));
}
@Override
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
index 8ec4afd55d9..7d5270df38d 100644
--- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
@@ -109,7 +109,7 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
{
return new ObjectAggregateCombiner()
{
- private final Union union = (Union) SetOperation.builder().build(size, Family.UNION);
+ private final Union union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION);
private final SketchHolder combined = SketchHolder.of(union);
@Override
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
index 266bdf17698..e7f239ad860 100644
--- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
@@ -19,9 +19,7 @@
package io.druid.query.aggregation.datasketches.theta;
-import com.yahoo.memory.Memory;
-import com.yahoo.memory.MemoryRegion;
-import com.yahoo.memory.NativeMemory;
+import com.yahoo.memory.WritableMemory;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Union;
@@ -40,7 +38,7 @@ public class SketchBufferAggregator implements BufferAggregator
private final int size;
private final int maxIntermediateSize;
private final IdentityHashMap> unions = new IdentityHashMap<>();
- private final IdentityHashMap nmCache = new IdentityHashMap<>();
+ private final IdentityHashMap memCache = new IdentityHashMap<>();
public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIntermediateSize)
{
@@ -91,11 +89,10 @@ public class SketchBufferAggregator implements BufferAggregator
private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped)
{
- NativeMemory nm = getNativeMemory(buf);
- Memory mem = new MemoryRegion(nm, position, maxIntermediateSize);
+ WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize);
Union union = isWrapped
? (Union) SetOperation.wrap(mem)
- : (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION);
+ : (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem);
Int2ObjectMap unionMap = unions.get(buf);
if (unionMap == null) {
unionMap = new Int2ObjectOpenHashMap<>();
@@ -127,6 +124,7 @@ public class SketchBufferAggregator implements BufferAggregator
public void close()
{
unions.clear();
+ memCache.clear();
}
@Override
@@ -144,19 +142,19 @@ public class SketchBufferAggregator implements BufferAggregator
unionMap.remove(oldPosition);
if (unionMap.isEmpty()) {
unions.remove(oldBuffer);
- nmCache.remove(oldBuffer);
+ memCache.remove(oldBuffer);
}
}
}
- private NativeMemory getNativeMemory(ByteBuffer buffer)
+ private WritableMemory getMemory(ByteBuffer buffer)
{
- NativeMemory nm = nmCache.get(buffer);
- if (nm == null) {
- nm = new NativeMemory(buffer);
- nmCache.put(buffer, nm);
+ WritableMemory mem = memCache.get(buffer);
+ if (mem == null) {
+ mem = WritableMemory.wrap(buffer);
+ memCache.put(buffer, mem);
}
- return nm;
+ return mem;
}
}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
index 33a1779b9e0..4f1147caf17 100644
--- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Ordering;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
import com.yahoo.memory.Memory;
-import com.yahoo.memory.NativeMemory;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.theta.AnotB;
import com.yahoo.sketches.theta.Intersection;
@@ -187,7 +186,7 @@ public class SketchHolder
holder2.invalidateCache();
return holder2;
} else {
- Union union = (Union) SetOperation.builder().build(nomEntries, Family.UNION);
+ Union union = (Union) SetOperation.builder().setNominalEntries(nomEntries).build(Family.UNION);
holder1.updateUnion(union);
holder2.updateUnion(union);
return SketchHolder.of(union);
@@ -227,7 +226,7 @@ public class SketchHolder
private static Sketch deserializeFromByteArray(byte[] data)
{
- return deserializeFromMemory(new NativeMemory(data));
+ return deserializeFromMemory(Memory.wrap(data));
}
private static Sketch deserializeFromMemory(Memory mem)
@@ -255,13 +254,13 @@ public class SketchHolder
//the final stages of query processing, ordered sketch would be of no use.
switch (func) {
case UNION:
- Union union = (Union) SetOperation.builder().build(sketchSize, Family.UNION);
+ Union union = (Union) SetOperation.builder().setNominalEntries(sketchSize).build(Family.UNION);
for (Object o : holders) {
((SketchHolder) o).updateUnion(union);
}
return SketchHolder.of(union);
case INTERSECT:
- Intersection intersection = (Intersection) SetOperation.builder().build(sketchSize, Family.INTERSECTION);
+ Intersection intersection = (Intersection) SetOperation.builder().setNominalEntries(sketchSize).build(Family.INTERSECTION);
for (Object o : holders) {
intersection.update(((SketchHolder) o).getSketch());
}
@@ -277,7 +276,7 @@ public class SketchHolder
Sketch result = ((SketchHolder) holders[0]).getSketch();
for (int i = 1; i < holders.length; i++) {
- AnotB anotb = (AnotB) SetOperation.builder().build(sketchSize, Family.A_NOT_B);
+ AnotB anotb = (AnotB) SetOperation.builder().setNominalEntries(sketchSize).build(Family.A_NOT_B);
anotb.update(result, ((SketchHolder) holders[i]).getSketch());
result = anotb.getResult(false, null);
}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java
index 2e2550a0b3b..714c5d73bca 100644
--- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java
@@ -19,8 +19,7 @@
package io.druid.query.aggregation.datasketches.theta;
-import com.yahoo.memory.MemoryRegion;
-import com.yahoo.memory.NativeMemory;
+import com.yahoo.memory.Memory;
import com.yahoo.sketches.theta.Sketch;
import io.druid.java.util.common.IAE;
import io.druid.segment.data.ObjectStrategy;
@@ -51,7 +50,7 @@ public class SketchObjectStrategy implements ObjectStrategy
return SketchHolder.EMPTY;
}
- return SketchHolder.of(new MemoryRegion(new NativeMemory(buffer), buffer.position(), numBytes));
+ return SketchHolder.of(Memory.wrap(buffer).region(buffer.position(), numBytes));
}
@Override
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java
index 83ac19ba43b..df097048114 100644
--- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java
@@ -155,7 +155,7 @@ public class SketchSetPostAggregator implements PostAggregator
if (!fields.equals(that.fields)) {
return false;
}
- return func == that.func;
+ return func.equals(that.func);
}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SynchronizedUnion.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SynchronizedUnion.java
index 819c68b8481..d3cb529cf75 100644
--- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SynchronizedUnion.java
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SynchronizedUnion.java
@@ -20,6 +20,7 @@
package io.druid.query.aggregation.datasketches.theta;
import com.yahoo.memory.Memory;
+import com.yahoo.memory.WritableMemory;
import com.yahoo.sketches.theta.CompactSketch;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
@@ -90,7 +91,7 @@ public class SynchronizedUnion implements Union
}
@Override
- public synchronized CompactSketch getResult(boolean b, Memory memory)
+ public synchronized CompactSketch getResult(boolean b, WritableMemory memory)
{
return delegate.getResult(b, memory);
}
@@ -112,4 +113,10 @@ public class SynchronizedUnion implements Union
{
delegate.reset();
}
+
+ @Override
+ public boolean isSameResource(Memory mem)
+ {
+ return delegate.isSameResource(mem);
+ }
}
diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java
index c8995815258..b287712c181 100644
--- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java
+++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java
@@ -69,7 +69,7 @@ public class BufferHashGrouperUsingSketchMergeAggregatorFactoryTest
try {
final int expectedMaxSize = 5;
- SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16));
+ SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().setNominalEntries(16).build());
UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch();
updateSketch.update(1);
diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java
index e98c74bb706..49352ac18f1 100644
--- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java
+++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java
@@ -229,7 +229,7 @@ public class SketchAggregationTest
@Test
public void testSketchMergeFinalization() throws Exception
{
- SketchHolder sketch = SketchHolder.of(Sketches.updateSketchBuilder().build(128));
+ SketchHolder sketch = SketchHolder.of(Sketches.updateSketchBuilder().setNominalEntries(128).build());
SketchMergeAggregatorFactory agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null);
Assert.assertEquals(0.0, ((Double) agg.finalizeComputation(sketch)).doubleValue(), 0.0001);
@@ -370,7 +370,7 @@ public class SketchAggregationTest
Comparator