diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java index b4520070fe9..4880c0f6cee 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -26,7 +26,10 @@ import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CodingErrorAction; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Base64; @@ -107,6 +110,44 @@ public class StringUtils } } + /** + * Encodes "string" into the buffer "byteBuffer", using no more than the number of bytes remaining in the buffer. + * Will only encode whole characters. The byteBuffer's position and limit be changed during operation, but will + * be reset before this method call ends. + * + * @return the number of bytes written, which may be shorter than the full encoded string length if there + * is not enough room in the output buffer. + */ + public static int toUtf8WithLimit(final String string, final ByteBuffer byteBuffer) + { + final CharsetEncoder encoder = StandardCharsets.UTF_8 + .newEncoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); + + final int originalPosition = byteBuffer.position(); + final int originalLimit = byteBuffer.limit(); + final int maxBytes = byteBuffer.remaining(); + + try { + final char[] chars = string.toCharArray(); + final CharBuffer charBuffer = CharBuffer.wrap(chars); + + // No reason to look at the CoderResult from the "encode" call; we can tell the number of transferred characters + // by looking at the output buffer's position. + encoder.encode(charBuffer, byteBuffer, true); + + final int bytesWritten = byteBuffer.position() - originalPosition; + + assert bytesWritten <= maxBytes; + return bytesWritten; + } + finally { + byteBuffer.position(originalPosition); + byteBuffer.limit(originalLimit); + } + } + @Nullable public static byte[] toUtf8Nullable(@Nullable final String string) { @@ -163,6 +204,7 @@ public class StringUtils * application/x-www-form-urlencoded encodes spaces as "+", but we use this to encode non-form data as well. * * @param s String to be encoded + * * @return application/x-www-form-urlencoded format encoded String, but with "+" replaced with "%20". */ @Nullable @@ -311,6 +353,7 @@ public class StringUtils * Convert an input to base 64 and return the utf8 string of that byte array * * @param input The string to convert to base64 + * * @return the base64 of the input in string form */ public static String utf8Base64(String input) @@ -322,6 +365,7 @@ public class StringUtils * Convert an input byte array into a newly-allocated byte array using the {@link Base64} encoding scheme * * @param input The byte array to convert to base64 + * * @return the base64 of the input in byte array form */ public static byte[] encodeBase64(byte[] input) @@ -333,6 +377,7 @@ public class StringUtils * Convert an input byte array into a string using the {@link Base64} encoding scheme * * @param input The byte array to convert to base64 + * * @return the base64 of the input in string form */ public static String encodeBase64String(byte[] input) @@ -344,6 +389,7 @@ public class StringUtils * Decode an input byte array using the {@link Base64} encoding scheme and return a newly-allocated byte array * * @param input The byte array to decode from base64 + * * @return a newly-allocated byte array */ public static byte[] decodeBase64(byte[] input) @@ -355,6 +401,7 @@ public class StringUtils * Decode an input string using the {@link Base64} encoding scheme and return a newly-allocated byte array * * @param input The string to decode from base64 + * * @return a newly-allocated byte array */ public static byte[] decodeBase64String(String input) @@ -411,7 +458,7 @@ public class StringUtils System.arraycopy(multiple, 0, multiple, copied, limit - copied); return new String(multiple, StandardCharsets.UTF_8); } - + /** * Returns the string left-padded with the string pad to a length of len characters. * If str is longer than len, the return value is shortened to len characters. @@ -419,8 +466,9 @@ public class StringUtils * https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala * * @param base The base string to be padded - * @param len The length of padded string - * @param pad The pad string + * @param len The length of padded string + * @param pad The pad string + * * @return the string left-padded with pad to a length of len */ public static String lpad(String base, Integer len, String pad) @@ -453,11 +501,12 @@ public class StringUtils /** * Returns the string right-padded with the string pad to a length of len characters. - * If str is longer than len, the return value is shortened to len characters. + * If str is longer than len, the return value is shortened to len characters. * * @param base The base string to be padded - * @param len The length of padded string - * @param pad The pad string + * @param len The length of padded string + * @param pad The pad string + * * @return the string right-padded with pad to a length of len */ public static String rpad(String base, Integer len, String pad) @@ -473,12 +522,12 @@ public class StringUtils int pos = 0; // Copy the base - for ( ; pos < base.length() && pos < len; pos++) { + for (; pos < base.length() && pos < len; pos++) { data[pos] = base.charAt(pos); } // Copy the padding - for ( ; pos < len; pos += pad.length()) { + for (; pos < len; pos += pad.length()) { for (int i = 0; i < pad.length() && i < len - pos; i++) { data[pos + i] = pad.charAt(i); } diff --git a/core/src/main/java/org/apache/druid/math/expr/Evals.java b/core/src/main/java/org/apache/druid/math/expr/Evals.java index 88cc56a3dc8..f6e3e4f74c8 100644 --- a/core/src/main/java/org/apache/druid/math/expr/Evals.java +++ b/core/src/main/java/org/apache/druid/math/expr/Evals.java @@ -22,6 +22,7 @@ package org.apache.druid.math.expr; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.logger.Logger; +import javax.annotation.Nullable; import java.util.Arrays; import java.util.List; @@ -46,21 +47,6 @@ public class Evals return true; } - // for binary operator not providing constructor of form (String, Expr, Expr), - // you should create it explicitly in here - public static Expr binaryOp(BinaryOpExprBase binary, Expr left, Expr right) - { - try { - return binary.getClass() - .getDeclaredConstructor(String.class, Expr.class, Expr.class) - .newInstance(binary.op, left, right); - } - catch (Exception e) { - log.warn(e, "failed to rewrite expression " + binary); - return binary; // best effort.. keep it working - } - } - public static long asLong(boolean x) { return x ? 1L : 0L; @@ -81,8 +67,8 @@ public class Evals return x > 0; } - public static boolean asBoolean(String x) + public static boolean asBoolean(@Nullable String x) { - return !NullHandling.isNullOrEquivalent(x) && Boolean.valueOf(x); + return !NullHandling.isNullOrEquivalent(x) && Boolean.parseBoolean(x); } } diff --git a/core/src/main/java/org/apache/druid/math/expr/ExprEval.java b/core/src/main/java/org/apache/druid/math/expr/ExprEval.java index de77eb1d99e..b6f1f58736a 100644 --- a/core/src/main/java/org/apache/druid/math/expr/ExprEval.java +++ b/core/src/main/java/org/apache/druid/math/expr/ExprEval.java @@ -381,6 +381,7 @@ public abstract class ExprEval private static final StringExprEval OF_NULL = new StringExprEval(null); + @Nullable private Number numericVal; private StringExprEval(@Nullable String value) diff --git a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java index 8a1748cc249..e9f5f214b08 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java @@ -56,6 +56,32 @@ public class StringUtilsTest } } + @Test + public void toUtf8WithLimitTest() + { + final ByteBuffer smallBuffer = ByteBuffer.allocate(4); + final ByteBuffer mediumBuffer = ByteBuffer.allocate(6); + final ByteBuffer bigBuffer = ByteBuffer.allocate(8); + + final int smallBufferResult = StringUtils.toUtf8WithLimit("🚀🌔", smallBuffer); + Assert.assertEquals(4, smallBufferResult); + final byte[] smallBufferByteArray = new byte[smallBufferResult]; + smallBuffer.get(smallBufferByteArray); + Assert.assertEquals("🚀", StringUtils.fromUtf8(smallBufferByteArray)); + + final int mediumBufferResult = StringUtils.toUtf8WithLimit("🚀🌔", mediumBuffer); + Assert.assertEquals(4, mediumBufferResult); + final byte[] mediumBufferByteArray = new byte[mediumBufferResult]; + mediumBuffer.get(mediumBufferByteArray); + Assert.assertEquals("🚀", StringUtils.fromUtf8(mediumBufferByteArray)); + + final int bigBufferResult = StringUtils.toUtf8WithLimit("🚀🌔", bigBuffer); + Assert.assertEquals(8, bigBufferResult); + final byte[] bigBufferByteArray = new byte[bigBufferResult]; + bigBuffer.get(bigBufferByteArray); + Assert.assertEquals("🚀🌔", StringUtils.fromUtf8(bigBufferByteArray)); + } + @Test public void fromUtf8ByteBufferHeap() { @@ -181,7 +207,7 @@ public class StringUtilsTest expectedException.expectMessage("count is negative, -1"); Assert.assertEquals("", StringUtils.repeat("foo", -1)); } - + @Test public void testLpad() { diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java index 48ba0832753..8bc4bd323ad 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -23,7 +23,6 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseNullableColumnValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -44,10 +43,10 @@ import java.nio.ByteBuffer; * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize} and * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered} * - * @param type of {@link BaseNullableColumnValueSelector} that feeds this aggregator, likely either values - * to add to a bloom filter, or other bloom filters to merge into this bloom filter. + * @param type of selector that feeds this aggregator, likely either values to add to a bloom filter, + * or other bloom filters to merge into this bloom filter. */ -public abstract class BaseBloomFilterAggregator +public abstract class BaseBloomFilterAggregator implements BufferAggregator, Aggregator { @Nullable diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java index d158be45a20..28fcaffbbf5 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java @@ -35,6 +35,7 @@ import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseNullableColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.NilColumnValueSelector; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; @@ -279,16 +280,21 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory ); } } else { + // No column capabilities, try to guess based on selector type. BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); + if (selector instanceof NilColumnValueSelector) { return new NoopBloomFilterAggregator(maxNumEntries, onHeap); + } else if (selector instanceof DimensionSelector) { + return new StringBloomFilterAggregator((DimensionSelector) selector, maxNumEntries, onHeap); + } else { + // Use fallback 'object' aggregator. + return new ObjectBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + onHeap + ); } - // no column capabilities, use fallback 'object' aggregator - return new ObjectBloomFilterAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), - maxNumEntries, - onHeap - ); } } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java index 6855d83a35e..7dd8abaea0b 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java @@ -21,13 +21,14 @@ package org.apache.druid.query.aggregation.bloom; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import java.nio.ByteBuffer; -public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator> +public final class BloomFilterMergeAggregator + extends BaseBloomFilterAggregator> { - BloomFilterMergeAggregator(ColumnValueSelector selector, int maxNumEntries, boolean onHeap) + BloomFilterMergeAggregator(BaseObjectColumnValueSelector selector, int maxNumEntries, boolean onHeap) { super(selector, maxNumEntries, onHeap); } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java index bc97fab800a..0ad7a179fdb 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java @@ -19,20 +19,18 @@ package org.apache.druid.query.aggregation.bloom; -import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import java.nio.ByteBuffer; /** * Handles "unknown" columns by examining what comes out of the selector */ -class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator +class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator> { ObjectBloomFilterAggregator( - ColumnValueSelector selector, + BaseObjectColumnValueSelector selector, int maxNumEntries, boolean onHeap ) @@ -48,16 +46,14 @@ class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator implements AggregateCombiner +public final class NullableNumericAggregateCombiner implements AggregateCombiner { private boolean isNullResult = true; private final AggregateCombiner delegate; - public NullableAggregateCombiner(AggregateCombiner delegate) + public NullableNumericAggregateCombiner(AggregateCombiner delegate) { this.delegate = delegate; } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregator.java similarity index 67% rename from processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregator.java rename to processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregator.java index 64658b6ca35..33c0c2438fd 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregator.java @@ -21,24 +21,34 @@ package org.apache.druid.query.aggregation; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; import javax.annotation.Nullable; /** - * The result of a NullableAggregator will be null if all the values to be aggregated are null values - * or no values are aggregated at all. If any of the value is non-null, the result would be the aggregated - * value of the delegate aggregator. Note that the delegate aggregator is not required to perform check for - * {@link BaseNullableColumnValueSelector#isNull()} on the selector as only non-null values will be passed - * to the delegate aggregator. This class is only used when SQL compatible null handling is enabled. + * Null-aware numeric {@link Aggregator}. + * + * The result of this aggregator will be null if all the values to be aggregated are null values or no values are + * aggregated at all. If any of the values are non-null, the result will be the aggregated value of the delegate + * aggregator. + * + * When wrapped by this class, the underlying aggregator's required storage space is increased by one byte. The extra + * byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before + * the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes]) + * + * Used by {@link NullableNumericAggregatorFactory#factorize(ColumnSelectorFactory)} to wrap non-null aware + * aggregators. This class is only used when SQL compatible null handling is enabled. + * + * @see NullableNumericAggregatorFactory#factorize(ColumnSelectorFactory) */ @PublicApi -public final class NullableAggregator implements Aggregator +public final class NullableNumericAggregator implements Aggregator { private final Aggregator delegate; private final BaseNullableColumnValueSelector selector; private boolean isNullResult = true; - public NullableAggregator(Aggregator delegate, BaseNullableColumnValueSelector selector) + public NullableNumericAggregator(Aggregator delegate, BaseNullableColumnValueSelector selector) { this.delegate = delegate; this.selector = selector; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java similarity index 81% rename from processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregatorFactory.java rename to processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java index e88eac3428e..d9d66e37599 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java @@ -30,19 +30,28 @@ import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; /** - * Abstract class with functionality to wrap {@link Aggregator}, {@link BufferAggregator} and {@link AggregateCombiner} - * to support nullable aggregations for SQL compatibility. Implementations of {@link AggregatorFactory} which need to - * Support Nullable Aggregations are encouraged to extend this class. + * Abstract superclass for null-aware numeric aggregators. + * + * Includes functionality to wrap {@link Aggregator}, {@link BufferAggregator}, {@link VectorAggregator}, and + * {@link AggregateCombiner} to support nullable aggregations. The result of this aggregator will be null if all the + * values to be aggregated are null values, or if no values are aggregated at all. If any of the values are non-null, + * the result will be the aggregated value of the non-null values. + * + * This superclass should only be extended by aggregators that read primitive numbers. It implements logic that is + * not valid for non-numeric selector methods such as {@link ColumnValueSelector#getObject()}. + * + * @see BaseNullableColumnValueSelector#isNull() for why this only works in the numeric case */ @ExtensionPoint -public abstract class NullableAggregatorFactory extends AggregatorFactory +public abstract class NullableNumericAggregatorFactory + extends AggregatorFactory { @Override public final Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) { T selector = selector(columnSelectorFactory); Aggregator aggregator = factorize(columnSelectorFactory, selector); - return NullHandling.replaceWithDefault() ? aggregator : new NullableAggregator(aggregator, selector); + return NullHandling.replaceWithDefault() ? aggregator : new NullableNumericAggregator(aggregator, selector); } @Override @@ -50,7 +59,7 @@ public abstract class NullableAggregatorFactory" instead of "NullableAggregatorFactory" * to additionally support aggregation on single/multi value string column types. */ -public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFactory +public abstract class SimpleDoubleAggregatorFactory extends NullableNumericAggregatorFactory { protected final String name; @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java index 6c4badf1842..f82f5d3ef36 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java @@ -39,7 +39,7 @@ import java.util.Comparator; import java.util.List; import java.util.Objects; -public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFactory +public abstract class SimpleFloatAggregatorFactory extends NullableNumericAggregatorFactory { protected final String name; @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java index f0dc0476536..bbc7b43cd57 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java @@ -45,7 +45,7 @@ import java.util.Objects; * It extends "NullableAggregatorFactory" instead of "NullableAggregatorFactory" * to additionally support aggregation on single/multi value string column types. */ -public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFactory +public abstract class SimpleLongAggregatorFactory extends NullableNumericAggregatorFactory { protected final String name; @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java index 8782e1235cd..eb39e35e4ef 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java @@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; +import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -45,7 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -public class DoubleFirstAggregatorFactory extends NullableAggregatorFactory +public class DoubleFirstAggregatorFactory extends NullableNumericAggregatorFactory { public static final Comparator> VALUE_COMPARATOR = Comparator.comparingDouble(o -> o.rhs); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java index d7ddb4b3bf5..4a2d1c12abd 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java @@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; +import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -45,7 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -public class FloatFirstAggregatorFactory extends NullableAggregatorFactory +public class FloatFirstAggregatorFactory extends NullableNumericAggregatorFactory { public static final Comparator> VALUE_COMPARATOR = Comparator.comparingDouble(o -> o.rhs); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java index 6289b8c8fbc..399f1fd8161 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java @@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; +import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -44,7 +44,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -public class LongFirstAggregatorFactory extends NullableAggregatorFactory +public class LongFirstAggregatorFactory extends NullableNumericAggregatorFactory { public static final Comparator> VALUE_COMPARATOR = Comparator.comparingLong(o -> o.rhs); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java index 84991e9645c..02600444f67 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java @@ -19,24 +19,26 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import javax.annotation.Nullable; + public class StringFirstAggregator implements Aggregator { - - private final BaseObjectColumnValueSelector valueSelector; + @Nullable private final BaseLongColumnValueSelector timeSelector; + private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; protected long firstTime; protected String firstValue; public StringFirstAggregator( - BaseLongColumnValueSelector timeSelector, + @Nullable BaseLongColumnValueSelector timeSelector, BaseObjectColumnValueSelector valueSelector, int maxStringBytes ) @@ -45,35 +47,24 @@ public class StringFirstAggregator implements Aggregator this.timeSelector = timeSelector; this.maxStringBytes = maxStringBytes; - firstTime = Long.MAX_VALUE; + firstTime = DateTimes.MAX.getMillis(); firstValue = null; } @Override public void aggregate() { - long time = timeSelector.getLong(); - if (time < firstTime) { - firstTime = time; - Object value = valueSelector.getObject(); + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); - if (value != null) { - if (value instanceof String) { - firstValue = (String) value; - } else if (value instanceof SerializablePairLongString) { - firstValue = ((SerializablePairLongString) value).rhs; - } else { - throw new ISE( - "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString", - value.getClass().getName() - ); - } + if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) { + firstTime = inPair.lhs; + firstValue = inPair.rhs; - if (firstValue != null && firstValue.length() > maxStringBytes) { - firstValue = firstValue.substring(0, maxStringBytes); - } - } else { - firstValue = null; + if (firstValue.length() > maxStringBytes) { + firstValue = firstValue.substring(0, maxStringBytes); } } } @@ -81,7 +72,7 @@ public class StringFirstAggregator implements Aggregator @Override public Object get() { - return new SerializablePairLongString(firstTime, firstValue); + return new SerializablePairLongString(firstTime, StringFirstLastUtils.chop(firstValue, maxStringBytes)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java index cceebc0cd21..983ed9b2202 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java @@ -29,10 +29,8 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.cache.CacheKeyBuilder; -import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnHolder; @@ -45,7 +43,7 @@ import java.util.Map; import java.util.Objects; @JsonTypeName("stringFirst") -public class StringFirstAggregatorFactory extends NullableAggregatorFactory +public class StringFirstAggregatorFactory extends AggregatorFactory { public static final int DEFAULT_MAX_STRING_SIZE = 1024; @@ -106,31 +104,27 @@ public class StringFirstAggregatorFactory extends NullableAggregatorFactory maxStringBytes) { - firstString = firstString.substring(0, maxStringBytes); - } - - byte[] valueBytes = StringUtils.toUtf8(firstString); - - mutationBuffer.putLong(position, time); - mutationBuffer.putInt(position + Long.BYTES, valueBytes.length); - - mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.put(valueBytes); - } else { - mutationBuffer.putLong(position, time); - mutationBuffer.putInt(position + Long.BYTES, 0); - } - } } @Override public Object get(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - Long timeValue = mutationBuffer.getLong(position); - int stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES); - - SerializablePairLongString serializablePair; - - if (stringSizeBytes > 0) { - byte[] valueBytes = new byte[stringSizeBytes]; - mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.get(valueBytes, 0, stringSizeBytes); - serializablePair = new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes)); - } else { - serializablePair = new SerializablePairLongString(timeValue, null); - } - - return serializablePair; + return StringFirstLastUtils.readPair(buf, position); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java index 1c49a54f984..6bade403945 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java @@ -19,85 +19,16 @@ package org.apache.druid.query.aggregation.first; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.SerializablePairLongString; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseObjectColumnValueSelector; -import org.apache.druid.segment.ColumnSelectorFactory; +import com.fasterxml.jackson.annotation.JsonCreator; -import java.nio.ByteBuffer; - -@JsonTypeName("stringFirstFold") +/** + * For backwards compatibility; equivalent to a regular StringFirstAggregatorFactory. + */ public class StringFirstFoldingAggregatorFactory extends StringFirstAggregatorFactory { - public StringFirstFoldingAggregatorFactory( - @JsonProperty("name") String name, - @JsonProperty("fieldName") final String fieldName, - @JsonProperty("maxStringBytes") Integer maxStringBytes - ) + @JsonCreator + public StringFirstFoldingAggregatorFactory(String name, String fieldName, Integer maxStringBytes) { super(name, fieldName, maxStringBytes); } - - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) - { - return new StringFirstAggregator(null, null, maxStringBytes) - { - @Override - public void aggregate() - { - SerializablePairLongString pair = (SerializablePairLongString) selector.getObject(); - if (pair != null && pair.lhs < firstTime) { - firstTime = pair.lhs; - firstValue = pair.rhs; - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) - { - return new StringFirstBufferAggregator(null, null, maxStringBytes) - { - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePairLongString pair = (SerializablePairLongString) selector.getObject(); - - if (pair != null && pair.lhs != null) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - long lastTime = mutationBuffer.getLong(position); - - if (pair.lhs < lastTime) { - mutationBuffer.putLong(position, pair.lhs); - - if (pair.rhs != null) { - byte[] valueBytes = StringUtils.toUtf8(pair.rhs); - - mutationBuffer.putInt(position + Long.BYTES, valueBytes.length); - mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.put(valueBytes); - } else { - mutationBuffer.putInt(position + Long.BYTES, 0); - } - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java new file mode 100644 index 00000000000..133c4ba1ffc --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.DimensionHandlerUtils; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +public class StringFirstLastUtils +{ + private static final int NULL_VALUE = -1; + + @Nullable + public static String chop(@Nullable final String s, final int maxBytes) + { + if (s == null) { + return null; + } else { + // Shorten firstValue to what could fit in maxBytes as UTF-8. + final byte[] bytes = new byte[maxBytes]; + final int len = StringUtils.toUtf8WithLimit(s, ByteBuffer.wrap(bytes)); + return new String(bytes, 0, len, StandardCharsets.UTF_8); + } + } + + @Nullable + public static SerializablePairLongString readPairFromSelectors( + final BaseLongColumnValueSelector timeSelector, + final BaseObjectColumnValueSelector valueSelector + ) + { + final long time; + final String string; + + // Need to read this first (before time), just in case it's a SerializablePairLongString (we don't know; it's + // detected at query time). + final Object object = valueSelector.getObject(); + + if (object instanceof SerializablePairLongString) { + final SerializablePairLongString pair = (SerializablePairLongString) object; + time = pair.lhs; + string = pair.rhs; + } else if (object != null) { + time = timeSelector.getLong(); + string = DimensionHandlerUtils.convertObjectToString(object); + } else { + // Don't aggregate nulls. + return null; + } + + return new SerializablePairLongString(time, string); + } + + public static void writePair( + final ByteBuffer buf, + final int position, + final SerializablePairLongString pair, + final int maxStringBytes + ) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + mutationBuffer.putLong(pair.lhs); + + if (pair.rhs != null) { + mutationBuffer.position(position + Long.BYTES + Integer.BYTES); + mutationBuffer.limit(maxStringBytes); + final int len = StringUtils.toUtf8WithLimit(pair.rhs, mutationBuffer); + mutationBuffer.putInt(position + Long.BYTES, len); + } else { + mutationBuffer.putInt(NULL_VALUE); + } + } + + public static SerializablePairLongString readPair(final ByteBuffer buf, final int position) + { + ByteBuffer copyBuffer = buf.duplicate(); + copyBuffer.position(position); + + Long timeValue = copyBuffer.getLong(); + int stringSizeBytes = copyBuffer.getInt(); + + if (stringSizeBytes >= 0) { + byte[] valueBytes = new byte[stringSizeBytes]; + copyBuffer.get(valueBytes, 0, stringSizeBytes); + return new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes)); + } else { + return new SerializablePairLongString(timeValue, null); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java index 2b343beeb39..ad90857cc82 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java @@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; +import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory; import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -47,7 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -public class DoubleLastAggregatorFactory extends NullableAggregatorFactory +public class DoubleLastAggregatorFactory extends NullableNumericAggregatorFactory { private final String fieldName; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java index 16e521629e7..0e42f9fded7 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java @@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; +import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory; import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -47,7 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -public class FloatLastAggregatorFactory extends NullableAggregatorFactory +public class FloatLastAggregatorFactory extends NullableNumericAggregatorFactory { private final String fieldName; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java index 1b42fe1aaf4..4a00a63dce7 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java @@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; +import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnSelectorFactory; @@ -46,7 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -public class LongLastAggregatorFactory extends NullableAggregatorFactory +public class LongLastAggregatorFactory extends NullableNumericAggregatorFactory { private final String fieldName; private final String name; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java index 9d93c9c6c1d..01be5db4992 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java @@ -19,17 +19,17 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; public class StringLastAggregator implements Aggregator { - - private final BaseObjectColumnValueSelector valueSelector; private final BaseLongColumnValueSelector timeSelector; + private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; protected long lastTime; @@ -45,35 +45,29 @@ public class StringLastAggregator implements Aggregator this.timeSelector = timeSelector; this.maxStringBytes = maxStringBytes; - lastTime = Long.MIN_VALUE; + lastTime = DateTimes.MIN.getMillis(); lastValue = null; } @Override public void aggregate() { - long time = timeSelector.getLong(); - if (time >= lastTime) { - lastTime = time; - Object value = valueSelector.getObject(); + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); - if (value != null) { - if (value instanceof String) { - lastValue = (String) value; - } else if (value instanceof SerializablePairLongString) { - lastValue = ((SerializablePairLongString) value).rhs; - } else { - throw new ISE( - "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString", - value.getClass().getName() - ); - } + if (inPair == null) { + // Don't aggregate nulls. + return; + } - if (lastValue != null && lastValue.length() > maxStringBytes) { - lastValue = lastValue.substring(0, maxStringBytes); - } - } else { - lastValue = null; + if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) { + lastTime = inPair.lhs; + lastValue = inPair.rhs; + + if (lastValue.length() > maxStringBytes) { + lastValue = lastValue.substring(0, maxStringBytes); } } } @@ -81,25 +75,25 @@ public class StringLastAggregator implements Aggregator @Override public Object get() { - return new SerializablePairLongString(lastTime, lastValue); + return new SerializablePairLongString(lastTime, StringFirstLastUtils.chop(lastValue, maxStringBytes)); } @Override public float getFloat() { - throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()"); + throw new UnsupportedOperationException("StringLastAggregator does not support getFloat()"); } @Override public long getLong() { - throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()"); + throw new UnsupportedOperationException("StringLastAggregator does not support getLong()"); } @Override public double getDouble() { - throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()"); + throw new UnsupportedOperationException("StringLastAggregator does not support getDouble()"); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java index 113a1747702..b024af022db 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java @@ -23,21 +23,19 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; import org.apache.druid.query.cache.CacheKeyBuilder; -import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnHolder; import javax.annotation.Nullable; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -45,7 +43,7 @@ import java.util.Map; import java.util.Objects; @JsonTypeName("stringLast") -public class StringLastAggregatorFactory extends NullableAggregatorFactory +public class StringLastAggregatorFactory extends AggregatorFactory { private final String fieldName; private final String name; @@ -68,27 +66,21 @@ public class StringLastAggregatorFactory extends NullableAggregatorFactory requiredFields() { - return Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, fieldName); + return ImmutableList.of(ColumnHolder.TIME_COLUMN_NAME, fieldName); } @Override @@ -192,25 +184,25 @@ public class StringLastAggregatorFactory extends NullableAggregatorFactory= lastTime) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(inPair.lhs, inPair.rhs), + maxStringBytes ); } } - - long lastTime = mutationBuffer.getLong(position); - - if (time >= lastTime) { - if (lastString != null) { - if (lastString.length() > maxStringBytes) { - lastString = lastString.substring(0, maxStringBytes); - } - - byte[] valueBytes = StringUtils.toUtf8(lastString); - - mutationBuffer.putLong(position, time); - mutationBuffer.putInt(position + Long.BYTES, valueBytes.length); - - mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.put(valueBytes); - } else { - mutationBuffer.putLong(position, time); - mutationBuffer.putInt(position + Long.BYTES, 0); - } - } } @Override public Object get(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - Long timeValue = mutationBuffer.getLong(position); - int stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES); - - SerializablePairLongString serializablePair; - - if (stringSizeBytes > 0) { - byte[] valueBytes = new byte[stringSizeBytes]; - mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.get(valueBytes, 0, stringSizeBytes); - serializablePair = new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes)); - } else { - serializablePair = new SerializablePairLongString(timeValue, null); - } - - return serializablePair; + return StringFirstLastUtils.readPair(buf, position); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java index 77d4ad3a94b..7f92f00b4ae 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java @@ -19,82 +19,16 @@ package org.apache.druid.query.aggregation.last; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.SerializablePairLongString; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseObjectColumnValueSelector; -import org.apache.druid.segment.ColumnSelectorFactory; +import com.fasterxml.jackson.annotation.JsonCreator; -import java.nio.ByteBuffer; - -@JsonTypeName("stringLastFold") +/** + * For backwards compatibility; equivalent to a regular StringLastAggregatorFactory. + */ public class StringLastFoldingAggregatorFactory extends StringLastAggregatorFactory { - public StringLastFoldingAggregatorFactory( - @JsonProperty("name") String name, - @JsonProperty("fieldName") final String fieldName, - @JsonProperty("maxStringBytes") Integer maxStringBytes - ) + @JsonCreator + public StringLastFoldingAggregatorFactory(String name, String fieldName, Integer maxStringBytes) { super(name, fieldName, maxStringBytes); } - - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) - { - return new StringLastAggregator(null, null, maxStringBytes) - { - @Override - public void aggregate() - { - SerializablePairLongString pair = (SerializablePairLongString) selector.getObject(); - if (pair != null && pair.lhs >= lastTime) { - lastTime = pair.lhs; - lastValue = pair.rhs; - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) - { - return new StringLastBufferAggregator(null, null, maxStringBytes) - { - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePairLongString pair = (SerializablePairLongString) selector.getObject(); - if (pair != null && pair.lhs != null) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - long lastTime = mutationBuffer.getLong(position); - - if (pair.lhs >= lastTime) { - mutationBuffer.putLong(position, pair.lhs); - if (pair.rhs != null) { - byte[] valueBytes = StringUtils.toUtf8(pair.rhs); - - mutationBuffer.putInt(position + Long.BYTES, valueBytes.length); - mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.put(valueBytes); - } else { - mutationBuffer.putInt(position + Long.BYTES, 0); - } - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } } diff --git a/processing/src/main/java/org/apache/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java index de50bc5d256..b2f19f51973 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java @@ -32,7 +32,7 @@ public class DoubleValueMatcherColumnSelectorStrategy { final Double matchVal = DimensionHandlerUtils.convertObjectToDouble(value); if (matchVal == null) { - return ValueMatcher.nullValueMatcher(selector); + return ValueMatcher.primitiveNullValueMatcher(selector); } final long matchValLongBits = Double.doubleToLongBits(matchVal); diff --git a/processing/src/main/java/org/apache/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java index a37f0d592fb..980fc32eb05 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java @@ -31,7 +31,7 @@ public class FloatValueMatcherColumnSelectorStrategy { final Float matchVal = DimensionHandlerUtils.convertObjectToFloat(value); if (matchVal == null) { - return ValueMatcher.nullValueMatcher(selector); + return ValueMatcher.primitiveNullValueMatcher(selector); } final int matchValIntBits = Float.floatToIntBits(matchVal); diff --git a/processing/src/main/java/org/apache/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java index 71a59aeae3f..8ff746c6c6a 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java @@ -31,7 +31,7 @@ public class LongValueMatcherColumnSelectorStrategy { final Long matchVal = DimensionHandlerUtils.convertObjectToLong(value); if (matchVal == null) { - return ValueMatcher.nullValueMatcher(selector); + return ValueMatcher.primitiveNullValueMatcher(selector); } final long matchValLong = matchVal; return new ValueMatcher() diff --git a/processing/src/main/java/org/apache/druid/query/filter/ValueMatcher.java b/processing/src/main/java/org/apache/druid/query/filter/ValueMatcher.java index a800ec7e39e..c45e14aa1e9 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/ValueMatcher.java +++ b/processing/src/main/java/org/apache/druid/query/filter/ValueMatcher.java @@ -37,7 +37,12 @@ public interface ValueMatcher extends HotLoopCallee boolean matches(); // Utility method to match null values. - static ValueMatcher nullValueMatcher(BaseNullableColumnValueSelector selector) + + /** + * Returns a ValueMatcher that matches when the primitive long, double, or float value from {@code selector} + * should be treated as null. + */ + static ValueMatcher primitiveNullValueMatcher(BaseNullableColumnValueSelector selector) { return new ValueMatcher() { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index df29bf7b316..fd3213fe9a8 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -48,7 +48,7 @@ import org.apache.druid.query.groupby.epinephelinae.column.FloatGroupByColumnSel import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorPlus; import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.column.LongGroupByColumnSelectorStrategy; -import org.apache.druid.query.groupby.epinephelinae.column.NullableValueGroupByColumnSelectorStrategy; +import org.apache.druid.query.groupby.epinephelinae.column.NullableNumericGroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.column.StringGroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; @@ -350,20 +350,20 @@ public class GroupByQueryEngineV2 return new DictionaryBuildingStringGroupByColumnSelectorStrategy(); } case LONG: - return makeNullableStrategy(new LongGroupByColumnSelectorStrategy()); + return makeNullableNumericStrategy(new LongGroupByColumnSelectorStrategy()); case FLOAT: - return makeNullableStrategy(new FloatGroupByColumnSelectorStrategy()); + return makeNullableNumericStrategy(new FloatGroupByColumnSelectorStrategy()); case DOUBLE: - return makeNullableStrategy(new DoubleGroupByColumnSelectorStrategy()); + return makeNullableNumericStrategy(new DoubleGroupByColumnSelectorStrategy()); default: throw new IAE("Cannot create query type helper from invalid type [%s]", type); } } - private GroupByColumnSelectorStrategy makeNullableStrategy(GroupByColumnSelectorStrategy delegate) + private GroupByColumnSelectorStrategy makeNullableNumericStrategy(GroupByColumnSelectorStrategy delegate) { if (NullHandling.sqlCompatible()) { - return new NullableValueGroupByColumnSelectorStrategy(delegate); + return new NullableNumericGroupByColumnSelectorStrategy(delegate); } else { return delegate; } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableValueGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java similarity index 88% rename from processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableValueGroupByColumnSelectorStrategy.java rename to processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java index 3cbd6b8dc60..663a0dd2b21 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableValueGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java @@ -29,11 +29,17 @@ import org.apache.druid.segment.ColumnValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; -public class NullableValueGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy +/** + * A wrapper around a numeric {@link GroupByColumnSelectorStrategy} that makes it null-aware. Should only be used + * for numeric strategies, not for string strategies. + * + * @see org.apache.druid.segment.BaseNullableColumnValueSelector#isNull() for why this only works in the numeric case + */ +public class NullableNumericGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy { private final GroupByColumnSelectorStrategy delegate; - public NullableValueGroupByColumnSelectorStrategy(GroupByColumnSelectorStrategy delegate) + public NullableNumericGroupByColumnSelectorStrategy(GroupByColumnSelectorStrategy delegate) { this.delegate = delegate; } diff --git a/processing/src/main/java/org/apache/druid/segment/BaseNullableColumnValueSelector.java b/processing/src/main/java/org/apache/druid/segment/BaseNullableColumnValueSelector.java index ae1199e8ae3..dd838fed71c 100644 --- a/processing/src/main/java/org/apache/druid/segment/BaseNullableColumnValueSelector.java +++ b/processing/src/main/java/org/apache/druid/segment/BaseNullableColumnValueSelector.java @@ -24,16 +24,19 @@ import org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop; /** * Null value checking polymorphic "part" of the {@link ColumnValueSelector} interface for primitive values. - * Users of {@link BaseLongColumnValueSelector#getLong()}, {@link BaseDoubleColumnValueSelector#getDouble()} - * and {@link BaseFloatColumnValueSelector#getFloat()} are required to check the nullability of the primitive - * types returned. */ @PublicApi public interface BaseNullableColumnValueSelector { /** - * Returns true if selected primitive value is null for {@link BaseFloatColumnValueSelector}, - * {@link BaseLongColumnValueSelector} and {@link BaseDoubleColumnValueSelector} otherwise false. + * Returns true if the primitive long, double, or float value returned by this selector should be treated as null. + * + * Users of {@link BaseLongColumnValueSelector#getLong()}, {@link BaseDoubleColumnValueSelector#getDouble()} + * and {@link BaseFloatColumnValueSelector#getFloat()} must check this method first, or else they may improperly + * use placeholder values returned by the primitive get methods. + * + * Users of {@link BaseObjectColumnValueSelector#getObject()} should not call this method. Instead, call "getObject" + * and check if it is null. */ @CalledFromHotLoop boolean isNull(); diff --git a/processing/src/main/java/org/apache/druid/segment/BaseObjectColumnValueSelector.java b/processing/src/main/java/org/apache/druid/segment/BaseObjectColumnValueSelector.java index 859f401b57b..6bd9557e1d1 100644 --- a/processing/src/main/java/org/apache/druid/segment/BaseObjectColumnValueSelector.java +++ b/processing/src/main/java/org/apache/druid/segment/BaseObjectColumnValueSelector.java @@ -31,7 +31,7 @@ import javax.annotation.Nullable; * All implementations of this interface MUST also implement {@link ColumnValueSelector}. */ @ExtensionPoint -public interface BaseObjectColumnValueSelector extends BaseNullableColumnValueSelector +public interface BaseObjectColumnValueSelector { @Nullable T getObject(); diff --git a/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java index a8dd18fef43..8283fd728e7 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java @@ -63,25 +63,35 @@ public class ExpressionFilter implements Filter @Override public boolean matches() { - if (NullHandling.sqlCompatible() && selector.isNull()) { - return false; - } - ExprEval eval = selector.getObject(); - if (eval == null) { - return false; - } + final ExprEval eval = selector.getObject(); + switch (eval.type()) { case LONG_ARRAY: - Long[] lResult = eval.asLongArray(); + final Long[] lResult = eval.asLongArray(); + if (lResult == null) { + return false; + } + return Arrays.stream(lResult).anyMatch(Evals::asBoolean); + case STRING_ARRAY: - String[] sResult = eval.asStringArray(); + final String[] sResult = eval.asStringArray(); + if (sResult == null) { + return false; + } + return Arrays.stream(sResult).anyMatch(Evals::asBoolean); + case DOUBLE_ARRAY: - Double[] dResult = eval.asDoubleArray(); + final Double[] dResult = eval.asDoubleArray(); + if (dResult == null) { + return false; + } + return Arrays.stream(dResult).anyMatch(Evals::asBoolean); + default: - return Evals.asBoolean(selector.getLong()); + return eval.asBoolean(); } } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index 7d7fcac60eb..5f6c8fce07c 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -138,12 +138,19 @@ public abstract class IncrementalIndex extends AbstractIndex imp public ColumnValueSelector makeColumnValueSelector(final String column) { final String typeName = agg.getTypeName(); - boolean isComplexMetric = + final boolean isComplexMetric = GuavaUtils.getEnumIfPresent(ValueType.class, StringUtils.toUpperCase(typeName)) == null || typeName.equalsIgnoreCase(ValueType.COMPLEX.name()); + + final ColumnValueSelector selector = baseSelectorFactory.makeColumnValueSelector(column); + if (!isComplexMetric || !deserializeComplexMetrics) { - return baseSelectorFactory.makeColumnValueSelector(column); + return selector; } else { + // Wrap selector in a special one that uses ComplexMetricSerde to modify incoming objects. + // For complex aggregators that read from multiple columns, we wrap all of them. This is not ideal but it + // has worked so far. + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); if (serde == null) { throw new ISE("Don't know how to handle type[%s]", typeName); @@ -155,31 +162,25 @@ public abstract class IncrementalIndex extends AbstractIndex imp @Override public boolean isNull() { - return in.get().getMetric(column) == null; + return selector.isNull(); } @Override public long getLong() { - Number metric = in.get().getMetric(column); - assert NullHandling.replaceWithDefault() || metric != null; - return DimensionHandlerUtils.nullToZero(metric).longValue(); + return selector.getLong(); } @Override public float getFloat() { - Number metric = in.get().getMetric(column); - assert NullHandling.replaceWithDefault() || metric != null; - return DimensionHandlerUtils.nullToZero(metric).floatValue(); + return selector.getFloat(); } @Override public double getDouble() { - Number metric = in.get().getMetric(column); - assert NullHandling.replaceWithDefault() || metric != null; - return DimensionHandlerUtils.nullToZero(metric).doubleValue(); + return selector.getDouble(); } @Override @@ -192,6 +193,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp @Override public Object getObject() { + // Here is where the magic happens: read from "in" directly, don't go through the normal "selector". return extractor.extractValue(in.get(), column, agg); } @@ -199,6 +201,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp public void inspectRuntimeShape(RuntimeShapeInspector inspector) { inspector.visit("in", in); + inspector.visit("selector", selector); inspector.visit("extractor", extractor); } }; @@ -997,7 +1000,10 @@ public abstract class IncrementalIndex extends AbstractIndex imp return iterableWithPostAggregations(null, false).iterator(); } - public Iterable iterableWithPostAggregations(@Nullable final List postAggs, final boolean descending) + public Iterable iterableWithPostAggregations( + @Nullable final List postAggs, + final boolean descending + ) { return () -> { final List dimensions = getDimensions(); @@ -1237,6 +1243,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp /** * Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator} + * * @return */ Iterable persistIterable(); diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java index 1348029d998..98cbe36fb15 100644 --- a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java +++ b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java @@ -110,8 +110,8 @@ public class ExpressionSelectors ExprEval eval = baseSelector.getObject(); if (eval.isArray()) { return Arrays.stream(eval.asStringArray()) - .map(NullHandling::emptyToNullIfNeeded) - .collect(Collectors.toList()); + .map(NullHandling::emptyToNullIfNeeded) + .collect(Collectors.toList()); } return eval.value(); } @@ -374,15 +374,15 @@ public class ExpressionSelectors if (nativeType == ValueType.FLOAT) { ColumnValueSelector selector = columnSelectorFactory .makeColumnValueSelector(columnName); - supplier = makeNullableSupplier(selector, selector::getFloat); + supplier = makeNullableNumericSupplier(selector, selector::getFloat); } else if (nativeType == ValueType.LONG) { ColumnValueSelector selector = columnSelectorFactory .makeColumnValueSelector(columnName); - supplier = makeNullableSupplier(selector, selector::getLong); + supplier = makeNullableNumericSupplier(selector, selector::getLong); } else if (nativeType == ValueType.DOUBLE) { ColumnValueSelector selector = columnSelectorFactory .makeColumnValueSelector(columnName); - supplier = makeNullableSupplier(selector, selector::getDouble); + supplier = makeNullableNumericSupplier(selector, selector::getDouble); } else if (nativeType == ValueType.STRING) { supplier = supplierFromDimensionSelector( columnSelectorFactory.makeDimensionSelector(new DefaultDimensionSpec(columnName, columnName)), @@ -419,7 +419,12 @@ public class ExpressionSelectors } } - private static Supplier makeNullableSupplier( + /** + * Wraps a {@link ColumnValueSelector} and uses it to supply numeric values in a null-aware way. + * + * @see org.apache.druid.segment.BaseNullableColumnValueSelector#isNull() for why this only works in the numeric case + */ + private static Supplier makeNullableNumericSupplier( ColumnValueSelector selector, Supplier supplier ) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java index c4ae2671bd8..190c3435885 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java @@ -39,7 +39,7 @@ import java.nio.ByteBuffer; public class StringFirstAggregationTest { private final Integer MAX_STRING_SIZE = 1024; - private AggregatorFactory stringLastAggFactory; + private AggregatorFactory stringFirstAggFactory; private AggregatorFactory combiningAggFactory; private ColumnSelectorFactory colSelectorFactory; private TestLongColumnSelector timeSelector; @@ -59,8 +59,8 @@ public class StringFirstAggregationTest @Before public void setup() { - stringLastAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE); - combiningAggFactory = stringLastAggFactory.getCombiningFactory(); + stringFirstAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE); + combiningAggFactory = stringFirstAggFactory.getCombiningFactory(); timeSelector = new TestLongColumnSelector(times); valueSelector = new TestObjectColumnSelector<>(strings); objectSelector = new TestObjectColumnSelector<>(pairs); @@ -72,9 +72,9 @@ public class StringFirstAggregationTest } @Test - public void testStringLastAggregator() + public void testStringFirstAggregator() { - Aggregator agg = stringLastAggFactory.factorize(colSelectorFactory); + Aggregator agg = stringFirstAggFactory.factorize(colSelectorFactory); aggregate(agg); aggregate(agg); @@ -87,12 +87,12 @@ public class StringFirstAggregationTest } @Test - public void testStringLastBufferAggregator() + public void testStringFirstBufferAggregator() { - BufferAggregator agg = stringLastAggFactory.factorizeBuffered( + BufferAggregator agg = stringFirstAggFactory.factorizeBuffered( colSelectorFactory); - ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]); + ByteBuffer buffer = ByteBuffer.wrap(new byte[stringFirstAggFactory.getMaxIntermediateSize()]); agg.init(buffer, 0); aggregate(agg, buffer, 0); @@ -110,11 +110,11 @@ public class StringFirstAggregationTest { SerializablePairLongString pair1 = new SerializablePairLongString(1467225000L, "AAAA"); SerializablePairLongString pair2 = new SerializablePairLongString(1467240000L, "BBBB"); - Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2)); + Assert.assertEquals(pair2, stringFirstAggFactory.combine(pair1, pair2)); } @Test - public void testStringLastCombiningAggregator() + public void testStringFirstCombiningAggregator() { Aggregator agg = combiningAggFactory.factorize(colSelectorFactory); @@ -136,7 +136,7 @@ public class StringFirstAggregationTest BufferAggregator agg = combiningAggFactory.factorizeBuffered( colSelectorFactory); - ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]); + ByteBuffer buffer = ByteBuffer.wrap(new byte[stringFirstAggFactory.getMaxIntermediateSize()]); agg.init(buffer, 0); aggregate(agg, buffer, 0); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java index 7d3e0566f15..04700fae42d 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java @@ -27,7 +27,6 @@ import org.junit.Assert; import org.junit.Test; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; public class StringFirstBufferAggregatorTest { @@ -65,13 +64,7 @@ public class StringFirstBufferAggregatorTest maxStringBytes ); - String testString = "ZZZZ"; - ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); - buf.putLong(1526728500L); - buf.putInt(testString.length()); - buf.put(testString.getBytes(StandardCharsets.UTF_8)); - int position = 0; agg.init(buf, position); @@ -83,7 +76,7 @@ public class StringFirstBufferAggregatorTest SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); - Assert.assertEquals("expectec last string value", strings[0], sp.rhs); + Assert.assertEquals("expected last string value", strings[0], sp.rhs); Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs)); } @@ -109,13 +102,7 @@ public class StringFirstBufferAggregatorTest maxStringBytes ); - String testString = "ZZZZ"; - ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); - buf.putLong(1526728500L); - buf.putInt(testString.length()); - buf.put(testString.getBytes(StandardCharsets.UTF_8)); - int position = 0; agg.init(buf, position); @@ -127,12 +114,12 @@ public class StringFirstBufferAggregatorTest SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); - Assert.assertEquals("expectec last string value", strings[1], sp.rhs); + Assert.assertEquals("expected last string value", strings[1], sp.rhs); Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[1]), new Long(sp.lhs)); } - @Test(expected = IllegalStateException.class) + @Test public void testNoStringValue() { @@ -153,13 +140,7 @@ public class StringFirstBufferAggregatorTest maxStringBytes ); - String testString = "ZZZZ"; - ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); - buf.putLong(1526728500L); - buf.putInt(testString.length()); - buf.put(testString.getBytes(StandardCharsets.UTF_8)); - int position = 0; agg.init(buf, position); @@ -167,5 +148,10 @@ public class StringFirstBufferAggregatorTest for (int i = 0; i < timestamps.length; i++) { aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); } + + SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); + + Assert.assertEquals(1526724600L, (long) sp.lhs); + Assert.assertEquals("2.0", sp.rhs); } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java index 6442002f971..3ae838442fb 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.aggregation.first; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.druid.data.input.MapBasedInputRow; @@ -29,14 +30,21 @@ import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.SerializablePairLongStringSerde; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.apache.druid.segment.incremental.IndexSizeExceededException; +import org.apache.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; +import org.junit.Before; import org.junit.Test; import java.util.Collections; @@ -44,80 +52,99 @@ import java.util.List; public class StringFirstTimeseriesQueryTest { + private static final String VISITOR_ID = "visitor_id"; + private static final String CLIENT_TYPE = "client_type"; + private static final String FIRST_CLIENT_TYPE = "first_client_type"; - @Test - public void testTopNWithDistinctCountAgg() throws Exception + private static final DateTime TIME1 = DateTimes.of("2016-03-04T00:00:00.000Z"); + private static final DateTime TIME2 = DateTimes.of("2016-03-04T01:00:00.000Z"); + + private IncrementalIndex incrementalIndex; + private QueryableIndex queryableIndex; + + @Before + public void setUp() throws IndexSizeExceededException { - TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); + final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde(); + ComplexMetrics.registerSerde(serde.getTypeName(), serde); - String visitor_id = "visitor_id"; - String client_type = "client_type"; - - IncrementalIndex index = new IncrementalIndex.Builder() + incrementalIndex = new IncrementalIndex.Builder() .setIndexSchema( new IncrementalIndexSchema.Builder() .withQueryGranularity(Granularities.SECOND) .withMetrics(new CountAggregatorFactory("cnt")) - .withMetrics(new StringFirstAggregatorFactory( - "last_client_type", "client_type", 1024) - ) + .withMetrics(new StringFirstAggregatorFactory(FIRST_CLIENT_TYPE, CLIENT_TYPE, 1024)) .build() ) .setMaxRowCount(1000) .buildOnheap(); + incrementalIndex.add( + new MapBasedInputRow( + TIME1, + Lists.newArrayList(VISITOR_ID, CLIENT_TYPE), + ImmutableMap.of(VISITOR_ID, "0", CLIENT_TYPE, "iphone") + ) + ); + incrementalIndex.add( + new MapBasedInputRow( + TIME1, + Lists.newArrayList(VISITOR_ID, CLIENT_TYPE), + ImmutableMap.of(VISITOR_ID, "1", CLIENT_TYPE, "iphone") + ) + ); + incrementalIndex.add( + new MapBasedInputRow( + TIME2, + Lists.newArrayList(VISITOR_ID, CLIENT_TYPE), + ImmutableMap.of(VISITOR_ID, "0", CLIENT_TYPE, "android") + ) + ); - DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z"); - long timestamp = time.getMillis(); + queryableIndex = TestIndex.persistRealtimeAndLoadMMapped(incrementalIndex); + } + + @Test + public void testTimeseriesQuery() + { + TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); - DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z"); - long timestamp1 = time1.getMillis(); - index.add( - new MapBasedInputRow( - timestamp, - Lists.newArrayList(visitor_id, client_type), - ImmutableMap.of(visitor_id, "0", client_type, "iphone") - ) - ); - index.add( - new MapBasedInputRow( - timestamp, - Lists.newArrayList(visitor_id, client_type), - ImmutableMap.of(visitor_id, "1", client_type, "iphone") - ) - ); - index.add( - new MapBasedInputRow( - timestamp1, - Lists.newArrayList(visitor_id, client_type), - ImmutableMap.of(visitor_id, "0", client_type, "android") - ) - ); TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.DATA_SOURCE) .granularity(QueryRunnerTestHelper.ALL_GRAN) .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) .aggregators( - Collections.singletonList( - new StringFirstAggregatorFactory( - "last_client_type", client_type, 1024 - ) + ImmutableList.of( + new StringFirstAggregatorFactory("nonfolding", CLIENT_TYPE, 1024), + new StringFirstAggregatorFactory("folding", FIRST_CLIENT_TYPE, 1024), + new StringFirstAggregatorFactory("nonexistent", "nonexistent", 1024), + new StringFirstAggregatorFactory("numeric", "cnt", 1024) ) ) .build(); - final Iterable> results = - engine.process(query, new IncrementalIndexStorageAdapter(index)).toList(); - List> expectedResults = Collections.singletonList( new Result<>( - time, + TIME1, new TimeseriesResultValue( - ImmutableMap.of("last_client_type", new SerializablePairLongString(timestamp, "iphone")) + ImmutableMap.builder() + .put("nonfolding", new SerializablePairLongString(TIME1.getMillis(), "iphone")) + .put("folding", new SerializablePairLongString(TIME1.getMillis(), "iphone")) + .put("nonexistent", new SerializablePairLongString(DateTimes.MAX.getMillis(), null)) + .put("numeric", new SerializablePairLongString(DateTimes.MAX.getMillis(), null)) + .build() ) ) ); - TestHelper.assertExpectedResults(expectedResults, results); + + final Iterable> iiResults = + engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex)).toList(); + + final Iterable> qiResults = + engine.process(query, new QueryableIndexStorageAdapter(queryableIndex)).toList(); + + TestHelper.assertExpectedResults(expectedResults, iiResults, "incremental index"); + TestHelper.assertExpectedResults(expectedResults, qiResults, "queryable index"); } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java index c3de5989119..18a378855a6 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java @@ -27,7 +27,6 @@ import org.junit.Assert; import org.junit.Test; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; public class StringLastBufferAggregatorTest { @@ -65,13 +64,7 @@ public class StringLastBufferAggregatorTest maxStringBytes ); - String testString = "ZZZZ"; - ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); - buf.putLong(1526728500L); - buf.putInt(testString.length()); - buf.put(testString.getBytes(StandardCharsets.UTF_8)); - int position = 0; agg.init(buf, position); @@ -109,13 +102,7 @@ public class StringLastBufferAggregatorTest maxStringBytes ); - String testString = "ZZZZ"; - ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); - buf.putLong(1526728500L); - buf.putInt(testString.length()); - buf.put(testString.getBytes(StandardCharsets.UTF_8)); - int position = 0; agg.init(buf, position); @@ -132,8 +119,8 @@ public class StringLastBufferAggregatorTest } - @Test(expected = IllegalStateException.class) - public void testNoStringValue() + @Test + public void testNonStringValue() { final long[] timestamps = {1526724000L, 1526724600L}; @@ -153,13 +140,7 @@ public class StringLastBufferAggregatorTest maxStringBytes ); - String testString = "ZZZZ"; - ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); - buf.putLong(1526728500L); - buf.putInt(testString.length()); - buf.put(testString.getBytes(StandardCharsets.UTF_8)); - int position = 0; agg.init(buf, position); @@ -167,5 +148,10 @@ public class StringLastBufferAggregatorTest for (int i = 0; i < timestamps.length; i++) { aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); } + + SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); + + Assert.assertEquals(1526724600L, (long) sp.lhs); + Assert.assertEquals("2.0", sp.rhs); } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java index 87ec83ce0e5..7765ec4335a 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.aggregation.last; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.druid.data.input.MapBasedInputRow; @@ -29,14 +30,21 @@ import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.SerializablePairLongStringSerde; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.apache.druid.segment.incremental.IndexSizeExceededException; +import org.apache.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; +import org.junit.Before; import org.junit.Test; import java.util.Collections; @@ -44,83 +52,99 @@ import java.util.List; public class StringLastTimeseriesQueryTest { + private static final String VISITOR_ID = "visitor_id"; + private static final String CLIENT_TYPE = "client_type"; + private static final String LAST_CLIENT_TYPE = "last_client_type"; - @Test - public void testTopNWithDistinctCountAgg() throws Exception + private static final DateTime TIME1 = DateTimes.of("2016-03-04T00:00:00.000Z"); + private static final DateTime TIME2 = DateTimes.of("2016-03-04T01:00:00.000Z"); + + private IncrementalIndex incrementalIndex; + private QueryableIndex queryableIndex; + + @Before + public void setUp() throws IndexSizeExceededException { - TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); + final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde(); + ComplexMetrics.registerSerde(serde.getTypeName(), serde); - String visitor_id = "visitor_id"; - String client_type = "client_type"; - - IncrementalIndex index = new IncrementalIndex.Builder() + incrementalIndex = new IncrementalIndex.Builder() .setIndexSchema( new IncrementalIndexSchema.Builder() .withQueryGranularity(Granularities.SECOND) .withMetrics(new CountAggregatorFactory("cnt")) - .withMetrics(new StringLastAggregatorFactory( - "last_client_type", "client_type", 1024) - ) + .withMetrics(new StringLastAggregatorFactory(LAST_CLIENT_TYPE, CLIENT_TYPE, 1024)) .build() ) .setMaxRowCount(1000) .buildOnheap(); + incrementalIndex.add( + new MapBasedInputRow( + TIME1, + Lists.newArrayList(VISITOR_ID, CLIENT_TYPE), + ImmutableMap.of(VISITOR_ID, "0", CLIENT_TYPE, "iphone") + ) + ); + incrementalIndex.add( + new MapBasedInputRow( + TIME1, + Lists.newArrayList(VISITOR_ID, CLIENT_TYPE), + ImmutableMap.of(VISITOR_ID, "1", CLIENT_TYPE, "iphone") + ) + ); + incrementalIndex.add( + new MapBasedInputRow( + TIME2, + Lists.newArrayList(VISITOR_ID, CLIENT_TYPE), + ImmutableMap.of(VISITOR_ID, "0", CLIENT_TYPE, "android") + ) + ); - DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z"); - long timestamp = time.getMillis(); + queryableIndex = TestIndex.persistRealtimeAndLoadMMapped(incrementalIndex); + } + + @Test + public void testTimeseriesQuery() + { + TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); - DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z"); - long timestamp1 = time1.getMillis(); - index.add( - new MapBasedInputRow( - timestamp, - Lists.newArrayList(visitor_id, client_type), - ImmutableMap.of(visitor_id, "0", client_type, "iphone") - ) - ); - index.add( - new MapBasedInputRow( - timestamp, - Lists.newArrayList(visitor_id, client_type), - ImmutableMap.of(visitor_id, "1", client_type, "iphone") - ) - ); - index.add( - new MapBasedInputRow( - timestamp1, - Lists.newArrayList(visitor_id, client_type), - ImmutableMap.of(visitor_id, "0", client_type, "android") - ) - ); TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.DATA_SOURCE) .granularity(QueryRunnerTestHelper.ALL_GRAN) .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) .aggregators( - Collections.singletonList( - new StringLastAggregatorFactory( - "last_client_type", client_type, 1024 - ) + ImmutableList.of( + new StringLastAggregatorFactory("nonfolding", CLIENT_TYPE, 1024), + new StringLastAggregatorFactory("folding", LAST_CLIENT_TYPE, 1024), + new StringLastAggregatorFactory("nonexistent", "nonexistent", 1024), + new StringLastAggregatorFactory("numeric", "cnt", 1024) ) ) .build(); - final Iterable> results = - engine.process(query, new IncrementalIndexStorageAdapter(index)).toList(); - List> expectedResults = Collections.singletonList( new Result<>( - time, + TIME1, new TimeseriesResultValue( - ImmutableMap.of( - "last_client_type", - new SerializablePairLongString(timestamp1, "android") - ) + ImmutableMap.builder() + .put("nonfolding", new SerializablePairLongString(TIME2.getMillis(), "android")) + .put("folding", new SerializablePairLongString(TIME2.getMillis(), "android")) + .put("nonexistent", new SerializablePairLongString(DateTimes.MIN.getMillis(), null)) + .put("numeric", new SerializablePairLongString(DateTimes.MIN.getMillis(), null)) + .build() ) ) ); - TestHelper.assertExpectedResults(expectedResults, results); + + final Iterable> iiResults = + engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex)).toList(); + + final Iterable> qiResults = + engine.process(query, new QueryableIndexStorageAdapter(queryableIndex)).toList(); + + TestHelper.assertExpectedResults(expectedResults, iiResults, "incremental index"); + TestHelper.assertExpectedResults(expectedResults, qiResults, "queryable index"); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index cf59d7ec4ef..c461c7e774a 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -540,7 +540,13 @@ public class GroupByQueryQueryToolChestTest ); // test timestamps that result in integer size millis - final ResultRow result1 = ResultRow.of(123L, "val1", "fooval1", 1, getIntermediateComplexValue(ValueType.STRING, "val1")); + final ResultRow result1 = ResultRow.of( + 123L, + "val1", + "fooval1", + 1, + getIntermediateComplexValue(ValueType.STRING, "val1") + ); Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result1); diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java index aca6b749a1b..fc889fd325c 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -150,8 +150,10 @@ public class TopNQueryQueryToolChestTest ).getCacheStrategy(query2); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); - Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1), - strategy2.computeResultLevelCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); } @Test @@ -234,8 +236,10 @@ public class TopNQueryQueryToolChestTest //segment level cache key excludes postaggregates in topn Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy1.computeResultLevelCacheKey(query1))); - Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1), - strategy2.computeResultLevelCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); } @Test @@ -323,7 +327,8 @@ public class TopNQueryQueryToolChestTest collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong((Long) dimValue).asBytes()); break; case DOUBLE: - collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits((Double) dimValue)).asBytes()); + collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits((Double) dimValue)) + .asBytes()); break; case FLOAT: collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits((Float) dimValue)).asBytes()); diff --git a/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java index 286b1b632d5..c3fb6aefc26 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java @@ -188,7 +188,10 @@ public class ExpressionFilterTest extends BaseFilterTest public void testConstantExpression() { assertFilterMatchesSkipVectorize(edf("1 + 1"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")); + assertFilterMatchesSkipVectorize(edf("'true'"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")); + assertFilterMatchesSkipVectorize(edf("0 + 0"), ImmutableList.of()); + assertFilterMatchesSkipVectorize(edf("'false'"), ImmutableList.of()); } @Test