updated to use the latest sketches-core-0.12.0 (#6381)

This commit is contained in:
Alexander Saydakov 2018-10-23 11:20:19 -07:00 committed by Himanshu
parent 84ac18dc1b
commit ec9d1827a0
15 changed files with 39 additions and 157 deletions

View File

@ -38,7 +38,7 @@
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
<version>0.10.3</version>
<version>0.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.aggregation.datasketches.hll;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -135,7 +136,7 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
private WritableMemory getMemory(final ByteBuffer buf)
{
return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b));
return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN));
}
/**

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.aggregation.datasketches.hll;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -66,7 +67,7 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
@Override
public void init(final ByteBuffer buf, final int position)
{
final WritableMemory mem = WritableMemory.wrap(buf).writableRegion(position, size);
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
// Not necessary to keep the constructed object since it is cheap to reconstruct by wrapping the memory.
// The objects are not cached as in BuildBufferAggregator since they never exceed the max size and never move.
// So it is easier to reconstruct them by wrapping memory then to keep position-to-object mappings.
@ -85,7 +86,7 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
if (sketch == null) {
return;
}
final WritableMemory mem = WritableMemory.wrap(buf).writableRegion(position, size);
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).writeLock();
lock.lock();
try {
@ -105,7 +106,7 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
@Override
public Object get(final ByteBuffer buf, final int position)
{
final WritableMemory mem = WritableMemory.wrap(buf).writableRegion(position, size);
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).readLock();
lock.lock();
try {

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.aggregation.datasketches.hll;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import com.yahoo.memory.Memory;
import com.yahoo.sketches.hll.HllSketch;
@ -46,7 +47,7 @@ public class HllSketchObjectStrategy implements ObjectStrategy<HllSketch>
@Override
public HllSketch fromByteBuffer(final ByteBuffer buf, final int size)
{
return HllSketch.wrap(Memory.wrap(buf).region(buf.position(), size));
return HllSketch.wrap(Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN).region(buf.position(), size));
}
@Override

View File

@ -28,6 +28,7 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;
public class DoublesSketchBuildBufferAggregator implements BufferAggregator
@ -112,7 +113,7 @@ public class DoublesSketchBuildBufferAggregator implements BufferAggregator
private WritableMemory getMemory(final ByteBuffer buffer)
{
return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf));
return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
}
private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch)

View File

@ -22,7 +22,6 @@ package org.apache.druid.query.aggregation.datasketches.quantiles;
import com.yahoo.memory.WritableMemory;
import com.yahoo.sketches.quantiles.DoublesSketch;
import com.yahoo.sketches.quantiles.DoublesUnion;
import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.druid.query.aggregation.BufferAggregator;
@ -105,7 +104,7 @@ public class DoublesSketchMergeBufferAggregator implements BufferAggregator
final WritableMemory oldMem = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
if (union.isSameResource(oldMem)) { // union was not relocated on heap
final WritableMemory newMem = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
union = DoublesUnionBuilder.wrap(newMem);
union = DoublesUnion.wrap(newMem);
}
putUnion(newBuffer, newPosition, union);

View File

@ -22,9 +22,11 @@ package org.apache.druid.query.aggregation.datasketches.quantiles;
import com.yahoo.memory.Memory;
import com.yahoo.sketches.quantiles.DoublesSketch;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.segment.data.ObjectStrategy;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
public class DoublesSketchObjectStrategy implements ObjectStrategy<DoublesSketch>
{
@ -41,7 +43,7 @@ public class DoublesSketchObjectStrategy implements ObjectStrategy<DoublesSketch
if (numBytes == 0) {
return DoublesSketchOperations.EMPTY_SKETCH;
}
return DoublesSketch.wrap(Memory.wrap(buffer).region(buffer.position(), numBytes));
return DoublesSketch.wrap(Memory.wrap(buffer, ByteOrder.LITTLE_ENDIAN).region(buffer.position(), numBytes));
}
@Override

View File

@ -42,7 +42,7 @@ public class SketchAggregator implements Aggregator
private void initUnion()
{
union = new SynchronizedUnion((Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION));
union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION);
}
@Override
@ -52,10 +52,12 @@ public class SketchAggregator implements Aggregator
if (update == null) {
return;
}
if (union == null) {
initUnion();
synchronized (this) {
if (union == null) {
initUnion();
}
updateUnion(union, update);
}
updateUnion(union, update);
}
@Override
@ -69,7 +71,9 @@ public class SketchAggregator implements Aggregator
//however, advantage of ordered sketch is that they are faster to "union" later
//given that results from the aggregator will be combined further, it is better
//to return the ordered sketch here
return SketchHolder.of(union.getResult(true, null));
synchronized (this) {
return SketchHolder.of(union.getResult(true, null));
}
}
@Override

View File

@ -30,6 +30,7 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;
public class SketchBufferAggregator implements BufferAggregator
@ -155,7 +156,7 @@ public class SketchBufferAggregator implements BufferAggregator
{
WritableMemory mem = memCache.get(buffer);
if (mem == null) {
mem = WritableMemory.wrap(buffer);
mem = WritableMemory.wrap(buffer, ByteOrder.LITTLE_ENDIAN);
memCache.put(buffer, mem);
}
return mem;

View File

@ -26,6 +26,7 @@ import org.apache.druid.segment.data.ObjectStrategy;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
public class SketchHolderObjectStrategy implements ObjectStrategy<SketchHolder>
{
@ -49,7 +50,7 @@ public class SketchHolderObjectStrategy implements ObjectStrategy<SketchHolder>
return SketchHolder.EMPTY;
}
return SketchHolder.of(Memory.wrap(buffer).region(buffer.position(), numBytes));
return SketchHolder.of(Memory.wrap(buffer, ByteOrder.LITTLE_ENDIAN).region(buffer.position(), numBytes));
}
@Override

View File

@ -1,133 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.theta;
import com.yahoo.memory.Memory;
import com.yahoo.memory.WritableMemory;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.theta.CompactSketch;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
/**
*/
public class SynchronizedUnion extends Union
{
private final Union delegate;
public SynchronizedUnion(Union delegate)
{
this.delegate = delegate;
}
@Override
public synchronized void update(Sketch sketchIn)
{
delegate.update(sketchIn);
}
@Override
public synchronized void update(Memory mem)
{
delegate.update(mem);
}
@Override
public synchronized void update(long datum)
{
delegate.update(datum);
}
@Override
public synchronized void update(double datum)
{
delegate.update(datum);
}
@Override
public synchronized void update(String datum)
{
delegate.update(datum);
}
@Override
@SuppressWarnings("ParameterPackage")
public synchronized void update(byte[] data)
{
delegate.update(data);
}
@Override
@SuppressWarnings("ParameterPackage")
public synchronized void update(int[] data)
{
delegate.update(data);
}
@Override
@SuppressWarnings("ParameterPackage")
public synchronized void update(char[] chars)
{
delegate.update(chars);
}
@Override
public synchronized void update(long[] data)
{
delegate.update(data);
}
@Override
public synchronized CompactSketch getResult(boolean b, WritableMemory memory)
{
return delegate.getResult(b, memory);
}
@Override
public synchronized CompactSketch getResult()
{
return delegate.getResult();
}
@Override
public synchronized byte[] toByteArray()
{
return delegate.toByteArray();
}
@Override
public synchronized void reset()
{
delegate.reset();
}
@Override
public synchronized boolean isSameResource(Memory mem)
{
return delegate.isSameResource(mem);
}
@Override
public synchronized Family getFamily()
{
return delegate.getFamily();
}
}

View File

@ -31,6 +31,7 @@ import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.data.IndexedInts;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -69,7 +70,7 @@ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregat
@Override
public void init(final ByteBuffer buf, final int position)
{
final WritableMemory mem = WritableMemory.wrap(buf);
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries)
.setNumberOfValues(valueSelectors.length)
@ -91,7 +92,7 @@ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregat
// Wrapping memory and ArrayOfDoublesSketch is inexpensive compared to sketch operations.
// Maintaining a cache of wrapped objects per buffer position like in Theta sketch aggregator
// might might be considered, but it would increase complexity including relocate() support.
final WritableMemory mem = WritableMemory.wrap(buf);
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final Lock lock = stripedLock.getAt(lockIndex(position)).writeLock();
lock.lock();
@ -118,7 +119,7 @@ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregat
@Override
public Object get(final ByteBuffer buf, final int position)
{
final WritableMemory mem = WritableMemory.wrap(buf);
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final Lock lock = stripedLock.getAt(lockIndex(position)).readLock();
lock.lock();

View File

@ -30,6 +30,7 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -65,7 +66,7 @@ public class ArrayOfDoublesSketchMergeBufferAggregator implements BufferAggregat
@Override
public void init(final ByteBuffer buf, final int position)
{
final WritableMemory mem = WritableMemory.wrap(buf);
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
new ArrayOfDoublesSetOperationBuilder().setNominalEntries(nominalEntries)
.setNumberOfValues(numberOfValues).buildUnion(region);
@ -86,7 +87,7 @@ public class ArrayOfDoublesSketchMergeBufferAggregator implements BufferAggregat
// Wrapping memory and ArrayOfDoublesUnion is inexpensive compared to union operations.
// Maintaining a cache of wrapped objects per buffer position like in Theta sketch aggregator
// might might be considered, but it would increase complexity including relocate() support.
final WritableMemory mem = WritableMemory.wrap(buf);
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final Lock lock = stripedLock.getAt(ArrayOfDoublesSketchBuildBufferAggregator.lockIndex(position)).writeLock();
lock.lock();
@ -110,7 +111,7 @@ public class ArrayOfDoublesSketchMergeBufferAggregator implements BufferAggregat
@Override
public Object get(final ByteBuffer buf, final int position)
{
final WritableMemory mem = WritableMemory.wrap(buf);
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final Lock lock = stripedLock.getAt(ArrayOfDoublesSketchBuildBufferAggregator.lockIndex(position)).readLock();
lock.lock();

View File

@ -26,6 +26,7 @@ import org.apache.druid.segment.data.ObjectStrategy;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
public class ArrayOfDoublesSketchObjectStrategy implements ObjectStrategy<ArrayOfDoublesSketch>
{
@ -47,7 +48,7 @@ public class ArrayOfDoublesSketchObjectStrategy implements ObjectStrategy<ArrayO
@Override
public ArrayOfDoublesSketch fromByteBuffer(final ByteBuffer buffer, final int numBytes)
{
return ArrayOfDoublesSketches.wrapSketch(Memory.wrap(buffer).region(buffer.position(), numBytes));
return ArrayOfDoublesSketches.wrapSketch(Memory.wrap(buffer, ByteOrder.LITTLE_ENDIAN).region(buffer.position(), numBytes));
}
@Override

View File

@ -200,6 +200,7 @@ public class ArrayOfDoublesSketchAggregationTest
" \"name\": \"union\",",
" \"operation\": \"UNION\",",
" \"nominalEntries\": 1024,",
" \"numberOfValues\": 2,",
" \"fields\": [{\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}, {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}]",
" }},",
" {\"type\": \"arrayOfDoublesSketchToEstimate\", \"name\": \"intersection\", \"field\": {",