Fixes, adjustments to numeric null handling and string first/last aggregators. (#8834)

There is a class of bugs due to the fact that BaseObjectColumnValueSelector
has both "getObject" and "isNull" methods, but in most selector implementations
and most call sites, it is clear that the intent of "isNull" is only to apply
to the primitive getters, not the object getter. This makes sense, because the
purpose of isNull is to enable detection of nulls in otherwise-primitive columns.
Imagine a string column with a numeric selector built on top of it. You would
want it to return isNull = true, so numeric aggregators don't treat it as
all zeroes.

Sometimes this design leads people to accidentally guard non-primitive get
methods with "selector.isNull" checks, which is improper.

This patch has three goals:

1) Fix null-handling bugs that already exist in this class.
2) Make interface and doc changes that reduce the probability of future bugs.
3) Fix other, unrelated bugs I noticed in the stringFirst and stringLast
   aggregators while fixing null-handling bugs. I thought about splitting this
   into its own patch, but it ended up being tough to split from the
   null-handling fixes.

For (1) the fixes are,

- Fix StringFirst and StringLastAggregatorFactory to stop guarding getObject
  calls on isNull, by no longer extending NullableAggregatorFactory. Now uses
  -1 as a sigil value for null, to differentiate nulls and empty strings.
- Fix ExpressionFilter to stop guarding getObject calls on isNull. Also, use
  eval.asBoolean() to avoid calling getLong on the selector after already
  calling getObject.
- Fix ObjectBloomFilterAggregator to stop guarding DimensionSelector calls
  on isNull. Also, refactored slightly to avoid the overhead of calling
  getObject followed by another getter (see BloomFilterAggregatorFactory for
  part of this).

For (2) the main changes are,

- Remove the "isNull" method from BaseObjectColumnValueSelector.
- Clarify "isNull" doc on BaseNullableColumnValueSelector.
- Rename NullableAggregatorFactory -> NullbleNumericAggregatorFactory to emphasize
  that it only works on aggregators that take numbers as input.
- Similar naming changes to the Aggregator, BufferAggregator, and AggregateCombiner.
- Similar naming changes to helper methods for groupBy, ValueMatchers, etc.

For (3) the other fixes for StringFirst and StringLastAggregatorFactory are,

- Fixed buffer overrun in the buffer aggregators when some characters in the string
  code into more than one byte (the old code used "substring" to apply a byte limit,
  which is bad). I did this by introducing a new StringUtils.toUtf8WithLimit method.
- Fixed weird IncrementalIndex logic that led to reading nulls for the timestamp.
- Adjusted weird StringFirst/Last logic that worked around the weird IncrementalIndex
  behavior.
- Refactored to share code between the four aggregators.
- Improved test coverage.
- Made the base stringFirst, stringLast aggregators adaptive, and streamlined the
  xFold versions into aliases. The adaptiveness is similar to how other aggregators
  like hyperUnique work.
This commit is contained in:
Gian Merlino 2019-11-07 17:46:59 -08:00 committed by Fangjin Yang
parent b03aa060bd
commit c204d68376
52 changed files with 709 additions and 676 deletions

View File

@ -26,7 +26,10 @@ import java.io.UnsupportedEncodingException;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Base64; import java.util.Base64;
@ -107,6 +110,44 @@ public class StringUtils
} }
} }
/**
* Encodes "string" into the buffer "byteBuffer", using no more than the number of bytes remaining in the buffer.
* Will only encode whole characters. The byteBuffer's position and limit be changed during operation, but will
* be reset before this method call ends.
*
* @return the number of bytes written, which may be shorter than the full encoded string length if there
* is not enough room in the output buffer.
*/
public static int toUtf8WithLimit(final String string, final ByteBuffer byteBuffer)
{
final CharsetEncoder encoder = StandardCharsets.UTF_8
.newEncoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE);
final int originalPosition = byteBuffer.position();
final int originalLimit = byteBuffer.limit();
final int maxBytes = byteBuffer.remaining();
try {
final char[] chars = string.toCharArray();
final CharBuffer charBuffer = CharBuffer.wrap(chars);
// No reason to look at the CoderResult from the "encode" call; we can tell the number of transferred characters
// by looking at the output buffer's position.
encoder.encode(charBuffer, byteBuffer, true);
final int bytesWritten = byteBuffer.position() - originalPosition;
assert bytesWritten <= maxBytes;
return bytesWritten;
}
finally {
byteBuffer.position(originalPosition);
byteBuffer.limit(originalLimit);
}
}
@Nullable @Nullable
public static byte[] toUtf8Nullable(@Nullable final String string) public static byte[] toUtf8Nullable(@Nullable final String string)
{ {
@ -163,6 +204,7 @@ public class StringUtils
* application/x-www-form-urlencoded encodes spaces as "+", but we use this to encode non-form data as well. * application/x-www-form-urlencoded encodes spaces as "+", but we use this to encode non-form data as well.
* *
* @param s String to be encoded * @param s String to be encoded
*
* @return application/x-www-form-urlencoded format encoded String, but with "+" replaced with "%20". * @return application/x-www-form-urlencoded format encoded String, but with "+" replaced with "%20".
*/ */
@Nullable @Nullable
@ -311,6 +353,7 @@ public class StringUtils
* Convert an input to base 64 and return the utf8 string of that byte array * Convert an input to base 64 and return the utf8 string of that byte array
* *
* @param input The string to convert to base64 * @param input The string to convert to base64
*
* @return the base64 of the input in string form * @return the base64 of the input in string form
*/ */
public static String utf8Base64(String input) public static String utf8Base64(String input)
@ -322,6 +365,7 @@ public class StringUtils
* Convert an input byte array into a newly-allocated byte array using the {@link Base64} encoding scheme * Convert an input byte array into a newly-allocated byte array using the {@link Base64} encoding scheme
* *
* @param input The byte array to convert to base64 * @param input The byte array to convert to base64
*
* @return the base64 of the input in byte array form * @return the base64 of the input in byte array form
*/ */
public static byte[] encodeBase64(byte[] input) public static byte[] encodeBase64(byte[] input)
@ -333,6 +377,7 @@ public class StringUtils
* Convert an input byte array into a string using the {@link Base64} encoding scheme * Convert an input byte array into a string using the {@link Base64} encoding scheme
* *
* @param input The byte array to convert to base64 * @param input The byte array to convert to base64
*
* @return the base64 of the input in string form * @return the base64 of the input in string form
*/ */
public static String encodeBase64String(byte[] input) public static String encodeBase64String(byte[] input)
@ -344,6 +389,7 @@ public class StringUtils
* Decode an input byte array using the {@link Base64} encoding scheme and return a newly-allocated byte array * Decode an input byte array using the {@link Base64} encoding scheme and return a newly-allocated byte array
* *
* @param input The byte array to decode from base64 * @param input The byte array to decode from base64
*
* @return a newly-allocated byte array * @return a newly-allocated byte array
*/ */
public static byte[] decodeBase64(byte[] input) public static byte[] decodeBase64(byte[] input)
@ -355,6 +401,7 @@ public class StringUtils
* Decode an input string using the {@link Base64} encoding scheme and return a newly-allocated byte array * Decode an input string using the {@link Base64} encoding scheme and return a newly-allocated byte array
* *
* @param input The string to decode from base64 * @param input The string to decode from base64
*
* @return a newly-allocated byte array * @return a newly-allocated byte array
*/ */
public static byte[] decodeBase64String(String input) public static byte[] decodeBase64String(String input)
@ -411,7 +458,7 @@ public class StringUtils
System.arraycopy(multiple, 0, multiple, copied, limit - copied); System.arraycopy(multiple, 0, multiple, copied, limit - copied);
return new String(multiple, StandardCharsets.UTF_8); return new String(multiple, StandardCharsets.UTF_8);
} }
/** /**
* Returns the string left-padded with the string pad to a length of len characters. * Returns the string left-padded with the string pad to a length of len characters.
* If str is longer than len, the return value is shortened to len characters. * If str is longer than len, the return value is shortened to len characters.
@ -419,8 +466,9 @@ public class StringUtils
* https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala * https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
* *
* @param base The base string to be padded * @param base The base string to be padded
* @param len The length of padded string * @param len The length of padded string
* @param pad The pad string * @param pad The pad string
*
* @return the string left-padded with pad to a length of len * @return the string left-padded with pad to a length of len
*/ */
public static String lpad(String base, Integer len, String pad) public static String lpad(String base, Integer len, String pad)
@ -453,11 +501,12 @@ public class StringUtils
/** /**
* Returns the string right-padded with the string pad to a length of len characters. * Returns the string right-padded with the string pad to a length of len characters.
* If str is longer than len, the return value is shortened to len characters. * If str is longer than len, the return value is shortened to len characters.
* *
* @param base The base string to be padded * @param base The base string to be padded
* @param len The length of padded string * @param len The length of padded string
* @param pad The pad string * @param pad The pad string
*
* @return the string right-padded with pad to a length of len * @return the string right-padded with pad to a length of len
*/ */
public static String rpad(String base, Integer len, String pad) public static String rpad(String base, Integer len, String pad)
@ -473,12 +522,12 @@ public class StringUtils
int pos = 0; int pos = 0;
// Copy the base // Copy the base
for ( ; pos < base.length() && pos < len; pos++) { for (; pos < base.length() && pos < len; pos++) {
data[pos] = base.charAt(pos); data[pos] = base.charAt(pos);
} }
// Copy the padding // Copy the padding
for ( ; pos < len; pos += pad.length()) { for (; pos < len; pos += pad.length()) {
for (int i = 0; i < pad.length() && i < len - pos; i++) { for (int i = 0; i < pad.length() && i < len - pos; i++) {
data[pos + i] = pad.charAt(i); data[pos + i] = pad.charAt(i);
} }

View File

@ -22,6 +22,7 @@ package org.apache.druid.math.expr;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -46,21 +47,6 @@ public class Evals
return true; return true;
} }
// for binary operator not providing constructor of form <init>(String, Expr, Expr),
// you should create it explicitly in here
public static Expr binaryOp(BinaryOpExprBase binary, Expr left, Expr right)
{
try {
return binary.getClass()
.getDeclaredConstructor(String.class, Expr.class, Expr.class)
.newInstance(binary.op, left, right);
}
catch (Exception e) {
log.warn(e, "failed to rewrite expression " + binary);
return binary; // best effort.. keep it working
}
}
public static long asLong(boolean x) public static long asLong(boolean x)
{ {
return x ? 1L : 0L; return x ? 1L : 0L;
@ -81,8 +67,8 @@ public class Evals
return x > 0; return x > 0;
} }
public static boolean asBoolean(String x) public static boolean asBoolean(@Nullable String x)
{ {
return !NullHandling.isNullOrEquivalent(x) && Boolean.valueOf(x); return !NullHandling.isNullOrEquivalent(x) && Boolean.parseBoolean(x);
} }
} }

View File

@ -381,6 +381,7 @@ public abstract class ExprEval<T>
private static final StringExprEval OF_NULL = new StringExprEval(null); private static final StringExprEval OF_NULL = new StringExprEval(null);
@Nullable
private Number numericVal; private Number numericVal;
private StringExprEval(@Nullable String value) private StringExprEval(@Nullable String value)

View File

@ -56,6 +56,32 @@ public class StringUtilsTest
} }
} }
@Test
public void toUtf8WithLimitTest()
{
final ByteBuffer smallBuffer = ByteBuffer.allocate(4);
final ByteBuffer mediumBuffer = ByteBuffer.allocate(6);
final ByteBuffer bigBuffer = ByteBuffer.allocate(8);
final int smallBufferResult = StringUtils.toUtf8WithLimit("🚀🌔", smallBuffer);
Assert.assertEquals(4, smallBufferResult);
final byte[] smallBufferByteArray = new byte[smallBufferResult];
smallBuffer.get(smallBufferByteArray);
Assert.assertEquals("🚀", StringUtils.fromUtf8(smallBufferByteArray));
final int mediumBufferResult = StringUtils.toUtf8WithLimit("🚀🌔", mediumBuffer);
Assert.assertEquals(4, mediumBufferResult);
final byte[] mediumBufferByteArray = new byte[mediumBufferResult];
mediumBuffer.get(mediumBufferByteArray);
Assert.assertEquals("🚀", StringUtils.fromUtf8(mediumBufferByteArray));
final int bigBufferResult = StringUtils.toUtf8WithLimit("🚀🌔", bigBuffer);
Assert.assertEquals(8, bigBufferResult);
final byte[] bigBufferByteArray = new byte[bigBufferResult];
bigBuffer.get(bigBufferByteArray);
Assert.assertEquals("🚀🌔", StringUtils.fromUtf8(bigBufferByteArray));
}
@Test @Test
public void fromUtf8ByteBufferHeap() public void fromUtf8ByteBufferHeap()
{ {
@ -181,7 +207,7 @@ public class StringUtilsTest
expectedException.expectMessage("count is negative, -1"); expectedException.expectMessage("count is negative, -1");
Assert.assertEquals("", StringUtils.repeat("foo", -1)); Assert.assertEquals("", StringUtils.repeat("foo", -1));
} }
@Test @Test
public void testLpad() public void testLpad()
{ {

View File

@ -23,7 +23,6 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseNullableColumnValueSelector;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -44,10 +43,10 @@ import java.nio.ByteBuffer;
* {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize} and * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize} and
* {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered} * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered}
* *
* @param <TSelector> type of {@link BaseNullableColumnValueSelector} that feeds this aggregator, likely either values * @param <TSelector> type of selector that feeds this aggregator, likely either values to add to a bloom filter,
* to add to a bloom filter, or other bloom filters to merge into this bloom filter. * or other bloom filters to merge into this bloom filter.
*/ */
public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector> public abstract class BaseBloomFilterAggregator<TSelector>
implements BufferAggregator, Aggregator implements BufferAggregator, Aggregator
{ {
@Nullable @Nullable

View File

@ -35,6 +35,7 @@ import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.BaseNullableColumnValueSelector; import org.apache.druid.segment.BaseNullableColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.NilColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
@ -279,16 +280,21 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
); );
} }
} else { } else {
// No column capabilities, try to guess based on selector type.
BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
if (selector instanceof NilColumnValueSelector) { if (selector instanceof NilColumnValueSelector) {
return new NoopBloomFilterAggregator(maxNumEntries, onHeap); return new NoopBloomFilterAggregator(maxNumEntries, onHeap);
} else if (selector instanceof DimensionSelector) {
return new StringBloomFilterAggregator((DimensionSelector) selector, maxNumEntries, onHeap);
} else {
// Use fallback 'object' aggregator.
return new ObjectBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
onHeap
);
} }
// no column capabilities, use fallback 'object' aggregator
return new ObjectBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
onHeap
);
} }
} }
} }

View File

@ -21,13 +21,14 @@ package org.apache.druid.query.aggregation.bloom;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator<ColumnValueSelector<ByteBuffer>> public final class BloomFilterMergeAggregator
extends BaseBloomFilterAggregator<BaseObjectColumnValueSelector<ByteBuffer>>
{ {
BloomFilterMergeAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap) BloomFilterMergeAggregator(BaseObjectColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap)
{ {
super(selector, maxNumEntries, onHeap); super(selector, maxNumEntries, onHeap);
} }

View File

@ -19,20 +19,18 @@
package org.apache.druid.query.aggregation.bloom; package org.apache.druid.query.aggregation.bloom;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
* Handles "unknown" columns by examining what comes out of the selector * Handles "unknown" columns by examining what comes out of the selector
*/ */
class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator<ColumnValueSelector> class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator<BaseObjectColumnValueSelector<Object>>
{ {
ObjectBloomFilterAggregator( ObjectBloomFilterAggregator(
ColumnValueSelector selector, BaseObjectColumnValueSelector<Object> selector,
int maxNumEntries, int maxNumEntries,
boolean onHeap boolean onHeap
) )
@ -48,16 +46,14 @@ class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator<ColumnValueS
final ByteBuffer other = (ByteBuffer) object; final ByteBuffer other = (ByteBuffer) object;
BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position()); BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position());
} else { } else {
if (NullHandling.replaceWithDefault() || !selector.isNull()) { if (object instanceof Long) {
if (object instanceof Long) { BloomKFilter.addLong(buf, (long) object);
BloomKFilter.addLong(buf, selector.getLong()); } else if (object instanceof Double) {
} else if (object instanceof Double) { BloomKFilter.addDouble(buf, (double) object);
BloomKFilter.addDouble(buf, selector.getDouble()); } else if (object instanceof Float) {
} else if (object instanceof Float) { BloomKFilter.addFloat(buf, (float) object);
BloomKFilter.addFloat(buf, selector.getFloat()); } else if (object instanceof String) {
} else { BloomKFilter.addString(buf, (String) object);
StringBloomFilterAggregator.stringBufferAdd(buf, (DimensionSelector) selector);
}
} else { } else {
BloomKFilter.addBytes(buf, null, 0, 0); BloomKFilter.addBytes(buf, null, 0, 0);
} }

View File

@ -67,8 +67,6 @@ import org.apache.druid.query.aggregation.post.LongGreatestPostAggregator;
import org.apache.druid.query.aggregation.post.LongLeastPostAggregator; import org.apache.druid.query.aggregation.post.LongLeastPostAggregator;
import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.segment.serde.ComplexMetrics;
/**
*/
public class AggregatorsModule extends SimpleModule public class AggregatorsModule extends SimpleModule
{ {
public AggregatorsModule() public AggregatorsModule()

View File

@ -39,7 +39,7 @@ import java.util.Map;
* max, sum of metric columns, or cardinality of dimension columns (see {@link * max, sum of metric columns, or cardinality of dimension columns (see {@link
* org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory}). * org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory}).
* Implementations of {@link AggregatorFactory} which need to Support Nullable Aggregations are encouraged * Implementations of {@link AggregatorFactory} which need to Support Nullable Aggregations are encouraged
* to extend {@link NullableAggregatorFactory}. * to extend {@link NullableNumericAggregatorFactory}.
* *
* Implementations are also expected to correctly handle single/multi value string type columns as it makes sense * Implementations are also expected to correctly handle single/multi value string type columns as it makes sense
* for them e.g. doubleSum aggregator tries to parse the string value as double and assumes it to be zero if parsing * for them e.g. doubleSum aggregator tries to parse the string value as double and assumes it to be zero if parsing
@ -106,11 +106,11 @@ public abstract class AggregatorFactory implements Cacheable
/** /**
* Creates an {@link AggregateCombiner} which supports nullability. * Creates an {@link AggregateCombiner} which supports nullability.
* Implementations of {@link AggregatorFactory} which need to Support Nullable Aggregations are encouraged * Implementations of {@link AggregatorFactory} which need to Support Nullable Aggregations are encouraged
* to extend {@link NullableAggregatorFactory} instead of overriding this method. * to extend {@link NullableNumericAggregatorFactory} instead of overriding this method.
* Default implementation calls {@link #makeAggregateCombiner()} for backwards compatibility. * Default implementation calls {@link #makeAggregateCombiner()} for backwards compatibility.
* *
* @see AggregateCombiner * @see AggregateCombiner
* @see NullableAggregatorFactory * @see NullableNumericAggregatorFactory
*/ */
public AggregateCombiner makeNullableAggregateCombiner() public AggregateCombiner makeNullableAggregateCombiner()
{ {
@ -221,7 +221,7 @@ public abstract class AggregatorFactory implements Cacheable
/** /**
* Returns the maximum size that this aggregator will require in bytes for intermediate storage of results. * Returns the maximum size that this aggregator will require in bytes for intermediate storage of results.
* Implementations of {@link AggregatorFactory} which need to Support Nullable Aggregations are encouraged * Implementations of {@link AggregatorFactory} which need to Support Nullable Aggregations are encouraged
* to extend {@link NullableAggregatorFactory} instead of overriding this method. * to extend {@link NullableNumericAggregatorFactory} instead of overriding this method.
* Default implementation calls {@link #makeAggregateCombiner()} for backwards compatibility. * Default implementation calls {@link #makeAggregateCombiner()} for backwards compatibility.
* *
* @return the maximum number of bytes that an aggregator of this type will require for intermediate result storage. * @return the maximum number of bytes that an aggregator of this type will require for intermediate result storage.

View File

@ -20,26 +20,26 @@
package org.apache.druid.query.aggregation; package org.apache.druid.query.aggregation;
import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.segment.BaseNullableColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
* The result of a NullableAggregateCombiner will be null if all the values to be combined are null values or no values * Null-aware numeric {@link AggregateCombiner}.
* are combined at all. If any of the value is non-null, the result would be the value of the delegate combiner. *
* Note that the delegate combiner is not required to perform check for {@link BaseNullableColumnValueSelector#isNull()} * Used by {@link NullableNumericAggregatorFactory#makeAggregateCombiner()} to wrap non-null aware combiners. This
* on the selector as only non-null values will be passed to the delegate combiner. * class is only used when SQL compatible null handling is enabled.
* This class is only used when SQL compatible null handling is enabled. *
* @see NullableNumericAggregatorFactory#makeAggregateCombiner()
*/ */
@PublicApi @PublicApi
public final class NullableAggregateCombiner<T> implements AggregateCombiner<T> public final class NullableNumericAggregateCombiner<T> implements AggregateCombiner<T>
{ {
private boolean isNullResult = true; private boolean isNullResult = true;
private final AggregateCombiner<T> delegate; private final AggregateCombiner<T> delegate;
public NullableAggregateCombiner(AggregateCombiner<T> delegate) public NullableNumericAggregateCombiner(AggregateCombiner<T> delegate)
{ {
this.delegate = delegate; this.delegate = delegate;
} }

View File

@ -21,24 +21,34 @@ package org.apache.druid.query.aggregation;
import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.segment.BaseNullableColumnValueSelector; import org.apache.druid.segment.BaseNullableColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
* The result of a NullableAggregator will be null if all the values to be aggregated are null values * Null-aware numeric {@link Aggregator}.
* or no values are aggregated at all. If any of the value is non-null, the result would be the aggregated *
* value of the delegate aggregator. Note that the delegate aggregator is not required to perform check for * The result of this aggregator will be null if all the values to be aggregated are null values or no values are
* {@link BaseNullableColumnValueSelector#isNull()} on the selector as only non-null values will be passed * aggregated at all. If any of the values are non-null, the result will be the aggregated value of the delegate
* to the delegate aggregator. This class is only used when SQL compatible null handling is enabled. * aggregator.
*
* When wrapped by this class, the underlying aggregator's required storage space is increased by one byte. The extra
* byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before
* the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes])
*
* Used by {@link NullableNumericAggregatorFactory#factorize(ColumnSelectorFactory)} to wrap non-null aware
* aggregators. This class is only used when SQL compatible null handling is enabled.
*
* @see NullableNumericAggregatorFactory#factorize(ColumnSelectorFactory)
*/ */
@PublicApi @PublicApi
public final class NullableAggregator implements Aggregator public final class NullableNumericAggregator implements Aggregator
{ {
private final Aggregator delegate; private final Aggregator delegate;
private final BaseNullableColumnValueSelector selector; private final BaseNullableColumnValueSelector selector;
private boolean isNullResult = true; private boolean isNullResult = true;
public NullableAggregator(Aggregator delegate, BaseNullableColumnValueSelector selector) public NullableNumericAggregator(Aggregator delegate, BaseNullableColumnValueSelector selector)
{ {
this.delegate = delegate; this.delegate = delegate;
this.selector = selector; this.selector = selector;

View File

@ -30,19 +30,28 @@ import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector; import org.apache.druid.segment.vector.VectorValueSelector;
/** /**
* Abstract class with functionality to wrap {@link Aggregator}, {@link BufferAggregator} and {@link AggregateCombiner} * Abstract superclass for null-aware numeric aggregators.
* to support nullable aggregations for SQL compatibility. Implementations of {@link AggregatorFactory} which need to *
* Support Nullable Aggregations are encouraged to extend this class. * Includes functionality to wrap {@link Aggregator}, {@link BufferAggregator}, {@link VectorAggregator}, and
* {@link AggregateCombiner} to support nullable aggregations. The result of this aggregator will be null if all the
* values to be aggregated are null values, or if no values are aggregated at all. If any of the values are non-null,
* the result will be the aggregated value of the non-null values.
*
* This superclass should only be extended by aggregators that read primitive numbers. It implements logic that is
* not valid for non-numeric selector methods such as {@link ColumnValueSelector#getObject()}.
*
* @see BaseNullableColumnValueSelector#isNull() for why this only works in the numeric case
*/ */
@ExtensionPoint @ExtensionPoint
public abstract class NullableAggregatorFactory<T extends BaseNullableColumnValueSelector> extends AggregatorFactory public abstract class NullableNumericAggregatorFactory<T extends BaseNullableColumnValueSelector>
extends AggregatorFactory
{ {
@Override @Override
public final Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) public final Aggregator factorize(ColumnSelectorFactory columnSelectorFactory)
{ {
T selector = selector(columnSelectorFactory); T selector = selector(columnSelectorFactory);
Aggregator aggregator = factorize(columnSelectorFactory, selector); Aggregator aggregator = factorize(columnSelectorFactory, selector);
return NullHandling.replaceWithDefault() ? aggregator : new NullableAggregator(aggregator, selector); return NullHandling.replaceWithDefault() ? aggregator : new NullableNumericAggregator(aggregator, selector);
} }
@Override @Override
@ -50,7 +59,7 @@ public abstract class NullableAggregatorFactory<T extends BaseNullableColumnValu
{ {
T selector = selector(columnSelectorFactory); T selector = selector(columnSelectorFactory);
BufferAggregator aggregator = factorizeBuffered(columnSelectorFactory, selector); BufferAggregator aggregator = factorizeBuffered(columnSelectorFactory, selector);
return NullHandling.replaceWithDefault() ? aggregator : new NullableBufferAggregator(aggregator, selector); return NullHandling.replaceWithDefault() ? aggregator : new NullableNumericBufferAggregator(aggregator, selector);
} }
@Override @Override
@ -59,14 +68,14 @@ public abstract class NullableAggregatorFactory<T extends BaseNullableColumnValu
Preconditions.checkState(canVectorize(), "Cannot vectorize"); Preconditions.checkState(canVectorize(), "Cannot vectorize");
VectorValueSelector selector = vectorSelector(columnSelectorFactory); VectorValueSelector selector = vectorSelector(columnSelectorFactory);
VectorAggregator aggregator = factorizeVector(columnSelectorFactory, selector); VectorAggregator aggregator = factorizeVector(columnSelectorFactory, selector);
return NullHandling.replaceWithDefault() ? aggregator : new NullableVectorAggregator(aggregator, selector); return NullHandling.replaceWithDefault() ? aggregator : new NullableNumericVectorAggregator(aggregator, selector);
} }
@Override @Override
public final AggregateCombiner makeNullableAggregateCombiner() public final AggregateCombiner makeNullableAggregateCombiner()
{ {
AggregateCombiner combiner = makeAggregateCombiner(); AggregateCombiner combiner = makeAggregateCombiner();
return NullHandling.replaceWithDefault() ? combiner : new NullableAggregateCombiner(combiner); return NullHandling.replaceWithDefault() ? combiner : new NullableNumericAggregateCombiner(combiner);
} }
@Override @Override

View File

@ -23,13 +23,13 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseNullableColumnValueSelector; import org.apache.druid.segment.BaseNullableColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
* A wrapper around a non-null-aware BufferAggregator that makes it null-aware. This removes the need for each * Null-aware numeric {@link BufferAggregator}.
* aggregator class to handle nulls on its own.
* *
* The result of this aggregator will be null if all the values to be aggregated are null values or no values are * The result of this aggregator will be null if all the values to be aggregated are null values or no values are
* aggregated at all. If any of the values are non-null, the result will be the aggregated value of the delegate * aggregated at all. If any of the values are non-null, the result will be the aggregated value of the delegate
@ -39,15 +39,19 @@ import java.nio.ByteBuffer;
* byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before * byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before
* the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes]) * the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes])
* *
* @see NullableVectorAggregator, the vectorized version. * Used by {@link NullableNumericAggregatorFactory#factorizeBuffered(ColumnSelectorFactory)} to wrap non-null aware
* aggregators. This class is only used when SQL compatible null handling is enabled.
*
* @see NullableNumericAggregatorFactory#factorizeBuffered(ColumnSelectorFactory)
* @see NullableNumericVectorAggregator the vectorized version.
*/ */
@PublicApi @PublicApi
public final class NullableBufferAggregator implements BufferAggregator public final class NullableNumericBufferAggregator implements BufferAggregator
{ {
private final BufferAggregator delegate; private final BufferAggregator delegate;
private final BaseNullableColumnValueSelector nullSelector; private final BaseNullableColumnValueSelector nullSelector;
public NullableBufferAggregator(BufferAggregator delegate, BaseNullableColumnValueSelector nullSelector) public NullableNumericBufferAggregator(BufferAggregator delegate, BaseNullableColumnValueSelector nullSelector)
{ {
this.delegate = delegate; this.delegate = delegate;
this.nullSelector = nullSelector; this.nullSelector = nullSelector;

View File

@ -40,9 +40,15 @@ import java.util.Arrays;
* byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before * byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before
* the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes]) * the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes])
* *
* @see NullableBufferAggregator, the vectorized version. * The result of a NullableAggregator will be null if all the values to be aggregated are null values
* or no values are aggregated at all. If any of the value is non-null, the result would be the aggregated
* value of the delegate aggregator. Note that the delegate aggregator is not required to perform check for
* {@link VectorValueSelector#getNullVector()} on the selector as only non-null values will be passed
* to the delegate aggregator. This class is only used when SQL compatible null handling is enabled.
*
* @see NullableNumericBufferAggregator , the vectorized version.
*/ */
public class NullableVectorAggregator implements VectorAggregator public class NullableNumericVectorAggregator implements VectorAggregator
{ {
private final VectorAggregator delegate; private final VectorAggregator delegate;
private final VectorValueSelector selector; private final VectorValueSelector selector;
@ -53,7 +59,7 @@ public class NullableVectorAggregator implements VectorAggregator
@Nullable @Nullable
private int[] vAggregationRows = null; private int[] vAggregationRows = null;
NullableVectorAggregator(VectorAggregator delegate, VectorValueSelector selector) NullableNumericVectorAggregator(VectorAggregator delegate, VectorValueSelector selector)
{ {
this.delegate = delegate; this.delegate = delegate;
this.selector = selector; this.selector = selector;

View File

@ -46,7 +46,7 @@ import java.util.Objects;
* It extends "NullableAggregatorFactory<ColumnValueSelector>" instead of "NullableAggregatorFactory<BaseDoubleColumnValueSelector>" * It extends "NullableAggregatorFactory<ColumnValueSelector>" instead of "NullableAggregatorFactory<BaseDoubleColumnValueSelector>"
* to additionally support aggregation on single/multi value string column types. * to additionally support aggregation on single/multi value string column types.
*/ */
public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFactory<ColumnValueSelector> public abstract class SimpleDoubleAggregatorFactory extends NullableNumericAggregatorFactory<ColumnValueSelector>
{ {
protected final String name; protected final String name;
@Nullable @Nullable

View File

@ -39,7 +39,7 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFactory<ColumnValueSelector> public abstract class SimpleFloatAggregatorFactory extends NullableNumericAggregatorFactory<ColumnValueSelector>
{ {
protected final String name; protected final String name;
@Nullable @Nullable

View File

@ -45,7 +45,7 @@ import java.util.Objects;
* It extends "NullableAggregatorFactory<ColumnValueSelector>" instead of "NullableAggregatorFactory<BaseLongColumnValueSelector>" * It extends "NullableAggregatorFactory<ColumnValueSelector>" instead of "NullableAggregatorFactory<BaseLongColumnValueSelector>"
* to additionally support aggregation on single/multi value string column types. * to additionally support aggregation on single/multi value string column types.
*/ */
public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFactory<ColumnValueSelector> public abstract class SimpleLongAggregatorFactory extends NullableNumericAggregatorFactory<ColumnValueSelector>
{ {
protected final String name; protected final String name;
@Nullable @Nullable

View File

@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableAggregatorFactory; import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
@ -45,7 +45,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
public class DoubleFirstAggregatorFactory extends NullableAggregatorFactory<ColumnValueSelector> public class DoubleFirstAggregatorFactory extends NullableNumericAggregatorFactory<ColumnValueSelector>
{ {
public static final Comparator<SerializablePair<Long, Double>> VALUE_COMPARATOR = public static final Comparator<SerializablePair<Long, Double>> VALUE_COMPARATOR =
Comparator.comparingDouble(o -> o.rhs); Comparator.comparingDouble(o -> o.rhs);

View File

@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableAggregatorFactory; import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
@ -45,7 +45,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
public class FloatFirstAggregatorFactory extends NullableAggregatorFactory<ColumnValueSelector> public class FloatFirstAggregatorFactory extends NullableNumericAggregatorFactory<ColumnValueSelector>
{ {
public static final Comparator<SerializablePair<Long, Float>> VALUE_COMPARATOR = public static final Comparator<SerializablePair<Long, Float>> VALUE_COMPARATOR =
Comparator.comparingDouble(o -> o.rhs); Comparator.comparingDouble(o -> o.rhs);

View File

@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableAggregatorFactory; import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
@ -44,7 +44,7 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
public class LongFirstAggregatorFactory extends NullableAggregatorFactory<ColumnValueSelector> public class LongFirstAggregatorFactory extends NullableNumericAggregatorFactory<ColumnValueSelector>
{ {
public static final Comparator<SerializablePair<Long, Long>> VALUE_COMPARATOR = public static final Comparator<SerializablePair<Long, Long>> VALUE_COMPARATOR =
Comparator.comparingLong(o -> o.rhs); Comparator.comparingLong(o -> o.rhs);

View File

@ -19,24 +19,26 @@
package org.apache.druid.query.aggregation.first; package org.apache.druid.query.aggregation.first;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector;
import javax.annotation.Nullable;
public class StringFirstAggregator implements Aggregator public class StringFirstAggregator implements Aggregator
{ {
@Nullable
private final BaseObjectColumnValueSelector valueSelector;
private final BaseLongColumnValueSelector timeSelector; private final BaseLongColumnValueSelector timeSelector;
private final BaseObjectColumnValueSelector valueSelector;
private final int maxStringBytes; private final int maxStringBytes;
protected long firstTime; protected long firstTime;
protected String firstValue; protected String firstValue;
public StringFirstAggregator( public StringFirstAggregator(
BaseLongColumnValueSelector timeSelector, @Nullable BaseLongColumnValueSelector timeSelector,
BaseObjectColumnValueSelector valueSelector, BaseObjectColumnValueSelector valueSelector,
int maxStringBytes int maxStringBytes
) )
@ -45,35 +47,24 @@ public class StringFirstAggregator implements Aggregator
this.timeSelector = timeSelector; this.timeSelector = timeSelector;
this.maxStringBytes = maxStringBytes; this.maxStringBytes = maxStringBytes;
firstTime = Long.MAX_VALUE; firstTime = DateTimes.MAX.getMillis();
firstValue = null; firstValue = null;
} }
@Override @Override
public void aggregate() public void aggregate()
{ {
long time = timeSelector.getLong(); final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors(
if (time < firstTime) { timeSelector,
firstTime = time; valueSelector
Object value = valueSelector.getObject(); );
if (value != null) { if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) {
if (value instanceof String) { firstTime = inPair.lhs;
firstValue = (String) value; firstValue = inPair.rhs;
} else if (value instanceof SerializablePairLongString) {
firstValue = ((SerializablePairLongString) value).rhs;
} else {
throw new ISE(
"Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString",
value.getClass().getName()
);
}
if (firstValue != null && firstValue.length() > maxStringBytes) { if (firstValue.length() > maxStringBytes) {
firstValue = firstValue.substring(0, maxStringBytes); firstValue = firstValue.substring(0, maxStringBytes);
}
} else {
firstValue = null;
} }
} }
} }
@ -81,7 +72,7 @@ public class StringFirstAggregator implements Aggregator
@Override @Override
public Object get() public Object get()
{ {
return new SerializablePairLongString(firstTime, firstValue); return new SerializablePairLongString(firstTime, StringFirstLastUtils.chop(firstValue, maxStringBytes));
} }
@Override @Override

View File

@ -29,10 +29,8 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableAggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
@ -45,7 +43,7 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
@JsonTypeName("stringFirst") @JsonTypeName("stringFirst")
public class StringFirstAggregatorFactory extends NullableAggregatorFactory<BaseObjectColumnValueSelector> public class StringFirstAggregatorFactory extends AggregatorFactory
{ {
public static final int DEFAULT_MAX_STRING_SIZE = 1024; public static final int DEFAULT_MAX_STRING_SIZE = 1024;
@ -106,31 +104,27 @@ public class StringFirstAggregatorFactory extends NullableAggregatorFactory<Base
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name; this.name = name;
this.fieldName = fieldName; this.fieldName = fieldName;
this.maxStringBytes = maxStringBytes == null ? DEFAULT_MAX_STRING_SIZE : maxStringBytes; this.maxStringBytes = maxStringBytes == null
? StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE
: maxStringBytes;
} }
@Override @Override
protected BaseObjectColumnValueSelector selector(ColumnSelectorFactory metricFactory) public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return metricFactory.makeColumnValueSelector(fieldName);
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector)
{ {
return new StringFirstAggregator( return new StringFirstAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector, metricFactory.makeColumnValueSelector(fieldName),
maxStringBytes maxStringBytes
); );
} }
@Override @Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{ {
return new StringFirstBufferAggregator( return new StringFirstBufferAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector, metricFactory.makeColumnValueSelector(fieldName),
maxStringBytes maxStringBytes
); );
} }
@ -156,7 +150,7 @@ public class StringFirstAggregatorFactory extends NullableAggregatorFactory<Base
@Override @Override
public AggregatorFactory getCombiningFactory() public AggregatorFactory getCombiningFactory()
{ {
return new StringFirstFoldingAggregatorFactory(name, name, maxStringBytes); return new StringFirstAggregatorFactory(name, name, maxStringBytes);
} }
@Override @Override
@ -234,25 +228,25 @@ public class StringFirstAggregatorFactory extends NullableAggregatorFactory<Base
if (o == null || getClass() != o.getClass()) { if (o == null || getClass() != o.getClass()) {
return false; return false;
} }
StringFirstAggregatorFactory that = (StringFirstAggregatorFactory) o; StringFirstAggregatorFactory that = (StringFirstAggregatorFactory) o;
return maxStringBytes == that.maxStringBytes &&
return fieldName.equals(that.fieldName) && name.equals(that.name) && maxStringBytes == that.maxStringBytes; Objects.equals(fieldName, that.fieldName) &&
Objects.equals(name, that.name);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(name, fieldName, maxStringBytes); return Objects.hash(fieldName, name, maxStringBytes);
} }
@Override @Override
public String toString() public String toString()
{ {
return "StringFirstAggregatorFactory{" + return "StringFirstAggregatorFactory{" +
"name='" + name + '\'' + "fieldName='" + fieldName + '\'' +
", fieldName='" + fieldName + '\'' + ", name='" + name + '\'' +
", maxStringBytes=" + maxStringBytes + '\'' + ", maxStringBytes=" + maxStringBytes +
'}'; '}';
} }
} }

View File

@ -19,8 +19,7 @@
package org.apache.druid.query.aggregation.first; package org.apache.druid.query.aggregation.first;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
@ -31,6 +30,11 @@ import java.nio.ByteBuffer;
public class StringFirstBufferAggregator implements BufferAggregator public class StringFirstBufferAggregator implements BufferAggregator
{ {
private static final SerializablePairLongString INIT = new SerializablePairLongString(
DateTimes.MAX.getMillis(),
null
);
private final BaseLongColumnValueSelector timeSelector; private final BaseLongColumnValueSelector timeSelector;
private final BaseObjectColumnValueSelector valueSelector; private final BaseObjectColumnValueSelector valueSelector;
private final int maxStringBytes; private final int maxStringBytes;
@ -49,79 +53,34 @@ public class StringFirstBufferAggregator implements BufferAggregator
@Override @Override
public void init(ByteBuffer buf, int position) public void init(ByteBuffer buf, int position)
{ {
buf.putLong(position, Long.MAX_VALUE); StringFirstLastUtils.writePair(buf, position, INIT, maxStringBytes);
buf.putInt(position + Long.BYTES, 0);
} }
@Override @Override
public void aggregate(ByteBuffer buf, int position) public void aggregate(ByteBuffer buf, int position)
{ {
ByteBuffer mutationBuffer = buf.duplicate(); final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors(
mutationBuffer.position(position); timeSelector,
valueSelector
);
Object value = valueSelector.getObject(); if (inPair != null && inPair.rhs != null) {
final long firstTime = buf.getLong(position);
long time = timeSelector.getLong(); if (inPair.lhs < firstTime) {
String firstString = null; StringFirstLastUtils.writePair(
buf,
if (value != null) { position,
if (value instanceof SerializablePairLongString) { new SerializablePairLongString(inPair.lhs, inPair.rhs),
SerializablePairLongString serializablePair = (SerializablePairLongString) value; maxStringBytes
time = serializablePair.lhs;
firstString = serializablePair.rhs;
} else if (value instanceof String) {
firstString = (String) value;
} else {
throw new ISE(
"Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString",
value.getClass().getName()
); );
} }
} }
long lastTime = mutationBuffer.getLong(position);
if (time < lastTime) {
if (firstString != null) {
if (firstString.length() > maxStringBytes) {
firstString = firstString.substring(0, maxStringBytes);
}
byte[] valueBytes = StringUtils.toUtf8(firstString);
mutationBuffer.putLong(position, time);
mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
mutationBuffer.put(valueBytes);
} else {
mutationBuffer.putLong(position, time);
mutationBuffer.putInt(position + Long.BYTES, 0);
}
}
} }
@Override @Override
public Object get(ByteBuffer buf, int position) public Object get(ByteBuffer buf, int position)
{ {
ByteBuffer mutationBuffer = buf.duplicate(); return StringFirstLastUtils.readPair(buf, position);
mutationBuffer.position(position);
Long timeValue = mutationBuffer.getLong(position);
int stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES);
SerializablePairLongString serializablePair;
if (stringSizeBytes > 0) {
byte[] valueBytes = new byte[stringSizeBytes];
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
mutationBuffer.get(valueBytes, 0, stringSizeBytes);
serializablePair = new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes));
} else {
serializablePair = new SerializablePairLongString(timeValue, null);
}
return serializablePair;
} }
@Override @Override

View File

@ -19,85 +19,16 @@
package org.apache.druid.query.aggregation.first; package org.apache.druid.query.aggregation.first;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer; /**
* For backwards compatibility; equivalent to a regular StringFirstAggregatorFactory.
@JsonTypeName("stringFirstFold") */
public class StringFirstFoldingAggregatorFactory extends StringFirstAggregatorFactory public class StringFirstFoldingAggregatorFactory extends StringFirstAggregatorFactory
{ {
public StringFirstFoldingAggregatorFactory( @JsonCreator
@JsonProperty("name") String name, public StringFirstFoldingAggregatorFactory(String name, String fieldName, Integer maxStringBytes)
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("maxStringBytes") Integer maxStringBytes
)
{ {
super(name, fieldName, maxStringBytes); super(name, fieldName, maxStringBytes);
} }
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector)
{
return new StringFirstAggregator(null, null, maxStringBytes)
{
@Override
public void aggregate()
{
SerializablePairLongString pair = (SerializablePairLongString) selector.getObject();
if (pair != null && pair.lhs < firstTime) {
firstTime = pair.lhs;
firstValue = pair.rhs;
}
}
};
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector)
{
return new StringFirstBufferAggregator(null, null, maxStringBytes)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePairLongString pair = (SerializablePairLongString) selector.getObject();
if (pair != null && pair.lhs != null) {
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
long lastTime = mutationBuffer.getLong(position);
if (pair.lhs < lastTime) {
mutationBuffer.putLong(position, pair.lhs);
if (pair.rhs != null) {
byte[] valueBytes = StringUtils.toUtf8(pair.rhs);
mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
mutationBuffer.put(valueBytes);
} else {
mutationBuffer.putInt(position + Long.BYTES, 0);
}
}
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
}
} }

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.first;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class StringFirstLastUtils
{
private static final int NULL_VALUE = -1;
@Nullable
public static String chop(@Nullable final String s, final int maxBytes)
{
if (s == null) {
return null;
} else {
// Shorten firstValue to what could fit in maxBytes as UTF-8.
final byte[] bytes = new byte[maxBytes];
final int len = StringUtils.toUtf8WithLimit(s, ByteBuffer.wrap(bytes));
return new String(bytes, 0, len, StandardCharsets.UTF_8);
}
}
@Nullable
public static SerializablePairLongString readPairFromSelectors(
final BaseLongColumnValueSelector timeSelector,
final BaseObjectColumnValueSelector valueSelector
)
{
final long time;
final String string;
// Need to read this first (before time), just in case it's a SerializablePairLongString (we don't know; it's
// detected at query time).
final Object object = valueSelector.getObject();
if (object instanceof SerializablePairLongString) {
final SerializablePairLongString pair = (SerializablePairLongString) object;
time = pair.lhs;
string = pair.rhs;
} else if (object != null) {
time = timeSelector.getLong();
string = DimensionHandlerUtils.convertObjectToString(object);
} else {
// Don't aggregate nulls.
return null;
}
return new SerializablePairLongString(time, string);
}
public static void writePair(
final ByteBuffer buf,
final int position,
final SerializablePairLongString pair,
final int maxStringBytes
)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
mutationBuffer.putLong(pair.lhs);
if (pair.rhs != null) {
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
mutationBuffer.limit(maxStringBytes);
final int len = StringUtils.toUtf8WithLimit(pair.rhs, mutationBuffer);
mutationBuffer.putInt(position + Long.BYTES, len);
} else {
mutationBuffer.putInt(NULL_VALUE);
}
}
public static SerializablePairLongString readPair(final ByteBuffer buf, final int position)
{
ByteBuffer copyBuffer = buf.duplicate();
copyBuffer.position(position);
Long timeValue = copyBuffer.getLong();
int stringSizeBytes = copyBuffer.getInt();
if (stringSizeBytes >= 0) {
byte[] valueBytes = new byte[stringSizeBytes];
copyBuffer.get(valueBytes, 0, stringSizeBytes);
return new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes));
} else {
return new SerializablePairLongString(timeValue, null);
}
}
}

View File

@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableAggregatorFactory; import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
@ -47,7 +47,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
public class DoubleLastAggregatorFactory extends NullableAggregatorFactory<ColumnValueSelector> public class DoubleLastAggregatorFactory extends NullableNumericAggregatorFactory<ColumnValueSelector>
{ {
private final String fieldName; private final String fieldName;

View File

@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableAggregatorFactory; import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
@ -47,7 +47,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
public class FloatLastAggregatorFactory extends NullableAggregatorFactory<ColumnValueSelector> public class FloatLastAggregatorFactory extends NullableNumericAggregatorFactory<ColumnValueSelector>
{ {
private final String fieldName; private final String fieldName;

View File

@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableAggregatorFactory; import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
@ -46,7 +46,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
public class LongLastAggregatorFactory extends NullableAggregatorFactory<ColumnValueSelector> public class LongLastAggregatorFactory extends NullableNumericAggregatorFactory<ColumnValueSelector>
{ {
private final String fieldName; private final String fieldName;
private final String name; private final String name;

View File

@ -19,17 +19,17 @@
package org.apache.druid.query.aggregation.last; package org.apache.druid.query.aggregation.last;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.first.StringFirstLastUtils;
import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector;
public class StringLastAggregator implements Aggregator public class StringLastAggregator implements Aggregator
{ {
private final BaseObjectColumnValueSelector valueSelector;
private final BaseLongColumnValueSelector timeSelector; private final BaseLongColumnValueSelector timeSelector;
private final BaseObjectColumnValueSelector valueSelector;
private final int maxStringBytes; private final int maxStringBytes;
protected long lastTime; protected long lastTime;
@ -45,35 +45,29 @@ public class StringLastAggregator implements Aggregator
this.timeSelector = timeSelector; this.timeSelector = timeSelector;
this.maxStringBytes = maxStringBytes; this.maxStringBytes = maxStringBytes;
lastTime = Long.MIN_VALUE; lastTime = DateTimes.MIN.getMillis();
lastValue = null; lastValue = null;
} }
@Override @Override
public void aggregate() public void aggregate()
{ {
long time = timeSelector.getLong(); final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors(
if (time >= lastTime) { timeSelector,
lastTime = time; valueSelector
Object value = valueSelector.getObject(); );
if (value != null) { if (inPair == null) {
if (value instanceof String) { // Don't aggregate nulls.
lastValue = (String) value; return;
} else if (value instanceof SerializablePairLongString) { }
lastValue = ((SerializablePairLongString) value).rhs;
} else {
throw new ISE(
"Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString",
value.getClass().getName()
);
}
if (lastValue != null && lastValue.length() > maxStringBytes) { if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) {
lastValue = lastValue.substring(0, maxStringBytes); lastTime = inPair.lhs;
} lastValue = inPair.rhs;
} else {
lastValue = null; if (lastValue.length() > maxStringBytes) {
lastValue = lastValue.substring(0, maxStringBytes);
} }
} }
} }
@ -81,25 +75,25 @@ public class StringLastAggregator implements Aggregator
@Override @Override
public Object get() public Object get()
{ {
return new SerializablePairLongString(lastTime, lastValue); return new SerializablePairLongString(lastTime, StringFirstLastUtils.chop(lastValue, maxStringBytes));
} }
@Override @Override
public float getFloat() public float getFloat()
{ {
throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()"); throw new UnsupportedOperationException("StringLastAggregator does not support getFloat()");
} }
@Override @Override
public long getLong() public long getLong()
{ {
throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()"); throw new UnsupportedOperationException("StringLastAggregator does not support getLong()");
} }
@Override @Override
public double getDouble() public double getDouble()
{ {
throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()"); throw new UnsupportedOperationException("StringLastAggregator does not support getDouble()");
} }
@Override @Override

View File

@ -23,21 +23,19 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableAggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
@ -45,7 +43,7 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
@JsonTypeName("stringLast") @JsonTypeName("stringLast")
public class StringLastAggregatorFactory extends NullableAggregatorFactory<BaseObjectColumnValueSelector> public class StringLastAggregatorFactory extends AggregatorFactory
{ {
private final String fieldName; private final String fieldName;
private final String name; private final String name;
@ -68,27 +66,21 @@ public class StringLastAggregatorFactory extends NullableAggregatorFactory<BaseO
} }
@Override @Override
protected BaseObjectColumnValueSelector selector(ColumnSelectorFactory metricFactory) public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return metricFactory.makeColumnValueSelector(fieldName);
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector)
{ {
return new StringLastAggregator( return new StringLastAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector, metricFactory.makeColumnValueSelector(fieldName),
maxStringBytes maxStringBytes
); );
} }
@Override @Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{ {
return new StringLastBufferAggregator( return new StringLastBufferAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector, metricFactory.makeColumnValueSelector(fieldName),
maxStringBytes maxStringBytes
); );
} }
@ -114,7 +106,7 @@ public class StringLastAggregatorFactory extends NullableAggregatorFactory<BaseO
@Override @Override
public AggregatorFactory getCombiningFactory() public AggregatorFactory getCombiningFactory()
{ {
return new StringLastFoldingAggregatorFactory(name, name, maxStringBytes); return new StringLastAggregatorFactory(name, name, maxStringBytes);
} }
@Override @Override
@ -159,7 +151,7 @@ public class StringLastAggregatorFactory extends NullableAggregatorFactory<BaseO
@Override @Override
public List<String> requiredFields() public List<String> requiredFields()
{ {
return Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, fieldName); return ImmutableList.of(ColumnHolder.TIME_COLUMN_NAME, fieldName);
} }
@Override @Override
@ -192,25 +184,25 @@ public class StringLastAggregatorFactory extends NullableAggregatorFactory<BaseO
if (o == null || getClass() != o.getClass()) { if (o == null || getClass() != o.getClass()) {
return false; return false;
} }
StringLastAggregatorFactory that = (StringLastAggregatorFactory) o; StringLastAggregatorFactory that = (StringLastAggregatorFactory) o;
return maxStringBytes == that.maxStringBytes &&
return fieldName.equals(that.fieldName) && name.equals(that.name) && maxStringBytes == that.maxStringBytes; Objects.equals(fieldName, that.fieldName) &&
Objects.equals(name, that.name);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(name, fieldName, maxStringBytes); return Objects.hash(fieldName, name, maxStringBytes);
} }
@Override @Override
public String toString() public String toString()
{ {
return "StringFirstAggregatorFactory{" + return "StringLastAggregatorFactory{" +
"name='" + name + '\'' + "fieldName='" + fieldName + '\'' +
", fieldName='" + fieldName + '\'' + ", name='" + name + '\'' +
", maxStringBytes=" + maxStringBytes + '\'' + ", maxStringBytes=" + maxStringBytes +
'}'; '}';
} }
} }

View File

@ -19,10 +19,10 @@
package org.apache.druid.query.aggregation.last; package org.apache.druid.query.aggregation.last;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.first.StringFirstLastUtils;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector;
@ -31,6 +31,11 @@ import java.nio.ByteBuffer;
public class StringLastBufferAggregator implements BufferAggregator public class StringLastBufferAggregator implements BufferAggregator
{ {
private static final SerializablePairLongString INIT = new SerializablePairLongString(
DateTimes.MIN.getMillis(),
null
);
private final BaseLongColumnValueSelector timeSelector; private final BaseLongColumnValueSelector timeSelector;
private final BaseObjectColumnValueSelector valueSelector; private final BaseObjectColumnValueSelector valueSelector;
private final int maxStringBytes; private final int maxStringBytes;
@ -49,79 +54,34 @@ public class StringLastBufferAggregator implements BufferAggregator
@Override @Override
public void init(ByteBuffer buf, int position) public void init(ByteBuffer buf, int position)
{ {
buf.putLong(position, Long.MIN_VALUE); StringFirstLastUtils.writePair(buf, position, INIT, maxStringBytes);
buf.putInt(position + Long.BYTES, 0);
} }
@Override @Override
public void aggregate(ByteBuffer buf, int position) public void aggregate(ByteBuffer buf, int position)
{ {
ByteBuffer mutationBuffer = buf.duplicate(); final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors(
mutationBuffer.position(position); timeSelector,
valueSelector
);
Object value = valueSelector.getObject(); if (inPair != null && inPair.rhs != null) {
final long lastTime = buf.getLong(position);
long time = timeSelector.getLong(); if (inPair.lhs >= lastTime) {
String lastString = null; StringFirstLastUtils.writePair(
buf,
if (value != null) { position,
if (value instanceof SerializablePairLongString) { new SerializablePairLongString(inPair.lhs, inPair.rhs),
SerializablePairLongString serializablePair = (SerializablePairLongString) value; maxStringBytes
time = serializablePair.lhs;
lastString = serializablePair.rhs;
} else if (value instanceof String) {
lastString = (String) value;
} else {
throw new ISE(
"Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString",
value.getClass().getName()
); );
} }
} }
long lastTime = mutationBuffer.getLong(position);
if (time >= lastTime) {
if (lastString != null) {
if (lastString.length() > maxStringBytes) {
lastString = lastString.substring(0, maxStringBytes);
}
byte[] valueBytes = StringUtils.toUtf8(lastString);
mutationBuffer.putLong(position, time);
mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
mutationBuffer.put(valueBytes);
} else {
mutationBuffer.putLong(position, time);
mutationBuffer.putInt(position + Long.BYTES, 0);
}
}
} }
@Override @Override
public Object get(ByteBuffer buf, int position) public Object get(ByteBuffer buf, int position)
{ {
ByteBuffer mutationBuffer = buf.duplicate(); return StringFirstLastUtils.readPair(buf, position);
mutationBuffer.position(position);
Long timeValue = mutationBuffer.getLong(position);
int stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES);
SerializablePairLongString serializablePair;
if (stringSizeBytes > 0) {
byte[] valueBytes = new byte[stringSizeBytes];
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
mutationBuffer.get(valueBytes, 0, stringSizeBytes);
serializablePair = new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes));
} else {
serializablePair = new SerializablePairLongString(timeValue, null);
}
return serializablePair;
} }
@Override @Override

View File

@ -19,82 +19,16 @@
package org.apache.druid.query.aggregation.last; package org.apache.druid.query.aggregation.last;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer; /**
* For backwards compatibility; equivalent to a regular StringLastAggregatorFactory.
@JsonTypeName("stringLastFold") */
public class StringLastFoldingAggregatorFactory extends StringLastAggregatorFactory public class StringLastFoldingAggregatorFactory extends StringLastAggregatorFactory
{ {
public StringLastFoldingAggregatorFactory( @JsonCreator
@JsonProperty("name") String name, public StringLastFoldingAggregatorFactory(String name, String fieldName, Integer maxStringBytes)
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("maxStringBytes") Integer maxStringBytes
)
{ {
super(name, fieldName, maxStringBytes); super(name, fieldName, maxStringBytes);
} }
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector)
{
return new StringLastAggregator(null, null, maxStringBytes)
{
@Override
public void aggregate()
{
SerializablePairLongString pair = (SerializablePairLongString) selector.getObject();
if (pair != null && pair.lhs >= lastTime) {
lastTime = pair.lhs;
lastValue = pair.rhs;
}
}
};
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector)
{
return new StringLastBufferAggregator(null, null, maxStringBytes)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePairLongString pair = (SerializablePairLongString) selector.getObject();
if (pair != null && pair.lhs != null) {
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
long lastTime = mutationBuffer.getLong(position);
if (pair.lhs >= lastTime) {
mutationBuffer.putLong(position, pair.lhs);
if (pair.rhs != null) {
byte[] valueBytes = StringUtils.toUtf8(pair.rhs);
mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
mutationBuffer.put(valueBytes);
} else {
mutationBuffer.putInt(position + Long.BYTES, 0);
}
}
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
}
} }

View File

@ -32,7 +32,7 @@ public class DoubleValueMatcherColumnSelectorStrategy
{ {
final Double matchVal = DimensionHandlerUtils.convertObjectToDouble(value); final Double matchVal = DimensionHandlerUtils.convertObjectToDouble(value);
if (matchVal == null) { if (matchVal == null) {
return ValueMatcher.nullValueMatcher(selector); return ValueMatcher.primitiveNullValueMatcher(selector);
} }
final long matchValLongBits = Double.doubleToLongBits(matchVal); final long matchValLongBits = Double.doubleToLongBits(matchVal);

View File

@ -31,7 +31,7 @@ public class FloatValueMatcherColumnSelectorStrategy
{ {
final Float matchVal = DimensionHandlerUtils.convertObjectToFloat(value); final Float matchVal = DimensionHandlerUtils.convertObjectToFloat(value);
if (matchVal == null) { if (matchVal == null) {
return ValueMatcher.nullValueMatcher(selector); return ValueMatcher.primitiveNullValueMatcher(selector);
} }
final int matchValIntBits = Float.floatToIntBits(matchVal); final int matchValIntBits = Float.floatToIntBits(matchVal);

View File

@ -31,7 +31,7 @@ public class LongValueMatcherColumnSelectorStrategy
{ {
final Long matchVal = DimensionHandlerUtils.convertObjectToLong(value); final Long matchVal = DimensionHandlerUtils.convertObjectToLong(value);
if (matchVal == null) { if (matchVal == null) {
return ValueMatcher.nullValueMatcher(selector); return ValueMatcher.primitiveNullValueMatcher(selector);
} }
final long matchValLong = matchVal; final long matchValLong = matchVal;
return new ValueMatcher() return new ValueMatcher()

View File

@ -37,7 +37,12 @@ public interface ValueMatcher extends HotLoopCallee
boolean matches(); boolean matches();
// Utility method to match null values. // Utility method to match null values.
static ValueMatcher nullValueMatcher(BaseNullableColumnValueSelector selector)
/**
* Returns a ValueMatcher that matches when the primitive long, double, or float value from {@code selector}
* should be treated as null.
*/
static ValueMatcher primitiveNullValueMatcher(BaseNullableColumnValueSelector selector)
{ {
return new ValueMatcher() return new ValueMatcher()
{ {

View File

@ -48,7 +48,7 @@ import org.apache.druid.query.groupby.epinephelinae.column.FloatGroupByColumnSel
import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorPlus; import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorPlus;
import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.LongGroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.column.LongGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.NullableValueGroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.column.NullableNumericGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.StringGroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.column.StringGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine; import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
@ -350,20 +350,20 @@ public class GroupByQueryEngineV2
return new DictionaryBuildingStringGroupByColumnSelectorStrategy(); return new DictionaryBuildingStringGroupByColumnSelectorStrategy();
} }
case LONG: case LONG:
return makeNullableStrategy(new LongGroupByColumnSelectorStrategy()); return makeNullableNumericStrategy(new LongGroupByColumnSelectorStrategy());
case FLOAT: case FLOAT:
return makeNullableStrategy(new FloatGroupByColumnSelectorStrategy()); return makeNullableNumericStrategy(new FloatGroupByColumnSelectorStrategy());
case DOUBLE: case DOUBLE:
return makeNullableStrategy(new DoubleGroupByColumnSelectorStrategy()); return makeNullableNumericStrategy(new DoubleGroupByColumnSelectorStrategy());
default: default:
throw new IAE("Cannot create query type helper from invalid type [%s]", type); throw new IAE("Cannot create query type helper from invalid type [%s]", type);
} }
} }
private GroupByColumnSelectorStrategy makeNullableStrategy(GroupByColumnSelectorStrategy delegate) private GroupByColumnSelectorStrategy makeNullableNumericStrategy(GroupByColumnSelectorStrategy delegate)
{ {
if (NullHandling.sqlCompatible()) { if (NullHandling.sqlCompatible()) {
return new NullableValueGroupByColumnSelectorStrategy(delegate); return new NullableNumericGroupByColumnSelectorStrategy(delegate);
} else { } else {
return delegate; return delegate;
} }

View File

@ -29,11 +29,17 @@ import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
public class NullableValueGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy /**
* A wrapper around a numeric {@link GroupByColumnSelectorStrategy} that makes it null-aware. Should only be used
* for numeric strategies, not for string strategies.
*
* @see org.apache.druid.segment.BaseNullableColumnValueSelector#isNull() for why this only works in the numeric case
*/
public class NullableNumericGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
{ {
private final GroupByColumnSelectorStrategy delegate; private final GroupByColumnSelectorStrategy delegate;
public NullableValueGroupByColumnSelectorStrategy(GroupByColumnSelectorStrategy delegate) public NullableNumericGroupByColumnSelectorStrategy(GroupByColumnSelectorStrategy delegate)
{ {
this.delegate = delegate; this.delegate = delegate;
} }

View File

@ -24,16 +24,19 @@ import org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop;
/** /**
* Null value checking polymorphic "part" of the {@link ColumnValueSelector} interface for primitive values. * Null value checking polymorphic "part" of the {@link ColumnValueSelector} interface for primitive values.
* Users of {@link BaseLongColumnValueSelector#getLong()}, {@link BaseDoubleColumnValueSelector#getDouble()}
* and {@link BaseFloatColumnValueSelector#getFloat()} are required to check the nullability of the primitive
* types returned.
*/ */
@PublicApi @PublicApi
public interface BaseNullableColumnValueSelector public interface BaseNullableColumnValueSelector
{ {
/** /**
* Returns true if selected primitive value is null for {@link BaseFloatColumnValueSelector}, * Returns true if the primitive long, double, or float value returned by this selector should be treated as null.
* {@link BaseLongColumnValueSelector} and {@link BaseDoubleColumnValueSelector} otherwise false. *
* Users of {@link BaseLongColumnValueSelector#getLong()}, {@link BaseDoubleColumnValueSelector#getDouble()}
* and {@link BaseFloatColumnValueSelector#getFloat()} must check this method first, or else they may improperly
* use placeholder values returned by the primitive get methods.
*
* Users of {@link BaseObjectColumnValueSelector#getObject()} should not call this method. Instead, call "getObject"
* and check if it is null.
*/ */
@CalledFromHotLoop @CalledFromHotLoop
boolean isNull(); boolean isNull();

View File

@ -31,7 +31,7 @@ import javax.annotation.Nullable;
* All implementations of this interface MUST also implement {@link ColumnValueSelector}. * All implementations of this interface MUST also implement {@link ColumnValueSelector}.
*/ */
@ExtensionPoint @ExtensionPoint
public interface BaseObjectColumnValueSelector<T> extends BaseNullableColumnValueSelector public interface BaseObjectColumnValueSelector<T>
{ {
@Nullable @Nullable
T getObject(); T getObject();

View File

@ -63,25 +63,35 @@ public class ExpressionFilter implements Filter
@Override @Override
public boolean matches() public boolean matches()
{ {
if (NullHandling.sqlCompatible() && selector.isNull()) { final ExprEval eval = selector.getObject();
return false;
}
ExprEval eval = selector.getObject();
if (eval == null) {
return false;
}
switch (eval.type()) { switch (eval.type()) {
case LONG_ARRAY: case LONG_ARRAY:
Long[] lResult = eval.asLongArray(); final Long[] lResult = eval.asLongArray();
if (lResult == null) {
return false;
}
return Arrays.stream(lResult).anyMatch(Evals::asBoolean); return Arrays.stream(lResult).anyMatch(Evals::asBoolean);
case STRING_ARRAY: case STRING_ARRAY:
String[] sResult = eval.asStringArray(); final String[] sResult = eval.asStringArray();
if (sResult == null) {
return false;
}
return Arrays.stream(sResult).anyMatch(Evals::asBoolean); return Arrays.stream(sResult).anyMatch(Evals::asBoolean);
case DOUBLE_ARRAY: case DOUBLE_ARRAY:
Double[] dResult = eval.asDoubleArray(); final Double[] dResult = eval.asDoubleArray();
if (dResult == null) {
return false;
}
return Arrays.stream(dResult).anyMatch(Evals::asBoolean); return Arrays.stream(dResult).anyMatch(Evals::asBoolean);
default: default:
return Evals.asBoolean(selector.getLong()); return eval.asBoolean();
} }
} }

View File

@ -138,12 +138,19 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
public ColumnValueSelector<?> makeColumnValueSelector(final String column) public ColumnValueSelector<?> makeColumnValueSelector(final String column)
{ {
final String typeName = agg.getTypeName(); final String typeName = agg.getTypeName();
boolean isComplexMetric = final boolean isComplexMetric =
GuavaUtils.getEnumIfPresent(ValueType.class, StringUtils.toUpperCase(typeName)) == null || GuavaUtils.getEnumIfPresent(ValueType.class, StringUtils.toUpperCase(typeName)) == null ||
typeName.equalsIgnoreCase(ValueType.COMPLEX.name()); typeName.equalsIgnoreCase(ValueType.COMPLEX.name());
final ColumnValueSelector selector = baseSelectorFactory.makeColumnValueSelector(column);
if (!isComplexMetric || !deserializeComplexMetrics) { if (!isComplexMetric || !deserializeComplexMetrics) {
return baseSelectorFactory.makeColumnValueSelector(column); return selector;
} else { } else {
// Wrap selector in a special one that uses ComplexMetricSerde to modify incoming objects.
// For complex aggregators that read from multiple columns, we wrap all of them. This is not ideal but it
// has worked so far.
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) { if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName); throw new ISE("Don't know how to handle type[%s]", typeName);
@ -155,31 +162,25 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
@Override @Override
public boolean isNull() public boolean isNull()
{ {
return in.get().getMetric(column) == null; return selector.isNull();
} }
@Override @Override
public long getLong() public long getLong()
{ {
Number metric = in.get().getMetric(column); return selector.getLong();
assert NullHandling.replaceWithDefault() || metric != null;
return DimensionHandlerUtils.nullToZero(metric).longValue();
} }
@Override @Override
public float getFloat() public float getFloat()
{ {
Number metric = in.get().getMetric(column); return selector.getFloat();
assert NullHandling.replaceWithDefault() || metric != null;
return DimensionHandlerUtils.nullToZero(metric).floatValue();
} }
@Override @Override
public double getDouble() public double getDouble()
{ {
Number metric = in.get().getMetric(column); return selector.getDouble();
assert NullHandling.replaceWithDefault() || metric != null;
return DimensionHandlerUtils.nullToZero(metric).doubleValue();
} }
@Override @Override
@ -192,6 +193,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
@Override @Override
public Object getObject() public Object getObject()
{ {
// Here is where the magic happens: read from "in" directly, don't go through the normal "selector".
return extractor.extractValue(in.get(), column, agg); return extractor.extractValue(in.get(), column, agg);
} }
@ -199,6 +201,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
public void inspectRuntimeShape(RuntimeShapeInspector inspector) public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{ {
inspector.visit("in", in); inspector.visit("in", in);
inspector.visit("selector", selector);
inspector.visit("extractor", extractor); inspector.visit("extractor", extractor);
} }
}; };
@ -997,7 +1000,10 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
return iterableWithPostAggregations(null, false).iterator(); return iterableWithPostAggregations(null, false).iterator();
} }
public Iterable<Row> iterableWithPostAggregations(@Nullable final List<PostAggregator> postAggs, final boolean descending) public Iterable<Row> iterableWithPostAggregations(
@Nullable final List<PostAggregator> postAggs,
final boolean descending
)
{ {
return () -> { return () -> {
final List<DimensionDesc> dimensions = getDimensions(); final List<DimensionDesc> dimensions = getDimensions();
@ -1237,6 +1243,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
/** /**
* Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator<IncrementalIndexRow>} * Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator<IncrementalIndexRow>}
*
* @return * @return
*/ */
Iterable<IncrementalIndexRow> persistIterable(); Iterable<IncrementalIndexRow> persistIterable();

View File

@ -110,8 +110,8 @@ public class ExpressionSelectors
ExprEval eval = baseSelector.getObject(); ExprEval eval = baseSelector.getObject();
if (eval.isArray()) { if (eval.isArray()) {
return Arrays.stream(eval.asStringArray()) return Arrays.stream(eval.asStringArray())
.map(NullHandling::emptyToNullIfNeeded) .map(NullHandling::emptyToNullIfNeeded)
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
return eval.value(); return eval.value();
} }
@ -374,15 +374,15 @@ public class ExpressionSelectors
if (nativeType == ValueType.FLOAT) { if (nativeType == ValueType.FLOAT) {
ColumnValueSelector selector = columnSelectorFactory ColumnValueSelector selector = columnSelectorFactory
.makeColumnValueSelector(columnName); .makeColumnValueSelector(columnName);
supplier = makeNullableSupplier(selector, selector::getFloat); supplier = makeNullableNumericSupplier(selector, selector::getFloat);
} else if (nativeType == ValueType.LONG) { } else if (nativeType == ValueType.LONG) {
ColumnValueSelector selector = columnSelectorFactory ColumnValueSelector selector = columnSelectorFactory
.makeColumnValueSelector(columnName); .makeColumnValueSelector(columnName);
supplier = makeNullableSupplier(selector, selector::getLong); supplier = makeNullableNumericSupplier(selector, selector::getLong);
} else if (nativeType == ValueType.DOUBLE) { } else if (nativeType == ValueType.DOUBLE) {
ColumnValueSelector selector = columnSelectorFactory ColumnValueSelector selector = columnSelectorFactory
.makeColumnValueSelector(columnName); .makeColumnValueSelector(columnName);
supplier = makeNullableSupplier(selector, selector::getDouble); supplier = makeNullableNumericSupplier(selector, selector::getDouble);
} else if (nativeType == ValueType.STRING) { } else if (nativeType == ValueType.STRING) {
supplier = supplierFromDimensionSelector( supplier = supplierFromDimensionSelector(
columnSelectorFactory.makeDimensionSelector(new DefaultDimensionSpec(columnName, columnName)), columnSelectorFactory.makeDimensionSelector(new DefaultDimensionSpec(columnName, columnName)),
@ -419,7 +419,12 @@ public class ExpressionSelectors
} }
} }
private static <T> Supplier<T> makeNullableSupplier( /**
* Wraps a {@link ColumnValueSelector} and uses it to supply numeric values in a null-aware way.
*
* @see org.apache.druid.segment.BaseNullableColumnValueSelector#isNull() for why this only works in the numeric case
*/
private static <T> Supplier<T> makeNullableNumericSupplier(
ColumnValueSelector selector, ColumnValueSelector selector,
Supplier<T> supplier Supplier<T> supplier
) )

View File

@ -39,7 +39,7 @@ import java.nio.ByteBuffer;
public class StringFirstAggregationTest public class StringFirstAggregationTest
{ {
private final Integer MAX_STRING_SIZE = 1024; private final Integer MAX_STRING_SIZE = 1024;
private AggregatorFactory stringLastAggFactory; private AggregatorFactory stringFirstAggFactory;
private AggregatorFactory combiningAggFactory; private AggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory; private ColumnSelectorFactory colSelectorFactory;
private TestLongColumnSelector timeSelector; private TestLongColumnSelector timeSelector;
@ -59,8 +59,8 @@ public class StringFirstAggregationTest
@Before @Before
public void setup() public void setup()
{ {
stringLastAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE); stringFirstAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE);
combiningAggFactory = stringLastAggFactory.getCombiningFactory(); combiningAggFactory = stringFirstAggFactory.getCombiningFactory();
timeSelector = new TestLongColumnSelector(times); timeSelector = new TestLongColumnSelector(times);
valueSelector = new TestObjectColumnSelector<>(strings); valueSelector = new TestObjectColumnSelector<>(strings);
objectSelector = new TestObjectColumnSelector<>(pairs); objectSelector = new TestObjectColumnSelector<>(pairs);
@ -72,9 +72,9 @@ public class StringFirstAggregationTest
} }
@Test @Test
public void testStringLastAggregator() public void testStringFirstAggregator()
{ {
Aggregator agg = stringLastAggFactory.factorize(colSelectorFactory); Aggregator agg = stringFirstAggFactory.factorize(colSelectorFactory);
aggregate(agg); aggregate(agg);
aggregate(agg); aggregate(agg);
@ -87,12 +87,12 @@ public class StringFirstAggregationTest
} }
@Test @Test
public void testStringLastBufferAggregator() public void testStringFirstBufferAggregator()
{ {
BufferAggregator agg = stringLastAggFactory.factorizeBuffered( BufferAggregator agg = stringFirstAggFactory.factorizeBuffered(
colSelectorFactory); colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]); ByteBuffer buffer = ByteBuffer.wrap(new byte[stringFirstAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0); agg.init(buffer, 0);
aggregate(agg, buffer, 0); aggregate(agg, buffer, 0);
@ -110,11 +110,11 @@ public class StringFirstAggregationTest
{ {
SerializablePairLongString pair1 = new SerializablePairLongString(1467225000L, "AAAA"); SerializablePairLongString pair1 = new SerializablePairLongString(1467225000L, "AAAA");
SerializablePairLongString pair2 = new SerializablePairLongString(1467240000L, "BBBB"); SerializablePairLongString pair2 = new SerializablePairLongString(1467240000L, "BBBB");
Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2)); Assert.assertEquals(pair2, stringFirstAggFactory.combine(pair1, pair2));
} }
@Test @Test
public void testStringLastCombiningAggregator() public void testStringFirstCombiningAggregator()
{ {
Aggregator agg = combiningAggFactory.factorize(colSelectorFactory); Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
@ -136,7 +136,7 @@ public class StringFirstAggregationTest
BufferAggregator agg = combiningAggFactory.factorizeBuffered( BufferAggregator agg = combiningAggFactory.factorizeBuffered(
colSelectorFactory); colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]); ByteBuffer buffer = ByteBuffer.wrap(new byte[stringFirstAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0); agg.init(buffer, 0);
aggregate(agg, buffer, 0); aggregate(agg, buffer, 0);

View File

@ -27,7 +27,6 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class StringFirstBufferAggregatorTest public class StringFirstBufferAggregatorTest
{ {
@ -65,13 +64,7 @@ public class StringFirstBufferAggregatorTest
maxStringBytes maxStringBytes
); );
String testString = "ZZZZ";
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
buf.putLong(1526728500L);
buf.putInt(testString.length());
buf.put(testString.getBytes(StandardCharsets.UTF_8));
int position = 0; int position = 0;
agg.init(buf, position); agg.init(buf, position);
@ -83,7 +76,7 @@ public class StringFirstBufferAggregatorTest
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
Assert.assertEquals("expectec last string value", strings[0], sp.rhs); Assert.assertEquals("expected last string value", strings[0], sp.rhs);
Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs)); Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs));
} }
@ -109,13 +102,7 @@ public class StringFirstBufferAggregatorTest
maxStringBytes maxStringBytes
); );
String testString = "ZZZZ";
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
buf.putLong(1526728500L);
buf.putInt(testString.length());
buf.put(testString.getBytes(StandardCharsets.UTF_8));
int position = 0; int position = 0;
agg.init(buf, position); agg.init(buf, position);
@ -127,12 +114,12 @@ public class StringFirstBufferAggregatorTest
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
Assert.assertEquals("expectec last string value", strings[1], sp.rhs); Assert.assertEquals("expected last string value", strings[1], sp.rhs);
Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[1]), new Long(sp.lhs)); Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[1]), new Long(sp.lhs));
} }
@Test(expected = IllegalStateException.class) @Test
public void testNoStringValue() public void testNoStringValue()
{ {
@ -153,13 +140,7 @@ public class StringFirstBufferAggregatorTest
maxStringBytes maxStringBytes
); );
String testString = "ZZZZ";
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
buf.putLong(1526728500L);
buf.putInt(testString.length());
buf.put(testString.getBytes(StandardCharsets.UTF_8));
int position = 0; int position = 0;
agg.init(buf, position); agg.init(buf, position);
@ -167,5 +148,10 @@ public class StringFirstBufferAggregatorTest
for (int i = 0; i < timestamps.length; i++) { for (int i = 0; i < timestamps.length; i++) {
aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
} }
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
Assert.assertEquals(1526724600L, (long) sp.lhs);
Assert.assertEquals("2.0", sp.rhs);
} }
} }

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.aggregation.first; package org.apache.druid.query.aggregation.first;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.MapBasedInputRow;
@ -29,14 +30,21 @@ import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result; import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Collections; import java.util.Collections;
@ -44,80 +52,99 @@ import java.util.List;
public class StringFirstTimeseriesQueryTest public class StringFirstTimeseriesQueryTest
{ {
private static final String VISITOR_ID = "visitor_id";
private static final String CLIENT_TYPE = "client_type";
private static final String FIRST_CLIENT_TYPE = "first_client_type";
@Test private static final DateTime TIME1 = DateTimes.of("2016-03-04T00:00:00.000Z");
public void testTopNWithDistinctCountAgg() throws Exception private static final DateTime TIME2 = DateTimes.of("2016-03-04T01:00:00.000Z");
private IncrementalIndex incrementalIndex;
private QueryableIndex queryableIndex;
@Before
public void setUp() throws IndexSizeExceededException
{ {
TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde();
ComplexMetrics.registerSerde(serde.getTypeName(), serde);
String visitor_id = "visitor_id"; incrementalIndex = new IncrementalIndex.Builder()
String client_type = "client_type";
IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema( .setIndexSchema(
new IncrementalIndexSchema.Builder() new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.SECOND) .withQueryGranularity(Granularities.SECOND)
.withMetrics(new CountAggregatorFactory("cnt")) .withMetrics(new CountAggregatorFactory("cnt"))
.withMetrics(new StringFirstAggregatorFactory( .withMetrics(new StringFirstAggregatorFactory(FIRST_CLIENT_TYPE, CLIENT_TYPE, 1024))
"last_client_type", "client_type", 1024)
)
.build() .build()
) )
.setMaxRowCount(1000) .setMaxRowCount(1000)
.buildOnheap(); .buildOnheap();
incrementalIndex.add(
new MapBasedInputRow(
TIME1,
Lists.newArrayList(VISITOR_ID, CLIENT_TYPE),
ImmutableMap.of(VISITOR_ID, "0", CLIENT_TYPE, "iphone")
)
);
incrementalIndex.add(
new MapBasedInputRow(
TIME1,
Lists.newArrayList(VISITOR_ID, CLIENT_TYPE),
ImmutableMap.of(VISITOR_ID, "1", CLIENT_TYPE, "iphone")
)
);
incrementalIndex.add(
new MapBasedInputRow(
TIME2,
Lists.newArrayList(VISITOR_ID, CLIENT_TYPE),
ImmutableMap.of(VISITOR_ID, "0", CLIENT_TYPE, "android")
)
);
DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z"); queryableIndex = TestIndex.persistRealtimeAndLoadMMapped(incrementalIndex);
long timestamp = time.getMillis(); }
@Test
public void testTimeseriesQuery()
{
TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z");
long timestamp1 = time1.getMillis();
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.of(visitor_id, "0", client_type, "iphone")
)
);
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.of(visitor_id, "1", client_type, "iphone")
)
);
index.add(
new MapBasedInputRow(
timestamp1,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.of(visitor_id, "0", client_type, "android")
)
);
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE) .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(QueryRunnerTestHelper.ALL_GRAN) .granularity(QueryRunnerTestHelper.ALL_GRAN)
.intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
.aggregators( .aggregators(
Collections.singletonList( ImmutableList.of(
new StringFirstAggregatorFactory( new StringFirstAggregatorFactory("nonfolding", CLIENT_TYPE, 1024),
"last_client_type", client_type, 1024 new StringFirstAggregatorFactory("folding", FIRST_CLIENT_TYPE, 1024),
) new StringFirstAggregatorFactory("nonexistent", "nonexistent", 1024),
new StringFirstAggregatorFactory("numeric", "cnt", 1024)
) )
) )
.build(); .build();
final Iterable<Result<TimeseriesResultValue>> results =
engine.process(query, new IncrementalIndexStorageAdapter(index)).toList();
List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList( List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList(
new Result<>( new Result<>(
time, TIME1,
new TimeseriesResultValue( new TimeseriesResultValue(
ImmutableMap.of("last_client_type", new SerializablePairLongString(timestamp, "iphone")) ImmutableMap.<String, Object>builder()
.put("nonfolding", new SerializablePairLongString(TIME1.getMillis(), "iphone"))
.put("folding", new SerializablePairLongString(TIME1.getMillis(), "iphone"))
.put("nonexistent", new SerializablePairLongString(DateTimes.MAX.getMillis(), null))
.put("numeric", new SerializablePairLongString(DateTimes.MAX.getMillis(), null))
.build()
) )
) )
); );
TestHelper.assertExpectedResults(expectedResults, results);
final Iterable<Result<TimeseriesResultValue>> iiResults =
engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex)).toList();
final Iterable<Result<TimeseriesResultValue>> qiResults =
engine.process(query, new QueryableIndexStorageAdapter(queryableIndex)).toList();
TestHelper.assertExpectedResults(expectedResults, iiResults, "incremental index");
TestHelper.assertExpectedResults(expectedResults, qiResults, "queryable index");
} }
} }

View File

@ -27,7 +27,6 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class StringLastBufferAggregatorTest public class StringLastBufferAggregatorTest
{ {
@ -65,13 +64,7 @@ public class StringLastBufferAggregatorTest
maxStringBytes maxStringBytes
); );
String testString = "ZZZZ";
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
buf.putLong(1526728500L);
buf.putInt(testString.length());
buf.put(testString.getBytes(StandardCharsets.UTF_8));
int position = 0; int position = 0;
agg.init(buf, position); agg.init(buf, position);
@ -109,13 +102,7 @@ public class StringLastBufferAggregatorTest
maxStringBytes maxStringBytes
); );
String testString = "ZZZZ";
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
buf.putLong(1526728500L);
buf.putInt(testString.length());
buf.put(testString.getBytes(StandardCharsets.UTF_8));
int position = 0; int position = 0;
agg.init(buf, position); agg.init(buf, position);
@ -132,8 +119,8 @@ public class StringLastBufferAggregatorTest
} }
@Test(expected = IllegalStateException.class) @Test
public void testNoStringValue() public void testNonStringValue()
{ {
final long[] timestamps = {1526724000L, 1526724600L}; final long[] timestamps = {1526724000L, 1526724600L};
@ -153,13 +140,7 @@ public class StringLastBufferAggregatorTest
maxStringBytes maxStringBytes
); );
String testString = "ZZZZ";
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
buf.putLong(1526728500L);
buf.putInt(testString.length());
buf.put(testString.getBytes(StandardCharsets.UTF_8));
int position = 0; int position = 0;
agg.init(buf, position); agg.init(buf, position);
@ -167,5 +148,10 @@ public class StringLastBufferAggregatorTest
for (int i = 0; i < timestamps.length; i++) { for (int i = 0; i < timestamps.length; i++) {
aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
} }
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
Assert.assertEquals(1526724600L, (long) sp.lhs);
Assert.assertEquals("2.0", sp.rhs);
} }
} }

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.aggregation.last; package org.apache.druid.query.aggregation.last;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.MapBasedInputRow;
@ -29,14 +30,21 @@ import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result; import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Collections; import java.util.Collections;
@ -44,83 +52,99 @@ import java.util.List;
public class StringLastTimeseriesQueryTest public class StringLastTimeseriesQueryTest
{ {
private static final String VISITOR_ID = "visitor_id";
private static final String CLIENT_TYPE = "client_type";
private static final String LAST_CLIENT_TYPE = "last_client_type";
@Test private static final DateTime TIME1 = DateTimes.of("2016-03-04T00:00:00.000Z");
public void testTopNWithDistinctCountAgg() throws Exception private static final DateTime TIME2 = DateTimes.of("2016-03-04T01:00:00.000Z");
private IncrementalIndex incrementalIndex;
private QueryableIndex queryableIndex;
@Before
public void setUp() throws IndexSizeExceededException
{ {
TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde();
ComplexMetrics.registerSerde(serde.getTypeName(), serde);
String visitor_id = "visitor_id"; incrementalIndex = new IncrementalIndex.Builder()
String client_type = "client_type";
IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema( .setIndexSchema(
new IncrementalIndexSchema.Builder() new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.SECOND) .withQueryGranularity(Granularities.SECOND)
.withMetrics(new CountAggregatorFactory("cnt")) .withMetrics(new CountAggregatorFactory("cnt"))
.withMetrics(new StringLastAggregatorFactory( .withMetrics(new StringLastAggregatorFactory(LAST_CLIENT_TYPE, CLIENT_TYPE, 1024))
"last_client_type", "client_type", 1024)
)
.build() .build()
) )
.setMaxRowCount(1000) .setMaxRowCount(1000)
.buildOnheap(); .buildOnheap();
incrementalIndex.add(
new MapBasedInputRow(
TIME1,
Lists.newArrayList(VISITOR_ID, CLIENT_TYPE),
ImmutableMap.of(VISITOR_ID, "0", CLIENT_TYPE, "iphone")
)
);
incrementalIndex.add(
new MapBasedInputRow(
TIME1,
Lists.newArrayList(VISITOR_ID, CLIENT_TYPE),
ImmutableMap.of(VISITOR_ID, "1", CLIENT_TYPE, "iphone")
)
);
incrementalIndex.add(
new MapBasedInputRow(
TIME2,
Lists.newArrayList(VISITOR_ID, CLIENT_TYPE),
ImmutableMap.of(VISITOR_ID, "0", CLIENT_TYPE, "android")
)
);
DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z"); queryableIndex = TestIndex.persistRealtimeAndLoadMMapped(incrementalIndex);
long timestamp = time.getMillis(); }
@Test
public void testTimeseriesQuery()
{
TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z");
long timestamp1 = time1.getMillis();
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.of(visitor_id, "0", client_type, "iphone")
)
);
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.of(visitor_id, "1", client_type, "iphone")
)
);
index.add(
new MapBasedInputRow(
timestamp1,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.of(visitor_id, "0", client_type, "android")
)
);
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE) .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(QueryRunnerTestHelper.ALL_GRAN) .granularity(QueryRunnerTestHelper.ALL_GRAN)
.intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
.aggregators( .aggregators(
Collections.singletonList( ImmutableList.of(
new StringLastAggregatorFactory( new StringLastAggregatorFactory("nonfolding", CLIENT_TYPE, 1024),
"last_client_type", client_type, 1024 new StringLastAggregatorFactory("folding", LAST_CLIENT_TYPE, 1024),
) new StringLastAggregatorFactory("nonexistent", "nonexistent", 1024),
new StringLastAggregatorFactory("numeric", "cnt", 1024)
) )
) )
.build(); .build();
final Iterable<Result<TimeseriesResultValue>> results =
engine.process(query, new IncrementalIndexStorageAdapter(index)).toList();
List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList( List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList(
new Result<>( new Result<>(
time, TIME1,
new TimeseriesResultValue( new TimeseriesResultValue(
ImmutableMap.of( ImmutableMap.<String, Object>builder()
"last_client_type", .put("nonfolding", new SerializablePairLongString(TIME2.getMillis(), "android"))
new SerializablePairLongString(timestamp1, "android") .put("folding", new SerializablePairLongString(TIME2.getMillis(), "android"))
) .put("nonexistent", new SerializablePairLongString(DateTimes.MIN.getMillis(), null))
.put("numeric", new SerializablePairLongString(DateTimes.MIN.getMillis(), null))
.build()
) )
) )
); );
TestHelper.assertExpectedResults(expectedResults, results);
final Iterable<Result<TimeseriesResultValue>> iiResults =
engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex)).toList();
final Iterable<Result<TimeseriesResultValue>> qiResults =
engine.process(query, new QueryableIndexStorageAdapter(queryableIndex)).toList();
TestHelper.assertExpectedResults(expectedResults, iiResults, "incremental index");
TestHelper.assertExpectedResults(expectedResults, qiResults, "queryable index");
} }
} }

View File

@ -540,7 +540,13 @@ public class GroupByQueryQueryToolChestTest
); );
// test timestamps that result in integer size millis // test timestamps that result in integer size millis
final ResultRow result1 = ResultRow.of(123L, "val1", "fooval1", 1, getIntermediateComplexValue(ValueType.STRING, "val1")); final ResultRow result1 = ResultRow.of(
123L,
"val1",
"fooval1",
1,
getIntermediateComplexValue(ValueType.STRING, "val1")
);
Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result1); Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result1);

View File

@ -150,8 +150,10 @@ public class TopNQueryQueryToolChestTest
).getCacheStrategy(query2); ).getCacheStrategy(query2);
Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1), Assert.assertFalse(Arrays.equals(
strategy2.computeResultLevelCacheKey(query2))); strategy1.computeResultLevelCacheKey(query1),
strategy2.computeResultLevelCacheKey(query2)
));
} }
@Test @Test
@ -234,8 +236,10 @@ public class TopNQueryQueryToolChestTest
//segment level cache key excludes postaggregates in topn //segment level cache key excludes postaggregates in topn
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy1.computeResultLevelCacheKey(query1))); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy1.computeResultLevelCacheKey(query1)));
Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1), Assert.assertFalse(Arrays.equals(
strategy2.computeResultLevelCacheKey(query2))); strategy1.computeResultLevelCacheKey(query1),
strategy2.computeResultLevelCacheKey(query2)
));
} }
@Test @Test
@ -323,7 +327,8 @@ public class TopNQueryQueryToolChestTest
collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong((Long) dimValue).asBytes()); collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong((Long) dimValue).asBytes());
break; break;
case DOUBLE: case DOUBLE:
collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits((Double) dimValue)).asBytes()); collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits((Double) dimValue))
.asBytes());
break; break;
case FLOAT: case FLOAT:
collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits((Float) dimValue)).asBytes()); collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits((Float) dimValue)).asBytes());

View File

@ -188,7 +188,10 @@ public class ExpressionFilterTest extends BaseFilterTest
public void testConstantExpression() public void testConstantExpression()
{ {
assertFilterMatchesSkipVectorize(edf("1 + 1"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")); assertFilterMatchesSkipVectorize(edf("1 + 1"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"));
assertFilterMatchesSkipVectorize(edf("'true'"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"));
assertFilterMatchesSkipVectorize(edf("0 + 0"), ImmutableList.of()); assertFilterMatchesSkipVectorize(edf("0 + 0"), ImmutableList.of());
assertFilterMatchesSkipVectorize(edf("'false'"), ImmutableList.of());
} }
@Test @Test