mirror of https://github.com/apache/druid.git
HLL: avoid direct buffers + fix swap target buffer
This commit is contained in:
parent
4c7399bfbf
commit
30d1a5db75
|
@ -326,27 +326,26 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
convertToMutableByteBuffer();
|
convertToMutableByteBuffer();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
|
||||||
|
convertToDenseStorage();
|
||||||
|
}
|
||||||
|
|
||||||
estimatedCardinality = null;
|
estimatedCardinality = null;
|
||||||
|
|
||||||
if (getRegisterOffset() < other.getRegisterOffset()) {
|
if (getRegisterOffset() < other.getRegisterOffset()) {
|
||||||
// "Swap" the buffers so that we are folding into the one with the higher offset
|
// "Swap" the buffers so that we are folding into the one with the higher offset
|
||||||
ByteBuffer newStorage = ByteBuffer.allocateDirect(other.storageBuffer.remaining());
|
final ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining());
|
||||||
newStorage.put(other.storageBuffer.asReadOnlyBuffer());
|
tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
|
||||||
newStorage.clear();
|
tmpBuffer.clear();
|
||||||
|
|
||||||
other.storageBuffer = storageBuffer;
|
storageBuffer.duplicate().put(other.storageBuffer.asReadOnlyBuffer());
|
||||||
other.initPosition = initPosition;
|
|
||||||
storageBuffer = newStorage;
|
other = HyperLogLogCollector.makeCollector(tmpBuffer);
|
||||||
initPosition = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
|
final ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
|
||||||
final byte otherOffset = other.getRegisterOffset();
|
final byte otherOffset = other.getRegisterOffset();
|
||||||
|
|
||||||
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
|
|
||||||
convertToDenseStorage();
|
|
||||||
}
|
|
||||||
|
|
||||||
byte myOffset = getRegisterOffset();
|
byte myOffset = getRegisterOffset();
|
||||||
short numNonZero = getNumNonZeroRegisters();
|
short numNonZero = getNumNonZeroRegisters();
|
||||||
|
|
||||||
|
@ -540,7 +539,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
|
|
||||||
private void convertToMutableByteBuffer()
|
private void convertToMutableByteBuffer()
|
||||||
{
|
{
|
||||||
ByteBuffer tmpBuffer = ByteBuffer.allocateDirect(storageBuffer.remaining());
|
ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining());
|
||||||
tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
|
tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
|
||||||
tmpBuffer.position(0);
|
tmpBuffer.position(0);
|
||||||
storageBuffer = tmpBuffer;
|
storageBuffer = tmpBuffer;
|
||||||
|
@ -549,7 +548,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
|
|
||||||
private void convertToDenseStorage()
|
private void convertToDenseStorage()
|
||||||
{
|
{
|
||||||
ByteBuffer tmpBuffer = ByteBuffer.allocateDirect(getNumBytesForDenseStorage());
|
ByteBuffer tmpBuffer = ByteBuffer.allocate(getNumBytesForDenseStorage());
|
||||||
// put header
|
// put header
|
||||||
setVersion(tmpBuffer);
|
setVersion(tmpBuffer);
|
||||||
setRegisterOffset(tmpBuffer, getRegisterOffset());
|
setRegisterOffset(tmpBuffer, getRegisterOffset());
|
||||||
|
|
|
@ -67,7 +67,7 @@ public class HyperUniquesBufferAggregator implements BufferAggregator
|
||||||
public Object get(ByteBuffer buf, int position)
|
public Object get(ByteBuffer buf, int position)
|
||||||
{
|
{
|
||||||
final int size = HyperLogLogCollector.getLatestNumBytesForDenseStorage();
|
final int size = HyperLogLogCollector.getLatestNumBytesForDenseStorage();
|
||||||
ByteBuffer dataCopyBuffer = ByteBuffer.allocateDirect(size);
|
ByteBuffer dataCopyBuffer = ByteBuffer.allocate(size);
|
||||||
ByteBuffer mutationBuffer = buf.duplicate();
|
ByteBuffer mutationBuffer = buf.duplicate();
|
||||||
mutationBuffer.position(position);
|
mutationBuffer.position(position);
|
||||||
mutationBuffer.limit(position + size);
|
mutationBuffer.limit(position + size);
|
||||||
|
|
Loading…
Reference in New Issue