diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index a0fc9378441..d3c2a7ee39a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -21,6 +21,8 @@ package org.apache.druid.query.groupby.epinephelinae; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.parsers.CloseableIterator; @@ -29,9 +31,9 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy; import javax.annotation.Nullable; - import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Arrays; import java.util.NoSuchElementException; @@ -57,7 +59,7 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper private final int recordSize; // size of all aggregated values private boolean initialized = false; - private ByteBuffer usedFlagBuffer; + private WritableMemory usedFlagMemory; private ByteBuffer valBuffer; // Scratch objects used by aggregateVector(). Only set if initVectorized() is called. @@ -127,7 +129,7 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper // Slice up the buffer. buffer.position(0); buffer.limit(usedFlagBufferEnd); - usedFlagBuffer = buffer.slice(); + usedFlagMemory = WritableMemory.wrap(buffer.slice(), ByteOrder.nativeOrder()); buffer.position(usedFlagBufferEnd); buffer.limit(buffer.capacity()); @@ -169,9 +171,21 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper } @Override - public AggregateResult aggregateVector(int[] keySpace, int startRow, int endRow) + public AggregateResult aggregateVector(Memory keySpace, int startRow, int endRow) { - if (keySpace.length == 0) { + final int numRows = endRow - startRow; + + // Hoisted bounds check on keySpace. + if (keySpace.getCapacity() < (long) numRows * Integer.BYTES) { + throw new IAE("Not enough keySpace capacity for the provided start/end rows"); + } + + // We use integer indexes into the keySpace. + if (keySpace.getCapacity() > Integer.MAX_VALUE) { + throw new ISE("keySpace too large to handle"); + } + + if (keySpace.getCapacity() == 0) { // Empty key space, assume keys are all zeroes. final int dimIndex = 1; @@ -184,11 +198,9 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper endRow ); } else { - final int numRows = endRow - startRow; - for (int i = 0; i < numRows; i++) { // +1 matches what hashFunction() would do. - final int dimIndex = keySpace[i] + 1; + final int dimIndex = keySpace.getInt(i * Integer.BYTES) + 1; if (dimIndex < 0 || dimIndex >= cardinalityWithMissingValue) { throw new IAE("Invalid dimIndex[%s]", dimIndex); @@ -214,10 +226,12 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper { final int index = dimIndex / Byte.SIZE; final int extraIndex = dimIndex % Byte.SIZE; - final int usedFlagByte = 1 << extraIndex; + final int usedFlagMask = 1 << extraIndex; - if ((usedFlagBuffer.get(index) & usedFlagByte) == 0) { - usedFlagBuffer.put(index, (byte) (usedFlagBuffer.get(index) | (1 << extraIndex))); + final byte currentByte = usedFlagMemory.getByte(index); + + if ((currentByte & usedFlagMask) == 0) { + usedFlagMemory.putByte(index, (byte) (currentByte | usedFlagMask)); aggregators.init(valBuffer, dimIndex * recordSize); } } @@ -226,26 +240,16 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper { final int index = dimIndex / Byte.SIZE; final int extraIndex = dimIndex % Byte.SIZE; - final int usedFlagByte = 1 << extraIndex; + final int usedFlagMask = 1 << extraIndex; - return (usedFlagBuffer.get(index) & usedFlagByte) != 0; + return (usedFlagMemory.getByte(index) & usedFlagMask) != 0; } @Override public void reset() { // Clear the entire usedFlagBuffer - final int usedFlagBufferCapacity = usedFlagBuffer.capacity(); - - // putLong() instead of put() can boost the performance of clearing the buffer - final int n = (usedFlagBufferCapacity / Long.BYTES) * Long.BYTES; - for (int i = 0; i < n; i += Long.BYTES) { - usedFlagBuffer.putLong(i, 0L); - } - - for (int i = n; i < usedFlagBufferCapacity; i++) { - usedFlagBuffer.put(i, (byte) 0); - } + usedFlagMemory.clear(); } @Override @@ -261,11 +265,11 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper } @Override - public CloseableIterator> iterator() + public CloseableIterator> iterator() { final CloseableIterator> iterator = iterator(false); - final ByteBuffer keyBuffer = ByteBuffer.allocate(Integer.BYTES); - return new CloseableIterator>() + final WritableMemory keyMemory = WritableMemory.allocate(Integer.BYTES); + return new CloseableIterator>() { @Override public boolean hasNext() @@ -274,11 +278,11 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper } @Override - public Entry next() + public Entry next() { final Entry integerEntry = iterator.next(); - keyBuffer.putInt(0, integerEntry.getKey()); - return new Entry<>(keyBuffer, integerEntry.getValues()); + keyMemory.putInt(0, integerEntry.getKey()); + return new Entry<>(keyMemory, integerEntry.getValues()); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java index bc4fb8a09d7..2b286019b0b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java @@ -20,16 +20,13 @@ package org.apache.druid.query.groupby.epinephelinae; import com.google.common.base.Supplier; -import org.apache.commons.lang.mutable.MutableInt; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.query.aggregation.AggregatorAdapters; import org.apache.druid.query.aggregation.AggregatorFactory; import javax.annotation.Nullable; - import java.nio.ByteBuffer; import java.util.AbstractList; import java.util.Collections; @@ -37,7 +34,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.function.ToIntFunction; -public class BufferHashGrouper extends AbstractBufferHashGrouper implements VectorGrouper +public class BufferHashGrouper extends AbstractBufferHashGrouper { private static final int MIN_INITIAL_BUCKETS = 4; private static final int DEFAULT_INITIAL_BUCKETS = 1024; @@ -57,16 +54,6 @@ public class BufferHashGrouper extends AbstractBufferHashGrouper bufferSupplier, final KeySerde keySerde, @@ -135,96 +122,6 @@ public class BufferHashGrouper extends AbstractBufferHashGrouper { - if (aggregationNumRows.intValue() > 0) { - doAggregateVector(aggregationStartRow.intValue(), aggregationNumRows.intValue()); - aggregationStartRow.setValue(aggregationStartRow.intValue() + aggregationNumRows.intValue()); - aggregationNumRows.setValue(0); - } - } - ); - - if (bucket < 0) { - // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will - // be correct. - - // Aggregate any remaining rows. - if (aggregationNumRows.intValue() > 0) { - doAggregateVector(aggregationStartRow.intValue(), aggregationNumRows.intValue()); - } - - return Groupers.hashTableFull(rowNum); - } - - final int bucketStartOffset = hashTable.getOffsetForBucket(bucket); - final boolean bucketWasUsed = hashTable.isBucketUsed(bucket); - - // Set up key and initialize the aggs if this is a new bucket. - if (!bucketWasUsed) { - hashTable.initializeNewBucketKey(bucket, vKeyBuffer, vKeyHashCodes[rowNum]); - aggregators.init(hashTable.getTableBuffer(), bucketStartOffset + baseAggregatorOffset); - } - - // Schedule the current row for aggregation. - vAggregationPositions[aggregationNumRows.intValue()] = bucketStartOffset + Integer.BYTES + keySize; - aggregationNumRows.increment(); - } - - // Aggregate any remaining rows. - if (aggregationNumRows.intValue() > 0) { - doAggregateVector(aggregationStartRow.intValue(), aggregationNumRows.intValue()); - } - - return AggregateResult.ok(); - } - @Override public boolean isInitialized() { @@ -263,15 +160,6 @@ public class BufferHashGrouper extends AbstractBufferHashGrouper> iterator() - { - // Unchecked cast, since this method is only called through the VectorGrouper interface, which uses - // ByteBuffer keys (and this is verified in initVectorized). - return (CloseableIterator) iterator(false); - } - @Override public CloseableIterator> iterator(boolean sorted) { @@ -403,16 +291,6 @@ public class BufferHashGrouper extends AbstractBufferHashGrouper -{ - private final int keySize; - - public ByteBufferKeySerde(final int keySize) - { - this.keySize = keySize; - } - - @Override - public int keySize() - { - return keySize; - } - - @Override - public Class keyClazz() - { - return ByteBuffer.class; - } - - @Override - public List getDictionary() - { - return ImmutableList.of(); - } - - @Override - public ByteBuffer toByteBuffer(ByteBuffer key) - { - return key; - } - - @Override - public ByteBuffer fromByteBuffer(ByteBuffer buffer, int position) - { - final ByteBuffer dup = buffer.duplicate(); - dup.position(position).limit(position + keySize); - return dup.slice(); - } - - @Override - public Grouper.BufferComparator bufferComparator() - { - // This class is used by segment processing engines, where bufferComparator will not be called. - throw new UnsupportedOperationException(); - } - - @Override - public Grouper.BufferComparator bufferComparatorWithAggregators( - AggregatorFactory[] aggregatorFactories, - int[] aggregatorOffsets - ) - { - // This class is used by segment processing engines, where bufferComparatorWithAggregators will not be called. - throw new UnsupportedOperationException(); - } - - @Override - public void reset() - { - // No state, nothing to reset - } -} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java index 2ed89161819..bb351826e54 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java @@ -86,18 +86,6 @@ public class Groupers return smear(obj.hashCode()) & USED_FLAG_MASK; } - public static int hashIntArray(final int[] ints, final int start, final int length) - { - // Similar to what Arrays.hashCode would do. - // Also apply the "smear" function, to improve distribution. - int hashCode = 1; - for (int i = 0; i < length; i++) { - hashCode = 31 * hashCode + ints[start + i]; - } - - return smear(hashCode) & USED_FLAG_MASK; - } - static int getUsedFlag(int keyHash) { return keyHash | 0x80000000; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java new file mode 100644 index 00000000000..b05ccb5f358 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby.epinephelinae; + +import com.google.common.base.Supplier; +import it.unimi.dsi.fastutil.HashCommon; +import it.unimi.dsi.fastutil.ints.IntIterator; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.aggregation.AggregatorAdapters; +import org.apache.druid.query.groupby.epinephelinae.collection.HashTableUtils; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryOpenHashTable; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Collections; + +/** + * An implementation of {@link VectorGrouper} backed by a growable {@link MemoryOpenHashTable}. Growability is + * implemented in this class because {@link MemoryOpenHashTable} is not innately growable. + */ +public class HashVectorGrouper implements VectorGrouper +{ + private static final int MIN_BUCKETS = 4; + private static final int DEFAULT_INITIAL_BUCKETS = 1024; + private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f; + + private boolean initialized = false; + private int maxNumBuckets; + + private final Supplier bufferSupplier; + private final AggregatorAdapters aggregators; + private final int keySize; + private final int bufferGrouperMaxSize; + private final int configuredInitialNumBuckets; + private final int bucketSize; + private final float maxLoadFactor; + + private ByteBuffer buffer; + private int tableStart = 0; + + @Nullable + private MemoryOpenHashTable hashTable; + + // Scratch objects used by aggregateVector(). Set by initVectorized(). + @Nullable + private int[] vKeyHashCodes = null; + @Nullable + private int[] vAggregationPositions = null; + @Nullable + private int[] vAggregationRows = null; + + public HashVectorGrouper( + final Supplier bufferSupplier, + final int keySize, + final AggregatorAdapters aggregators, + final int bufferGrouperMaxSize, + final float maxLoadFactor, + final int configuredInitialNumBuckets + ) + { + this.bufferSupplier = bufferSupplier; + this.keySize = keySize; + this.aggregators = aggregators; + this.bufferGrouperMaxSize = bufferGrouperMaxSize; + this.maxLoadFactor = maxLoadFactor > 0 ? maxLoadFactor : DEFAULT_MAX_LOAD_FACTOR; + this.configuredInitialNumBuckets = configuredInitialNumBuckets >= MIN_BUCKETS + ? configuredInitialNumBuckets + : DEFAULT_INITIAL_BUCKETS; + this.bucketSize = MemoryOpenHashTable.bucketSize(keySize, aggregators.spaceNeeded()); + + if (this.maxLoadFactor >= 1.0f) { + throw new IAE("Invalid maxLoadFactor[%f], must be < 1.0", maxLoadFactor); + } + } + + @Override + public void initVectorized(final int maxVectorSize) + { + if (!initialized) { + this.buffer = bufferSupplier.get(); + this.maxNumBuckets = Math.max( + computeRoundedInitialNumBuckets(buffer.capacity(), bucketSize, configuredInitialNumBuckets), + computeMaxNumBucketsAfterGrowth(buffer.capacity(), bucketSize) + ); + + reset(); + + this.vKeyHashCodes = new int[maxVectorSize]; + this.vAggregationPositions = new int[maxVectorSize]; + this.vAggregationRows = new int[maxVectorSize]; + + initialized = true; + } + } + + @Override + public AggregateResult aggregateVector(final Memory keySpace, final int startRow, final int endRow) + { + final int numRows = endRow - startRow; + + // Hoisted bounds check on keySpace. + if (keySpace.getCapacity() < (long) numRows * keySize) { + throw new IAE("Not enough keySpace capacity for the provided start/end rows"); + } + + // We use integer indexes into the keySpace. + if (keySpace.getCapacity() > Integer.MAX_VALUE) { + throw new ISE("keySpace too large to handle"); + } + + // Initialize vKeyHashCodes: one int per key. + // Does *not* use hashFunction(). This is okay because the API of VectorGrouper does not expose any way of messing + // about with hash codes. + for (int rowNum = 0, keySpacePosition = 0; rowNum < numRows; rowNum++, keySpacePosition += keySize) { + vKeyHashCodes[rowNum] = Groupers.smear(HashTableUtils.hashMemory(keySpace, keySpacePosition, keySize)); + } + + int aggregationStartRow = startRow; + int aggregationNumRows = 0; + + final int aggregatorStartOffset = hashTable.bucketValueOffset(); + + for (int rowNum = 0, keySpacePosition = 0; rowNum < numRows; rowNum++, keySpacePosition += keySize) { + // Find, and if the table is full, expand and find again. + int bucket = hashTable.findBucket(vKeyHashCodes[rowNum], keySpace, keySpacePosition); + + if (bucket < 0) { + // Bucket not yet initialized. + if (hashTable.canInsertNewBucket()) { + // There's space, initialize it and move on. + bucket = -(bucket + 1); + initBucket(bucket, keySpace, keySpacePosition); + } else { + // Out of space. Finish up unfinished aggregations, then try to grow. + if (aggregationNumRows > 0) { + doAggregateVector(aggregationStartRow, aggregationNumRows); + aggregationStartRow = aggregationStartRow + aggregationNumRows; + aggregationNumRows = 0; + } + + if (grow() && hashTable.canInsertNewBucket()) { + bucket = hashTable.findBucket(vKeyHashCodes[rowNum], keySpace, keySpacePosition); + bucket = -(bucket + 1); + initBucket(bucket, keySpace, keySpacePosition); + } else { + // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message + // will be correct. + return Groupers.hashTableFull(rowNum); + } + } + } + + // Schedule the current row for aggregation. + vAggregationPositions[aggregationNumRows] = bucket * bucketSize + aggregatorStartOffset; + aggregationNumRows++; + } + + // Aggregate any remaining rows. + if (aggregationNumRows > 0) { + doAggregateVector(aggregationStartRow, aggregationNumRows); + } + + return AggregateResult.ok(); + } + + @Override + public void reset() + { + // Compute initial hash table size (numBuckets). + final int numBuckets = computeRoundedInitialNumBuckets(buffer.capacity(), bucketSize, configuredInitialNumBuckets); + assert numBuckets <= maxNumBuckets; + + if (numBuckets == maxNumBuckets) { + // Maximum-sized tables start at zero. + tableStart = 0; + } else { + // The first table, if not maximum-sized, starts at the latest possible position (where the penultimate + // table ends at the end of the buffer). + tableStart = buffer.capacity() - bucketSize * (maxNumBuckets - numBuckets); + } + + final ByteBuffer tableBuffer = buffer.duplicate(); + tableBuffer.position(0); + tableBuffer.limit(MemoryOpenHashTable.memoryNeeded(numBuckets, bucketSize)); + + this.hashTable = new MemoryOpenHashTable( + WritableMemory.wrap(tableBuffer.slice(), ByteOrder.nativeOrder()), + numBuckets, + Math.max(1, Math.min(bufferGrouperMaxSize, (int) (numBuckets * maxLoadFactor))), + keySize, + aggregators.spaceNeeded() + ); + } + + @Override + public CloseableIterator> iterator() + { + if (!initialized) { + // it's possible for iterator() to be called before initialization when + // a nested groupBy's subquery has an empty result set (see testEmptySubquery() in GroupByQueryRunnerTest) + return CloseableIterators.withEmptyBaggage(Collections.emptyIterator()); + } + + final IntIterator baseIterator = hashTable.bucketIterator(); + + return new CloseableIterator>() + { + @Override + public boolean hasNext() + { + return baseIterator.hasNext(); + } + + @Override + public Grouper.Entry next() + { + final int bucket = baseIterator.nextInt(); + final int bucketPosition = hashTable.bucketMemoryPosition(bucket); + + final Memory keyMemory = hashTable.memory().region( + bucketPosition + hashTable.bucketKeyOffset(), + hashTable.keySize() + ); + + final Object[] values = new Object[aggregators.size()]; + final int aggregatorsOffset = bucketPosition + hashTable.bucketValueOffset(); + for (int i = 0; i < aggregators.size(); i++) { + values[i] = aggregators.get(hashTable.memory().getByteBuffer(), aggregatorsOffset, i); + } + + return new Grouper.Entry<>(keyMemory, values); + } + + @Override + public void close() + { + // Do nothing. + } + }; + } + + @Override + public void close() + { + // Nothing to do. + } + + + /** + * Initializes the given bucket with the given key and fresh, empty aggregation state. Must only be called if + * {@code hashTable.canInsertNewBucket()} returns true and if this bucket is currently unused. + */ + private void initBucket(final int bucket, final Memory keySpace, final int keySpacePosition) + { + assert bucket >= 0 && bucket < maxNumBuckets && hashTable != null && hashTable.canInsertNewBucket(); + hashTable.initBucket(bucket, keySpace, keySpacePosition); + aggregators.init(hashTable.memory().getByteBuffer(), bucket * bucketSize + hashTable.bucketValueOffset()); + } + + /** + * Aggregate the current vector from "startRow" (inclusive) to "endRow" (exclusive) into aggregation positions + * given by {@link #vAggregationPositions}. + */ + private void doAggregateVector(final int startRow, final int numRows) + { + aggregators.aggregateVector( + hashTable.memory().getByteBuffer(), + numRows, + vAggregationPositions, + Groupers.writeAggregationRows(vAggregationRows, startRow, startRow + numRows) + ); + } + + /** + * Attempts to grow the table and returns whether or not it was possible. Each growth doubles the number of buckets + * in the table. + */ + private boolean grow() + { + if (hashTable.numBuckets() >= maxNumBuckets) { + return false; + } + + final int newNumBuckets = nextTableNumBuckets(); + final int newTableStart = nextTableStart(); + + final ByteBuffer newTableBuffer = buffer.duplicate(); + newTableBuffer.position(newTableStart); + newTableBuffer.limit(newTableStart + MemoryOpenHashTable.memoryNeeded(newNumBuckets, bucketSize)); + + final MemoryOpenHashTable newHashTable = new MemoryOpenHashTable( + WritableMemory.wrap(newTableBuffer.slice(), ByteOrder.nativeOrder()), + newNumBuckets, + maxSizeForNumBuckets(newNumBuckets, maxLoadFactor, bufferGrouperMaxSize), + keySize, + aggregators.spaceNeeded() + ); + + hashTable.copyTo(newHashTable, new HashVectorGrouperBucketCopyHandler(aggregators, hashTable.bucketValueOffset())); + hashTable = newHashTable; + tableStart = newTableStart; + return true; + } + + /** + * Returns the table size after the next growth. Each growth doubles the number of buckets, so this will be + * double the current number of buckets. + * + * @throws IllegalStateException if not initialized or if growing is not possible + */ + private int nextTableNumBuckets() + { + if (!initialized) { + throw new ISE("Must be initialized"); + } + + if (hashTable.numBuckets() >= maxNumBuckets) { + throw new ISE("No room left to grow"); + } + + return hashTable.numBuckets() * 2; + } + + /** + * Returns the start of the table within {@link #buffer} after the next growth. Each growth starts from the end of + * the previous table. + * + * @throws IllegalStateException if not initialized or if growing is not possible + */ + private int nextTableStart() + { + if (!initialized) { + throw new ISE("Must be initialized"); + } + + final int nextNumBuckets = nextTableNumBuckets(); + final int currentEnd = tableStart + MemoryOpenHashTable.memoryNeeded(hashTable.numBuckets(), bucketSize); + + final int nextTableStart; + + if (nextNumBuckets == maxNumBuckets) { + assert currentEnd == buffer.capacity(); + nextTableStart = 0; + } else { + nextTableStart = currentEnd; + } + + // Sanity check on buffer capacity. If this triggers then it is a bug in this class. + final long nextEnd = ((long) nextTableStart) + MemoryOpenHashTable.memoryNeeded(nextNumBuckets, bucketSize); + + if (nextEnd > buffer.capacity()) { + throw new ISE("New table overruns buffer capacity"); + } + + if (nextTableStart < currentEnd && nextEnd > tableStart) { + throw new ISE("New table overruns old table"); + } + + return nextTableStart; + } + + /** + * Compute the maximum number of elements (size) for a given number of buckets. When the table hits this size, + * we must either grow it or return a table-full error. + */ + private static int maxSizeForNumBuckets(final int numBuckets, final double maxLoadFactor, final int configuredMaxSize) + { + return Math.max(1, Math.min(configuredMaxSize, (int) (numBuckets * maxLoadFactor))); + } + + /** + * Compute the initial table bucket count given a particular buffer capacity, bucket size, and user-configured + * initial bucket count. + * + * @param capacity buffer capacity, in bytes + * @param bucketSize bucket size, in bytes + * @param configuredInitialNumBuckets user-configured initial bucket count + */ + private static int computeRoundedInitialNumBuckets( + final int capacity, + final int bucketSize, + final int configuredInitialNumBuckets + ) + { + final int initialNumBucketsRoundedUp = (int) Math.min( + 1 << 30, + HashCommon.nextPowerOfTwo((long) configuredInitialNumBuckets) + ); + + if (initialNumBucketsRoundedUp < computeMaxNumBucketsAfterGrowth(capacity, bucketSize)) { + return initialNumBucketsRoundedUp; + } else { + // Special case: initialNumBucketsRoundedUp is equal to or higher than max capacity of a growable table; start out + // at max size the buffer will hold. Note that this allows the table to be larger than it could ever be as a + // result of growing, proving that biting off as much as you can chew is not always a bad strategy. (Why don't + // we always do this? Because clearing a big table is expensive.) + return HashTableUtils.previousPowerOfTwo(Math.min(capacity / bucketSize, 1 << 30)); + } + } + + /** + * Compute the largest possible table bucket count given a particular buffer capacity, bucket size, and initial + * bucket count. Assumes that tables are grown by allocating new tables that are twice as large and then copying + * into them. + * + * @param capacity buffer capacity, in bytes + * @param bucketSize bucket size, in bytes + */ + private static int computeMaxNumBucketsAfterGrowth(final int capacity, final int bucketSize) + { + // Tables start at some size (see computeRoundedInitialNumBuckets) and grow by doubling. The penultimate table ends + // at the end of the buffer, and then the final table starts at the beginning of the buffer. This means the largest + // possible table size 2^x is the one where x is maximized subject to: + // + // 2^(x-1) < capacity / bucketSize / 3 + // + // Or: + // + // 2^x < capacity / bucketSize / 3 * 2 + // + // All other smaller tables fit within the 2/3rds of the buffer preceding the penultimate table, and then the + // memory they used can be reclaimed for the final table. + return HashTableUtils.previousPowerOfTwo(Math.min(capacity / bucketSize / 3 * 2, 1 << 30)); + } + + private static class HashVectorGrouperBucketCopyHandler implements MemoryOpenHashTable.BucketCopyHandler + { + private final AggregatorAdapters aggregators; + private final int baseAggregatorOffset; + + public HashVectorGrouperBucketCopyHandler(final AggregatorAdapters aggregators, final int bucketAggregatorOffset) + { + this.aggregators = aggregators; + this.baseAggregatorOffset = bucketAggregatorOffset; + } + + @Override + public void bucketCopied( + final int oldBucket, + final int newBucket, + final MemoryOpenHashTable oldTable, + final MemoryOpenHashTable newTable + ) + { + // Relocate aggregators (see https://github.com/apache/druid/pull/4071). + aggregators.relocate( + oldTable.bucketMemoryPosition(oldBucket) + baseAggregatorOffset, + newTable.bucketMemoryPosition(newBucket) + baseAggregatorOffset, + oldTable.memory().getByteBuffer(), + newTable.memory().getByteBuffer() + ); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/VectorGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/VectorGrouper.java index 1da43f8a99a..66cd4fb4fec 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/VectorGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/VectorGrouper.java @@ -19,13 +19,14 @@ package org.apache.druid.query.groupby.epinephelinae; +import org.apache.datasketches.memory.Memory; import org.apache.druid.java.util.common.parsers.CloseableIterator; import java.io.Closeable; -import java.nio.ByteBuffer; /** - * Like a {@link Grouper}, but vectorized. Keys are always int arrays, so there is no generic type parameter KeyType. + * Like a {@link Grouper}, but vectorized. Keys are always memory regions, so there is no generic type parameter + * KeyType. *

* This interface is designed such that an implementation can implement both Grouper and VectorGrouper. Of course, * it would generally only make sense for a particular instance to be called with one set of functionality or the @@ -39,16 +40,15 @@ public interface VectorGrouper extends Closeable void initVectorized(int maxVectorSize); /** - * Aggregate the current vector of rows from "startVectorOffset" to "endVectorOffset" using the provided keys. + * Aggregate the current vector of rows from "startRow" to "endRow" using the provided keys. * - * @param keySpace array holding keys, chunked into ints. First (endVectorOffset - startVectorOffset) keys - * must be valid. + * @param keySpace array holding keys, chunked into ints. First (endRow - startRow) keys must be valid. * @param startRow row to start at (inclusive). * @param endRow row to end at (exclusive). * * @return result that indicates how many keys were aggregated (may be partial due to resource limits) */ - AggregateResult aggregateVector(int[] keySpace, int startRow, int endRow); + AggregateResult aggregateVector(Memory keySpace, int startRow, int endRow); /** * Reset the grouper to its initial state. @@ -62,16 +62,14 @@ public interface VectorGrouper extends Closeable void close(); /** - * Iterate through entries. + * Iterate through entry buckets. Each bucket's key is a {@link Memory} object in native byte order. *

- * Some implementations allow writes even after this method is called. After you are done with the iterator - * returned by this method, you should either call {@link #close()} (if you are done with the VectorGrouper) or - * {@link #reset()} (if you want to reuse it). + * After you are done with the iterator returned by this method, you should either call {@link #close()} (if you are + * done with the VectorGrouper) or {@link #reset()} (if you want to reuse it). *

- * Callers must process and discard the returned {@link Grouper.Entry}s immediately, because the keys may - * be reused. + * Callers must process and discard the returned {@link Grouper.Entry}s immediately, because objects may be reused. * * @return entry iterator */ - CloseableIterator> iterator(); + CloseableIterator> iterator(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DoubleGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DoubleGroupByVectorColumnSelector.java index 69f6c868b3f..437d2f82eb6 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DoubleGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DoubleGroupByVectorColumnSelector.java @@ -19,11 +19,11 @@ package org.apache.druid.query.groupby.epinephelinae.vector; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.segment.vector.VectorValueSelector; -import java.nio.ByteBuffer; - public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSelector { private final VectorValueSelector selector; @@ -36,12 +36,12 @@ public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSel @Override public int getGroupingKeySize() { - return 2; + return Double.BYTES; } @Override public void writeKeys( - final int[] keySpace, + final WritableMemory keySpace, final int keySize, final int keyOffset, final int startRow, @@ -50,22 +50,23 @@ public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSel { final double[] vector = selector.getDoubleVector(); - for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { - final long longValue = Double.doubleToLongBits(vector[i]); - keySpace[j] = (int) (longValue >>> 32); - keySpace[j + 1] = (int) (longValue & 0xffffffffL); + if (keySize == Double.BYTES) { + keySpace.putDoubleArray(keyOffset, vector, startRow, endRow - startRow); + } else { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putDouble(j, vector[i]); + } } } @Override public void writeKeyToResultRow( - final ByteBuffer keyBuffer, + final Memory keyMemory, final int keyOffset, final ResultRow resultRow, final int resultRowPosition ) { - final double value = keyBuffer.getDouble(keyOffset * Integer.BYTES); - resultRow.set(resultRowPosition, value); + resultRow.set(resultRowPosition, keyMemory.getDouble(keyOffset)); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/FloatGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/FloatGroupByVectorColumnSelector.java index 9b9d53a8495..c404932746e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/FloatGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/FloatGroupByVectorColumnSelector.java @@ -19,11 +19,11 @@ package org.apache.druid.query.groupby.epinephelinae.vector; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.segment.vector.VectorValueSelector; -import java.nio.ByteBuffer; - public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSelector { private final VectorValueSelector selector; @@ -36,12 +36,12 @@ public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSele @Override public int getGroupingKeySize() { - return 1; + return Float.BYTES; } @Override public void writeKeys( - final int[] keySpace, + final WritableMemory keySpace, final int keySize, final int keyOffset, final int startRow, @@ -50,20 +50,23 @@ public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSele { final float[] vector = selector.getFloatVector(); - for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { - keySpace[j] = Float.floatToIntBits(vector[i]); + if (keySize == Float.BYTES) { + keySpace.putFloatArray(keyOffset, vector, startRow, endRow - startRow); + } else { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putFloat(j, vector[i]); + } } } @Override public void writeKeyToResultRow( - final ByteBuffer keyBuffer, + final Memory keyMemory, final int keyOffset, final ResultRow resultRow, final int resultRowPosition ) { - final float value = Float.intBitsToFloat(keyBuffer.getInt(keyOffset * Integer.BYTES)); - resultRow.set(resultRowPosition, value); + resultRow.set(resultRowPosition, keyMemory.getFloat(keyOffset)); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java index 087c778b18e..f95aaf71b13 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java @@ -19,18 +19,44 @@ package org.apache.druid.query.groupby.epinephelinae.vector; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.query.groupby.ResultRow; -import java.nio.ByteBuffer; - +/** + * Column processor for groupBy dimensions. + * + * @see GroupByVectorColumnProcessorFactory + */ public interface GroupByVectorColumnSelector { + /** + * Get the size in bytes of the key parts generated by this column. + */ int getGroupingKeySize(); - void writeKeys(int[] keySpace, int keySize, int keyOffset, int startRow, int endRow); + /** + * Write key parts for this column, from startRow (inclusive) to endRow (exclusive), into keySpace starting at + * keyOffset. + * + * @param keySpace key memory + * @param keySize size of the overall key (not just the part for this column) + * @param keyOffset starting position for the first key part within keySpace + * @param startRow starting row (inclusive) within the current vector + * @param endRow ending row (exclusive) within the current vector + */ + void writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow); + /** + * Write key parts for this column into a particular result row. + * + * @param keyMemory key memory + * @param keyOffset starting positionĀ for this key part within keyMemory + * @param resultRow result row to receive key parts + * @param resultRowPosition position within the result row for this key part + */ void writeKeyToResultRow( - ByteBuffer keyBuffer, + Memory keyMemory, int keyOffset, ResultRow resultRow, int resultRowPosition diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/LongGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/LongGroupByVectorColumnSelector.java index afacd224d65..f806ef8481b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/LongGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/LongGroupByVectorColumnSelector.java @@ -19,11 +19,11 @@ package org.apache.druid.query.groupby.epinephelinae.vector; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.segment.vector.VectorValueSelector; -import java.nio.ByteBuffer; - public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelector { private final VectorValueSelector selector; @@ -36,12 +36,12 @@ public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelec @Override public int getGroupingKeySize() { - return 2; + return Long.BYTES; } @Override public void writeKeys( - final int[] keySpace, + final WritableMemory keySpace, final int keySize, final int keyOffset, final int startRow, @@ -50,21 +50,23 @@ public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelec { final long[] vector = selector.getLongVector(); - for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { - keySpace[j] = (int) (vector[i] >>> 32); - keySpace[j + 1] = (int) (vector[i] & 0xffffffffL); + if (keySize == Long.BYTES) { + keySpace.putLongArray(keyOffset, vector, startRow, endRow - startRow); + } else { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putLong(j, vector[i]); + } } } @Override public void writeKeyToResultRow( - final ByteBuffer keyBuffer, + final Memory keyMemory, final int keyOffset, final ResultRow resultRow, final int resultRowPosition ) { - final long value = keyBuffer.getLong(keyOffset * Integer.BYTES); - resultRow.set(resultRowPosition, value); + resultRow.set(resultRowPosition, keyMemory.getLong(keyOffset)); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/SingleValueStringGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/SingleValueStringGroupByVectorColumnSelector.java index f63f9cc6895..2dfdb0e25cf 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/SingleValueStringGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/SingleValueStringGroupByVectorColumnSelector.java @@ -19,11 +19,11 @@ package org.apache.druid.query.groupby.epinephelinae.vector; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; -import java.nio.ByteBuffer; - public class SingleValueStringGroupByVectorColumnSelector implements GroupByVectorColumnSelector { private final SingleValueDimensionVectorSelector selector; @@ -36,34 +36,38 @@ public class SingleValueStringGroupByVectorColumnSelector implements GroupByVect @Override public int getGroupingKeySize() { - return 1; + return Integer.BYTES; } @Override public void writeKeys( - final int[] keySpace, + final WritableMemory keySpace, final int keySize, final int keyOffset, final int startRow, final int endRow ) { - final int[] rowVector = selector.getRowVector(); + final int[] vector = selector.getRowVector(); - for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { - keySpace[j] = rowVector[i]; + if (keySize == Integer.BYTES) { + keySpace.putIntArray(keyOffset, vector, startRow, endRow - startRow); + } else { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putInt(j, vector[i]); + } } } @Override public void writeKeyToResultRow( - final ByteBuffer keyBuffer, + final Memory keyMemory, final int keyOffset, final ResultRow resultRow, final int resultRowPosition ) { - final int id = keyBuffer.getInt(keyOffset * Integer.BYTES); + final int id = keyMemory.getInt(keyOffset); resultRow.set(resultRowPosition, selector.lookupName(id)); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java index aab0412529d..f8a3a34f167 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java @@ -20,6 +20,8 @@ package org.apache.druid.query.groupby.epinephelinae.vector; import com.google.common.base.Suppliers; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.Sequence; @@ -34,11 +36,9 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.epinephelinae.AggregateResult; import org.apache.druid.query.groupby.epinephelinae.BufferArrayGrouper; -import org.apache.druid.query.groupby.epinephelinae.BufferHashGrouper; -import org.apache.druid.query.groupby.epinephelinae.ByteBufferKeySerde; import org.apache.druid.query.groupby.epinephelinae.CloseableGrouperIterator; import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; -import org.apache.druid.query.groupby.epinephelinae.Grouper; +import org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper; import org.apache.druid.query.groupby.epinephelinae.VectorGrouper; import org.apache.druid.query.vector.VectorCursorGranularizer; import org.apache.druid.segment.DimensionHandlerUtils; @@ -200,8 +200,7 @@ public class VectorGroupByEngine private final ByteBuffer processingBuffer; private final DateTime fudgeTimestamp; private final int keySize; - private final int[] keySpace; - private final Grouper.KeySerde keySerde; + private final WritableMemory keySpace; private final VectorGrouper vectorGrouper; @Nullable @@ -216,7 +215,7 @@ public class VectorGroupByEngine private int partiallyAggregatedRows = -1; @Nullable - private CloseableGrouperIterator delegate = null; + private CloseableGrouperIterator delegate = null; VectorGroupByEngineIterator( final GroupByQuery query, @@ -237,8 +236,7 @@ public class VectorGroupByEngine this.processingBuffer = processingBuffer; this.fudgeTimestamp = fudgeTimestamp; this.keySize = selectors.stream().mapToInt(GroupByVectorColumnSelector::getGroupingKeySize).sum(); - this.keySpace = new int[keySize * cursor.getMaxVectorSize()]; - this.keySerde = new ByteBufferKeySerde(keySize * Integer.BYTES); + this.keySpace = WritableMemory.allocate(keySize * cursor.getMaxVectorSize()); this.vectorGrouper = makeGrouper(); this.granulizer = VectorCursorGranularizer.create(storageAdapter, cursor, query.getGranularity(), queryInterval); @@ -322,17 +320,16 @@ public class VectorGroupByEngine cardinalityForArrayAggregation ); } else { - grouper = new BufferHashGrouper<>( + grouper = new HashVectorGrouper( Suppliers.ofInstance(processingBuffer), - keySerde, + keySize, AggregatorAdapters.factorizeVector( cursor.getColumnSelectorFactory(), query.getAggregatorSpecs() ), querySpecificConfig.getBufferGrouperMaxSize(), querySpecificConfig.getBufferGrouperMaxLoadFactor(), - querySpecificConfig.getBufferGrouperInitialBuckets(), - true + querySpecificConfig.getBufferGrouperInitialBuckets() ); } @@ -341,7 +338,7 @@ public class VectorGroupByEngine return grouper; } - private CloseableGrouperIterator initNewDelegate() + private CloseableGrouperIterator initNewDelegate() { // Method must not be called unless there's a current bucketInterval. assert bucketInterval != null;