From ec9d1827a05f4179f2189eac32287e14287d6126 Mon Sep 17 00:00:00 2001 From: Alexander Saydakov Date: Tue, 23 Oct 2018 11:20:19 -0700 Subject: [PATCH] updated to use the latest sketches-core-0.12.0 (#6381) --- extensions-core/datasketches/pom.xml | 2 +- .../hll/HllSketchBuildBufferAggregator.java | 3 +- .../hll/HllSketchMergeBufferAggregator.java | 7 +- .../hll/HllSketchObjectStrategy.java | 3 +- .../DoublesSketchBuildBufferAggregator.java | 3 +- .../DoublesSketchMergeBufferAggregator.java | 3 +- .../DoublesSketchObjectStrategy.java | 4 +- .../datasketches/theta/SketchAggregator.java | 14 +- .../theta/SketchBufferAggregator.java | 3 +- .../theta/SketchHolderObjectStrategy.java | 3 +- .../datasketches/theta/SynchronizedUnion.java | 133 ------------------ ...yOfDoublesSketchBuildBufferAggregator.java | 7 +- ...yOfDoublesSketchMergeBufferAggregator.java | 7 +- .../ArrayOfDoublesSketchObjectStrategy.java | 3 +- .../ArrayOfDoublesSketchAggregationTest.java | 1 + 15 files changed, 39 insertions(+), 157 deletions(-) delete mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SynchronizedUnion.java diff --git a/extensions-core/datasketches/pom.xml b/extensions-core/datasketches/pom.xml index 1abb026ffe0..ef44977eb27 100644 --- a/extensions-core/datasketches/pom.xml +++ b/extensions-core/datasketches/pom.xml @@ -38,7 +38,7 @@ com.yahoo.datasketches sketches-core - 0.10.3 + 0.12.0 org.apache.commons diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java index cab80195f8e..bd2d047cbe9 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java @@ -20,6 +20,7 @@ package org.apache.druid.query.aggregation.datasketches.hll; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.IdentityHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -135,7 +136,7 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator private WritableMemory getMemory(final ByteBuffer buf) { - return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b)); + return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN)); } /** diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java index 3477bb0e16c..8e72d236911 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java @@ -20,6 +20,7 @@ package org.apache.druid.query.aggregation.datasketches.hll; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -66,7 +67,7 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator @Override public void init(final ByteBuffer buf, final int position) { - final WritableMemory mem = WritableMemory.wrap(buf).writableRegion(position, size); + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size); // Not necessary to keep the constructed object since it is cheap to reconstruct by wrapping the memory. // The objects are not cached as in BuildBufferAggregator since they never exceed the max size and never move. // So it is easier to reconstruct them by wrapping memory then to keep position-to-object mappings. @@ -85,7 +86,7 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator if (sketch == null) { return; } - final WritableMemory mem = WritableMemory.wrap(buf).writableRegion(position, size); + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size); final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).writeLock(); lock.lock(); try { @@ -105,7 +106,7 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator @Override public Object get(final ByteBuffer buf, final int position) { - final WritableMemory mem = WritableMemory.wrap(buf).writableRegion(position, size); + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size); final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).readLock(); lock.lock(); try { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchObjectStrategy.java index 2f8ab0d4424..6ae54f9e419 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchObjectStrategy.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchObjectStrategy.java @@ -20,6 +20,7 @@ package org.apache.druid.query.aggregation.datasketches.hll; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import com.yahoo.memory.Memory; import com.yahoo.sketches.hll.HllSketch; @@ -46,7 +47,7 @@ public class HllSketchObjectStrategy implements ObjectStrategy @Override public HllSketch fromByteBuffer(final ByteBuffer buf, final int size) { - return HllSketch.wrap(Memory.wrap(buf).region(buf.position(), size)); + return HllSketch.wrap(Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN).region(buf.position(), size)); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java index 48cd62213ee..a1e52162800 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java @@ -28,6 +28,7 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.IdentityHashMap; public class DoublesSketchBuildBufferAggregator implements BufferAggregator @@ -112,7 +113,7 @@ public class DoublesSketchBuildBufferAggregator implements BufferAggregator private WritableMemory getMemory(final ByteBuffer buffer) { - return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf)); + return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)); } private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java index 337c1d744cd..ffe9009ed48 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java @@ -22,7 +22,6 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; import com.yahoo.memory.WritableMemory; import com.yahoo.sketches.quantiles.DoublesSketch; import com.yahoo.sketches.quantiles.DoublesUnion; -import com.yahoo.sketches.quantiles.DoublesUnionBuilder; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import org.apache.druid.query.aggregation.BufferAggregator; @@ -105,7 +104,7 @@ public class DoublesSketchMergeBufferAggregator implements BufferAggregator final WritableMemory oldMem = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize); if (union.isSameResource(oldMem)) { // union was not relocated on heap final WritableMemory newMem = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize); - union = DoublesUnionBuilder.wrap(newMem); + union = DoublesUnion.wrap(newMem); } putUnion(newBuffer, newPosition, union); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchObjectStrategy.java index aa66f272bd9..618799d5796 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchObjectStrategy.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchObjectStrategy.java @@ -22,9 +22,11 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; import com.yahoo.memory.Memory; import com.yahoo.sketches.quantiles.DoublesSketch; import it.unimi.dsi.fastutil.bytes.ByteArrays; + import org.apache.druid.segment.data.ObjectStrategy; import java.nio.ByteBuffer; +import java.nio.ByteOrder; public class DoublesSketchObjectStrategy implements ObjectStrategy { @@ -41,7 +43,7 @@ public class DoublesSketchObjectStrategy implements ObjectStrategy { @@ -49,7 +50,7 @@ public class SketchHolderObjectStrategy implements ObjectStrategy return SketchHolder.EMPTY; } - return SketchHolder.of(Memory.wrap(buffer).region(buffer.position(), numBytes)); + return SketchHolder.of(Memory.wrap(buffer, ByteOrder.LITTLE_ENDIAN).region(buffer.position(), numBytes)); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SynchronizedUnion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SynchronizedUnion.java deleted file mode 100644 index 1403a9d5431..00000000000 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SynchronizedUnion.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.datasketches.theta; - -import com.yahoo.memory.Memory; -import com.yahoo.memory.WritableMemory; -import com.yahoo.sketches.Family; -import com.yahoo.sketches.theta.CompactSketch; -import com.yahoo.sketches.theta.Sketch; -import com.yahoo.sketches.theta.Union; - -/** - */ -public class SynchronizedUnion extends Union -{ - private final Union delegate; - - public SynchronizedUnion(Union delegate) - { - this.delegate = delegate; - } - - @Override - public synchronized void update(Sketch sketchIn) - { - delegate.update(sketchIn); - } - - @Override - public synchronized void update(Memory mem) - { - delegate.update(mem); - } - - @Override - public synchronized void update(long datum) - { - delegate.update(datum); - } - - @Override - public synchronized void update(double datum) - { - delegate.update(datum); - } - - @Override - public synchronized void update(String datum) - { - delegate.update(datum); - } - - @Override - @SuppressWarnings("ParameterPackage") - public synchronized void update(byte[] data) - { - delegate.update(data); - } - - @Override - @SuppressWarnings("ParameterPackage") - public synchronized void update(int[] data) - { - delegate.update(data); - } - - @Override - @SuppressWarnings("ParameterPackage") - public synchronized void update(char[] chars) - { - delegate.update(chars); - } - - @Override - public synchronized void update(long[] data) - { - delegate.update(data); - } - - @Override - public synchronized CompactSketch getResult(boolean b, WritableMemory memory) - { - return delegate.getResult(b, memory); - } - - @Override - public synchronized CompactSketch getResult() - { - return delegate.getResult(); - } - - @Override - public synchronized byte[] toByteArray() - { - return delegate.toByteArray(); - } - - @Override - public synchronized void reset() - { - delegate.reset(); - } - - @Override - public synchronized boolean isSameResource(Memory mem) - { - return delegate.isSameResource(mem); - } - - @Override - public synchronized Family getFamily() - { - return delegate.getFamily(); - } - -} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java index aa0cf392790..051d59fca69 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java @@ -31,6 +31,7 @@ import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.data.IndexedInts; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -69,7 +70,7 @@ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregat @Override public void init(final ByteBuffer buf, final int position) { - final WritableMemory mem = WritableMemory.wrap(buf); + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN); final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries) .setNumberOfValues(valueSelectors.length) @@ -91,7 +92,7 @@ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregat // Wrapping memory and ArrayOfDoublesSketch is inexpensive compared to sketch operations. // Maintaining a cache of wrapped objects per buffer position like in Theta sketch aggregator // might might be considered, but it would increase complexity including relocate() support. - final WritableMemory mem = WritableMemory.wrap(buf); + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN); final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); final Lock lock = stripedLock.getAt(lockIndex(position)).writeLock(); lock.lock(); @@ -118,7 +119,7 @@ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregat @Override public Object get(final ByteBuffer buf, final int position) { - final WritableMemory mem = WritableMemory.wrap(buf); + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN); final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); final Lock lock = stripedLock.getAt(lockIndex(position)).readLock(); lock.lock(); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeBufferAggregator.java index af6d3c7bae2..f96280d0e6b 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeBufferAggregator.java @@ -30,6 +30,7 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseObjectColumnValueSelector; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -65,7 +66,7 @@ public class ArrayOfDoublesSketchMergeBufferAggregator implements BufferAggregat @Override public void init(final ByteBuffer buf, final int position) { - final WritableMemory mem = WritableMemory.wrap(buf); + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN); final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); new ArrayOfDoublesSetOperationBuilder().setNominalEntries(nominalEntries) .setNumberOfValues(numberOfValues).buildUnion(region); @@ -86,7 +87,7 @@ public class ArrayOfDoublesSketchMergeBufferAggregator implements BufferAggregat // Wrapping memory and ArrayOfDoublesUnion is inexpensive compared to union operations. // Maintaining a cache of wrapped objects per buffer position like in Theta sketch aggregator // might might be considered, but it would increase complexity including relocate() support. - final WritableMemory mem = WritableMemory.wrap(buf); + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN); final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); final Lock lock = stripedLock.getAt(ArrayOfDoublesSketchBuildBufferAggregator.lockIndex(position)).writeLock(); lock.lock(); @@ -110,7 +111,7 @@ public class ArrayOfDoublesSketchMergeBufferAggregator implements BufferAggregat @Override public Object get(final ByteBuffer buf, final int position) { - final WritableMemory mem = WritableMemory.wrap(buf); + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN); final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); final Lock lock = stripedLock.getAt(ArrayOfDoublesSketchBuildBufferAggregator.lockIndex(position)).readLock(); lock.lock(); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchObjectStrategy.java index e2157da3411..aceef651772 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchObjectStrategy.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchObjectStrategy.java @@ -26,6 +26,7 @@ import org.apache.druid.segment.data.ObjectStrategy; import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.nio.ByteOrder; public class ArrayOfDoublesSketchObjectStrategy implements ObjectStrategy { @@ -47,7 +48,7 @@ public class ArrayOfDoublesSketchObjectStrategy implements ObjectStrategy