Handling review comments and correctness fix for latest_by when the time expression need not be in sorted order

This commit is contained in:
Soumyava Das 2023-08-16 15:43:21 -07:00
parent aa971815a8
commit f58541273b
9 changed files with 120 additions and 54 deletions

View File

@ -57,21 +57,21 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{ {
final long[] timeVector = timeSelector.getLongVector(); final long[] timeVector = timeSelector.getLongVector();
final boolean[] nullTimeVector = timeSelector.getNullVector();
final boolean[] nullValueVector = valueSelector.getNullVector(); final boolean[] nullValueVector = valueSelector.getNullVector();
firstTime = buf.getLong(position);
// check if nullVector is found or not
// the nullVector is null if no null values are found
// set the nullAbsent flag accordingly
// the time vector is already sorted so the first element would be the earliest
// traverse accordingly
int index; int index;
// Now we are iterating over the values to find the minima as the
for (int i = startRow; i < endRow; i++) { // timestamp expression in EARLIEST_BY has no established sorting order
index = i; // If we know that the time is already sorted this can be optimized
// for the general EARLIEST call which is always on __time which is sorted
for (index = startRow; index < endRow; index++) {
if (nullTimeVector != null && nullTimeVector[index]) {
continue;
}
final long earliestTime = timeVector[index]; final long earliestTime = timeVector[index];
if (earliestTime >= firstTime) { if (earliestTime >= firstTime) {
break; continue;
} }
firstTime = earliestTime; firstTime = earliestTime;
if (useDefault || nullValueVector == null || !nullValueVector[index]) { if (useDefault || nullValueVector == null || !nullValueVector[index]) {
@ -83,11 +83,10 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator
} }
/** /**
*
* Checks if the aggregated value at a position in the buffer is null or not * Checks if the aggregated value at a position in the buffer is null or not
* *
* @param buf byte buffer storing the byte array representation of the aggregate * @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the current aggregate value is stored * @param position offset within the byte buffer at which the current aggregate value is stored
* @return * @return
*/ */
boolean isValueNull(ByteBuffer buf, int position) boolean isValueNull(ByteBuffer buf, int position)
@ -110,7 +109,7 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator
for (int i = 0; i < numRows; i++) { for (int i = 0; i < numRows; i++) {
int position = positions[i] + positionOffset; int position = positions[i] + positionOffset;
int row = rows == null ? i : rows[i]; int row = rows == null ? i : rows[i];
long firstTime = buf.getLong(position); firstTime = buf.getLong(position);
if (timeVector[row] < firstTime) { if (timeVector[row] < firstTime) {
if (useDefault || nulls == null || !nulls[row]) { if (useDefault || nulls == null || !nulls[row]) {
updateTimeWithValue(buf, position, timeVector[row], row); updateTimeWithValue(buf, position, timeVector[row], row);
@ -124,10 +123,10 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator
/** /**
* Updates the time and the non null values to the appropriate position in buffer * Updates the time and the non null values to the appropriate position in buffer
* *
* @param buf byte buffer storing the byte array representation of the aggregate * @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the current aggregate value is stored * @param position offset within the byte buffer at which the current aggregate value is stored
* @param time the time to be updated in the buffer as the last time * @param time the time to be updated in the buffer as the last time
* @param index the index of the vectorized vector which is the last value * @param index the index of the vectorized vector which is the last value
*/ */
void updateTimeWithValue(ByteBuffer buf, int position, long time, int index) void updateTimeWithValue(ByteBuffer buf, int position, long time, int index)
{ {
@ -139,9 +138,9 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator
/** /**
* Updates the time only to the appropriate position in buffer as the value is null * Updates the time only to the appropriate position in buffer as the value is null
* *
* @param buf byte buffer storing the byte array representation of the aggregate * @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the current aggregate value is stored * @param position offset within the byte buffer at which the current aggregate value is stored
* @param time the time to be updated in the buffer as the last time * @param time the time to be updated in the buffer as the last time
*/ */
void updateTimeWithNull(ByteBuffer buf, int position, long time) void updateTimeWithNull(ByteBuffer buf, int position, long time)
{ {
@ -150,7 +149,7 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator
} }
/** /**
*Abstract function which needs to be overridden by subclasses to set the initial value * Abstract function which needs to be overridden by subclasses to set the initial value
*/ */
abstract void initValue(ByteBuffer buf, int position); abstract void initValue(ByteBuffer buf, int position);

View File

@ -66,14 +66,21 @@ public class SingleStringFirstDimensionVectorAggregator implements VectorAggrega
final long[] timeVector = timeSelector.getLongVector(); final long[] timeVector = timeSelector.getLongVector();
final int[] valueVector = valueDimensionVectorSelector.getRowVector(); final int[] valueVector = valueDimensionVectorSelector.getRowVector();
firstTime = buf.getLong(position); firstTime = buf.getLong(position);
int index = startRow; int index;
long earliestTime;
final long earliestTime = timeVector[index]; // Now we are iterating over the values to find the minima as the
if (earliestTime < firstTime) { // timestamp expression in EARLIEST_BY has no established sorting order
firstTime = earliestTime; // If we know that the time is already sorted this can be optimized
buf.putLong(position, firstTime); // for the general EARLIEST call which is always on __time which is sorted
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); for (index = startRow; index < endRow; index++) {
buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, valueVector[index]); earliestTime = timeVector[index];
if (earliestTime < firstTime) {
firstTime = earliestTime;
buf.putLong(position, firstTime);
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, valueVector[index]);
}
} }
} }
@ -81,19 +88,45 @@ public class SingleStringFirstDimensionVectorAggregator implements VectorAggrega
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
{ {
long[] timeVector = timeSelector.getLongVector(); long[] timeVector = timeSelector.getLongVector();
boolean[] nullTimeVector = timeSelector.getNullVector();
int[] values = valueDimensionVectorSelector.getRowVector(); int[] values = valueDimensionVectorSelector.getRowVector();
for (int i = 0; i < numRows; i++) { // Now we are iterating over the values to find the minima as the
int position = positions[i] + positionOffset; // timestamp expression in EARLIEST_BY has no established sorting order
int row = rows == null ? i : rows[i]; // If we know that the time is already sorted this can be optimized
long firstTime = buf.getLong(position); // for the general EARLIEST call which is always on __time which is sorted
if (timeVector[row] < firstTime) {
firstTime = timeVector[row]; // The hotpath is separated out into 2 cases when nullTimeVector
buf.putLong(position, firstTime); // is null and not-null so that the check is not on every value
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); if (nullTimeVector != null) {
buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, values[row]); for (int i = 0; i < numRows; i++) {
if (nullTimeVector[i]) {
continue;
}
int position = positions[i] + positionOffset;
int row = rows == null ? i : rows[i];
long firstTime = buf.getLong(position);
if (timeVector[row] < firstTime) {
firstTime = timeVector[row];
buf.putLong(position, firstTime);
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, values[row]);
}
}
} else {
for (int i = 0; i < numRows; i++) {
int position = positions[i] + positionOffset;
int row = rows == null ? i : rows[i];
long firstTime = buf.getLong(position);
if (timeVector[row] < firstTime) {
firstTime = timeVector[row];
buf.putLong(position, firstTime);
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, values[row]);
}
} }
} }
} }
@Nullable @Nullable

View File

@ -56,9 +56,6 @@ public class StringFirstAggregator implements Aggregator
@Override @Override
public void aggregate() public void aggregate()
{ {
if (timeSelector.isNull()) {
return;
}
if (needsFoldCheck) { if (needsFoldCheck) {
// Less efficient code path when folding is a possibility (we must read the value selector first just in case // Less efficient code path when folding is a possibility (we must read the value selector first just in case
// it's a foldable object). // it's a foldable object).
@ -72,6 +69,9 @@ public class StringFirstAggregator implements Aggregator
firstValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes); firstValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes);
} }
} else { } else {
if (timeSelector.isNull()) {
return;
}
final long time = timeSelector.getLong(); final long time = timeSelector.getLong();
if (time < firstTime) { if (time < firstTime) {

View File

@ -63,9 +63,6 @@ public class StringFirstBufferAggregator implements BufferAggregator
@Override @Override
public void aggregate(ByteBuffer buf, int position) public void aggregate(ByteBuffer buf, int position)
{ {
if (timeSelector.isNull()) {
return;
}
if (needsFoldCheck) { if (needsFoldCheck) {
// Less efficient code path when folding is a possibility (we must read the value selector first just in case // Less efficient code path when folding is a possibility (we must read the value selector first just in case
// it's a foldable object). // it's a foldable object).
@ -86,6 +83,9 @@ public class StringFirstBufferAggregator implements BufferAggregator
} }
} }
} else { } else {
if (timeSelector.isNull()) {
return;
}
final long time = timeSelector.getLong(); final long time = timeSelector.getLong();
final long firstTime = buf.getLong(position); final long firstTime = buf.getLong(position);

View File

@ -120,6 +120,9 @@ public class StringFirstLastUtils
time = pair.lhs; time = pair.lhs;
string = pair.rhs; string = pair.rhs;
} else if (object != null) { } else if (object != null) {
if (timeSelector.isNull()) {
return null;
}
time = timeSelector.getLong(); time = timeSelector.getLong();
string = DimensionHandlerUtils.convertObjectToString(object); string = DimensionHandlerUtils.convertObjectToString(object);
} else { } else {

View File

@ -68,11 +68,14 @@ public class StringFirstVectorAggregator implements VectorAggregator
Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector(); Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector();
long firstTime = buf.getLong(position); long firstTime = buf.getLong(position);
int index; int index;
for (int i = startRow; i < endRow; i++) { // Now we are iterating over the values to find the minima as the
if (times[i] > firstTime) { // timestamp expression in EARLIEST_BY has no established sorting order
break; // If we know that the time is already sorted this can be optimized
// for the general EARLIEST call which is always on __time which is sorted
for (index = startRow; index < endRow; index++) {
if (times[index] > firstTime) {
continue;
} }
index = i;
final boolean foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(objectsWhichMightBeStrings[index]); final boolean foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(objectsWhichMightBeStrings[index]);
if (foldNeeded) { if (foldNeeded) {
final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromVectorSelectorsAtIndex( final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromVectorSelectorsAtIndex(
@ -132,6 +135,7 @@ public class StringFirstVectorAggregator implements VectorAggregator
long firstTime = buf.getLong(position); long firstTime = buf.getLong(position);
if (timeVector[row] < firstTime) { if (timeVector[row] < firstTime) {
if (foldNeeded) { if (foldNeeded) {
firstTime = timeVector[row];
final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromVectorSelectorsAtIndex( final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromVectorSelectorsAtIndex(
timeSelector, timeSelector,
valueSelector, valueSelector,

View File

@ -57,9 +57,6 @@ public class StringLastAggregator implements Aggregator
@Override @Override
public void aggregate() public void aggregate()
{ {
if (timeSelector.isNull()) {
return;
}
if (needsFoldCheck) { if (needsFoldCheck) {
// Less efficient code path when folding is a possibility (we must read the value selector first just in case // Less efficient code path when folding is a possibility (we must read the value selector first just in case
// it's a foldable object). // it's a foldable object).
@ -73,6 +70,9 @@ public class StringLastAggregator implements Aggregator
lastValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes); lastValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes);
} }
} else { } else {
if (timeSelector.isNull()) {
return;
}
final long time = timeSelector.getLong(); final long time = timeSelector.getLong();
if (time >= lastTime) { if (time >= lastTime) {

View File

@ -64,9 +64,6 @@ public class StringLastBufferAggregator implements BufferAggregator
@Override @Override
public void aggregate(ByteBuffer buf, int position) public void aggregate(ByteBuffer buf, int position)
{ {
if (timeSelector.isNull()) {
return;
}
if (needsFoldCheck) { if (needsFoldCheck) {
// Less efficient code path when folding is a possibility (we must read the value selector first just in case // Less efficient code path when folding is a possibility (we must read the value selector first just in case
// it's a foldable object). // it's a foldable object).
@ -87,6 +84,9 @@ public class StringLastBufferAggregator implements BufferAggregator
} }
} }
} else { } else {
if (timeSelector.isNull()) {
return;
}
final long time = timeSelector.getLong(); final long time = timeSelector.getLong();
final long lastTime = buf.getLong(position); final long lastTime = buf.getLong(position);

View File

@ -707,6 +707,33 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
@Test
public void testEarliestVectorAggregators()
{
notMsqCompatible();
testQuery(
"SELECT "
+ "EARLIEST(dim1, 10) "
+ "FROM druid.numfoo",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(
aggregators(
new StringFirstAggregatorFactory("a0", "dim1", "__time", 10)
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{""}
)
);
}
@Test @Test
public void testLatestAggregators() public void testLatestAggregators()
{ {