From c3fbe5158d9f4e84aa7bffe9802052b61878b178 Mon Sep 17 00:00:00 2001 From: Alexander Saydakov Date: Wed, 27 Sep 2017 13:18:33 -0700 Subject: [PATCH] use latest sketches-core-0.10.1 and memory-0.10.3 (#4828) * use latest sketches-core-0.10.1 and memory-0.10.3 * style fix * better variable name * removed explicit dependency on memory --- extensions-core/datasketches/pom.xml | 2 +- .../datasketches/theta/SketchAggregator.java | 2 +- .../theta/SketchAggregatorFactory.java | 2 +- .../theta/SketchBufferAggregator.java | 26 +++++++++---------- .../datasketches/theta/SketchHolder.java | 11 ++++---- .../theta/SketchObjectStrategy.java | 5 ++-- .../theta/SketchSetPostAggregator.java | 2 +- .../datasketches/theta/SynchronizedUnion.java | 9 ++++++- ...UsingSketchMergeAggregatorFactoryTest.java | 2 +- .../theta/SketchAggregationTest.java | 8 +++--- .../oldapi/OldApiSketchAggregationTest.java | 2 +- 11 files changed, 37 insertions(+), 34 deletions(-) diff --git a/extensions-core/datasketches/pom.xml b/extensions-core/datasketches/pom.xml index 9f7ebfe5d84..d3fc15adc2a 100644 --- a/extensions-core/datasketches/pom.xml +++ b/extensions-core/datasketches/pom.xml @@ -38,7 +38,7 @@ com.yahoo.datasketches sketches-core - 0.8.4 + 0.10.1 io.druid diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java index 8d2df6fb9a6..8587fe59ea3 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java @@ -37,7 +37,7 @@ public class SketchAggregator implements Aggregator public SketchAggregator(ObjectColumnSelector selector, int size) { this.selector = selector; - union = new SynchronizedUnion((Union) SetOperation.builder().build(size, Family.UNION)); + union = new SynchronizedUnion((Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION)); } @Override diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index 8ec4afd55d9..7d5270df38d 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -109,7 +109,7 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory { return new ObjectAggregateCombiner() { - private final Union union = (Union) SetOperation.builder().build(size, Family.UNION); + private final Union union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION); private final SketchHolder combined = SketchHolder.of(union); @Override diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index 266bdf17698..e7f239ad860 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -19,9 +19,7 @@ package io.druid.query.aggregation.datasketches.theta; -import com.yahoo.memory.Memory; -import com.yahoo.memory.MemoryRegion; -import com.yahoo.memory.NativeMemory; +import com.yahoo.memory.WritableMemory; import com.yahoo.sketches.Family; import com.yahoo.sketches.theta.SetOperation; import com.yahoo.sketches.theta.Union; @@ -40,7 +38,7 @@ public class SketchBufferAggregator implements BufferAggregator private final int size; private final int maxIntermediateSize; private final IdentityHashMap> unions = new IdentityHashMap<>(); - private final IdentityHashMap nmCache = new IdentityHashMap<>(); + private final IdentityHashMap memCache = new IdentityHashMap<>(); public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIntermediateSize) { @@ -91,11 +89,10 @@ public class SketchBufferAggregator implements BufferAggregator private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) { - NativeMemory nm = getNativeMemory(buf); - Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); + WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize); Union union = isWrapped ? (Union) SetOperation.wrap(mem) - : (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION); + : (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem); Int2ObjectMap unionMap = unions.get(buf); if (unionMap == null) { unionMap = new Int2ObjectOpenHashMap<>(); @@ -127,6 +124,7 @@ public class SketchBufferAggregator implements BufferAggregator public void close() { unions.clear(); + memCache.clear(); } @Override @@ -144,19 +142,19 @@ public class SketchBufferAggregator implements BufferAggregator unionMap.remove(oldPosition); if (unionMap.isEmpty()) { unions.remove(oldBuffer); - nmCache.remove(oldBuffer); + memCache.remove(oldBuffer); } } } - private NativeMemory getNativeMemory(ByteBuffer buffer) + private WritableMemory getMemory(ByteBuffer buffer) { - NativeMemory nm = nmCache.get(buffer); - if (nm == null) { - nm = new NativeMemory(buffer); - nmCache.put(buffer, nm); + WritableMemory mem = memCache.get(buffer); + if (mem == null) { + mem = WritableMemory.wrap(buffer); + memCache.put(buffer, mem); } - return nm; + return mem; } } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java index 33a1779b9e0..4f1147caf17 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java @@ -24,7 +24,6 @@ import com.google.common.collect.Ordering; import com.google.common.primitives.Doubles; import com.google.common.primitives.Longs; import com.yahoo.memory.Memory; -import com.yahoo.memory.NativeMemory; import com.yahoo.sketches.Family; import com.yahoo.sketches.theta.AnotB; import com.yahoo.sketches.theta.Intersection; @@ -187,7 +186,7 @@ public class SketchHolder holder2.invalidateCache(); return holder2; } else { - Union union = (Union) SetOperation.builder().build(nomEntries, Family.UNION); + Union union = (Union) SetOperation.builder().setNominalEntries(nomEntries).build(Family.UNION); holder1.updateUnion(union); holder2.updateUnion(union); return SketchHolder.of(union); @@ -227,7 +226,7 @@ public class SketchHolder private static Sketch deserializeFromByteArray(byte[] data) { - return deserializeFromMemory(new NativeMemory(data)); + return deserializeFromMemory(Memory.wrap(data)); } private static Sketch deserializeFromMemory(Memory mem) @@ -255,13 +254,13 @@ public class SketchHolder //the final stages of query processing, ordered sketch would be of no use. switch (func) { case UNION: - Union union = (Union) SetOperation.builder().build(sketchSize, Family.UNION); + Union union = (Union) SetOperation.builder().setNominalEntries(sketchSize).build(Family.UNION); for (Object o : holders) { ((SketchHolder) o).updateUnion(union); } return SketchHolder.of(union); case INTERSECT: - Intersection intersection = (Intersection) SetOperation.builder().build(sketchSize, Family.INTERSECTION); + Intersection intersection = (Intersection) SetOperation.builder().setNominalEntries(sketchSize).build(Family.INTERSECTION); for (Object o : holders) { intersection.update(((SketchHolder) o).getSketch()); } @@ -277,7 +276,7 @@ public class SketchHolder Sketch result = ((SketchHolder) holders[0]).getSketch(); for (int i = 1; i < holders.length; i++) { - AnotB anotb = (AnotB) SetOperation.builder().build(sketchSize, Family.A_NOT_B); + AnotB anotb = (AnotB) SetOperation.builder().setNominalEntries(sketchSize).build(Family.A_NOT_B); anotb.update(result, ((SketchHolder) holders[i]).getSketch()); result = anotb.getResult(false, null); } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java index 2e2550a0b3b..714c5d73bca 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java @@ -19,8 +19,7 @@ package io.druid.query.aggregation.datasketches.theta; -import com.yahoo.memory.MemoryRegion; -import com.yahoo.memory.NativeMemory; +import com.yahoo.memory.Memory; import com.yahoo.sketches.theta.Sketch; import io.druid.java.util.common.IAE; import io.druid.segment.data.ObjectStrategy; @@ -51,7 +50,7 @@ public class SketchObjectStrategy implements ObjectStrategy return SketchHolder.EMPTY; } - return SketchHolder.of(new MemoryRegion(new NativeMemory(buffer), buffer.position(), numBytes)); + return SketchHolder.of(Memory.wrap(buffer).region(buffer.position(), numBytes)); } @Override diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java index 83ac19ba43b..df097048114 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java @@ -155,7 +155,7 @@ public class SketchSetPostAggregator implements PostAggregator if (!fields.equals(that.fields)) { return false; } - return func == that.func; + return func.equals(that.func); } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SynchronizedUnion.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SynchronizedUnion.java index 819c68b8481..d3cb529cf75 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SynchronizedUnion.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SynchronizedUnion.java @@ -20,6 +20,7 @@ package io.druid.query.aggregation.datasketches.theta; import com.yahoo.memory.Memory; +import com.yahoo.memory.WritableMemory; import com.yahoo.sketches.theta.CompactSketch; import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Union; @@ -90,7 +91,7 @@ public class SynchronizedUnion implements Union } @Override - public synchronized CompactSketch getResult(boolean b, Memory memory) + public synchronized CompactSketch getResult(boolean b, WritableMemory memory) { return delegate.getResult(b, memory); } @@ -112,4 +113,10 @@ public class SynchronizedUnion implements Union { delegate.reset(); } + + @Override + public boolean isSameResource(Memory mem) + { + return delegate.isSameResource(mem); + } } diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java index c8995815258..b287712c181 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java @@ -69,7 +69,7 @@ public class BufferHashGrouperUsingSketchMergeAggregatorFactoryTest try { final int expectedMaxSize = 5; - SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16)); + SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().setNominalEntries(16).build()); UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch(); updateSketch.update(1); diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java index e98c74bb706..49352ac18f1 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java @@ -229,7 +229,7 @@ public class SketchAggregationTest @Test public void testSketchMergeFinalization() throws Exception { - SketchHolder sketch = SketchHolder.of(Sketches.updateSketchBuilder().build(128)); + SketchHolder sketch = SketchHolder.of(Sketches.updateSketchBuilder().setNominalEntries(128).build()); SketchMergeAggregatorFactory agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null); Assert.assertEquals(0.0, ((Double) agg.finalizeComputation(sketch)).doubleValue(), 0.0001); @@ -370,7 +370,7 @@ public class SketchAggregationTest Comparator comparator = SketchHolder.COMPARATOR; Assert.assertEquals(0, comparator.compare(null, null)); - Union union1 = (Union) SetOperation.builder().build(1 << 4, Family.UNION); + Union union1 = (Union) SetOperation.builder().setNominalEntries(1 << 4).build(Family.UNION); union1.update("a"); union1.update("b"); Sketch sketch1 = union1.getResult(); @@ -378,7 +378,7 @@ public class SketchAggregationTest Assert.assertEquals(-1, comparator.compare(null, SketchHolder.of(sketch1))); Assert.assertEquals(1, comparator.compare(SketchHolder.of(sketch1), null)); - Union union2 = (Union) SetOperation.builder().build(1 << 4, Family.UNION); + Union union2 = (Union) SetOperation.builder().setNominalEntries(1 << 4).build(Family.UNION); union2.update("a"); union2.update("b"); union2.update("c"); @@ -396,7 +396,7 @@ public class SketchAggregationTest public void testRelocation() { final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); - SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16)); + SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().setNominalEntries(16).build()); UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch(); updateSketch.update(1); diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java index b756673b26c..a001ed77f7a 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java @@ -203,7 +203,7 @@ public class OldApiSketchAggregationTest public void testRelocation() { final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); - SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16)); + SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().setNominalEntries(16).build()); UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch(); updateSketch.update(1);