rowMap)
{
for (DimensionSpec dimSpec : dimensionSpecs) {
final ValueType outputType = dimSpec.getOutputType();
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java
index 05664f516d1..596254ecd23 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -36,10 +37,12 @@ import java.util.function.ToIntFunction;
* Groupers aggregate metrics from rows that they typically get from a ColumnSelectorFactory, under
* grouping keys that some outside driver is passing in. They can also iterate over the grouped
* rows after the aggregation is done.
- *
+ *
* They work sort of like a map of KeyType to aggregated values, except they don't support
* random lookups.
*
+ * See {@link VectorGrouper} for a vectorized version.
+ *
* @param type of the key that will be passed in
*/
public interface Grouper extends Closeable
@@ -89,7 +92,7 @@ public interface Grouper extends Closeable
default ToIntFunction hashFunction()
{
- return Groupers::hash;
+ return Groupers::hashObject;
}
/**
@@ -247,6 +250,7 @@ public interface Grouper extends Closeable
*
* @return serialized key, or null if we are unable to serialize more keys due to resource limits
*/
+ @Nullable
ByteBuffer toByteBuffer(T key);
/**
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java
index d173b04f518..a1d8dbf816e 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java
@@ -19,6 +19,7 @@
package org.apache.druid.query.groupby.epinephelinae;
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class Groupers
@@ -28,17 +29,22 @@ public class Groupers
// No instantiation
}
- static final AggregateResult DICTIONARY_FULL = AggregateResult.failure(
+ private static final AggregateResult DICTIONARY_FULL_ZERO_COUNT = AggregateResult.partial(
+ 0,
"Not enough dictionary space to execute this query. Try increasing "
+ "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
);
- static final AggregateResult HASH_TABLE_FULL = AggregateResult.failure(
+
+ private static final AggregateResult HASH_TABLE_FULL_ZERO_COUNT = AggregateResult.partial(
+ 0,
"Not enough aggregation buffer space to execute this query. Try increasing "
+ "druid.processing.buffer.sizeBytes or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
);
+ private static final int USED_FLAG_MASK = 0x7fffffff;
+
private static final int C1 = 0xcc9e2d51;
private static final int C2 = 0x1b873593;
@@ -50,18 +56,46 @@ public class Groupers
* MurmurHash3 was written by Austin Appleby, and is placed in the public domain. The author
* hereby disclaims copyright to this source code.
*/
- static int smear(int hashCode)
+ private static int smear(int hashCode)
{
return C2 * Integer.rotateLeft(hashCode * C1, 15);
}
- public static int hash(final Object obj)
+ public static AggregateResult dictionaryFull(final int count)
+ {
+ if (count == 0) {
+ return DICTIONARY_FULL_ZERO_COUNT;
+ } else {
+ return AggregateResult.partial(count, DICTIONARY_FULL_ZERO_COUNT.getReason());
+ }
+ }
+
+ public static AggregateResult hashTableFull(final int count)
+ {
+ if (count == 0) {
+ return HASH_TABLE_FULL_ZERO_COUNT;
+ } else {
+ return AggregateResult.partial(count, HASH_TABLE_FULL_ZERO_COUNT.getReason());
+ }
+ }
+
+ public static int hashObject(final Object obj)
{
// Mask off the high bit so we can use that to determine if a bucket is used or not.
- // Also apply the smear function, to improve distribution.
- final int code = obj.hashCode();
- return smear(code) & 0x7fffffff;
+ // Also apply the "smear" function, to improve distribution.
+ return smear(obj.hashCode()) & USED_FLAG_MASK;
+ }
+ public static int hashIntArray(final int[] ints, final int start, final int length)
+ {
+ // Similar to what Arrays.hashCode would do.
+ // Also apply the "smear" function, to improve distribution.
+ int hashCode = 1;
+ for (int i = 0; i < length; i++) {
+ hashCode = 31 * hashCode + ints[start + i];
+ }
+
+ return smear(hashCode) & USED_FLAG_MASK;
}
static int getUsedFlag(int keyHash)
@@ -76,4 +110,22 @@ public class Groupers
slice.limit(slice.position() + sliceSize);
return slice.slice();
}
+
+ /**
+ * Write ints from "start" to "end" into "scratch", if start != 0. Otherwise, return null.
+ */
+ @Nullable
+ public static int[] writeAggregationRows(final int[] scratch, final int start, final int end)
+ {
+ if (start == 0) {
+ return null;
+ } else {
+ final int numRows = end - start;
+ for (int i = 0; i < numRows; i++) {
+ scratch[i] = start + i;
+ }
+
+ return scratch;
+ }
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
index c8d97eafc1b..4c71c31167a 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
@@ -24,8 +24,8 @@ import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.AbstractList;
@@ -40,8 +40,6 @@ public class LimitedBufferHashGrouper extends AbstractBufferHashGrouper
private static final int DEFAULT_INITIAL_BUCKETS = 1024;
private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f;
- private final AggregatorFactory[] aggregatorFactories;
-
// Limit to apply to results.
private int limit;
@@ -66,8 +64,7 @@ public class LimitedBufferHashGrouper extends AbstractBufferHashGrouper
public LimitedBufferHashGrouper(
final Supplier bufferSupplier,
final Grouper.KeySerde keySerde,
- final ColumnSelectorFactory columnSelectorFactory,
- final AggregatorFactory[] aggregatorFactories,
+ final AggregatorAdapters aggregators,
final int bufferGrouperMaxSize,
final float maxLoadFactor,
final int initialBuckets,
@@ -75,7 +72,7 @@ public class LimitedBufferHashGrouper extends AbstractBufferHashGrouper
final boolean sortHasNonGroupingFields
)
{
- super(bufferSupplier, keySerde, aggregatorFactories, bufferGrouperMaxSize);
+ super(bufferSupplier, keySerde, aggregators, HASH_SIZE + keySerde.keySize(), bufferGrouperMaxSize);
this.maxLoadFactor = maxLoadFactor > 0 ? maxLoadFactor : DEFAULT_MAX_LOAD_FACTOR;
this.initialBuckets = initialBuckets > 0 ? Math.max(MIN_INITIAL_BUCKETS, initialBuckets) : DEFAULT_INITIAL_BUCKETS;
this.limit = limit;
@@ -85,18 +82,9 @@ public class LimitedBufferHashGrouper extends AbstractBufferHashGrouper
throw new IAE("Invalid maxLoadFactor[%f], must be < 1.0", maxLoadFactor);
}
- int offset = HASH_SIZE + keySize;
- this.aggregatorFactories = aggregatorFactories;
- for (int i = 0; i < aggregatorFactories.length; i++) {
- aggregators[i] = aggregatorFactories[i].factorizeBuffered(columnSelectorFactory);
- aggregatorOffsets[i] = offset;
- offset += aggregatorFactories[i].getMaxIntermediateSizeWithNulls();
- }
-
// For each bucket, store an extra field indicating the bucket's current index within the heap when
- // pushing down limits
- offset += Integer.BYTES;
- this.bucketSize = offset;
+ // pushing down limits (size Integer.BYTES).
+ this.bucketSize = HASH_SIZE + keySerde.keySize() + Integer.BYTES + aggregators.spaceNeeded();
}
@Override
@@ -374,8 +362,8 @@ public class LimitedBufferHashGrouper extends AbstractBufferHashGrouper
return new Comparator()
{
final BufferComparator bufferComparator = keySerde.bufferComparatorWithAggregators(
- aggregatorFactories,
- aggregatorOffsets
+ aggregators.factories().toArray(new AggregatorFactory[0]),
+ aggregators.aggregatorPositions()
);
@Override
@@ -511,14 +499,12 @@ public class LimitedBufferHashGrouper extends AbstractBufferHashGrouper
offsetHeap.setAt(i, newBucketOffset);
// relocate aggregators (see https://github.com/apache/incubator-druid/pull/4071)
- for (int j = 0; j < aggregators.length; j++) {
- aggregators[j].relocate(
- oldBucketOffset + aggregatorOffsets[j],
- newBucketOffset + aggregatorOffsets[j],
- tableBuffer,
- newTableBuffer
- );
- }
+ aggregators.relocate(
+ oldBucketOffset + baseAggregatorOffset,
+ newBucketOffset + baseAggregatorOffset,
+ tableBuffer,
+ newTableBuffer
+ );
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
index bd41d08eedb..de5da6222fb 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
@@ -436,8 +436,7 @@ public class RowBasedGrouperHelper
final boolean includeTimestamp = GroupByStrategyV2.getUniversalTimestamp(query) == null;
return new CloseableGrouperIterator<>(
- grouper,
- true,
+ grouper.iterator(true),
new Function, Row>()
{
@Override
@@ -833,7 +832,10 @@ public class RowBasedGrouperHelper
@Override
public int compare(Grouper.Entry entry1, Grouper.Entry entry2)
{
- final int timeCompare = Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
+ final int timeCompare = Longs.compare(
+ (long) entry1.getKey().getKey()[0],
+ (long) entry2.getKey().getKey()[0]
+ );
if (timeCompare != 0) {
return timeCompare;
@@ -930,8 +932,10 @@ public class RowBasedGrouperHelper
// use natural comparison
cmp = Comparators.naturalNullsFirst().compare(lhs, rhs);
} else {
- cmp = comparator.compare(DimensionHandlerUtils.convertObjectToString(lhs),
- DimensionHandlerUtils.convertObjectToString(rhs));
+ cmp = comparator.compare(
+ DimensionHandlerUtils.convertObjectToString(lhs),
+ DimensionHandlerUtils.convertObjectToString(rhs)
+ );
}
if (cmp != 0) {
@@ -1637,7 +1641,8 @@ public class RowBasedGrouperHelper
FloatRowBasedKeySerdeHelper(
int keyBufferPosition,
boolean pushLimitDown,
- @Nullable StringComparator stringComparator)
+ @Nullable StringComparator stringComparator
+ )
{
this.keyBufferPosition = keyBufferPosition;
if (isPrimitiveComparable(pushLimitDown, stringComparator)) {
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
index 2ef2eb9cfc5..249060a5b8c 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
@@ -41,6 +42,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
@@ -55,11 +57,12 @@ import java.util.Set;
public class SpillingGrouper implements Grouper
{
private static final Logger log = new Logger(SpillingGrouper.class);
-
- private final Grouper grouper;
- private static final AggregateResult DISK_FULL = AggregateResult.failure(
+ private static final AggregateResult DISK_FULL = AggregateResult.partial(
+ 0,
"Not enough disk space to execute this query. Try raising druid.query.groupBy.maxOnDiskStorage."
);
+
+ private final Grouper grouper;
private final KeySerde keySerde;
private final LimitedTemporaryStorage temporaryStorage;
private final ObjectMapper spillMapper;
@@ -96,8 +99,7 @@ public class SpillingGrouper implements Grouper
LimitedBufferHashGrouper limitGrouper = new LimitedBufferHashGrouper<>(
bufferSupplier,
keySerde,
- columnSelectorFactory,
- aggregatorFactories,
+ AggregatorAdapters.factorizeBuffered(columnSelectorFactory, Arrays.asList(aggregatorFactories)),
bufferGrouperMaxSize,
bufferGrouperMaxLoadFactor,
bufferGrouperInitialBuckets,
@@ -119,8 +121,7 @@ public class SpillingGrouper implements Grouper
this.grouper = new BufferHashGrouper<>(
bufferSupplier,
keySerde,
- columnSelectorFactory,
- aggregatorFactories,
+ AggregatorAdapters.factorizeBuffered(columnSelectorFactory, Arrays.asList(aggregatorFactories)),
bufferGrouperMaxSize,
bufferGrouperMaxLoadFactor,
bufferGrouperInitialBuckets,
@@ -133,8 +134,7 @@ public class SpillingGrouper implements Grouper
this.grouper = new BufferHashGrouper<>(
bufferSupplier,
keySerde,
- columnSelectorFactory,
- aggregatorFactories,
+ AggregatorAdapters.factorizeBuffered(columnSelectorFactory, Arrays.asList(aggregatorFactories)),
bufferGrouperMaxSize,
bufferGrouperMaxLoadFactor,
bufferGrouperInitialBuckets,
@@ -168,6 +168,9 @@ public class SpillingGrouper implements Grouper
if (result.isOk() || !spillingAllowed || temporaryStorage.maxSize() <= 0) {
return result;
} else {
+ // Expecting all-or-nothing behavior.
+ assert result.getCount() == 0;
+
// Warning: this can potentially block up a processing thread for a while.
try {
spill();
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/VectorGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/VectorGrouper.java
new file mode 100644
index 00000000000..1da43f8a99a
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/VectorGrouper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+
+/**
+ * Like a {@link Grouper}, but vectorized. Keys are always int arrays, so there is no generic type parameter KeyType.
+ *
+ * This interface is designed such that an implementation can implement both Grouper and VectorGrouper. Of course,
+ * it would generally only make sense for a particular instance to be called with one set of functionality or the
+ * other.
+ */
+public interface VectorGrouper extends Closeable
+{
+ /**
+ * Initialize the grouper. This method needs to be called before calling {@link #aggregateVector}.
+ */
+ void initVectorized(int maxVectorSize);
+
+ /**
+ * Aggregate the current vector of rows from "startVectorOffset" to "endVectorOffset" using the provided keys.
+ *
+ * @param keySpace array holding keys, chunked into ints. First (endVectorOffset - startVectorOffset) keys
+ * must be valid.
+ * @param startRow row to start at (inclusive).
+ * @param endRow row to end at (exclusive).
+ *
+ * @return result that indicates how many keys were aggregated (may be partial due to resource limits)
+ */
+ AggregateResult aggregateVector(int[] keySpace, int startRow, int endRow);
+
+ /**
+ * Reset the grouper to its initial state.
+ */
+ void reset();
+
+ /**
+ * Close the grouper and release associated resources.
+ */
+ @Override
+ void close();
+
+ /**
+ * Iterate through entries.
+ *
+ * Some implementations allow writes even after this method is called. After you are done with the iterator
+ * returned by this method, you should either call {@link #close()} (if you are done with the VectorGrouper) or
+ * {@link #reset()} (if you want to reuse it).
+ *
+ * Callers must process and discard the returned {@link Grouper.Entry}s immediately, because the keys may
+ * be reused.
+ *
+ * @return entry iterator
+ */
+ CloseableIterator> iterator();
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DoubleGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DoubleGroupByVectorColumnSelector.java
new file mode 100644
index 00000000000..2802e3a8aec
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DoubleGroupByVectorColumnSelector.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.vector;
+
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSelector
+{
+ private final VectorValueSelector selector;
+
+ DoubleGroupByVectorColumnSelector(final VectorValueSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public int getGroupingKeySize()
+ {
+ return 2;
+ }
+
+ @Override
+ public void writeKeys(
+ final int[] keySpace,
+ final int keySize,
+ final int keyOffset,
+ final int startRow,
+ final int endRow
+ )
+ {
+ final double[] vector = selector.getDoubleVector();
+
+ for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
+ final long longValue = Double.doubleToLongBits(vector[i]);
+ keySpace[j] = (int) (longValue >>> 32);
+ keySpace[j + 1] = (int) (longValue & 0xffffffffL);
+ }
+ }
+
+ @Override
+ public void writeKeyToResultRow(
+ final String outputName,
+ final ByteBuffer keyBuffer,
+ final int keyOffset,
+ final Map resultMap
+ )
+ {
+ final double value = keyBuffer.getDouble(keyOffset * Integer.BYTES);
+ resultMap.put(outputName, value);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/FloatGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/FloatGroupByVectorColumnSelector.java
new file mode 100644
index 00000000000..5adbdb1f14f
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/FloatGroupByVectorColumnSelector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.vector;
+
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSelector
+{
+ private final VectorValueSelector selector;
+
+ FloatGroupByVectorColumnSelector(final VectorValueSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public int getGroupingKeySize()
+ {
+ return 1;
+ }
+
+ @Override
+ public void writeKeys(
+ final int[] keySpace,
+ final int keySize,
+ final int keyOffset,
+ final int startRow,
+ final int endRow
+ )
+ {
+ final float[] vector = selector.getFloatVector();
+
+ for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
+ keySpace[j] = Float.floatToIntBits(vector[i]);
+ }
+ }
+
+ @Override
+ public void writeKeyToResultRow(
+ final String outputName,
+ final ByteBuffer keyBuffer,
+ final int keyOffset,
+ final Map resultMap
+ )
+ {
+ final float value = Float.intBitsToFloat(keyBuffer.getInt(keyOffset * Integer.BYTES));
+ resultMap.put(outputName, value);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java
new file mode 100644
index 00000000000..3cc415368eb
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.vector;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public interface GroupByVectorColumnSelector
+{
+ int getGroupingKeySize();
+
+ void writeKeys(int[] keySpace, int keySize, int keyOffset, int startRow, int endRow);
+
+ void writeKeyToResultRow(
+ String outputName,
+ ByteBuffer keyBuffer,
+ int keyOffset,
+ Map resultMap
+ );
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnStrategizer.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnStrategizer.java
new file mode 100644
index 00000000000..c14041cdeae
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnStrategizer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.vector;
+
+import org.apache.druid.query.dimension.VectorColumnStrategizer;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+public class GroupByVectorColumnStrategizer implements VectorColumnStrategizer
+{
+ private static final GroupByVectorColumnStrategizer INSTANCE = new GroupByVectorColumnStrategizer();
+
+ private GroupByVectorColumnStrategizer()
+ {
+ // Singleton.
+ }
+
+ public static GroupByVectorColumnStrategizer instance()
+ {
+ return INSTANCE;
+ }
+
+ @Override
+ public GroupByVectorColumnSelector makeSingleValueDimensionStrategy(final SingleValueDimensionVectorSelector selector)
+ {
+ return new SingleValueStringGroupByVectorColumnSelector(selector);
+ }
+
+ @Override
+ public GroupByVectorColumnSelector makeMultiValueDimensionStrategy(final MultiValueDimensionVectorSelector selector)
+ {
+ throw new UnsupportedOperationException("Multi-value dimensions not yet implemented for vectorized groupBys");
+ }
+
+ @Override
+ public GroupByVectorColumnSelector makeFloatStrategy(final VectorValueSelector selector)
+ {
+ return new FloatGroupByVectorColumnSelector(selector);
+ }
+
+ @Override
+ public GroupByVectorColumnSelector makeDoubleStrategy(final VectorValueSelector selector)
+ {
+ return new DoubleGroupByVectorColumnSelector(selector);
+ }
+
+ @Override
+ public GroupByVectorColumnSelector makeLongStrategy(final VectorValueSelector selector)
+ {
+ return new LongGroupByVectorColumnSelector(selector);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/LongGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/LongGroupByVectorColumnSelector.java
new file mode 100644
index 00000000000..6ddbd99b4e8
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/LongGroupByVectorColumnSelector.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.vector;
+
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelector
+{
+ private final VectorValueSelector selector;
+
+ LongGroupByVectorColumnSelector(final VectorValueSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public int getGroupingKeySize()
+ {
+ return 2;
+ }
+
+ @Override
+ public void writeKeys(
+ final int[] keySpace,
+ final int keySize,
+ final int keyOffset,
+ final int startRow,
+ final int endRow
+ )
+ {
+ final long[] vector = selector.getLongVector();
+
+ for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
+ keySpace[j] = (int) (vector[i] >>> 32);
+ keySpace[j + 1] = (int) (vector[i] & 0xffffffffL);
+ }
+ }
+
+ @Override
+ public void writeKeyToResultRow(
+ final String outputName,
+ final ByteBuffer keyBuffer,
+ final int keyOffset,
+ final Map resultMap
+ )
+ {
+ final long value = keyBuffer.getLong(keyOffset * Integer.BYTES);
+ resultMap.put(outputName, value);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/SingleValueStringGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/SingleValueStringGroupByVectorColumnSelector.java
new file mode 100644
index 00000000000..6a9b4289821
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/SingleValueStringGroupByVectorColumnSelector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.vector;
+
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class SingleValueStringGroupByVectorColumnSelector implements GroupByVectorColumnSelector
+{
+ private final SingleValueDimensionVectorSelector selector;
+
+ SingleValueStringGroupByVectorColumnSelector(final SingleValueDimensionVectorSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public int getGroupingKeySize()
+ {
+ return 1;
+ }
+
+ @Override
+ public void writeKeys(
+ final int[] keySpace,
+ final int keySize,
+ final int keyOffset,
+ final int startRow,
+ final int endRow
+ )
+ {
+ final int[] rowVector = selector.getRowVector();
+
+ for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
+ keySpace[j] = rowVector[i];
+ }
+ }
+
+ @Override
+ public void writeKeyToResultRow(
+ final String outputName,
+ final ByteBuffer keyBuffer,
+ final int keyOffset,
+ final Map resultMap
+ )
+ {
+ final int id = keyBuffer.getInt(keyOffset * Integer.BYTES);
+ resultMap.put(outputName, selector.lookupName(id));
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
new file mode 100644
index 00000000000..96b9988e5ec
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.vector;
+
+import com.google.common.base.Suppliers;
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.aggregation.AggregatorAdapters;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.epinephelinae.AggregateResult;
+import org.apache.druid.query.groupby.epinephelinae.BufferArrayGrouper;
+import org.apache.druid.query.groupby.epinephelinae.BufferHashGrouper;
+import org.apache.druid.query.groupby.epinephelinae.ByteBufferKeySerde;
+import org.apache.druid.query.groupby.epinephelinae.CloseableGrouperIterator;
+import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
+import org.apache.druid.query.groupby.epinephelinae.Grouper;
+import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
+import org.apache.druid.query.vector.VectorCursorGranularizer;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+public class VectorGroupByEngine
+{
+ private VectorGroupByEngine()
+ {
+ // No instantiation.
+ }
+
+ public static boolean canVectorize(
+ final GroupByQuery query,
+ final StorageAdapter adapter,
+ @Nullable final Filter filter
+ )
+ {
+ // Multi-value dimensions are not yet supported.
+ //
+ // Two notes here about how we're handling this check:
+ // 1) After multi-value dimensions are supported, we could alter "GroupByQueryEngineV2.isAllSingleValueDims"
+ // to accept a ColumnSelectorFactory, which makes more sense than using a StorageAdapter (see #8013).
+ // 2) Technically using StorageAdapter here is bad since it only looks at real columns, but they might
+ // be shadowed by virtual columns (again, see #8013). But it's fine for now since adapter.canVectorize
+ // always returns false if there are any virtual columns.
+ //
+ // This situation should sort itself out pretty well once this engine supports multi-valued columns. Then we
+ // won't have to worry about having this all-single-value-dims check here.
+
+ return GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities, query.getDimensions())
+ && query.getDimensions().stream().allMatch(DimensionSpec::canVectorize)
+ && query.getAggregatorSpecs().stream().allMatch(AggregatorFactory::canVectorize)
+ && adapter.canVectorize(filter, query.getVirtualColumns(), false);
+ }
+
+ public static Sequence process(
+ final GroupByQuery query,
+ final StorageAdapter storageAdapter,
+ final ByteBuffer processingBuffer,
+ @Nullable final DateTime fudgeTimestamp,
+ @Nullable final Filter filter,
+ final Interval interval,
+ final GroupByQueryConfig config
+ )
+ {
+ if (!canVectorize(query, storageAdapter, filter)) {
+ throw new ISE("Cannot vectorize");
+ }
+
+ return new BaseSequence<>(
+ new BaseSequence.IteratorMaker>()
+ {
+ @Override
+ public CloseableIterator make()
+ {
+ final VectorCursor cursor = storageAdapter.makeVectorCursor(
+ Filters.toFilter(query.getDimFilter()),
+ interval,
+ query.getVirtualColumns(),
+ false,
+ QueryContexts.getVectorSize(query),
+ null
+ );
+
+ if (cursor == null) {
+ // Return empty iterator.
+ return new CloseableIterator()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ @Override
+ public Row next()
+ {
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void close()
+ {
+ // Nothing to do.
+ }
+ };
+ }
+
+ try {
+ final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
+ final List dimensions = query.getDimensions().stream().map(
+ dimensionSpec ->
+ DimensionHandlerUtils.makeVectorProcessor(
+ dimensionSpec,
+ GroupByVectorColumnStrategizer.instance(),
+ columnSelectorFactory
+ )
+ ).collect(Collectors.toList());
+
+ return new VectorGroupByEngineIterator(
+ query,
+ config,
+ storageAdapter,
+ cursor,
+ interval,
+ dimensions,
+ processingBuffer,
+ fudgeTimestamp
+ );
+ }
+ catch (Throwable e) {
+ try {
+ cursor.close();
+ }
+ catch (Throwable e2) {
+ e.addSuppressed(e2);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void cleanup(CloseableIterator iterFromMake)
+ {
+ try {
+ iterFromMake.close();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ );
+ }
+
+ private static class VectorGroupByEngineIterator implements CloseableIterator
+ {
+ private final GroupByQuery query;
+ private final GroupByQueryConfig querySpecificConfig;
+ private final StorageAdapter storageAdapter;
+ private final VectorCursor cursor;
+ private final List selectors;
+ private final ByteBuffer processingBuffer;
+ private final DateTime fudgeTimestamp;
+ private final int keySize;
+ private final int[] keySpace;
+ private final Grouper.KeySerde keySerde;
+ private final VectorGrouper vectorGrouper;
+
+ @Nullable
+ private final VectorCursorGranularizer granulizer;
+
+ // Granularity-bucket iterator and current bucket.
+ private final Iterator bucketIterator;
+
+ @Nullable
+ private Interval bucketInterval;
+
+ private int partiallyAggregatedRows = -1;
+
+ @Nullable
+ private CloseableGrouperIterator delegate = null;
+
+ VectorGroupByEngineIterator(
+ final GroupByQuery query,
+ final GroupByQueryConfig config,
+ final StorageAdapter storageAdapter,
+ final VectorCursor cursor,
+ final Interval queryInterval,
+ final List selectors,
+ final ByteBuffer processingBuffer,
+ @Nullable final DateTime fudgeTimestamp
+ )
+ {
+ this.query = query;
+ this.querySpecificConfig = config;
+ this.storageAdapter = storageAdapter;
+ this.cursor = cursor;
+ this.selectors = selectors;
+ this.processingBuffer = processingBuffer;
+ this.fudgeTimestamp = fudgeTimestamp;
+ this.keySize = selectors.stream().mapToInt(GroupByVectorColumnSelector::getGroupingKeySize).sum();
+ this.keySpace = new int[keySize * cursor.getMaxVectorSize()];
+ this.keySerde = new ByteBufferKeySerde(keySize * Integer.BYTES);
+ this.vectorGrouper = makeGrouper();
+ this.granulizer = VectorCursorGranularizer.create(storageAdapter, cursor, query.getGranularity(), queryInterval);
+
+ if (granulizer != null) {
+ this.bucketIterator = granulizer.getBucketIterable().iterator();
+ } else {
+ this.bucketIterator = Collections.emptyIterator();
+ }
+
+ this.bucketInterval = this.bucketIterator.hasNext() ? this.bucketIterator.next() : null;
+ }
+
+ @Override
+ public Row next()
+ {
+ if (delegate == null || !delegate.hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ return delegate.next();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ if (delegate != null && delegate.hasNext()) {
+ return true;
+ } else {
+ final boolean moreToRead = !cursor.isDone() || partiallyAggregatedRows >= 0;
+
+ if (bucketInterval != null && moreToRead) {
+ while (delegate == null || !delegate.hasNext()) {
+ if (delegate != null) {
+ delegate.close();
+ vectorGrouper.reset();
+ }
+
+ delegate = initNewDelegate();
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close()
+ {
+ cursor.close();
+
+ if (delegate != null) {
+ delegate.close();
+ }
+ }
+
+ private VectorGrouper makeGrouper()
+ {
+ final VectorGrouper grouper;
+
+ final int cardinalityForArrayAggregation = GroupByQueryEngineV2.getCardinalityForArrayAggregation(
+ querySpecificConfig,
+ query,
+ storageAdapter,
+ processingBuffer
+ );
+
+ if (cardinalityForArrayAggregation >= 0) {
+ grouper = new BufferArrayGrouper(
+ Suppliers.ofInstance(processingBuffer),
+ AggregatorAdapters.factorizeVector(
+ cursor.getColumnSelectorFactory(),
+ query.getAggregatorSpecs()
+ ),
+ cardinalityForArrayAggregation
+ );
+ } else {
+ grouper = new BufferHashGrouper<>(
+ Suppliers.ofInstance(processingBuffer),
+ keySerde,
+ AggregatorAdapters.factorizeVector(
+ cursor.getColumnSelectorFactory(),
+ query.getAggregatorSpecs()
+ ),
+ querySpecificConfig.getBufferGrouperMaxSize(),
+ querySpecificConfig.getBufferGrouperMaxLoadFactor(),
+ querySpecificConfig.getBufferGrouperInitialBuckets(),
+ true
+ );
+ }
+
+ grouper.initVectorized(cursor.getMaxVectorSize());
+
+ return grouper;
+ }
+
+ private CloseableGrouperIterator initNewDelegate()
+ {
+ // Method must not be called unless there's a current bucketInterval.
+ assert bucketInterval != null;
+
+ final DateTime timestamp = fudgeTimestamp != null
+ ? fudgeTimestamp
+ : query.getGranularity().toDateTime(bucketInterval.getStartMillis());
+
+ while (!cursor.isDone()) {
+ final int startOffset;
+
+ if (partiallyAggregatedRows < 0) {
+ granulizer.setCurrentOffsets(bucketInterval);
+ startOffset = granulizer.getStartOffset();
+ } else {
+ startOffset = granulizer.getStartOffset() + partiallyAggregatedRows;
+ }
+
+ if (granulizer.getEndOffset() > startOffset) {
+ // Write keys to the keySpace.
+ int keyOffset = 0;
+ for (final GroupByVectorColumnSelector selector : selectors) {
+ selector.writeKeys(keySpace, keySize, keyOffset, startOffset, granulizer.getEndOffset());
+ keyOffset += selector.getGroupingKeySize();
+ }
+
+ // Aggregate this vector.
+ final AggregateResult result = vectorGrouper.aggregateVector(
+ keySpace,
+ startOffset,
+ granulizer.getEndOffset()
+ );
+
+ if (result.isOk()) {
+ partiallyAggregatedRows = -1;
+ } else {
+ if (partiallyAggregatedRows < 0) {
+ partiallyAggregatedRows = result.getCount();
+ } else {
+ partiallyAggregatedRows += result.getCount();
+ }
+ }
+ } else {
+ partiallyAggregatedRows = -1;
+ }
+
+ if (partiallyAggregatedRows >= 0) {
+ break;
+ } else if (!granulizer.advanceCursorWithinBucket()) {
+ // Advance bucketInterval.
+ bucketInterval = bucketIterator.hasNext() ? bucketIterator.next() : null;
+ break;
+ }
+ }
+
+ return new CloseableGrouperIterator<>(
+ vectorGrouper.iterator(),
+ entry -> {
+ Map theMap = new LinkedHashMap<>();
+
+ // Add dimensions.
+ int keyOffset = 0;
+ for (int i = 0; i < selectors.size(); i++) {
+ final GroupByVectorColumnSelector selector = selectors.get(i);
+
+ selector.writeKeyToResultRow(
+ query.getDimensions().get(i).getOutputName(),
+ entry.getKey(),
+ keyOffset,
+ theMap
+ );
+
+ keyOffset += selector.getGroupingKeySize();
+ }
+
+ // Convert dimension values to desired output types, possibly.
+ GroupByQueryEngineV2.convertRowTypesToOutputTypes(query.getDimensions(), theMap);
+
+ // Add aggregations.
+ for (int i = 0; i < entry.getValues().length; i++) {
+ theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
+ }
+
+ return new MapBasedRow(timestamp, theMap);
+ },
+ vectorGrouper
+ );
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
index 494c261133c..ca1fe18c583 100644
--- a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
@@ -160,6 +160,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
delegateQueryMetrics.identity(identity);
}
+ @Override
+ public void vectorized(final boolean vectorized)
+ {
+ delegateQueryMetrics.vectorized(vectorized);
+ }
+
@Override
public BitmapResultFactory> makeBitmapResultFactory(BitmapFactory factory)
{
diff --git a/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
index c522987080b..80d92455a7d 100644
--- a/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
@@ -159,6 +159,12 @@ public class DefaultSelectQueryMetrics implements SelectQueryMetrics
delegateQueryMetrics.identity(identity);
}
+ @Override
+ public void vectorized(final boolean vectorized)
+ {
+ delegateQueryMetrics.vectorized(vectorized);
+ }
+
@Override
public BitmapResultFactory> makeBitmapResultFactory(BitmapFactory factory)
{
diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
index 0de74bbcbbf..87c24ffb11f 100644
--- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
@@ -19,24 +19,65 @@
package org.apache.druid.query.timeseries;
-import com.google.common.base.Function;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.inject.Inject;
+import org.apache.druid.collections.NonBlockingPool;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.collections.StupidPool;
+import org.apache.druid.guice.annotations.Global;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.filter.Filter;
-import org.apache.druid.segment.Cursor;
+import org.apache.druid.query.vector.VectorCursorGranularizer;
import org.apache.druid.segment.SegmentMissingException;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.joda.time.Interval;
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
/**
+ *
*/
public class TimeseriesQueryEngine
{
+ private final NonBlockingPool bufferPool;
+
+ /**
+ * Constructor for tests. In production, the @Inject constructor is used instead.
+ */
+ @VisibleForTesting
+ public TimeseriesQueryEngine()
+ {
+ this.bufferPool = new StupidPool<>("dummy", () -> ByteBuffer.allocate(1000000));
+ }
+
+ @Inject
+ public TimeseriesQueryEngine(final @Global NonBlockingPool bufferPool)
+ {
+ this.bufferPool = bufferPool;
+ }
+
+ /**
+ * Run a single-segment, single-interval timeseries query on a particular adapter. The query must have been
+ * scoped down to a single interval before calling this method.
+ */
public Sequence> process(final TimeseriesQuery query, final StorageAdapter adapter)
{
if (adapter == null) {
@@ -45,65 +86,210 @@ public class TimeseriesQueryEngine
);
}
- final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter()));
+ final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter()));
+ final Interval interval = Iterables.getOnlyElement(query.getIntervals());
+ final Granularity gran = query.getGranularity();
+ final boolean descending = query.isDescending();
+
+ final boolean doVectorize = QueryContexts.getVectorize(query).shouldVectorize(
+ adapter.canVectorize(filter, query.getVirtualColumns(), descending)
+ && query.getAggregatorSpecs().stream().allMatch(AggregatorFactory::canVectorize)
+ );
+
+ final Sequence> result;
+
+ if (doVectorize) {
+ result = processVectorized(query, adapter, filter, interval, gran, descending);
+ } else {
+ result = processNonVectorized(query, adapter, filter, interval, gran, descending);
+ }
+
final int limit = query.getLimit();
- Sequence> result = generateTimeseriesResult(adapter, query, filter);
if (limit < Integer.MAX_VALUE) {
return result.limit(limit);
+ } else {
+ return result;
}
- return result;
}
- private Sequence> generateTimeseriesResult(StorageAdapter adapter, TimeseriesQuery query, Filter filter)
+ private Sequence> processVectorized(
+ final TimeseriesQuery query,
+ final StorageAdapter adapter,
+ @Nullable final Filter filter,
+ final Interval queryInterval,
+ final Granularity gran,
+ final boolean descending
+ )
{
+ final boolean skipEmptyBuckets = query.isSkipEmptyBuckets();
+ final List aggregatorSpecs = query.getAggregatorSpecs();
+
+ final VectorCursor cursor = adapter.makeVectorCursor(
+ filter,
+ queryInterval,
+ query.getVirtualColumns(),
+ descending,
+ QueryContexts.getVectorSize(query),
+ null
+ );
+
+ if (cursor == null) {
+ return Sequences.empty();
+ }
+
+ final Closer closer = Closer.create();
+ closer.register(cursor);
+
+ try {
+ final VectorCursorGranularizer granularizer = VectorCursorGranularizer.create(
+ adapter,
+ cursor,
+ gran,
+ queryInterval
+ );
+
+ if (granularizer == null) {
+ return Sequences.empty();
+ }
+
+ final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
+ final AggregatorAdapters aggregators = closer.register(
+ AggregatorAdapters.factorizeVector(columnSelectorFactory, query.getAggregatorSpecs())
+ );
+
+ final ResourceHolder