diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java index a15f8d7e824..6207b17b672 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java @@ -57,21 +57,21 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) { final long[] timeVector = timeSelector.getLongVector(); + final boolean[] nullTimeVector = timeSelector.getNullVector(); final boolean[] nullValueVector = valueSelector.getNullVector(); - firstTime = buf.getLong(position); - // check if nullVector is found or not - // the nullVector is null if no null values are found - // set the nullAbsent flag accordingly - // the time vector is already sorted so the first element would be the earliest - // traverse accordingly int index; - - for (int i = startRow; i < endRow; i++) { - index = i; + // Now we are iterating over the values to find the minima as the + // timestamp expression in EARLIEST_BY has no established sorting order + // If we know that the time is already sorted this can be optimized + // for the general EARLIEST call which is always on __time which is sorted + for (index = startRow; index < endRow; index++) { + if (nullTimeVector != null && nullTimeVector[index]) { + continue; + } final long earliestTime = timeVector[index]; if (earliestTime >= firstTime) { - break; + continue; } firstTime = earliestTime; if (useDefault || nullValueVector == null || !nullValueVector[index]) { @@ -83,11 +83,10 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator } /** - * * Checks if the aggregated value at a position in the buffer is null or not * - * @param buf byte buffer storing the byte array representation of the aggregate - * @param position offset within the byte buffer at which the current aggregate value is stored + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the current aggregate value is stored * @return */ boolean isValueNull(ByteBuffer buf, int position) @@ -110,7 +109,7 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator for (int i = 0; i < numRows; i++) { int position = positions[i] + positionOffset; int row = rows == null ? i : rows[i]; - long firstTime = buf.getLong(position); + firstTime = buf.getLong(position); if (timeVector[row] < firstTime) { if (useDefault || nulls == null || !nulls[row]) { updateTimeWithValue(buf, position, timeVector[row], row); @@ -124,10 +123,10 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator /** * Updates the time and the non null values to the appropriate position in buffer * - * @param buf byte buffer storing the byte array representation of the aggregate - * @param position offset within the byte buffer at which the current aggregate value is stored - * @param time the time to be updated in the buffer as the last time - * @param index the index of the vectorized vector which is the last value + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the current aggregate value is stored + * @param time the time to be updated in the buffer as the last time + * @param index the index of the vectorized vector which is the last value */ void updateTimeWithValue(ByteBuffer buf, int position, long time, int index) { @@ -139,9 +138,9 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator /** * Updates the time only to the appropriate position in buffer as the value is null * - * @param buf byte buffer storing the byte array representation of the aggregate - * @param position offset within the byte buffer at which the current aggregate value is stored - * @param time the time to be updated in the buffer as the last time + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the current aggregate value is stored + * @param time the time to be updated in the buffer as the last time */ void updateTimeWithNull(ByteBuffer buf, int position, long time) { @@ -150,7 +149,7 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator } /** - *Abstract function which needs to be overridden by subclasses to set the initial value + * Abstract function which needs to be overridden by subclasses to set the initial value */ abstract void initValue(ByteBuffer buf, int position); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java index 2d876efd6f9..6428b173a1d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java @@ -66,14 +66,21 @@ public class SingleStringFirstDimensionVectorAggregator implements VectorAggrega final long[] timeVector = timeSelector.getLongVector(); final int[] valueVector = valueDimensionVectorSelector.getRowVector(); firstTime = buf.getLong(position); - int index = startRow; + int index; + long earliestTime; - final long earliestTime = timeVector[index]; - if (earliestTime < firstTime) { - firstTime = earliestTime; - buf.putLong(position, firstTime); - buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); - buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, valueVector[index]); + // Now we are iterating over the values to find the minima as the + // timestamp expression in EARLIEST_BY has no established sorting order + // If we know that the time is already sorted this can be optimized + // for the general EARLIEST call which is always on __time which is sorted + for (index = startRow; index < endRow; index++) { + earliestTime = timeVector[index]; + if (earliestTime < firstTime) { + firstTime = earliestTime; + buf.putLong(position, firstTime); + buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, valueVector[index]); + } } } @@ -81,19 +88,45 @@ public class SingleStringFirstDimensionVectorAggregator implements VectorAggrega public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) { long[] timeVector = timeSelector.getLongVector(); + boolean[] nullTimeVector = timeSelector.getNullVector(); int[] values = valueDimensionVectorSelector.getRowVector(); - for (int i = 0; i < numRows; i++) { - int position = positions[i] + positionOffset; - int row = rows == null ? i : rows[i]; - long firstTime = buf.getLong(position); - if (timeVector[row] < firstTime) { - firstTime = timeVector[row]; - buf.putLong(position, firstTime); - buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); - buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, values[row]); + // Now we are iterating over the values to find the minima as the + // timestamp expression in EARLIEST_BY has no established sorting order + // If we know that the time is already sorted this can be optimized + // for the general EARLIEST call which is always on __time which is sorted + + // The hotpath is separated out into 2 cases when nullTimeVector + // is null and not-null so that the check is not on every value + if (nullTimeVector != null) { + for (int i = 0; i < numRows; i++) { + if (nullTimeVector[i]) { + continue; + } + int position = positions[i] + positionOffset; + int row = rows == null ? i : rows[i]; + long firstTime = buf.getLong(position); + if (timeVector[row] < firstTime) { + firstTime = timeVector[row]; + buf.putLong(position, firstTime); + buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, values[row]); + } + } + } else { + for (int i = 0; i < numRows; i++) { + int position = positions[i] + positionOffset; + int row = rows == null ? i : rows[i]; + long firstTime = buf.getLong(position); + if (timeVector[row] < firstTime) { + firstTime = timeVector[row]; + buf.putLong(position, firstTime); + buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, values[row]); + } } } + } @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java index 8a6654fbfdf..0d05833378c 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java @@ -56,9 +56,6 @@ public class StringFirstAggregator implements Aggregator @Override public void aggregate() { - if (timeSelector.isNull()) { - return; - } if (needsFoldCheck) { // Less efficient code path when folding is a possibility (we must read the value selector first just in case // it's a foldable object). @@ -72,6 +69,9 @@ public class StringFirstAggregator implements Aggregator firstValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes); } } else { + if (timeSelector.isNull()) { + return; + } final long time = timeSelector.getLong(); if (time < firstTime) { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java index fbf2a4156c5..563455c9eef 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java @@ -63,9 +63,6 @@ public class StringFirstBufferAggregator implements BufferAggregator @Override public void aggregate(ByteBuffer buf, int position) { - if (timeSelector.isNull()) { - return; - } if (needsFoldCheck) { // Less efficient code path when folding is a possibility (we must read the value selector first just in case // it's a foldable object). @@ -86,6 +83,9 @@ public class StringFirstBufferAggregator implements BufferAggregator } } } else { + if (timeSelector.isNull()) { + return; + } final long time = timeSelector.getLong(); final long firstTime = buf.getLong(position); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java index 3a9b8818cd0..14538fe4712 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -120,6 +120,9 @@ public class StringFirstLastUtils time = pair.lhs; string = pair.rhs; } else if (object != null) { + if (timeSelector.isNull()) { + return null; + } time = timeSelector.getLong(); string = DimensionHandlerUtils.convertObjectToString(object); } else { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java index 1b58d6320aa..088f84b3931 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java @@ -68,11 +68,14 @@ public class StringFirstVectorAggregator implements VectorAggregator Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector(); long firstTime = buf.getLong(position); int index; - for (int i = startRow; i < endRow; i++) { - if (times[i] > firstTime) { - break; + // Now we are iterating over the values to find the minima as the + // timestamp expression in EARLIEST_BY has no established sorting order + // If we know that the time is already sorted this can be optimized + // for the general EARLIEST call which is always on __time which is sorted + for (index = startRow; index < endRow; index++) { + if (times[index] > firstTime) { + continue; } - index = i; final boolean foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(objectsWhichMightBeStrings[index]); if (foldNeeded) { final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromVectorSelectorsAtIndex( @@ -132,6 +135,7 @@ public class StringFirstVectorAggregator implements VectorAggregator long firstTime = buf.getLong(position); if (timeVector[row] < firstTime) { if (foldNeeded) { + firstTime = timeVector[row]; final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromVectorSelectorsAtIndex( timeSelector, valueSelector, diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java index a7c33c8ad23..f1dbab60938 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java @@ -57,9 +57,6 @@ public class StringLastAggregator implements Aggregator @Override public void aggregate() { - if (timeSelector.isNull()) { - return; - } if (needsFoldCheck) { // Less efficient code path when folding is a possibility (we must read the value selector first just in case // it's a foldable object). @@ -73,6 +70,9 @@ public class StringLastAggregator implements Aggregator lastValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes); } } else { + if (timeSelector.isNull()) { + return; + } final long time = timeSelector.getLong(); if (time >= lastTime) { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java index 8611ef72365..3f78745f5fa 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java @@ -64,9 +64,6 @@ public class StringLastBufferAggregator implements BufferAggregator @Override public void aggregate(ByteBuffer buf, int position) { - if (timeSelector.isNull()) { - return; - } if (needsFoldCheck) { // Less efficient code path when folding is a possibility (we must read the value selector first just in case // it's a foldable object). @@ -87,6 +84,9 @@ public class StringLastBufferAggregator implements BufferAggregator } } } else { + if (timeSelector.isNull()) { + return; + } final long time = timeSelector.getLong(); final long lastTime = buf.getLong(position); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 13a6afcb83c..dbb382f9270 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -707,6 +707,33 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testEarliestVectorAggregators() + { + notMsqCompatible(); + testQuery( + "SELECT " + + "EARLIEST(dim1, 10) " + + "FROM druid.numfoo", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .aggregators( + aggregators( + new StringFirstAggregatorFactory("a0", "dim1", "__time", 10) + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{""} + ) + ); + } + @Test public void testLatestAggregators() {