From 209f8a95468e8450e3d1365d7f0eda98a35732df Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 15 Jul 2024 15:00:17 +0530 Subject: [PATCH] Deserialize complex dimensions in group by queries to their respective types when reading from spilled files and cached results (#16620) Like #16511, but for keys that have been spilled or cached during the grouping process --- .../druid/jackson/AggregatorsModule.java | 15 +- .../apache/druid/query/QueryToolChest.java | 21 ++- .../DataSourceQueryQueryToolChest.java | 8 + .../groupby/GroupByQueryQueryToolChest.java | 102 +++++++---- .../query/groupby/epinephelinae/Grouper.java | 11 ++ .../epinephelinae/RowBasedGrouperHelper.java | 135 ++++++++++++-- .../epinephelinae/RowBasedKeySerdeHelper.java | 5 + .../epinephelinae/SpillingGrouper.java | 2 +- .../SegmentMetadataQueryQueryToolChest.java | 11 ++ .../search/SearchQueryQueryToolChest.java | 10 ++ .../TimeBoundaryQueryQueryToolChest.java | 12 ++ .../TimeseriesQueryQueryToolChest.java | 12 ++ .../query/topn/TopNQueryQueryToolChest.java | 13 +- .../ObjectStrategyComplexTypeStrategy.java | 6 +- .../druid/segment/column/TypeStrategies.java | 30 ++++ .../druid/segment/column/TypeStrategy.java | 2 +- .../aggregation/AggregationTestHelper.java | 8 +- .../ComplexDimensionGroupByQueryTest.java | 164 ++++++++++++++++++ .../GroupByQueryQueryToolChestTest.java | 94 ++++++++-- .../query/groupby/GroupByQueryRunnerTest.java | 5 +- .../druid/client/CachingClusteredClient.java | 2 +- .../druid/client/CachingQueryRunner.java | 2 +- .../query/ResultLevelCachingQueryRunner.java | 2 +- .../druid/client/CachingQueryRunnerTest.java | 15 +- 24 files changed, 592 insertions(+), 95 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java diff --git a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java index f7aca511e17..200e6fcb139 100644 --- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java @@ -83,6 +83,16 @@ public class AggregatorsModule extends SimpleModule { super("AggregatorFactories"); + registerComplexMetricsAndSerde(); + + setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class); + setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class); + + addSerializer(DoubleMeanHolder.class, DoubleMeanHolder.Serializer.INSTANCE); + } + + public static void registerComplexMetricsAndSerde() + { ComplexMetrics.registerSerde(HyperUniquesSerde.TYPE_NAME, new HyperUniquesSerde()); ComplexMetrics.registerSerde(PreComputedHyperUniquesSerde.TYPE_NAME, new PreComputedHyperUniquesSerde()); ComplexMetrics.registerSerde( @@ -102,11 +112,6 @@ public class AggregatorsModule extends SimpleModule SerializablePairLongLongComplexMetricSerde.TYPE_NAME, new SerializablePairLongLongComplexMetricSerde() ); - - setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class); - setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class); - - addSerializer(DoubleMeanHolder.class, DoubleMeanHolder.Serializer.INSTANCE); } @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index b0678f247c9..fa394beec43 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -251,19 +251,36 @@ public abstract class QueryToolChest getResultTypeReference(); + /** + * Like {@link #getCacheStrategy(Query, ObjectMapper)} but the caller doesn't supply the object mapper for deserializing + * and converting the cached data to desired type. It's upto the individual implementations to decide the appropriate action in that case. + * It can either throw an exception outright or decide if the query requires the object mapper for proper downstream processing and + * work with the generic java types if not. + *

+ * @deprecated Use {@link #getCacheStrategy(Query, ObjectMapper)} instead + */ + @Deprecated + @Nullable + public CacheStrategy getCacheStrategy(QueryType query) + { + return null; + } + /** * Returns a CacheStrategy to be used to load data into the cache and remove it from the cache. *

* This is optional. If it returns null, caching is effectively disabled for the query. * * @param query The query whose results might be cached + * @param mapper Object mapper to convert the deserialized generic java objects to desired types. It can be nullable + * to preserve backward compatibility. * @param The type of object that will be stored in the cache * @return A CacheStrategy that can be used to populate and read from the Cache */ @Nullable - public CacheStrategy getCacheStrategy(QueryType query) + public CacheStrategy getCacheStrategy(QueryType query, @Nullable ObjectMapper mapper) { - return null; + return getCacheStrategy(query); } /** diff --git a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java index dbe8922f2e9..21fb5c53afc 100644 --- a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.datasourcemetadata; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.inject.Inject; @@ -38,6 +39,7 @@ import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.timeline.LogicalSegment; +import javax.annotation.Nullable; import java.util.List; import java.util.stream.Collectors; @@ -119,4 +121,10 @@ public class DataSourceQueryQueryToolChest { return null; } + + @Override + public CacheStrategy getCacheStrategy(DataSourceMetadataQuery query, @Nullable ObjectMapper mapper) + { + return null; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index b19b479c26d..d69e09c9ff0 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -77,8 +77,10 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.NullableTypeStrategy; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.nested.StructuredData; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -471,7 +473,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest deserializer = new JsonDeserializer() { - final Class[] dimensionClasses = createDimensionClasses(); + final Class[] dimensionClasses = createDimensionClasses(query); boolean containsComplexDimensions = query.getDimensions() .stream() .anyMatch( @@ -524,30 +526,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest[] createDimensionClasses() - { - final List queryDimensions = query.getDimensions(); - final Class[] classes = new Class[queryDimensions.size()]; - for (int i = 0; i < queryDimensions.size(); ++i) { - final ColumnType dimensionOutputType = queryDimensions.get(i).getOutputType(); - if (dimensionOutputType.is(ValueType.COMPLEX)) { - NullableTypeStrategy nullableTypeStrategy = dimensionOutputType.getNullableStrategy(); - if (!nullableTypeStrategy.groupable()) { - throw DruidException.defensive( - "Ungroupable dimension [%s] with type [%s] found in the query.", - queryDimensions.get(i).getDimension(), - dimensionOutputType - ); - } - classes[i] = nullableTypeStrategy.getClazz(); - } else { - classes[i] = Object.class; - } - } - return classes; - } - }; class GroupByResultRowModule extends SimpleModule @@ -597,9 +575,32 @@ public class GroupByQueryQueryToolChest extends QueryToolChest getCacheStrategy(final GroupByQuery query) + public CacheStrategy getCacheStrategy(GroupByQuery query) { + return getCacheStrategy(query, null); + } + + @Override + public CacheStrategy getCacheStrategy( + final GroupByQuery query, + @Nullable final ObjectMapper mapper + ) + { + + for (DimensionSpec dimension : query.getDimensions()) { + if (dimension.getOutputType().is(ValueType.COMPLEX) && !dimension.getOutputType().equals(ColumnType.NESTED_DATA)) { + if (mapper == null) { + throw DruidException.defensive( + "Cannot deserialize complex dimension of type[%s] from result cache if object mapper is not provided", + dimension.getOutputType().getComplexTypeName() + ); + } + } + } + final Class[] dimensionClasses = createDimensionClasses(query); + return new CacheStrategy() { private static final byte CACHE_STRATEGY_VERSION = 0x1; @@ -726,13 +727,29 @@ public class GroupByQueryQueryToolChest extends QueryToolChest[] createDimensionClasses(final GroupByQuery query) + { + final List queryDimensions = query.getDimensions(); + final Class[] classes = new Class[queryDimensions.size()]; + for (int i = 0; i < queryDimensions.size(); ++i) { + final ColumnType dimensionOutputType = queryDimensions.get(i).getOutputType(); + if (dimensionOutputType.is(ValueType.COMPLEX)) { + NullableTypeStrategy nullableTypeStrategy = dimensionOutputType.getNullableStrategy(); + if (!nullableTypeStrategy.groupable()) { + throw DruidException.defensive( + "Ungroupable dimension [%s] with type [%s] found in the query.", + queryDimensions.get(i).getDimension(), + dimensionOutputType + ); + } + classes[i] = nullableTypeStrategy.getClazz(); + } else { + classes[i] = Object.class; + } + } + return classes; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java index 591624f1ab8..0f3faedb707 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java @@ -19,6 +19,7 @@ package org.apache.druid.query.groupby.epinephelinae; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -232,6 +233,16 @@ public interface Grouper extends Closeable */ BufferComparator bufferComparatorWithAggregators(AggregatorFactory[] aggregatorFactories, int[] aggregatorOffsets); + /** + * Decorates the object mapper enabling it to read and write query results' grouping keys. It is used by the + * {@link SpillingGrouper} to preserve the types of the dimensions after serializing and deserializing them on the + * spilled files. + */ + default ObjectMapper decorateObjectMapper(ObjectMapper spillMapper) + { + return spillMapper; + } + /** * Reset the keySerde to its initial state. After this method is called, {@link #readFromByteBuffer} * and {@link #bufferComparator()} may no longer work properly on previously-serialized keys. diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 491c28d4142..da8a0e04623 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -19,9 +19,14 @@ package org.apache.druid.query.groupby.epinephelinae; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.ObjectCodec; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.primitives.Ints; @@ -84,6 +89,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.Closeable; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -666,22 +672,6 @@ public class RowBasedGrouperHelper this.key = key; } - @JsonCreator - public static RowBasedKey fromJsonArray(final Object[] key) - { - // Type info is lost during serde: - // Floats may be deserialized as doubles, Longs may be deserialized as integers, convert them back - for (int i = 0; i < key.length; i++) { - if (key[i] instanceof Integer) { - key[i] = ((Integer) key[i]).longValue(); - } else if (key[i] instanceof Double) { - key[i] = ((Double) key[i]).floatValue(); - } - } - - return new RowBasedKey(key); - } - @JsonValue public Object[] getKey() { @@ -1371,6 +1361,65 @@ public class RowBasedGrouperHelper ); } + @Override + public ObjectMapper decorateObjectMapper(ObjectMapper spillMapper) + { + + final JsonDeserializer deserializer = new JsonDeserializer() + { + @Override + public RowBasedKey deserialize( + JsonParser jp, + DeserializationContext deserializationContext + ) throws IOException + { + if (!jp.isExpectedStartArrayToken()) { + throw DruidException.defensive("Expected array start token, received [%s]", jp.getCurrentToken()); + } + jp.nextToken(); + + final ObjectCodec codec = jp.getCodec(); + final int timestampAdjustment = includeTimestamp ? 1 : 0; + final int dimsToRead = timestampAdjustment + serdeHelpers.length; + int dimsReadSoFar = 0; + final Object[] objects = new Object[dimsToRead]; + + if (includeTimestamp) { + DruidException.conditionalDefensive( + jp.currentToken() != JsonToken.END_ARRAY, + "Unexpected end of array when deserializing timestamp from the spilled files" + ); + objects[dimsReadSoFar] = codec.readValue(jp, Long.class); + + ++dimsReadSoFar; + jp.nextToken(); + } + + while (jp.currentToken() != JsonToken.END_ARRAY) { + objects[dimsReadSoFar] = + codec.readValue(jp, serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz()); + + ++dimsReadSoFar; + jp.nextToken(); + } + + return new RowBasedKey(objects); + } + }; + + class SpillModule extends SimpleModule + { + public SpillModule() + { + addDeserializer(RowBasedKey.class, deserializer); + } + } + + final ObjectMapper newObjectMapper = spillMapper.copy(); + newObjectMapper.registerModule(new SpillModule()); + return newObjectMapper; + } + @Override public void reset() { @@ -1588,6 +1637,7 @@ public class RowBasedGrouperHelper { final BufferComparator bufferComparator; final String columnTypeName; + final Class clazz; final List dictionary; final Object2IntMap reverseDictionary; @@ -1613,6 +1663,7 @@ public class RowBasedGrouperHelper dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition)), dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition)) ); + clazz = columnType.getNullableStrategy().getClazz(); } // Asserts that we don't entertain any complex types without a typename, to prevent intermixing dictionaries of @@ -1645,6 +1696,12 @@ public class RowBasedGrouperHelper { return reverseDictionary; } + + @Override + public Class getClazz() + { + return clazz; + } } @@ -1726,6 +1783,14 @@ public class RowBasedGrouperHelper { return reverseDictionary; } + + @Override + public Class getClazz() + { + // Jackson deserializes Object[] containing longs to Object[] containing string if Object[].class is returned + // Therefore we are using Object.class + return Object.class; + } } private class ArrayStringRowBasedKeySerdeHelper extends DictionaryBuildingSingleValuedRowBasedKeySerdeHelper @@ -1770,6 +1835,12 @@ public class RowBasedGrouperHelper { return reverseStringArrayDictionary; } + + @Override + public Class getClazz() + { + return Object[].class; + } } private abstract class AbstractStringRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper @@ -1819,6 +1890,12 @@ public class RowBasedGrouperHelper { return bufferComparator; } + + @Override + public Class getClazz() + { + return String.class; + } } private class DynamicDictionaryStringRowBasedKeySerdeHelper extends AbstractStringRowBasedKeySerdeHelper @@ -1937,6 +2014,12 @@ public class RowBasedGrouperHelper { return bufferComparator; } + + @Override + public Class getClazz() + { + return Long.class; + } } private class FloatRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper @@ -1982,6 +2065,12 @@ public class RowBasedGrouperHelper { return bufferComparator; } + + @Override + public Class getClazz() + { + return Float.class; + } } private class DoubleRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper @@ -2027,6 +2116,12 @@ public class RowBasedGrouperHelper { return bufferComparator; } + + @Override + public Class getClazz() + { + return Double.class; + } } // This class is only used when SQL compatible null handling is enabled. @@ -2082,6 +2177,12 @@ public class RowBasedGrouperHelper { return comparator; } + + @Override + public Class getClazz() + { + return delegate.getClazz(); + } } } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java index 1cb29d23bc0..71372ca238b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java @@ -65,4 +65,9 @@ interface RowBasedKeySerdeHelper * Return a {@link BufferComparator} to compare keys stored in ByteBuffer. */ BufferComparator getBufferComparator(); + + /** + * Returns the expected class of the key which used to deserialize the objects correctly from the spilled files. + */ + Class getClazz(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 4e9b96102a1..d8a7760c11d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -152,7 +152,7 @@ public class SpillingGrouper implements Grouper } this.aggregatorFactories = aggregatorFactories; this.temporaryStorage = temporaryStorage; - this.spillMapper = spillMapper; + this.spillMapper = keySerde.decorateObjectMapper(spillMapper); this.spillingAllowed = spillingAllowed; this.sortHasNonGroupingFields = sortHasNonGroupingFields; } diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 912ecb1ac32..fd8d7e7009c 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.metadata; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; @@ -62,6 +63,7 @@ import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -184,6 +186,15 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest getCacheStrategy(final SegmentMetadataQuery query) + { + return getCacheStrategy(query, null); + } + + @Override + public CacheStrategy getCacheStrategy( + final SegmentMetadataQuery query, + @Nullable final ObjectMapper objectMapper + ) { return new CacheStrategy() { diff --git a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java index b390cd83a58..c15e1d0d99c 100644 --- a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.search; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; @@ -124,6 +125,15 @@ public class SearchQueryQueryToolChest extends QueryToolChest, Object, SearchQuery> getCacheStrategy(final SearchQuery query) + { + return getCacheStrategy(query, null); + } + + @Override + public CacheStrategy, Object, SearchQuery> getCacheStrategy( + final SearchQuery query, + @Nullable final ObjectMapper objectMapper + ) { return new CacheStrategy, Object, SearchQuery>() diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 9087dd26a88..eab5e0f5abc 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.timeboundary; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; @@ -47,6 +48,7 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.timeline.LogicalSegment; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Comparator; import java.util.List; @@ -163,6 +165,16 @@ public class TimeBoundaryQueryQueryToolChest @Override public CacheStrategy, Object, TimeBoundaryQuery> getCacheStrategy(final TimeBoundaryQuery query) + { + return getCacheStrategy(query, null); + } + + + @Override + public CacheStrategy, Object, TimeBoundaryQuery> getCacheStrategy( + final TimeBoundaryQuery query, + @Nullable final ObjectMapper objectMapper + ) { return new CacheStrategy, Object, TimeBoundaryQuery>() { diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 17a2f8be956..67c36fe7603 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.timeseries; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -65,6 +66,7 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; @@ -276,6 +278,16 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest, Object, TimeseriesQuery> getCacheStrategy(final TimeseriesQuery query) + { + return getCacheStrategy(query, null); + } + + + @Override + public CacheStrategy, Object, TimeseriesQuery> getCacheStrategy( + final TimeseriesQuery query, + @Nullable final ObjectMapper objectMapper + ) { return new CacheStrategy, Object, TimeseriesQuery>() { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 25a4284aa42..21bc336438a 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.topn; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.Iterables; @@ -64,6 +65,7 @@ import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.column.RowSignature; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; @@ -268,9 +270,18 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Object, TopNQuery> getCacheStrategy(TopNQuery query) + { + return getCacheStrategy(query, null); + } @Override - public CacheStrategy, Object, TopNQuery> getCacheStrategy(final TopNQuery query) + public CacheStrategy, Object, TopNQuery> getCacheStrategy( + final TopNQuery query, + @Nullable final ObjectMapper objectMapper + ) { return new CacheStrategy, Object, TopNQuery>() { diff --git a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java index b274e55282e..f80a1cdcf8d 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java @@ -123,7 +123,7 @@ public class ObjectStrategyComplexTypeStrategy implements TypeStrategy public int hashCode(T o) { if (hashStrategy == null) { - throw DruidException.defensive("hashStrategy not provided"); + throw DruidException.defensive("Type [%s] is not groupable", typeSignature.asTypeString()); } return hashStrategy.hashCode(o); } @@ -132,7 +132,7 @@ public class ObjectStrategyComplexTypeStrategy implements TypeStrategy public boolean equals(T a, T b) { if (hashStrategy == null) { - throw DruidException.defensive("hashStrategy not provided"); + throw DruidException.defensive("Type [%s] is not groupable", typeSignature.asTypeString()); } return hashStrategy.equals(a, b); } @@ -141,7 +141,7 @@ public class ObjectStrategyComplexTypeStrategy implements TypeStrategy public Class getClazz() { if (clazz == null) { - throw DruidException.defensive("hashStrategy not provided"); + throw DruidException.defensive("Type [%s] is not groupable", typeSignature.asTypeString()); } return clazz; } diff --git a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java index bae29179b4d..7ac8def99ec 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java +++ b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java @@ -299,6 +299,12 @@ public class TypeStrategies { return a.equals(b); } + + @Override + public Class getClazz() + { + return Long.class; + } } /** @@ -368,6 +374,12 @@ public class TypeStrategies { return a.equals(b); } + + @Override + public Class getClazz() + { + return Float.class; + } } /** @@ -438,6 +450,12 @@ public class TypeStrategies { return a.equals(b); } + + @Override + public Class getClazz() + { + return Double.class; + } } /** @@ -519,6 +537,12 @@ public class TypeStrategies { return a.equals(b); } + + @Override + public Class getClazz() + { + return String.class; + } } /** @@ -664,5 +688,11 @@ public class TypeStrategies return false; } } + + @Override + public Class getClazz() + { + return Object[].class; + } } } diff --git a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java index c5cff1a0b2f..075fceca473 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java @@ -225,6 +225,6 @@ public interface TypeStrategy extends Comparator, Hash.Strategy */ default Class getClazz() { - throw DruidException.defensive("Not implemented. It is only implemented for complex dimensions which are groupable()"); + throw DruidException.defensive("Not implemented. Check groupable() first"); } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 526a62c813f..2ad9f90148a 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -766,7 +766,7 @@ public class AggregationTestHelper implements Closeable String resultStr = mapper.writer().writeValueAsString(yielder); List resultRows = Lists.transform( - readQueryResultArrayFromString(resultStr), + readQueryResultArrayFromString(resultStr, queryPlus.getQuery()), toolChest.makePreComputeManipulatorFn( queryPlus.getQuery(), MetricManipulatorFns.deserializing() @@ -798,11 +798,13 @@ public class AggregationTestHelper implements Closeable }; } - private List readQueryResultArrayFromString(String str) throws Exception + private List readQueryResultArrayFromString(String str, Query query) throws Exception { List result = new ArrayList(); - JsonParser jp = mapper.getFactory().createParser(str); + ObjectMapper decoratedMapper = toolChest.decorateObjectMapper(mapper, query); + + JsonParser jp = decoratedMapper.getFactory().createParser(str); if (jp.nextToken() != JsonToken.START_ARRAY) { throw new IAE("not an array [%s]", str); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java b/processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java new file mode 100644 index 00000000000..bc1ecbb0ddc --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.aggregation.AggregationTestHelper; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.segment.RowBasedSegment; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.timeline.SegmentId; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@RunWith(Parameterized.class) +public class ComplexDimensionGroupByQueryTest +{ + private final QueryContexts.Vectorize vectorize; + private final AggregationTestHelper helper; + private final List segments; + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + public ComplexDimensionGroupByQueryTest(GroupByQueryConfig config, String vectorize) + { + this.vectorize = QueryContexts.Vectorize.fromString(vectorize); + this.helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + Collections.emptyList(), + config, + tempFolder + ); + Sequence rows = Sequences.simple( + ImmutableList.of( + new Object[]{new SerializablePairLongString(1L, "abc")}, + new Object[]{new SerializablePairLongString(1L, "abc")}, + new Object[]{new SerializablePairLongString(1L, "def")}, + new Object[]{new SerializablePairLongString(1L, "abc")}, + new Object[]{new SerializablePairLongString(1L, "ghi")}, + new Object[]{new SerializablePairLongString(1L, "def")}, + new Object[]{new SerializablePairLongString(1L, "abc")}, + new Object[]{new SerializablePairLongString(1L, "pqr")}, + new Object[]{new SerializablePairLongString(1L, "xyz")}, + new Object[]{new SerializablePairLongString(1L, "foo")}, + new Object[]{new SerializablePairLongString(1L, "bar")} + ) + ); + RowSignature rowSignature = RowSignature.builder() + .add( + "pair", + ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME) + ) + .build(); + + this.segments = Collections.singletonList( + new RowBasedSegment<>( + SegmentId.dummy("dummy"), + rows, + columnName -> { + final int columnNumber = rowSignature.indexOf(columnName); + return row -> columnNumber >= 0 ? row[columnNumber] : null; + }, + rowSignature + ) + ); + } + + @Parameterized.Parameters(name = "config = {0}, vectorize = {1}") + public static Collection constructorFeeder() + { + final List constructors = new ArrayList<>(); + for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { + for (String vectorize : new String[]{"false", "force"}) { + constructors.add(new Object[]{config, vectorize}); + } + } + return constructors; + } + + public Map getContext() + { + return ImmutableMap.of( + QueryContexts.VECTORIZE_KEY, vectorize.toString(), + QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, "true" + ); + } + + @Test + public void testGroupByOnPairClass() + { + GroupByQuery groupQuery = GroupByQuery.builder() + .setDataSource("test_datasource") + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ETERNITY) + .setDimensions(new DefaultDimensionSpec( + "pair", + "pair", + ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME) + )) + .setAggregatorSpecs(new CountAggregatorFactory("count")) + .setContext(getContext()) + .build(); + + if (vectorize == QueryContexts.Vectorize.FORCE) { + // Cannot vectorize group by on complex dimension + Assert.assertThrows( + RuntimeException.class, + () -> helper.runQueryOnSegmentsObjs(segments, groupQuery).toList() + ); + } else { + List resultRows = helper.runQueryOnSegmentsObjs(segments, groupQuery).toList(); + + Assert.assertArrayEquals( + new ResultRow[]{ + ResultRow.of(new SerializablePairLongString(1L, "abc"), 4L), + ResultRow.of(new SerializablePairLongString(1L, "bar"), 1L), + ResultRow.of(new SerializablePairLongString(1L, "def"), 2L), + ResultRow.of(new SerializablePairLongString(1L, "foo"), 1L), + ResultRow.of(new SerializablePairLongString(1L, "ghi"), 1L), + ResultRow.of(new SerializablePairLongString(1L, "pqr"), 1L), + ResultRow.of(new SerializablePairLongString(1L, "xyz"), 1L) + }, + resultRows.toArray() + ); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index f43bbce9d97..7279ca938bd 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -33,6 +33,7 @@ import org.apache.druid.collections.SerializablePair; import org.apache.druid.collections.StupidPool; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.Row; +import org.apache.druid.jackson.AggregatorsModule; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; @@ -97,6 +98,7 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest public static void setUpClass() { NullHandling.initializeForTests(); + AggregatorsModule.registerComplexMetricsAndSerde(); } @Test @@ -130,11 +132,13 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); + final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -190,11 +194,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest ) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -252,11 +257,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest .setHavingSpec(new GreaterThanHavingSpec(QueryRunnerTestHelper.UNIQUE_METRIC, 10)) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -336,11 +342,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest .setHavingSpec(andHavingSpec2) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -427,11 +434,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest .setHavingSpec(havingSpec2) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -490,11 +498,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest )) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -512,6 +521,48 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest doTestCacheStrategy(ColumnType.LONG, 2L); } + @Test + public void testComplexDimensionCacheStrategy() throws IOException + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(ImmutableList.of( + new DefaultDimensionSpec( + "test", + "test", + ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME) + ) + )) + .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .build(); + + ObjectMapper objectMapper = TestHelper.makeJsonMapper(); + + CacheStrategy strategy = + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, objectMapper); + + // test timestamps that result in integer size millis + final ResultRow result1 = ResultRow.of( + 123L, + new SerializablePairLongString(123L, "abc"), + 1 + ); + + Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result1); + + Object fromCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedValue), + strategy.getCacheObjectClazz() + ); + + ResultRow fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); + + Assert.assertEquals(result1, fromCacheResult); + } + @Test public void testMultiColumnCacheStrategy() throws Exception { @@ -538,8 +589,9 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); CacheStrategy strategy = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); // test timestamps that result in integer size millis final ResultRow result1 = ResultRow.of( @@ -1054,8 +1106,9 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); CacheStrategy strategy = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); // test timestamps that result in integer size millis final ResultRow result1 = ResultRow.of( @@ -1147,11 +1200,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -1183,11 +1237,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, "false")) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertTrue( @@ -1245,7 +1300,8 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final GroupByQueryQueryToolChest queryToolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool); - CacheStrategy cacheStrategy = queryToolChest.getCacheStrategy(query); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); + CacheStrategy cacheStrategy = queryToolChest.getCacheStrategy(query, mapper); Assert.assertTrue( "result level cache on broker server for GroupByStrategyV2 should be enabled", cacheStrategy.isCacheable(query, false, false) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index d4dc8734130..a5dbb49bca5 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -33,6 +33,7 @@ import com.google.common.collect.Sets; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.Rows; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.IAE; @@ -9965,7 +9966,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest @Test public void testGroupByComplexColumn() { - cannotVectorize(); GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) @@ -9979,7 +9979,8 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest .setGranularity(QueryRunnerTestHelper.ALL_GRAN) .build(); - expectedException.expect(RuntimeException.class); + expectedException.expect(DruidException.class); + expectedException.expectMessage("Type [COMPLEX] is not groupable"); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 7bcb4c2ce03..5fa34d6699d 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -275,7 +275,7 @@ public class CachingClusteredClient implements QuerySegmentWalker this.responseContext = responseContext; this.query = queryPlus.getQuery(); this.toolChest = warehouse.getToolChest(query); - this.strategy = toolChest.getCacheStrategy(query); + this.strategy = toolChest.getCacheStrategy(query, objectMapper); this.dataSourceAnalysis = query.getDataSource().getAnalysis(); this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER); diff --git a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java index 9bb9f474dd9..41d4bb4ea63 100644 --- a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java @@ -86,7 +86,7 @@ public class CachingQueryRunner implements QueryRunner public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { Query query = queryPlus.getQuery(); - final CacheStrategy strategy = toolChest.getCacheStrategy(query); + final CacheStrategy strategy = toolChest.getCacheStrategy(query, mapper); final boolean populateCache = canPopulateCache(query, strategy); final boolean useCache = canUseCache(query, strategy); diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index 182faba7a09..0af6ebca3ed 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -73,7 +73,7 @@ public class ResultLevelCachingQueryRunner implements QueryRunner this.cache = cache; this.cacheConfig = cacheConfig; this.query = query; - this.strategy = queryToolChest.getCacheStrategy(query); + this.strategy = queryToolChest.getCacheStrategy(query, objectMapper); this.populateResultCache = CacheUtil.isPopulateResultCache( query, strategy, diff --git a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java index a4375a61900..7208ab2fc4b 100644 --- a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java @@ -68,6 +68,7 @@ import org.apache.druid.query.topn.TopNQueryBuilder; import org.apache.druid.query.topn.TopNQueryConfig; import org.apache.druid.query.topn.TopNQueryQueryToolChest; import org.apache.druid.query.topn.TopNResultValue; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.joda.time.DateTime; import org.junit.Assert; @@ -90,7 +91,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @RunWith(Parameterized.class) -public class CachingQueryRunnerTest +public class CachingQueryRunnerTest extends InitializedNullHandlingTest { @Parameterized.Parameters(name = "numBackgroundThreads={0}") public static Iterable constructorFeeder() @@ -222,8 +223,8 @@ public class CachingQueryRunnerTest Cache cache = EasyMock.mock(Cache.class); EasyMock.replay(cache); CachingQueryRunner queryRunner = makeCachingQueryRunner(null, cache, toolchest, Sequences.empty()); - Assert.assertFalse(queryRunner.canPopulateCache(query, toolchest.getCacheStrategy(query))); - Assert.assertFalse(queryRunner.canUseCache(query, toolchest.getCacheStrategy(query))); + Assert.assertFalse(queryRunner.canPopulateCache(query, toolchest.getCacheStrategy(query, null))); + Assert.assertFalse(queryRunner.canUseCache(query, toolchest.getCacheStrategy(query, null))); queryRunner.run(QueryPlus.wrap(query)); EasyMock.verifyUnexpectedCalls(cache); } @@ -243,7 +244,7 @@ public class CachingQueryRunnerTest QueryToolChest toolchest = EasyMock.mock(QueryToolChest.class); Cache cache = EasyMock.mock(Cache.class); - EasyMock.expect(toolchest.getCacheStrategy(query)).andReturn(null); + EasyMock.expect(toolchest.getCacheStrategy(EasyMock.eq(query), EasyMock.anyObject())).andReturn(null); EasyMock.replay(cache, toolchest); CachingQueryRunner queryRunner = makeCachingQueryRunner(new byte[0], cache, toolchest, Sequences.empty()); Assert.assertFalse(queryRunner.canPopulateCache(query, null)); @@ -339,7 +340,7 @@ public class CachingQueryRunnerTest resultSeq ); - CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query); + CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query, null); Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey( CACHE_ID, SEGMENT_DESCRIPTOR, @@ -383,7 +384,7 @@ public class CachingQueryRunnerTest byte[] cacheKeyPrefix = RandomUtils.nextBytes(10); - CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query); + CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query, null); Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey( CACHE_ID, SEGMENT_DESCRIPTOR, @@ -399,7 +400,7 @@ public class CachingQueryRunnerTest toolchest, Sequences.empty() ); - Assert.assertTrue(runner.canUseCache(query, toolchest.getCacheStrategy(query))); + Assert.assertTrue(runner.canUseCache(query, toolchest.getCacheStrategy(query, null))); List results = runner.run(QueryPlus.wrap(query)).toList(); Assert.assertEquals(expectedResults.toString(), results.toString()); }