use latest sketches-core-0.10.1 and memory-0.10.3 (#4828)

* use latest sketches-core-0.10.1 and memory-0.10.3

* style fix

* better variable name

* removed explicit dependency on memory
This commit is contained in:
Alexander Saydakov 2017-09-27 13:18:33 -07:00 committed by Himanshu
parent 999c6d800e
commit c3fbe5158d
11 changed files with 37 additions and 34 deletions

View File

@ -38,7 +38,7 @@
<dependency> <dependency>
<groupId>com.yahoo.datasketches</groupId> <groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId> <artifactId>sketches-core</artifactId>
<version>0.8.4</version> <version>0.10.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>

View File

@ -37,7 +37,7 @@ public class SketchAggregator implements Aggregator
public SketchAggregator(ObjectColumnSelector selector, int size) public SketchAggregator(ObjectColumnSelector selector, int size)
{ {
this.selector = selector; this.selector = selector;
union = new SynchronizedUnion((Union) SetOperation.builder().build(size, Family.UNION)); union = new SynchronizedUnion((Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION));
} }
@Override @Override

View File

@ -109,7 +109,7 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
{ {
return new ObjectAggregateCombiner<SketchHolder>() return new ObjectAggregateCombiner<SketchHolder>()
{ {
private final Union union = (Union) SetOperation.builder().build(size, Family.UNION); private final Union union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION);
private final SketchHolder combined = SketchHolder.of(union); private final SketchHolder combined = SketchHolder.of(union);
@Override @Override

View File

@ -19,9 +19,7 @@
package io.druid.query.aggregation.datasketches.theta; package io.druid.query.aggregation.datasketches.theta;
import com.yahoo.memory.Memory; import com.yahoo.memory.WritableMemory;
import com.yahoo.memory.MemoryRegion;
import com.yahoo.memory.NativeMemory;
import com.yahoo.sketches.Family; import com.yahoo.sketches.Family;
import com.yahoo.sketches.theta.SetOperation; import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Union; import com.yahoo.sketches.theta.Union;
@ -40,7 +38,7 @@ public class SketchBufferAggregator implements BufferAggregator
private final int size; private final int size;
private final int maxIntermediateSize; private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Union>> unions = new IdentityHashMap<>(); private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Union>> unions = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, NativeMemory> nmCache = new IdentityHashMap<>(); private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIntermediateSize) public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIntermediateSize)
{ {
@ -91,11 +89,10 @@ public class SketchBufferAggregator implements BufferAggregator
private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped)
{ {
NativeMemory nm = getNativeMemory(buf); WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize);
Memory mem = new MemoryRegion(nm, position, maxIntermediateSize);
Union union = isWrapped Union union = isWrapped
? (Union) SetOperation.wrap(mem) ? (Union) SetOperation.wrap(mem)
: (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION); : (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem);
Int2ObjectMap<Union> unionMap = unions.get(buf); Int2ObjectMap<Union> unionMap = unions.get(buf);
if (unionMap == null) { if (unionMap == null) {
unionMap = new Int2ObjectOpenHashMap<>(); unionMap = new Int2ObjectOpenHashMap<>();
@ -127,6 +124,7 @@ public class SketchBufferAggregator implements BufferAggregator
public void close() public void close()
{ {
unions.clear(); unions.clear();
memCache.clear();
} }
@Override @Override
@ -144,19 +142,19 @@ public class SketchBufferAggregator implements BufferAggregator
unionMap.remove(oldPosition); unionMap.remove(oldPosition);
if (unionMap.isEmpty()) { if (unionMap.isEmpty()) {
unions.remove(oldBuffer); unions.remove(oldBuffer);
nmCache.remove(oldBuffer); memCache.remove(oldBuffer);
} }
} }
} }
private NativeMemory getNativeMemory(ByteBuffer buffer) private WritableMemory getMemory(ByteBuffer buffer)
{ {
NativeMemory nm = nmCache.get(buffer); WritableMemory mem = memCache.get(buffer);
if (nm == null) { if (mem == null) {
nm = new NativeMemory(buffer); mem = WritableMemory.wrap(buffer);
nmCache.put(buffer, nm); memCache.put(buffer, mem);
} }
return nm; return mem;
} }
} }

View File

@ -24,7 +24,6 @@ import com.google.common.collect.Ordering;
import com.google.common.primitives.Doubles; import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.yahoo.memory.Memory; import com.yahoo.memory.Memory;
import com.yahoo.memory.NativeMemory;
import com.yahoo.sketches.Family; import com.yahoo.sketches.Family;
import com.yahoo.sketches.theta.AnotB; import com.yahoo.sketches.theta.AnotB;
import com.yahoo.sketches.theta.Intersection; import com.yahoo.sketches.theta.Intersection;
@ -187,7 +186,7 @@ public class SketchHolder
holder2.invalidateCache(); holder2.invalidateCache();
return holder2; return holder2;
} else { } else {
Union union = (Union) SetOperation.builder().build(nomEntries, Family.UNION); Union union = (Union) SetOperation.builder().setNominalEntries(nomEntries).build(Family.UNION);
holder1.updateUnion(union); holder1.updateUnion(union);
holder2.updateUnion(union); holder2.updateUnion(union);
return SketchHolder.of(union); return SketchHolder.of(union);
@ -227,7 +226,7 @@ public class SketchHolder
private static Sketch deserializeFromByteArray(byte[] data) private static Sketch deserializeFromByteArray(byte[] data)
{ {
return deserializeFromMemory(new NativeMemory(data)); return deserializeFromMemory(Memory.wrap(data));
} }
private static Sketch deserializeFromMemory(Memory mem) private static Sketch deserializeFromMemory(Memory mem)
@ -255,13 +254,13 @@ public class SketchHolder
//the final stages of query processing, ordered sketch would be of no use. //the final stages of query processing, ordered sketch would be of no use.
switch (func) { switch (func) {
case UNION: case UNION:
Union union = (Union) SetOperation.builder().build(sketchSize, Family.UNION); Union union = (Union) SetOperation.builder().setNominalEntries(sketchSize).build(Family.UNION);
for (Object o : holders) { for (Object o : holders) {
((SketchHolder) o).updateUnion(union); ((SketchHolder) o).updateUnion(union);
} }
return SketchHolder.of(union); return SketchHolder.of(union);
case INTERSECT: case INTERSECT:
Intersection intersection = (Intersection) SetOperation.builder().build(sketchSize, Family.INTERSECTION); Intersection intersection = (Intersection) SetOperation.builder().setNominalEntries(sketchSize).build(Family.INTERSECTION);
for (Object o : holders) { for (Object o : holders) {
intersection.update(((SketchHolder) o).getSketch()); intersection.update(((SketchHolder) o).getSketch());
} }
@ -277,7 +276,7 @@ public class SketchHolder
Sketch result = ((SketchHolder) holders[0]).getSketch(); Sketch result = ((SketchHolder) holders[0]).getSketch();
for (int i = 1; i < holders.length; i++) { for (int i = 1; i < holders.length; i++) {
AnotB anotb = (AnotB) SetOperation.builder().build(sketchSize, Family.A_NOT_B); AnotB anotb = (AnotB) SetOperation.builder().setNominalEntries(sketchSize).build(Family.A_NOT_B);
anotb.update(result, ((SketchHolder) holders[i]).getSketch()); anotb.update(result, ((SketchHolder) holders[i]).getSketch());
result = anotb.getResult(false, null); result = anotb.getResult(false, null);
} }

View File

@ -19,8 +19,7 @@
package io.druid.query.aggregation.datasketches.theta; package io.druid.query.aggregation.datasketches.theta;
import com.yahoo.memory.MemoryRegion; import com.yahoo.memory.Memory;
import com.yahoo.memory.NativeMemory;
import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketch;
import io.druid.java.util.common.IAE; import io.druid.java.util.common.IAE;
import io.druid.segment.data.ObjectStrategy; import io.druid.segment.data.ObjectStrategy;
@ -51,7 +50,7 @@ public class SketchObjectStrategy implements ObjectStrategy
return SketchHolder.EMPTY; return SketchHolder.EMPTY;
} }
return SketchHolder.of(new MemoryRegion(new NativeMemory(buffer), buffer.position(), numBytes)); return SketchHolder.of(Memory.wrap(buffer).region(buffer.position(), numBytes));
} }
@Override @Override

View File

@ -155,7 +155,7 @@ public class SketchSetPostAggregator implements PostAggregator
if (!fields.equals(that.fields)) { if (!fields.equals(that.fields)) {
return false; return false;
} }
return func == that.func; return func.equals(that.func);
} }

View File

@ -20,6 +20,7 @@
package io.druid.query.aggregation.datasketches.theta; package io.druid.query.aggregation.datasketches.theta;
import com.yahoo.memory.Memory; import com.yahoo.memory.Memory;
import com.yahoo.memory.WritableMemory;
import com.yahoo.sketches.theta.CompactSketch; import com.yahoo.sketches.theta.CompactSketch;
import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union; import com.yahoo.sketches.theta.Union;
@ -90,7 +91,7 @@ public class SynchronizedUnion implements Union
} }
@Override @Override
public synchronized CompactSketch getResult(boolean b, Memory memory) public synchronized CompactSketch getResult(boolean b, WritableMemory memory)
{ {
return delegate.getResult(b, memory); return delegate.getResult(b, memory);
} }
@ -112,4 +113,10 @@ public class SynchronizedUnion implements Union
{ {
delegate.reset(); delegate.reset();
} }
@Override
public boolean isSameResource(Memory mem)
{
return delegate.isSameResource(mem);
}
} }

View File

@ -69,7 +69,7 @@ public class BufferHashGrouperUsingSketchMergeAggregatorFactoryTest
try { try {
final int expectedMaxSize = 5; final int expectedMaxSize = 5;
SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16)); SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().setNominalEntries(16).build());
UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch(); UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch();
updateSketch.update(1); updateSketch.update(1);

View File

@ -229,7 +229,7 @@ public class SketchAggregationTest
@Test @Test
public void testSketchMergeFinalization() throws Exception public void testSketchMergeFinalization() throws Exception
{ {
SketchHolder sketch = SketchHolder.of(Sketches.updateSketchBuilder().build(128)); SketchHolder sketch = SketchHolder.of(Sketches.updateSketchBuilder().setNominalEntries(128).build());
SketchMergeAggregatorFactory agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null); SketchMergeAggregatorFactory agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null);
Assert.assertEquals(0.0, ((Double) agg.finalizeComputation(sketch)).doubleValue(), 0.0001); Assert.assertEquals(0.0, ((Double) agg.finalizeComputation(sketch)).doubleValue(), 0.0001);
@ -370,7 +370,7 @@ public class SketchAggregationTest
Comparator<Object> comparator = SketchHolder.COMPARATOR; Comparator<Object> comparator = SketchHolder.COMPARATOR;
Assert.assertEquals(0, comparator.compare(null, null)); Assert.assertEquals(0, comparator.compare(null, null));
Union union1 = (Union) SetOperation.builder().build(1 << 4, Family.UNION); Union union1 = (Union) SetOperation.builder().setNominalEntries(1 << 4).build(Family.UNION);
union1.update("a"); union1.update("a");
union1.update("b"); union1.update("b");
Sketch sketch1 = union1.getResult(); Sketch sketch1 = union1.getResult();
@ -378,7 +378,7 @@ public class SketchAggregationTest
Assert.assertEquals(-1, comparator.compare(null, SketchHolder.of(sketch1))); Assert.assertEquals(-1, comparator.compare(null, SketchHolder.of(sketch1)));
Assert.assertEquals(1, comparator.compare(SketchHolder.of(sketch1), null)); Assert.assertEquals(1, comparator.compare(SketchHolder.of(sketch1), null));
Union union2 = (Union) SetOperation.builder().build(1 << 4, Family.UNION); Union union2 = (Union) SetOperation.builder().setNominalEntries(1 << 4).build(Family.UNION);
union2.update("a"); union2.update("a");
union2.update("b"); union2.update("b");
union2.update("c"); union2.update("c");
@ -396,7 +396,7 @@ public class SketchAggregationTest
public void testRelocation() public void testRelocation()
{ {
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16)); SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().setNominalEntries(16).build());
UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch(); UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch();
updateSketch.update(1); updateSketch.update(1);

View File

@ -203,7 +203,7 @@ public class OldApiSketchAggregationTest
public void testRelocation() public void testRelocation()
{ {
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16)); SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().setNominalEntries(16).build());
UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch(); UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch();
updateSketch.update(1); updateSketch.update(1);