Add HashVectorGrouper based on MemoryOpenHashTable. (#9314)

* Add HashVectorGrouper based on MemoryOpenHashTable.

Additional supporting changes:

1) Modifies VectorGrouper interface to use Memory instead of ByteBuffers.
2) Modifies BufferArrayGrouper to match the new VectorGrouper interface.
3) Removes "implements VectorGrouper" from BufferHashGrouper.

* Fix comment.

* Fix another comment.

* Remove unused stuff.

* Include hoisted bounds checks.

* Checks against too-large keySpaces.
This commit is contained in:
Gian Merlino 2020-02-06 15:29:14 -08:00 committed by GitHub
parent 2e1dbe598c
commit 0aa7a2a3ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 612 additions and 325 deletions

View File

@ -21,6 +21,8 @@ package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
@ -29,9 +31,9 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.NoSuchElementException;
@ -57,7 +59,7 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
private final int recordSize; // size of all aggregated values
private boolean initialized = false;
private ByteBuffer usedFlagBuffer;
private WritableMemory usedFlagMemory;
private ByteBuffer valBuffer;
// Scratch objects used by aggregateVector(). Only set if initVectorized() is called.
@ -127,7 +129,7 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
// Slice up the buffer.
buffer.position(0);
buffer.limit(usedFlagBufferEnd);
usedFlagBuffer = buffer.slice();
usedFlagMemory = WritableMemory.wrap(buffer.slice(), ByteOrder.nativeOrder());
buffer.position(usedFlagBufferEnd);
buffer.limit(buffer.capacity());
@ -169,9 +171,21 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
}
@Override
public AggregateResult aggregateVector(int[] keySpace, int startRow, int endRow)
public AggregateResult aggregateVector(Memory keySpace, int startRow, int endRow)
{
if (keySpace.length == 0) {
final int numRows = endRow - startRow;
// Hoisted bounds check on keySpace.
if (keySpace.getCapacity() < (long) numRows * Integer.BYTES) {
throw new IAE("Not enough keySpace capacity for the provided start/end rows");
}
// We use integer indexes into the keySpace.
if (keySpace.getCapacity() > Integer.MAX_VALUE) {
throw new ISE("keySpace too large to handle");
}
if (keySpace.getCapacity() == 0) {
// Empty key space, assume keys are all zeroes.
final int dimIndex = 1;
@ -184,11 +198,9 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
endRow
);
} else {
final int numRows = endRow - startRow;
for (int i = 0; i < numRows; i++) {
// +1 matches what hashFunction() would do.
final int dimIndex = keySpace[i] + 1;
final int dimIndex = keySpace.getInt(i * Integer.BYTES) + 1;
if (dimIndex < 0 || dimIndex >= cardinalityWithMissingValue) {
throw new IAE("Invalid dimIndex[%s]", dimIndex);
@ -214,10 +226,12 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
{
final int index = dimIndex / Byte.SIZE;
final int extraIndex = dimIndex % Byte.SIZE;
final int usedFlagByte = 1 << extraIndex;
final int usedFlagMask = 1 << extraIndex;
if ((usedFlagBuffer.get(index) & usedFlagByte) == 0) {
usedFlagBuffer.put(index, (byte) (usedFlagBuffer.get(index) | (1 << extraIndex)));
final byte currentByte = usedFlagMemory.getByte(index);
if ((currentByte & usedFlagMask) == 0) {
usedFlagMemory.putByte(index, (byte) (currentByte | usedFlagMask));
aggregators.init(valBuffer, dimIndex * recordSize);
}
}
@ -226,26 +240,16 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
{
final int index = dimIndex / Byte.SIZE;
final int extraIndex = dimIndex % Byte.SIZE;
final int usedFlagByte = 1 << extraIndex;
final int usedFlagMask = 1 << extraIndex;
return (usedFlagBuffer.get(index) & usedFlagByte) != 0;
return (usedFlagMemory.getByte(index) & usedFlagMask) != 0;
}
@Override
public void reset()
{
// Clear the entire usedFlagBuffer
final int usedFlagBufferCapacity = usedFlagBuffer.capacity();
// putLong() instead of put() can boost the performance of clearing the buffer
final int n = (usedFlagBufferCapacity / Long.BYTES) * Long.BYTES;
for (int i = 0; i < n; i += Long.BYTES) {
usedFlagBuffer.putLong(i, 0L);
}
for (int i = n; i < usedFlagBufferCapacity; i++) {
usedFlagBuffer.put(i, (byte) 0);
}
usedFlagMemory.clear();
}
@Override
@ -261,11 +265,11 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
}
@Override
public CloseableIterator<Entry<ByteBuffer>> iterator()
public CloseableIterator<Entry<Memory>> iterator()
{
final CloseableIterator<Entry<Integer>> iterator = iterator(false);
final ByteBuffer keyBuffer = ByteBuffer.allocate(Integer.BYTES);
return new CloseableIterator<Entry<ByteBuffer>>()
final WritableMemory keyMemory = WritableMemory.allocate(Integer.BYTES);
return new CloseableIterator<Entry<Memory>>()
{
@Override
public boolean hasNext()
@ -274,11 +278,11 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
}
@Override
public Entry<ByteBuffer> next()
public Entry<Memory> next()
{
final Entry<Integer> integerEntry = iterator.next();
keyBuffer.putInt(0, integerEntry.getKey());
return new Entry<>(keyBuffer, integerEntry.getValues());
keyMemory.putInt(0, integerEntry.getKey());
return new Entry<>(keyMemory, integerEntry.getValues());
}
@Override

View File

@ -20,16 +20,13 @@
package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.AbstractList;
import java.util.Collections;
@ -37,7 +34,7 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.ToIntFunction;
public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyType> implements VectorGrouper
public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyType>
{
private static final int MIN_INITIAL_BUCKETS = 4;
private static final int DEFAULT_INITIAL_BUCKETS = 1024;
@ -57,16 +54,6 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
@Nullable
private ByteBufferIntList offsetList;
// Scratch objects used by aggregateVector(). Only set if initVectorized() is called.
@Nullable
private ByteBuffer vKeyBuffer = null;
@Nullable
private int[] vKeyHashCodes = null;
@Nullable
private int[] vAggregationPositions = null;
@Nullable
private int[] vAggregationRows = null;
public BufferHashGrouper(
final Supplier<ByteBuffer> bufferSupplier,
final KeySerde<KeyType> keySerde,
@ -135,96 +122,6 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
}
}
@Override
public void initVectorized(final int maxVectorSize)
{
if (!ByteBuffer.class.equals(keySerde.keyClazz())) {
throw new ISE("keyClazz[%s] must be ByteBuffer", keySerde.keyClazz());
}
if (keySize % Integer.BYTES != 0) {
throw new ISE("keySize[%s] must be a multiple of[%s]", keySize, Integer.BYTES);
}
init();
this.vKeyBuffer = ByteBuffer.allocate(keySize);
this.vKeyHashCodes = new int[maxVectorSize];
this.vAggregationPositions = new int[maxVectorSize];
this.vAggregationRows = new int[maxVectorSize];
}
@Override
public AggregateResult aggregateVector(final int[] keySpace, final int startRow, final int endRow)
{
final int keyIntSize = keySize / Integer.BYTES;
final int numRows = endRow - startRow;
// Initialize vKeyHashCodes: one int per key.
// Does *not* use hashFunction(). This is okay because the API of VectorGrouper does not expose any way of messing
// about with hash codes.
for (int i = 0, rowStart = 0; i < numRows; i++, rowStart += keyIntSize) {
vKeyHashCodes[i] = Groupers.hashIntArray(keySpace, rowStart, keyIntSize);
}
final MutableInt aggregationStartRow = new MutableInt(startRow);
final MutableInt aggregationNumRows = new MutableInt(0);
for (int rowNum = 0, keySpacePosition = 0; rowNum < numRows; rowNum++, keySpacePosition += keyIntSize) {
// Copy current key into keyBuffer.
vKeyBuffer.rewind();
for (int i = 0; i < keyIntSize; i++) {
vKeyBuffer.putInt(keySpace[keySpacePosition + i]);
}
vKeyBuffer.rewind();
// Find, and if the table is full, expand and find again.
int bucket = hashTable.findBucketWithAutoGrowth(
vKeyBuffer,
vKeyHashCodes[rowNum],
() -> {
if (aggregationNumRows.intValue() > 0) {
doAggregateVector(aggregationStartRow.intValue(), aggregationNumRows.intValue());
aggregationStartRow.setValue(aggregationStartRow.intValue() + aggregationNumRows.intValue());
aggregationNumRows.setValue(0);
}
}
);
if (bucket < 0) {
// This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
// be correct.
// Aggregate any remaining rows.
if (aggregationNumRows.intValue() > 0) {
doAggregateVector(aggregationStartRow.intValue(), aggregationNumRows.intValue());
}
return Groupers.hashTableFull(rowNum);
}
final int bucketStartOffset = hashTable.getOffsetForBucket(bucket);
final boolean bucketWasUsed = hashTable.isBucketUsed(bucket);
// Set up key and initialize the aggs if this is a new bucket.
if (!bucketWasUsed) {
hashTable.initializeNewBucketKey(bucket, vKeyBuffer, vKeyHashCodes[rowNum]);
aggregators.init(hashTable.getTableBuffer(), bucketStartOffset + baseAggregatorOffset);
}
// Schedule the current row for aggregation.
vAggregationPositions[aggregationNumRows.intValue()] = bucketStartOffset + Integer.BYTES + keySize;
aggregationNumRows.increment();
}
// Aggregate any remaining rows.
if (aggregationNumRows.intValue() > 0) {
doAggregateVector(aggregationStartRow.intValue(), aggregationNumRows.intValue());
}
return AggregateResult.ok();
}
@Override
public boolean isInitialized()
{
@ -263,15 +160,6 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
keySerde.reset();
}
@Override
@SuppressWarnings("unchecked")
public CloseableIterator<Entry<ByteBuffer>> iterator()
{
// Unchecked cast, since this method is only called through the VectorGrouper interface, which uses
// ByteBuffer keys (and this is verified in initVectorized).
return (CloseableIterator) iterator(false);
}
@Override
public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
{
@ -403,16 +291,6 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
}
}
private void doAggregateVector(final int startRow, final int numRows)
{
aggregators.aggregateVector(
hashTable.getTableBuffer(),
numRows,
vAggregationPositions,
Groupers.writeAggregationRows(vAggregationRows, startRow, startRow + numRows)
);
}
private class BufferGrouperBucketUpdateHandler implements ByteBufferHashTable.BucketUpdateHandler
{
@Override

View File

@ -1,91 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.collect.ImmutableList;
import org.apache.druid.query.aggregation.AggregatorFactory;
import java.nio.ByteBuffer;
import java.util.List;
public class ByteBufferKeySerde implements Grouper.KeySerde<ByteBuffer>
{
private final int keySize;
public ByteBufferKeySerde(final int keySize)
{
this.keySize = keySize;
}
@Override
public int keySize()
{
return keySize;
}
@Override
public Class<ByteBuffer> keyClazz()
{
return ByteBuffer.class;
}
@Override
public List<String> getDictionary()
{
return ImmutableList.of();
}
@Override
public ByteBuffer toByteBuffer(ByteBuffer key)
{
return key;
}
@Override
public ByteBuffer fromByteBuffer(ByteBuffer buffer, int position)
{
final ByteBuffer dup = buffer.duplicate();
dup.position(position).limit(position + keySize);
return dup.slice();
}
@Override
public Grouper.BufferComparator bufferComparator()
{
// This class is used by segment processing engines, where bufferComparator will not be called.
throw new UnsupportedOperationException();
}
@Override
public Grouper.BufferComparator bufferComparatorWithAggregators(
AggregatorFactory[] aggregatorFactories,
int[] aggregatorOffsets
)
{
// This class is used by segment processing engines, where bufferComparatorWithAggregators will not be called.
throw new UnsupportedOperationException();
}
@Override
public void reset()
{
// No state, nothing to reset
}
}

View File

@ -86,18 +86,6 @@ public class Groupers
return smear(obj.hashCode()) & USED_FLAG_MASK;
}
public static int hashIntArray(final int[] ints, final int start, final int length)
{
// Similar to what Arrays.hashCode would do.
// Also apply the "smear" function, to improve distribution.
int hashCode = 1;
for (int i = 0; i < length; i++) {
hashCode = 31 * hashCode + ints[start + i];
}
return smear(hashCode) & USED_FLAG_MASK;
}
static int getUsedFlag(int keyHash)
{
return keyHash | 0x80000000;

View File

@ -0,0 +1,477 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import it.unimi.dsi.fastutil.HashCommon;
import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.groupby.epinephelinae.collection.HashTableUtils;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryOpenHashTable;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collections;
/**
* An implementation of {@link VectorGrouper} backed by a growable {@link MemoryOpenHashTable}. Growability is
* implemented in this class because {@link MemoryOpenHashTable} is not innately growable.
*/
public class HashVectorGrouper implements VectorGrouper
{
private static final int MIN_BUCKETS = 4;
private static final int DEFAULT_INITIAL_BUCKETS = 1024;
private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f;
private boolean initialized = false;
private int maxNumBuckets;
private final Supplier<ByteBuffer> bufferSupplier;
private final AggregatorAdapters aggregators;
private final int keySize;
private final int bufferGrouperMaxSize;
private final int configuredInitialNumBuckets;
private final int bucketSize;
private final float maxLoadFactor;
private ByteBuffer buffer;
private int tableStart = 0;
@Nullable
private MemoryOpenHashTable hashTable;
// Scratch objects used by aggregateVector(). Set by initVectorized().
@Nullable
private int[] vKeyHashCodes = null;
@Nullable
private int[] vAggregationPositions = null;
@Nullable
private int[] vAggregationRows = null;
public HashVectorGrouper(
final Supplier<ByteBuffer> bufferSupplier,
final int keySize,
final AggregatorAdapters aggregators,
final int bufferGrouperMaxSize,
final float maxLoadFactor,
final int configuredInitialNumBuckets
)
{
this.bufferSupplier = bufferSupplier;
this.keySize = keySize;
this.aggregators = aggregators;
this.bufferGrouperMaxSize = bufferGrouperMaxSize;
this.maxLoadFactor = maxLoadFactor > 0 ? maxLoadFactor : DEFAULT_MAX_LOAD_FACTOR;
this.configuredInitialNumBuckets = configuredInitialNumBuckets >= MIN_BUCKETS
? configuredInitialNumBuckets
: DEFAULT_INITIAL_BUCKETS;
this.bucketSize = MemoryOpenHashTable.bucketSize(keySize, aggregators.spaceNeeded());
if (this.maxLoadFactor >= 1.0f) {
throw new IAE("Invalid maxLoadFactor[%f], must be < 1.0", maxLoadFactor);
}
}
@Override
public void initVectorized(final int maxVectorSize)
{
if (!initialized) {
this.buffer = bufferSupplier.get();
this.maxNumBuckets = Math.max(
computeRoundedInitialNumBuckets(buffer.capacity(), bucketSize, configuredInitialNumBuckets),
computeMaxNumBucketsAfterGrowth(buffer.capacity(), bucketSize)
);
reset();
this.vKeyHashCodes = new int[maxVectorSize];
this.vAggregationPositions = new int[maxVectorSize];
this.vAggregationRows = new int[maxVectorSize];
initialized = true;
}
}
@Override
public AggregateResult aggregateVector(final Memory keySpace, final int startRow, final int endRow)
{
final int numRows = endRow - startRow;
// Hoisted bounds check on keySpace.
if (keySpace.getCapacity() < (long) numRows * keySize) {
throw new IAE("Not enough keySpace capacity for the provided start/end rows");
}
// We use integer indexes into the keySpace.
if (keySpace.getCapacity() > Integer.MAX_VALUE) {
throw new ISE("keySpace too large to handle");
}
// Initialize vKeyHashCodes: one int per key.
// Does *not* use hashFunction(). This is okay because the API of VectorGrouper does not expose any way of messing
// about with hash codes.
for (int rowNum = 0, keySpacePosition = 0; rowNum < numRows; rowNum++, keySpacePosition += keySize) {
vKeyHashCodes[rowNum] = Groupers.smear(HashTableUtils.hashMemory(keySpace, keySpacePosition, keySize));
}
int aggregationStartRow = startRow;
int aggregationNumRows = 0;
final int aggregatorStartOffset = hashTable.bucketValueOffset();
for (int rowNum = 0, keySpacePosition = 0; rowNum < numRows; rowNum++, keySpacePosition += keySize) {
// Find, and if the table is full, expand and find again.
int bucket = hashTable.findBucket(vKeyHashCodes[rowNum], keySpace, keySpacePosition);
if (bucket < 0) {
// Bucket not yet initialized.
if (hashTable.canInsertNewBucket()) {
// There's space, initialize it and move on.
bucket = -(bucket + 1);
initBucket(bucket, keySpace, keySpacePosition);
} else {
// Out of space. Finish up unfinished aggregations, then try to grow.
if (aggregationNumRows > 0) {
doAggregateVector(aggregationStartRow, aggregationNumRows);
aggregationStartRow = aggregationStartRow + aggregationNumRows;
aggregationNumRows = 0;
}
if (grow() && hashTable.canInsertNewBucket()) {
bucket = hashTable.findBucket(vKeyHashCodes[rowNum], keySpace, keySpacePosition);
bucket = -(bucket + 1);
initBucket(bucket, keySpace, keySpacePosition);
} else {
// This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message
// will be correct.
return Groupers.hashTableFull(rowNum);
}
}
}
// Schedule the current row for aggregation.
vAggregationPositions[aggregationNumRows] = bucket * bucketSize + aggregatorStartOffset;
aggregationNumRows++;
}
// Aggregate any remaining rows.
if (aggregationNumRows > 0) {
doAggregateVector(aggregationStartRow, aggregationNumRows);
}
return AggregateResult.ok();
}
@Override
public void reset()
{
// Compute initial hash table size (numBuckets).
final int numBuckets = computeRoundedInitialNumBuckets(buffer.capacity(), bucketSize, configuredInitialNumBuckets);
assert numBuckets <= maxNumBuckets;
if (numBuckets == maxNumBuckets) {
// Maximum-sized tables start at zero.
tableStart = 0;
} else {
// The first table, if not maximum-sized, starts at the latest possible position (where the penultimate
// table ends at the end of the buffer).
tableStart = buffer.capacity() - bucketSize * (maxNumBuckets - numBuckets);
}
final ByteBuffer tableBuffer = buffer.duplicate();
tableBuffer.position(0);
tableBuffer.limit(MemoryOpenHashTable.memoryNeeded(numBuckets, bucketSize));
this.hashTable = new MemoryOpenHashTable(
WritableMemory.wrap(tableBuffer.slice(), ByteOrder.nativeOrder()),
numBuckets,
Math.max(1, Math.min(bufferGrouperMaxSize, (int) (numBuckets * maxLoadFactor))),
keySize,
aggregators.spaceNeeded()
);
}
@Override
public CloseableIterator<Grouper.Entry<Memory>> iterator()
{
if (!initialized) {
// it's possible for iterator() to be called before initialization when
// a nested groupBy's subquery has an empty result set (see testEmptySubquery() in GroupByQueryRunnerTest)
return CloseableIterators.withEmptyBaggage(Collections.emptyIterator());
}
final IntIterator baseIterator = hashTable.bucketIterator();
return new CloseableIterator<Grouper.Entry<Memory>>()
{
@Override
public boolean hasNext()
{
return baseIterator.hasNext();
}
@Override
public Grouper.Entry<Memory> next()
{
final int bucket = baseIterator.nextInt();
final int bucketPosition = hashTable.bucketMemoryPosition(bucket);
final Memory keyMemory = hashTable.memory().region(
bucketPosition + hashTable.bucketKeyOffset(),
hashTable.keySize()
);
final Object[] values = new Object[aggregators.size()];
final int aggregatorsOffset = bucketPosition + hashTable.bucketValueOffset();
for (int i = 0; i < aggregators.size(); i++) {
values[i] = aggregators.get(hashTable.memory().getByteBuffer(), aggregatorsOffset, i);
}
return new Grouper.Entry<>(keyMemory, values);
}
@Override
public void close()
{
// Do nothing.
}
};
}
@Override
public void close()
{
// Nothing to do.
}
/**
* Initializes the given bucket with the given key and fresh, empty aggregation state. Must only be called if
* {@code hashTable.canInsertNewBucket()} returns true and if this bucket is currently unused.
*/
private void initBucket(final int bucket, final Memory keySpace, final int keySpacePosition)
{
assert bucket >= 0 && bucket < maxNumBuckets && hashTable != null && hashTable.canInsertNewBucket();
hashTable.initBucket(bucket, keySpace, keySpacePosition);
aggregators.init(hashTable.memory().getByteBuffer(), bucket * bucketSize + hashTable.bucketValueOffset());
}
/**
* Aggregate the current vector from "startRow" (inclusive) to "endRow" (exclusive) into aggregation positions
* given by {@link #vAggregationPositions}.
*/
private void doAggregateVector(final int startRow, final int numRows)
{
aggregators.aggregateVector(
hashTable.memory().getByteBuffer(),
numRows,
vAggregationPositions,
Groupers.writeAggregationRows(vAggregationRows, startRow, startRow + numRows)
);
}
/**
* Attempts to grow the table and returns whether or not it was possible. Each growth doubles the number of buckets
* in the table.
*/
private boolean grow()
{
if (hashTable.numBuckets() >= maxNumBuckets) {
return false;
}
final int newNumBuckets = nextTableNumBuckets();
final int newTableStart = nextTableStart();
final ByteBuffer newTableBuffer = buffer.duplicate();
newTableBuffer.position(newTableStart);
newTableBuffer.limit(newTableStart + MemoryOpenHashTable.memoryNeeded(newNumBuckets, bucketSize));
final MemoryOpenHashTable newHashTable = new MemoryOpenHashTable(
WritableMemory.wrap(newTableBuffer.slice(), ByteOrder.nativeOrder()),
newNumBuckets,
maxSizeForNumBuckets(newNumBuckets, maxLoadFactor, bufferGrouperMaxSize),
keySize,
aggregators.spaceNeeded()
);
hashTable.copyTo(newHashTable, new HashVectorGrouperBucketCopyHandler(aggregators, hashTable.bucketValueOffset()));
hashTable = newHashTable;
tableStart = newTableStart;
return true;
}
/**
* Returns the table size after the next growth. Each growth doubles the number of buckets, so this will be
* double the current number of buckets.
*
* @throws IllegalStateException if not initialized or if growing is not possible
*/
private int nextTableNumBuckets()
{
if (!initialized) {
throw new ISE("Must be initialized");
}
if (hashTable.numBuckets() >= maxNumBuckets) {
throw new ISE("No room left to grow");
}
return hashTable.numBuckets() * 2;
}
/**
* Returns the start of the table within {@link #buffer} after the next growth. Each growth starts from the end of
* the previous table.
*
* @throws IllegalStateException if not initialized or if growing is not possible
*/
private int nextTableStart()
{
if (!initialized) {
throw new ISE("Must be initialized");
}
final int nextNumBuckets = nextTableNumBuckets();
final int currentEnd = tableStart + MemoryOpenHashTable.memoryNeeded(hashTable.numBuckets(), bucketSize);
final int nextTableStart;
if (nextNumBuckets == maxNumBuckets) {
assert currentEnd == buffer.capacity();
nextTableStart = 0;
} else {
nextTableStart = currentEnd;
}
// Sanity check on buffer capacity. If this triggers then it is a bug in this class.
final long nextEnd = ((long) nextTableStart) + MemoryOpenHashTable.memoryNeeded(nextNumBuckets, bucketSize);
if (nextEnd > buffer.capacity()) {
throw new ISE("New table overruns buffer capacity");
}
if (nextTableStart < currentEnd && nextEnd > tableStart) {
throw new ISE("New table overruns old table");
}
return nextTableStart;
}
/**
* Compute the maximum number of elements (size) for a given number of buckets. When the table hits this size,
* we must either grow it or return a table-full error.
*/
private static int maxSizeForNumBuckets(final int numBuckets, final double maxLoadFactor, final int configuredMaxSize)
{
return Math.max(1, Math.min(configuredMaxSize, (int) (numBuckets * maxLoadFactor)));
}
/**
* Compute the initial table bucket count given a particular buffer capacity, bucket size, and user-configured
* initial bucket count.
*
* @param capacity buffer capacity, in bytes
* @param bucketSize bucket size, in bytes
* @param configuredInitialNumBuckets user-configured initial bucket count
*/
private static int computeRoundedInitialNumBuckets(
final int capacity,
final int bucketSize,
final int configuredInitialNumBuckets
)
{
final int initialNumBucketsRoundedUp = (int) Math.min(
1 << 30,
HashCommon.nextPowerOfTwo((long) configuredInitialNumBuckets)
);
if (initialNumBucketsRoundedUp < computeMaxNumBucketsAfterGrowth(capacity, bucketSize)) {
return initialNumBucketsRoundedUp;
} else {
// Special case: initialNumBucketsRoundedUp is equal to or higher than max capacity of a growable table; start out
// at max size the buffer will hold. Note that this allows the table to be larger than it could ever be as a
// result of growing, proving that biting off as much as you can chew is not always a bad strategy. (Why don't
// we always do this? Because clearing a big table is expensive.)
return HashTableUtils.previousPowerOfTwo(Math.min(capacity / bucketSize, 1 << 30));
}
}
/**
* Compute the largest possible table bucket count given a particular buffer capacity, bucket size, and initial
* bucket count. Assumes that tables are grown by allocating new tables that are twice as large and then copying
* into them.
*
* @param capacity buffer capacity, in bytes
* @param bucketSize bucket size, in bytes
*/
private static int computeMaxNumBucketsAfterGrowth(final int capacity, final int bucketSize)
{
// Tables start at some size (see computeRoundedInitialNumBuckets) and grow by doubling. The penultimate table ends
// at the end of the buffer, and then the final table starts at the beginning of the buffer. This means the largest
// possible table size 2^x is the one where x is maximized subject to:
//
// 2^(x-1) < capacity / bucketSize / 3
//
// Or:
//
// 2^x < capacity / bucketSize / 3 * 2
//
// All other smaller tables fit within the 2/3rds of the buffer preceding the penultimate table, and then the
// memory they used can be reclaimed for the final table.
return HashTableUtils.previousPowerOfTwo(Math.min(capacity / bucketSize / 3 * 2, 1 << 30));
}
private static class HashVectorGrouperBucketCopyHandler implements MemoryOpenHashTable.BucketCopyHandler
{
private final AggregatorAdapters aggregators;
private final int baseAggregatorOffset;
public HashVectorGrouperBucketCopyHandler(final AggregatorAdapters aggregators, final int bucketAggregatorOffset)
{
this.aggregators = aggregators;
this.baseAggregatorOffset = bucketAggregatorOffset;
}
@Override
public void bucketCopied(
final int oldBucket,
final int newBucket,
final MemoryOpenHashTable oldTable,
final MemoryOpenHashTable newTable
)
{
// Relocate aggregators (see https://github.com/apache/druid/pull/4071).
aggregators.relocate(
oldTable.bucketMemoryPosition(oldBucket) + baseAggregatorOffset,
newTable.bucketMemoryPosition(newBucket) + baseAggregatorOffset,
oldTable.memory().getByteBuffer(),
newTable.memory().getByteBuffer()
);
}
}
}

View File

@ -19,13 +19,14 @@
package org.apache.druid.query.groupby.epinephelinae;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import java.io.Closeable;
import java.nio.ByteBuffer;
/**
* Like a {@link Grouper}, but vectorized. Keys are always int arrays, so there is no generic type parameter KeyType.
* Like a {@link Grouper}, but vectorized. Keys are always memory regions, so there is no generic type parameter
* KeyType.
* <p>
* This interface is designed such that an implementation can implement both Grouper and VectorGrouper. Of course,
* it would generally only make sense for a particular instance to be called with one set of functionality or the
@ -39,16 +40,15 @@ public interface VectorGrouper extends Closeable
void initVectorized(int maxVectorSize);
/**
* Aggregate the current vector of rows from "startVectorOffset" to "endVectorOffset" using the provided keys.
* Aggregate the current vector of rows from "startRow" to "endRow" using the provided keys.
*
* @param keySpace array holding keys, chunked into ints. First (endVectorOffset - startVectorOffset) keys
* must be valid.
* @param keySpace array holding keys, chunked into ints. First (endRow - startRow) keys must be valid.
* @param startRow row to start at (inclusive).
* @param endRow row to end at (exclusive).
*
* @return result that indicates how many keys were aggregated (may be partial due to resource limits)
*/
AggregateResult aggregateVector(int[] keySpace, int startRow, int endRow);
AggregateResult aggregateVector(Memory keySpace, int startRow, int endRow);
/**
* Reset the grouper to its initial state.
@ -62,16 +62,14 @@ public interface VectorGrouper extends Closeable
void close();
/**
* Iterate through entries.
* Iterate through entry buckets. Each bucket's key is a {@link Memory} object in native byte order.
* <p>
* Some implementations allow writes even after this method is called. After you are done with the iterator
* returned by this method, you should either call {@link #close()} (if you are done with the VectorGrouper) or
* {@link #reset()} (if you want to reuse it).
* After you are done with the iterator returned by this method, you should either call {@link #close()} (if you are
* done with the VectorGrouper) or {@link #reset()} (if you want to reuse it).
* <p>
* Callers must process and discard the returned {@link Grouper.Entry}s immediately, because the keys may
* be reused.
* Callers must process and discard the returned {@link Grouper.Entry}s immediately, because objects may be reused.
*
* @return entry iterator
*/
CloseableIterator<Grouper.Entry<ByteBuffer>> iterator();
CloseableIterator<Grouper.Entry<Memory>> iterator();
}

View File

@ -19,11 +19,11 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.segment.vector.VectorValueSelector;
import java.nio.ByteBuffer;
public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSelector
{
private final VectorValueSelector selector;
@ -36,12 +36,12 @@ public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSel
@Override
public int getGroupingKeySize()
{
return 2;
return Double.BYTES;
}
@Override
public void writeKeys(
final int[] keySpace,
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
final int startRow,
@ -50,22 +50,23 @@ public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSel
{
final double[] vector = selector.getDoubleVector();
if (keySize == Double.BYTES) {
keySpace.putDoubleArray(keyOffset, vector, startRow, endRow - startRow);
} else {
for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
final long longValue = Double.doubleToLongBits(vector[i]);
keySpace[j] = (int) (longValue >>> 32);
keySpace[j + 1] = (int) (longValue & 0xffffffffL);
keySpace.putDouble(j, vector[i]);
}
}
}
@Override
public void writeKeyToResultRow(
final ByteBuffer keyBuffer,
final Memory keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
final double value = keyBuffer.getDouble(keyOffset * Integer.BYTES);
resultRow.set(resultRowPosition, value);
resultRow.set(resultRowPosition, keyMemory.getDouble(keyOffset));
}
}

View File

@ -19,11 +19,11 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.segment.vector.VectorValueSelector;
import java.nio.ByteBuffer;
public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSelector
{
private final VectorValueSelector selector;
@ -36,12 +36,12 @@ public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSele
@Override
public int getGroupingKeySize()
{
return 1;
return Float.BYTES;
}
@Override
public void writeKeys(
final int[] keySpace,
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
final int startRow,
@ -50,20 +50,23 @@ public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSele
{
final float[] vector = selector.getFloatVector();
if (keySize == Float.BYTES) {
keySpace.putFloatArray(keyOffset, vector, startRow, endRow - startRow);
} else {
for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
keySpace[j] = Float.floatToIntBits(vector[i]);
keySpace.putFloat(j, vector[i]);
}
}
}
@Override
public void writeKeyToResultRow(
final ByteBuffer keyBuffer,
final Memory keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
final float value = Float.intBitsToFloat(keyBuffer.getInt(keyOffset * Integer.BYTES));
resultRow.set(resultRowPosition, value);
resultRow.set(resultRowPosition, keyMemory.getFloat(keyOffset));
}
}

View File

@ -19,18 +19,44 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import java.nio.ByteBuffer;
/**
* Column processor for groupBy dimensions.
*
* @see GroupByVectorColumnProcessorFactory
*/
public interface GroupByVectorColumnSelector
{
/**
* Get the size in bytes of the key parts generated by this column.
*/
int getGroupingKeySize();
void writeKeys(int[] keySpace, int keySize, int keyOffset, int startRow, int endRow);
/**
* Write key parts for this column, from startRow (inclusive) to endRow (exclusive), into keySpace starting at
* keyOffset.
*
* @param keySpace key memory
* @param keySize size of the overall key (not just the part for this column)
* @param keyOffset starting position for the first key part within keySpace
* @param startRow starting row (inclusive) within the current vector
* @param endRow ending row (exclusive) within the current vector
*/
void writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow);
/**
* Write key parts for this column into a particular result row.
*
* @param keyMemory key memory
* @param keyOffset starting position for this key part within keyMemory
* @param resultRow result row to receive key parts
* @param resultRowPosition position within the result row for this key part
*/
void writeKeyToResultRow(
ByteBuffer keyBuffer,
Memory keyMemory,
int keyOffset,
ResultRow resultRow,
int resultRowPosition

View File

@ -19,11 +19,11 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.segment.vector.VectorValueSelector;
import java.nio.ByteBuffer;
public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelector
{
private final VectorValueSelector selector;
@ -36,12 +36,12 @@ public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelec
@Override
public int getGroupingKeySize()
{
return 2;
return Long.BYTES;
}
@Override
public void writeKeys(
final int[] keySpace,
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
final int startRow,
@ -50,21 +50,23 @@ public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelec
{
final long[] vector = selector.getLongVector();
if (keySize == Long.BYTES) {
keySpace.putLongArray(keyOffset, vector, startRow, endRow - startRow);
} else {
for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
keySpace[j] = (int) (vector[i] >>> 32);
keySpace[j + 1] = (int) (vector[i] & 0xffffffffL);
keySpace.putLong(j, vector[i]);
}
}
}
@Override
public void writeKeyToResultRow(
final ByteBuffer keyBuffer,
final Memory keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
final long value = keyBuffer.getLong(keyOffset * Integer.BYTES);
resultRow.set(resultRowPosition, value);
resultRow.set(resultRowPosition, keyMemory.getLong(keyOffset));
}
}

View File

@ -19,11 +19,11 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import java.nio.ByteBuffer;
public class SingleValueStringGroupByVectorColumnSelector implements GroupByVectorColumnSelector
{
private final SingleValueDimensionVectorSelector selector;
@ -36,34 +36,38 @@ public class SingleValueStringGroupByVectorColumnSelector implements GroupByVect
@Override
public int getGroupingKeySize()
{
return 1;
return Integer.BYTES;
}
@Override
public void writeKeys(
final int[] keySpace,
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
final int startRow,
final int endRow
)
{
final int[] rowVector = selector.getRowVector();
final int[] vector = selector.getRowVector();
if (keySize == Integer.BYTES) {
keySpace.putIntArray(keyOffset, vector, startRow, endRow - startRow);
} else {
for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
keySpace[j] = rowVector[i];
keySpace.putInt(j, vector[i]);
}
}
}
@Override
public void writeKeyToResultRow(
final ByteBuffer keyBuffer,
final Memory keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
final int id = keyBuffer.getInt(keyOffset * Integer.BYTES);
final int id = keyMemory.getInt(keyOffset);
resultRow.set(resultRowPosition, selector.lookupName(id));
}
}

View File

@ -20,6 +20,8 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import com.google.common.base.Suppliers;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
@ -34,11 +36,9 @@ import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.AggregateResult;
import org.apache.druid.query.groupby.epinephelinae.BufferArrayGrouper;
import org.apache.druid.query.groupby.epinephelinae.BufferHashGrouper;
import org.apache.druid.query.groupby.epinephelinae.ByteBufferKeySerde;
import org.apache.druid.query.groupby.epinephelinae.CloseableGrouperIterator;
import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper;
import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
import org.apache.druid.query.vector.VectorCursorGranularizer;
import org.apache.druid.segment.DimensionHandlerUtils;
@ -200,8 +200,7 @@ public class VectorGroupByEngine
private final ByteBuffer processingBuffer;
private final DateTime fudgeTimestamp;
private final int keySize;
private final int[] keySpace;
private final Grouper.KeySerde<ByteBuffer> keySerde;
private final WritableMemory keySpace;
private final VectorGrouper vectorGrouper;
@Nullable
@ -216,7 +215,7 @@ public class VectorGroupByEngine
private int partiallyAggregatedRows = -1;
@Nullable
private CloseableGrouperIterator<ByteBuffer, ResultRow> delegate = null;
private CloseableGrouperIterator<Memory, ResultRow> delegate = null;
VectorGroupByEngineIterator(
final GroupByQuery query,
@ -237,8 +236,7 @@ public class VectorGroupByEngine
this.processingBuffer = processingBuffer;
this.fudgeTimestamp = fudgeTimestamp;
this.keySize = selectors.stream().mapToInt(GroupByVectorColumnSelector::getGroupingKeySize).sum();
this.keySpace = new int[keySize * cursor.getMaxVectorSize()];
this.keySerde = new ByteBufferKeySerde(keySize * Integer.BYTES);
this.keySpace = WritableMemory.allocate(keySize * cursor.getMaxVectorSize());
this.vectorGrouper = makeGrouper();
this.granulizer = VectorCursorGranularizer.create(storageAdapter, cursor, query.getGranularity(), queryInterval);
@ -322,17 +320,16 @@ public class VectorGroupByEngine
cardinalityForArrayAggregation
);
} else {
grouper = new BufferHashGrouper<>(
grouper = new HashVectorGrouper(
Suppliers.ofInstance(processingBuffer),
keySerde,
keySize,
AggregatorAdapters.factorizeVector(
cursor.getColumnSelectorFactory(),
query.getAggregatorSpecs()
),
querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets(),
true
querySpecificConfig.getBufferGrouperInitialBuckets()
);
}
@ -341,7 +338,7 @@ public class VectorGroupByEngine
return grouper;
}
private CloseableGrouperIterator<ByteBuffer, ResultRow> initNewDelegate()
private CloseableGrouperIterator<Memory, ResultRow> initNewDelegate()
{
// Method must not be called unless there's a current bucketInterval.
assert bucketInterval != null;