mirror of https://github.com/apache/druid.git
Deserialize complex dimensions in group by queries to their respective types when reading from spilled files and cached results (#16620)
Like #16511, but for keys that have been spilled or cached during the grouping process
This commit is contained in:
parent
d6c760f7ce
commit
209f8a9546
|
@ -83,6 +83,16 @@ public class AggregatorsModule extends SimpleModule
|
|||
{
|
||||
super("AggregatorFactories");
|
||||
|
||||
registerComplexMetricsAndSerde();
|
||||
|
||||
setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
|
||||
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
|
||||
|
||||
addSerializer(DoubleMeanHolder.class, DoubleMeanHolder.Serializer.INSTANCE);
|
||||
}
|
||||
|
||||
public static void registerComplexMetricsAndSerde()
|
||||
{
|
||||
ComplexMetrics.registerSerde(HyperUniquesSerde.TYPE_NAME, new HyperUniquesSerde());
|
||||
ComplexMetrics.registerSerde(PreComputedHyperUniquesSerde.TYPE_NAME, new PreComputedHyperUniquesSerde());
|
||||
ComplexMetrics.registerSerde(
|
||||
|
@ -102,11 +112,6 @@ public class AggregatorsModule extends SimpleModule
|
|||
SerializablePairLongLongComplexMetricSerde.TYPE_NAME,
|
||||
new SerializablePairLongLongComplexMetricSerde()
|
||||
);
|
||||
|
||||
setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
|
||||
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
|
||||
|
||||
addSerializer(DoubleMeanHolder.class, DoubleMeanHolder.Serializer.INSTANCE);
|
||||
}
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
|
|
|
@ -251,19 +251,36 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
|
|||
*/
|
||||
public abstract TypeReference<ResultType> getResultTypeReference();
|
||||
|
||||
/**
|
||||
* Like {@link #getCacheStrategy(Query, ObjectMapper)} but the caller doesn't supply the object mapper for deserializing
|
||||
* and converting the cached data to desired type. It's upto the individual implementations to decide the appropriate action in that case.
|
||||
* It can either throw an exception outright or decide if the query requires the object mapper for proper downstream processing and
|
||||
* work with the generic java types if not.
|
||||
* <p>
|
||||
* @deprecated Use {@link #getCacheStrategy(Query, ObjectMapper)} instead
|
||||
*/
|
||||
@Deprecated
|
||||
@Nullable
|
||||
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a CacheStrategy to be used to load data into the cache and remove it from the cache.
|
||||
* <p>
|
||||
* This is optional. If it returns null, caching is effectively disabled for the query.
|
||||
*
|
||||
* @param query The query whose results might be cached
|
||||
* @param mapper Object mapper to convert the deserialized generic java objects to desired types. It can be nullable
|
||||
* to preserve backward compatibility.
|
||||
* @param <T> The type of object that will be stored in the cache
|
||||
* @return A CacheStrategy that can be used to populate and read from the Cache
|
||||
*/
|
||||
@Nullable
|
||||
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query)
|
||||
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query, @Nullable ObjectMapper mapper)
|
||||
{
|
||||
return null;
|
||||
return getCacheStrategy(query);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.query.datasourcemetadata;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -38,6 +39,7 @@ import org.apache.druid.query.aggregation.MetricManipulationFn;
|
|||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.timeline.LogicalSegment;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -119,4 +121,10 @@ public class DataSourceQueryQueryToolChest
|
|||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheStrategy getCacheStrategy(DataSourceMetadataQuery query, @Nullable ObjectMapper mapper)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,8 +77,10 @@ import org.apache.druid.segment.column.ColumnType;
|
|||
import org.apache.druid.segment.column.NullableTypeStrategy;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.nested.StructuredData;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -471,7 +473,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupB
|
|||
// Deserializer that can deserialize either array- or map-based rows.
|
||||
final JsonDeserializer<ResultRow> deserializer = new JsonDeserializer<ResultRow>()
|
||||
{
|
||||
final Class<?>[] dimensionClasses = createDimensionClasses();
|
||||
final Class<?>[] dimensionClasses = createDimensionClasses(query);
|
||||
boolean containsComplexDimensions = query.getDimensions()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
|
@ -524,30 +526,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupB
|
|||
return ResultRow.of(objectArray);
|
||||
}
|
||||
}
|
||||
|
||||
private Class<?>[] createDimensionClasses()
|
||||
{
|
||||
final List<DimensionSpec> queryDimensions = query.getDimensions();
|
||||
final Class<?>[] classes = new Class[queryDimensions.size()];
|
||||
for (int i = 0; i < queryDimensions.size(); ++i) {
|
||||
final ColumnType dimensionOutputType = queryDimensions.get(i).getOutputType();
|
||||
if (dimensionOutputType.is(ValueType.COMPLEX)) {
|
||||
NullableTypeStrategy nullableTypeStrategy = dimensionOutputType.getNullableStrategy();
|
||||
if (!nullableTypeStrategy.groupable()) {
|
||||
throw DruidException.defensive(
|
||||
"Ungroupable dimension [%s] with type [%s] found in the query.",
|
||||
queryDimensions.get(i).getDimension(),
|
||||
dimensionOutputType
|
||||
);
|
||||
}
|
||||
classes[i] = nullableTypeStrategy.getClazz();
|
||||
} else {
|
||||
classes[i] = Object.class;
|
||||
}
|
||||
}
|
||||
return classes;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
class GroupByResultRowModule extends SimpleModule
|
||||
|
@ -597,9 +575,32 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupB
|
|||
);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(final GroupByQuery query)
|
||||
public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(GroupByQuery query)
|
||||
{
|
||||
return getCacheStrategy(query, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(
|
||||
final GroupByQuery query,
|
||||
@Nullable final ObjectMapper mapper
|
||||
)
|
||||
{
|
||||
|
||||
for (DimensionSpec dimension : query.getDimensions()) {
|
||||
if (dimension.getOutputType().is(ValueType.COMPLEX) && !dimension.getOutputType().equals(ColumnType.NESTED_DATA)) {
|
||||
if (mapper == null) {
|
||||
throw DruidException.defensive(
|
||||
"Cannot deserialize complex dimension of type[%s] from result cache if object mapper is not provided",
|
||||
dimension.getOutputType().getComplexTypeName()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
final Class<?>[] dimensionClasses = createDimensionClasses(query);
|
||||
|
||||
return new CacheStrategy<ResultRow, Object, GroupByQuery>()
|
||||
{
|
||||
private static final byte CACHE_STRATEGY_VERSION = 0x1;
|
||||
|
@ -726,13 +727,29 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupB
|
|||
int dimPos = 0;
|
||||
while (dimsIter.hasNext() && results.hasNext()) {
|
||||
final DimensionSpec dimensionSpec = dimsIter.next();
|
||||
final Object dimensionObject = results.next();
|
||||
final Object dimensionObjectCasted;
|
||||
|
||||
// Must convert generic Jackson-deserialized type into the proper type.
|
||||
resultRow.set(
|
||||
dimensionStart + dimPos,
|
||||
DimensionHandlerUtils.convertObjectToType(results.next(), dimensionSpec.getOutputType())
|
||||
final ColumnType outputType = dimensionSpec.getOutputType();
|
||||
|
||||
// Must convert generic Jackson-deserialized type into the proper type. The downstream functions expect the
|
||||
// dimensions to be of appropriate types for further processing like merging and comparing.
|
||||
if (outputType.is(ValueType.COMPLEX)) {
|
||||
// Json columns can interpret generic data objects appropriately, hence they are wrapped as is in StructuredData.
|
||||
// They don't need to converted them from Object.class to StructuredData.class using object mapper as that is an
|
||||
// expensive operation that will be wasteful.
|
||||
if (outputType.equals(ColumnType.NESTED_DATA)) {
|
||||
dimensionObjectCasted = StructuredData.wrap(dimensionObject);
|
||||
} else {
|
||||
dimensionObjectCasted = mapper.convertValue(dimensionObject, dimensionClasses[dimPos]);
|
||||
}
|
||||
} else {
|
||||
dimensionObjectCasted = DimensionHandlerUtils.convertObjectToType(
|
||||
dimensionObject,
|
||||
dimensionSpec.getOutputType()
|
||||
);
|
||||
|
||||
}
|
||||
resultRow.set(dimensionStart + dimPos, dimensionObjectCasted);
|
||||
dimPos++;
|
||||
}
|
||||
|
||||
|
@ -861,4 +878,27 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupB
|
|||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private static Class<?>[] createDimensionClasses(final GroupByQuery query)
|
||||
{
|
||||
final List<DimensionSpec> queryDimensions = query.getDimensions();
|
||||
final Class<?>[] classes = new Class[queryDimensions.size()];
|
||||
for (int i = 0; i < queryDimensions.size(); ++i) {
|
||||
final ColumnType dimensionOutputType = queryDimensions.get(i).getOutputType();
|
||||
if (dimensionOutputType.is(ValueType.COMPLEX)) {
|
||||
NullableTypeStrategy nullableTypeStrategy = dimensionOutputType.getNullableStrategy();
|
||||
if (!nullableTypeStrategy.groupable()) {
|
||||
throw DruidException.defensive(
|
||||
"Ungroupable dimension [%s] with type [%s] found in the query.",
|
||||
queryDimensions.get(i).getDimension(),
|
||||
dimensionOutputType
|
||||
);
|
||||
}
|
||||
classes[i] = nullableTypeStrategy.getClazz();
|
||||
} else {
|
||||
classes[i] = Object.class;
|
||||
}
|
||||
}
|
||||
return classes;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.query.groupby.epinephelinae;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
|
@ -232,6 +233,16 @@ public interface Grouper<KeyType> extends Closeable
|
|||
*/
|
||||
BufferComparator bufferComparatorWithAggregators(AggregatorFactory[] aggregatorFactories, int[] aggregatorOffsets);
|
||||
|
||||
/**
|
||||
* Decorates the object mapper enabling it to read and write query results' grouping keys. It is used by the
|
||||
* {@link SpillingGrouper} to preserve the types of the dimensions after serializing and deserializing them on the
|
||||
* spilled files.
|
||||
*/
|
||||
default ObjectMapper decorateObjectMapper(ObjectMapper spillMapper)
|
||||
{
|
||||
return spillMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the keySerde to its initial state. After this method is called, {@link #readFromByteBuffer}
|
||||
* and {@link #bufferComparator()} may no longer work properly on previously-serialized keys.
|
||||
|
|
|
@ -19,9 +19,14 @@
|
|||
|
||||
package org.apache.druid.query.groupby.epinephelinae;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonValue;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import com.fasterxml.jackson.core.ObjectCodec;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.primitives.Ints;
|
||||
|
@ -84,6 +89,7 @@ import org.joda.time.Interval;
|
|||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -666,22 +672,6 @@ public class RowBasedGrouperHelper
|
|||
this.key = key;
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public static RowBasedKey fromJsonArray(final Object[] key)
|
||||
{
|
||||
// Type info is lost during serde:
|
||||
// Floats may be deserialized as doubles, Longs may be deserialized as integers, convert them back
|
||||
for (int i = 0; i < key.length; i++) {
|
||||
if (key[i] instanceof Integer) {
|
||||
key[i] = ((Integer) key[i]).longValue();
|
||||
} else if (key[i] instanceof Double) {
|
||||
key[i] = ((Double) key[i]).floatValue();
|
||||
}
|
||||
}
|
||||
|
||||
return new RowBasedKey(key);
|
||||
}
|
||||
|
||||
@JsonValue
|
||||
public Object[] getKey()
|
||||
{
|
||||
|
@ -1371,6 +1361,65 @@ public class RowBasedGrouperHelper
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectMapper decorateObjectMapper(ObjectMapper spillMapper)
|
||||
{
|
||||
|
||||
final JsonDeserializer<RowBasedKey> deserializer = new JsonDeserializer<RowBasedKey>()
|
||||
{
|
||||
@Override
|
||||
public RowBasedKey deserialize(
|
||||
JsonParser jp,
|
||||
DeserializationContext deserializationContext
|
||||
) throws IOException
|
||||
{
|
||||
if (!jp.isExpectedStartArrayToken()) {
|
||||
throw DruidException.defensive("Expected array start token, received [%s]", jp.getCurrentToken());
|
||||
}
|
||||
jp.nextToken();
|
||||
|
||||
final ObjectCodec codec = jp.getCodec();
|
||||
final int timestampAdjustment = includeTimestamp ? 1 : 0;
|
||||
final int dimsToRead = timestampAdjustment + serdeHelpers.length;
|
||||
int dimsReadSoFar = 0;
|
||||
final Object[] objects = new Object[dimsToRead];
|
||||
|
||||
if (includeTimestamp) {
|
||||
DruidException.conditionalDefensive(
|
||||
jp.currentToken() != JsonToken.END_ARRAY,
|
||||
"Unexpected end of array when deserializing timestamp from the spilled files"
|
||||
);
|
||||
objects[dimsReadSoFar] = codec.readValue(jp, Long.class);
|
||||
|
||||
++dimsReadSoFar;
|
||||
jp.nextToken();
|
||||
}
|
||||
|
||||
while (jp.currentToken() != JsonToken.END_ARRAY) {
|
||||
objects[dimsReadSoFar] =
|
||||
codec.readValue(jp, serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz());
|
||||
|
||||
++dimsReadSoFar;
|
||||
jp.nextToken();
|
||||
}
|
||||
|
||||
return new RowBasedKey(objects);
|
||||
}
|
||||
};
|
||||
|
||||
class SpillModule extends SimpleModule
|
||||
{
|
||||
public SpillModule()
|
||||
{
|
||||
addDeserializer(RowBasedKey.class, deserializer);
|
||||
}
|
||||
}
|
||||
|
||||
final ObjectMapper newObjectMapper = spillMapper.copy();
|
||||
newObjectMapper.registerModule(new SpillModule());
|
||||
return newObjectMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset()
|
||||
{
|
||||
|
@ -1588,6 +1637,7 @@ public class RowBasedGrouperHelper
|
|||
{
|
||||
final BufferComparator bufferComparator;
|
||||
final String columnTypeName;
|
||||
final Class<?> clazz;
|
||||
|
||||
final List<Object> dictionary;
|
||||
final Object2IntMap<Object> reverseDictionary;
|
||||
|
@ -1613,6 +1663,7 @@ public class RowBasedGrouperHelper
|
|||
dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition)),
|
||||
dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition))
|
||||
);
|
||||
clazz = columnType.getNullableStrategy().getClazz();
|
||||
}
|
||||
|
||||
// Asserts that we don't entertain any complex types without a typename, to prevent intermixing dictionaries of
|
||||
|
@ -1645,6 +1696,12 @@ public class RowBasedGrouperHelper
|
|||
{
|
||||
return reverseDictionary;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
return clazz;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -1726,6 +1783,14 @@ public class RowBasedGrouperHelper
|
|||
{
|
||||
return reverseDictionary;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
// Jackson deserializes Object[] containing longs to Object[] containing string if Object[].class is returned
|
||||
// Therefore we are using Object.class
|
||||
return Object.class;
|
||||
}
|
||||
}
|
||||
|
||||
private class ArrayStringRowBasedKeySerdeHelper extends DictionaryBuildingSingleValuedRowBasedKeySerdeHelper
|
||||
|
@ -1770,6 +1835,12 @@ public class RowBasedGrouperHelper
|
|||
{
|
||||
return reverseStringArrayDictionary;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
return Object[].class;
|
||||
}
|
||||
}
|
||||
|
||||
private abstract class AbstractStringRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
|
||||
|
@ -1819,6 +1890,12 @@ public class RowBasedGrouperHelper
|
|||
{
|
||||
return bufferComparator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
return String.class;
|
||||
}
|
||||
}
|
||||
|
||||
private class DynamicDictionaryStringRowBasedKeySerdeHelper extends AbstractStringRowBasedKeySerdeHelper
|
||||
|
@ -1937,6 +2014,12 @@ public class RowBasedGrouperHelper
|
|||
{
|
||||
return bufferComparator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
return Long.class;
|
||||
}
|
||||
}
|
||||
|
||||
private class FloatRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
|
||||
|
@ -1982,6 +2065,12 @@ public class RowBasedGrouperHelper
|
|||
{
|
||||
return bufferComparator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
return Float.class;
|
||||
}
|
||||
}
|
||||
|
||||
private class DoubleRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
|
||||
|
@ -2027,6 +2116,12 @@ public class RowBasedGrouperHelper
|
|||
{
|
||||
return bufferComparator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
return Double.class;
|
||||
}
|
||||
}
|
||||
|
||||
// This class is only used when SQL compatible null handling is enabled.
|
||||
|
@ -2082,6 +2177,12 @@ public class RowBasedGrouperHelper
|
|||
{
|
||||
return comparator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
return delegate.getClazz();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,4 +65,9 @@ interface RowBasedKeySerdeHelper
|
|||
* Return a {@link BufferComparator} to compare keys stored in ByteBuffer.
|
||||
*/
|
||||
BufferComparator getBufferComparator();
|
||||
|
||||
/**
|
||||
* Returns the expected class of the key which used to deserialize the objects correctly from the spilled files.
|
||||
*/
|
||||
Class<?> getClazz();
|
||||
}
|
||||
|
|
|
@ -152,7 +152,7 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
|
|||
}
|
||||
this.aggregatorFactories = aggregatorFactories;
|
||||
this.temporaryStorage = temporaryStorage;
|
||||
this.spillMapper = spillMapper;
|
||||
this.spillMapper = keySerde.decorateObjectMapper(spillMapper);
|
||||
this.spillingAllowed = spillingAllowed;
|
||||
this.sortHasNonGroupingFields = sortHasNonGroupingFields;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.query.metadata;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
|
@ -62,6 +63,7 @@ import org.apache.druid.utils.CollectionUtils;
|
|||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
|
@ -184,6 +186,15 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
|
|||
|
||||
@Override
|
||||
public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> getCacheStrategy(final SegmentMetadataQuery query)
|
||||
{
|
||||
return getCacheStrategy(query, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> getCacheStrategy(
|
||||
final SegmentMetadataQuery query,
|
||||
@Nullable final ObjectMapper objectMapper
|
||||
)
|
||||
{
|
||||
return new CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery>()
|
||||
{
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.query.search;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
|
@ -124,6 +125,15 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
|
|||
|
||||
@Override
|
||||
public CacheStrategy<Result<SearchResultValue>, Object, SearchQuery> getCacheStrategy(final SearchQuery query)
|
||||
{
|
||||
return getCacheStrategy(query, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheStrategy<Result<SearchResultValue>, Object, SearchQuery> getCacheStrategy(
|
||||
final SearchQuery query,
|
||||
@Nullable final ObjectMapper objectMapper
|
||||
)
|
||||
{
|
||||
|
||||
return new CacheStrategy<Result<SearchResultValue>, Object, SearchQuery>()
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.query.timeboundary;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
|
@ -47,6 +48,7 @@ import org.apache.druid.segment.column.ColumnType;
|
|||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.timeline.LogicalSegment;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
@ -163,6 +165,16 @@ public class TimeBoundaryQueryQueryToolChest
|
|||
|
||||
@Override
|
||||
public CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery> getCacheStrategy(final TimeBoundaryQuery query)
|
||||
{
|
||||
return getCacheStrategy(query, null);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery> getCacheStrategy(
|
||||
final TimeBoundaryQuery query,
|
||||
@Nullable final ObjectMapper objectMapper
|
||||
)
|
||||
{
|
||||
return new CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>()
|
||||
{
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.query.timeseries;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -65,6 +66,7 @@ import org.apache.druid.segment.column.ColumnType;
|
|||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -276,6 +278,16 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
|
|||
|
||||
@Override
|
||||
public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery> getCacheStrategy(final TimeseriesQuery query)
|
||||
{
|
||||
return getCacheStrategy(query, null);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery> getCacheStrategy(
|
||||
final TimeseriesQuery query,
|
||||
@Nullable final ObjectMapper objectMapper
|
||||
)
|
||||
{
|
||||
return new CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery>()
|
||||
{
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.query.topn;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
@ -64,6 +65,7 @@ import org.apache.druid.segment.DimensionHandlerUtils;
|
|||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -268,9 +270,18 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
return TYPE_REFERENCE;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> getCacheStrategy(TopNQuery query)
|
||||
{
|
||||
return getCacheStrategy(query, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> getCacheStrategy(final TopNQuery query)
|
||||
public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> getCacheStrategy(
|
||||
final TopNQuery query,
|
||||
@Nullable final ObjectMapper objectMapper
|
||||
)
|
||||
{
|
||||
return new CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>()
|
||||
{
|
||||
|
|
|
@ -123,7 +123,7 @@ public class ObjectStrategyComplexTypeStrategy<T> implements TypeStrategy<T>
|
|||
public int hashCode(T o)
|
||||
{
|
||||
if (hashStrategy == null) {
|
||||
throw DruidException.defensive("hashStrategy not provided");
|
||||
throw DruidException.defensive("Type [%s] is not groupable", typeSignature.asTypeString());
|
||||
}
|
||||
return hashStrategy.hashCode(o);
|
||||
}
|
||||
|
@ -132,7 +132,7 @@ public class ObjectStrategyComplexTypeStrategy<T> implements TypeStrategy<T>
|
|||
public boolean equals(T a, T b)
|
||||
{
|
||||
if (hashStrategy == null) {
|
||||
throw DruidException.defensive("hashStrategy not provided");
|
||||
throw DruidException.defensive("Type [%s] is not groupable", typeSignature.asTypeString());
|
||||
}
|
||||
return hashStrategy.equals(a, b);
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ public class ObjectStrategyComplexTypeStrategy<T> implements TypeStrategy<T>
|
|||
public Class<?> getClazz()
|
||||
{
|
||||
if (clazz == null) {
|
||||
throw DruidException.defensive("hashStrategy not provided");
|
||||
throw DruidException.defensive("Type [%s] is not groupable", typeSignature.asTypeString());
|
||||
}
|
||||
return clazz;
|
||||
}
|
||||
|
|
|
@ -299,6 +299,12 @@ public class TypeStrategies
|
|||
{
|
||||
return a.equals(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
return Long.class;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -368,6 +374,12 @@ public class TypeStrategies
|
|||
{
|
||||
return a.equals(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
return Float.class;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -438,6 +450,12 @@ public class TypeStrategies
|
|||
{
|
||||
return a.equals(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
return Double.class;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -519,6 +537,12 @@ public class TypeStrategies
|
|||
{
|
||||
return a.equals(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
return String.class;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -664,5 +688,11 @@ public class TypeStrategies
|
|||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getClazz()
|
||||
{
|
||||
return Object[].class;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -225,6 +225,6 @@ public interface TypeStrategy<T> extends Comparator<Object>, Hash.Strategy<T>
|
|||
*/
|
||||
default Class<?> getClazz()
|
||||
{
|
||||
throw DruidException.defensive("Not implemented. It is only implemented for complex dimensions which are groupable()");
|
||||
throw DruidException.defensive("Not implemented. Check groupable() first");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -766,7 +766,7 @@ public class AggregationTestHelper implements Closeable
|
|||
String resultStr = mapper.writer().writeValueAsString(yielder);
|
||||
|
||||
List<ResultRow> resultRows = Lists.transform(
|
||||
readQueryResultArrayFromString(resultStr),
|
||||
readQueryResultArrayFromString(resultStr, queryPlus.getQuery()),
|
||||
toolChest.makePreComputeManipulatorFn(
|
||||
queryPlus.getQuery(),
|
||||
MetricManipulatorFns.deserializing()
|
||||
|
@ -798,11 +798,13 @@ public class AggregationTestHelper implements Closeable
|
|||
};
|
||||
}
|
||||
|
||||
private List readQueryResultArrayFromString(String str) throws Exception
|
||||
private List readQueryResultArrayFromString(String str, Query query) throws Exception
|
||||
{
|
||||
List result = new ArrayList();
|
||||
|
||||
JsonParser jp = mapper.getFactory().createParser(str);
|
||||
ObjectMapper decoratedMapper = toolChest.decorateObjectMapper(mapper, query);
|
||||
|
||||
JsonParser jp = decoratedMapper.getFactory().createParser(str);
|
||||
|
||||
if (jp.nextToken() != JsonToken.START_ARRAY) {
|
||||
throw new IAE("not an array [%s]", str);
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.groupby;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
import org.apache.druid.query.QueryContexts;
|
||||
import org.apache.druid.query.aggregation.AggregationTestHelper;
|
||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.SerializablePairLongString;
|
||||
import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde;
|
||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||
import org.apache.druid.segment.RowBasedSegment;
|
||||
import org.apache.druid.segment.Segment;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.timeline.SegmentId;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class ComplexDimensionGroupByQueryTest
|
||||
{
|
||||
private final QueryContexts.Vectorize vectorize;
|
||||
private final AggregationTestHelper helper;
|
||||
private final List<Segment> segments;
|
||||
|
||||
@Rule
|
||||
public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||
|
||||
public ComplexDimensionGroupByQueryTest(GroupByQueryConfig config, String vectorize)
|
||||
{
|
||||
this.vectorize = QueryContexts.Vectorize.fromString(vectorize);
|
||||
this.helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
|
||||
Collections.emptyList(),
|
||||
config,
|
||||
tempFolder
|
||||
);
|
||||
Sequence<Object[]> rows = Sequences.simple(
|
||||
ImmutableList.of(
|
||||
new Object[]{new SerializablePairLongString(1L, "abc")},
|
||||
new Object[]{new SerializablePairLongString(1L, "abc")},
|
||||
new Object[]{new SerializablePairLongString(1L, "def")},
|
||||
new Object[]{new SerializablePairLongString(1L, "abc")},
|
||||
new Object[]{new SerializablePairLongString(1L, "ghi")},
|
||||
new Object[]{new SerializablePairLongString(1L, "def")},
|
||||
new Object[]{new SerializablePairLongString(1L, "abc")},
|
||||
new Object[]{new SerializablePairLongString(1L, "pqr")},
|
||||
new Object[]{new SerializablePairLongString(1L, "xyz")},
|
||||
new Object[]{new SerializablePairLongString(1L, "foo")},
|
||||
new Object[]{new SerializablePairLongString(1L, "bar")}
|
||||
)
|
||||
);
|
||||
RowSignature rowSignature = RowSignature.builder()
|
||||
.add(
|
||||
"pair",
|
||||
ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME)
|
||||
)
|
||||
.build();
|
||||
|
||||
this.segments = Collections.singletonList(
|
||||
new RowBasedSegment<>(
|
||||
SegmentId.dummy("dummy"),
|
||||
rows,
|
||||
columnName -> {
|
||||
final int columnNumber = rowSignature.indexOf(columnName);
|
||||
return row -> columnNumber >= 0 ? row[columnNumber] : null;
|
||||
},
|
||||
rowSignature
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "config = {0}, vectorize = {1}")
|
||||
public static Collection<?> constructorFeeder()
|
||||
{
|
||||
final List<Object[]> constructors = new ArrayList<>();
|
||||
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
|
||||
for (String vectorize : new String[]{"false", "force"}) {
|
||||
constructors.add(new Object[]{config, vectorize});
|
||||
}
|
||||
}
|
||||
return constructors;
|
||||
}
|
||||
|
||||
public Map<String, Object> getContext()
|
||||
{
|
||||
return ImmutableMap.of(
|
||||
QueryContexts.VECTORIZE_KEY, vectorize.toString(),
|
||||
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, "true"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByOnPairClass()
|
||||
{
|
||||
GroupByQuery groupQuery = GroupByQuery.builder()
|
||||
.setDataSource("test_datasource")
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setInterval(Intervals.ETERNITY)
|
||||
.setDimensions(new DefaultDimensionSpec(
|
||||
"pair",
|
||||
"pair",
|
||||
ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME)
|
||||
))
|
||||
.setAggregatorSpecs(new CountAggregatorFactory("count"))
|
||||
.setContext(getContext())
|
||||
.build();
|
||||
|
||||
if (vectorize == QueryContexts.Vectorize.FORCE) {
|
||||
// Cannot vectorize group by on complex dimension
|
||||
Assert.assertThrows(
|
||||
RuntimeException.class,
|
||||
() -> helper.runQueryOnSegmentsObjs(segments, groupQuery).toList()
|
||||
);
|
||||
} else {
|
||||
List<ResultRow> resultRows = helper.runQueryOnSegmentsObjs(segments, groupQuery).toList();
|
||||
|
||||
Assert.assertArrayEquals(
|
||||
new ResultRow[]{
|
||||
ResultRow.of(new SerializablePairLongString(1L, "abc"), 4L),
|
||||
ResultRow.of(new SerializablePairLongString(1L, "bar"), 1L),
|
||||
ResultRow.of(new SerializablePairLongString(1L, "def"), 2L),
|
||||
ResultRow.of(new SerializablePairLongString(1L, "foo"), 1L),
|
||||
ResultRow.of(new SerializablePairLongString(1L, "ghi"), 1L),
|
||||
ResultRow.of(new SerializablePairLongString(1L, "pqr"), 1L),
|
||||
ResultRow.of(new SerializablePairLongString(1L, "xyz"), 1L)
|
||||
},
|
||||
resultRows.toArray()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -33,6 +33,7 @@ import org.apache.druid.collections.SerializablePair;
|
|||
import org.apache.druid.collections.StupidPool;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.data.input.Row;
|
||||
import org.apache.druid.jackson.AggregatorsModule;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
|
@ -97,6 +98,7 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
public static void setUpClass()
|
||||
{
|
||||
NullHandling.initializeForTests();
|
||||
AggregatorsModule.registerComplexMetricsAndSerde();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -130,11 +132,13 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
|
||||
.build();
|
||||
|
||||
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper);
|
||||
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper);
|
||||
|
||||
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
|
||||
Assert.assertFalse(Arrays.equals(
|
||||
|
@ -190,11 +194,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
)
|
||||
.build();
|
||||
|
||||
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper);
|
||||
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper);
|
||||
|
||||
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
|
||||
Assert.assertFalse(Arrays.equals(
|
||||
|
@ -252,11 +257,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
.setHavingSpec(new GreaterThanHavingSpec(QueryRunnerTestHelper.UNIQUE_METRIC, 10))
|
||||
.build();
|
||||
|
||||
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper);
|
||||
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper);
|
||||
|
||||
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
|
||||
Assert.assertFalse(Arrays.equals(
|
||||
|
@ -336,11 +342,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
.setHavingSpec(andHavingSpec2)
|
||||
.build();
|
||||
|
||||
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper);
|
||||
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper);
|
||||
|
||||
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
|
||||
Assert.assertFalse(Arrays.equals(
|
||||
|
@ -427,11 +434,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
.setHavingSpec(havingSpec2)
|
||||
.build();
|
||||
|
||||
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper);
|
||||
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper);
|
||||
|
||||
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
|
||||
Assert.assertFalse(Arrays.equals(
|
||||
|
@ -490,11 +498,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
))
|
||||
.build();
|
||||
|
||||
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper);
|
||||
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper);
|
||||
|
||||
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
|
||||
Assert.assertFalse(Arrays.equals(
|
||||
|
@ -512,6 +521,48 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
doTestCacheStrategy(ColumnType.LONG, 2L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testComplexDimensionCacheStrategy() throws IOException
|
||||
{
|
||||
final GroupByQuery query1 = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
|
||||
.setDimensions(ImmutableList.of(
|
||||
new DefaultDimensionSpec(
|
||||
"test",
|
||||
"test",
|
||||
ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME)
|
||||
)
|
||||
))
|
||||
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
|
||||
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
|
||||
.build();
|
||||
|
||||
ObjectMapper objectMapper = TestHelper.makeJsonMapper();
|
||||
|
||||
CacheStrategy<ResultRow, Object, GroupByQuery> strategy =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, objectMapper);
|
||||
|
||||
// test timestamps that result in integer size millis
|
||||
final ResultRow result1 = ResultRow.of(
|
||||
123L,
|
||||
new SerializablePairLongString(123L, "abc"),
|
||||
1
|
||||
);
|
||||
|
||||
Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result1);
|
||||
|
||||
Object fromCacheValue = objectMapper.readValue(
|
||||
objectMapper.writeValueAsBytes(preparedValue),
|
||||
strategy.getCacheObjectClazz()
|
||||
);
|
||||
|
||||
ResultRow fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue);
|
||||
|
||||
Assert.assertEquals(result1, fromCacheResult);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiColumnCacheStrategy() throws Exception
|
||||
{
|
||||
|
@ -538,8 +589,9 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
|
||||
.build();
|
||||
|
||||
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
CacheStrategy<ResultRow, Object, GroupByQuery> strategy =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper);
|
||||
|
||||
// test timestamps that result in integer size millis
|
||||
final ResultRow result1 = ResultRow.of(
|
||||
|
@ -1054,8 +1106,9 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
|
||||
.build();
|
||||
|
||||
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
CacheStrategy<ResultRow, Object, GroupByQuery> strategy =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper);
|
||||
|
||||
// test timestamps that result in integer size millis
|
||||
final ResultRow result1 = ResultRow.of(
|
||||
|
@ -1147,11 +1200,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
|
||||
.build();
|
||||
|
||||
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper);
|
||||
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper);
|
||||
|
||||
Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
|
||||
Assert.assertFalse(Arrays.equals(
|
||||
|
@ -1183,11 +1237,12 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, "false"))
|
||||
.build();
|
||||
|
||||
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper);
|
||||
|
||||
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
|
||||
new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper);
|
||||
|
||||
Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
|
||||
Assert.assertTrue(
|
||||
|
@ -1245,7 +1300,8 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
|
|||
QueryRunnerTestHelper.NOOP_QUERYWATCHER
|
||||
);
|
||||
final GroupByQueryQueryToolChest queryToolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool);
|
||||
CacheStrategy<ResultRow, Object, GroupByQuery> cacheStrategy = queryToolChest.getCacheStrategy(query);
|
||||
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
CacheStrategy<ResultRow, Object, GroupByQuery> cacheStrategy = queryToolChest.getCacheStrategy(query, mapper);
|
||||
Assert.assertTrue(
|
||||
"result level cache on broker server for GroupByStrategyV2 should be enabled",
|
||||
cacheStrategy.isCacheable(query, false, false)
|
||||
|
|
|
@ -33,6 +33,7 @@ import com.google.common.collect.Sets;
|
|||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.data.input.Row;
|
||||
import org.apache.druid.data.input.Rows;
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.HumanReadableBytes;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
|
@ -9965,7 +9966,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testGroupByComplexColumn()
|
||||
{
|
||||
cannotVectorize();
|
||||
GroupByQuery query = makeQueryBuilder()
|
||||
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
|
||||
|
@ -9979,7 +9979,8 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
|
|||
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
|
||||
.build();
|
||||
|
||||
expectedException.expect(RuntimeException.class);
|
||||
expectedException.expect(DruidException.class);
|
||||
expectedException.expectMessage("Type [COMPLEX<hyperUnique>] is not groupable");
|
||||
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
}
|
||||
|
||||
|
|
|
@ -275,7 +275,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
|
|||
this.responseContext = responseContext;
|
||||
this.query = queryPlus.getQuery();
|
||||
this.toolChest = warehouse.getToolChest(query);
|
||||
this.strategy = toolChest.getCacheStrategy(query);
|
||||
this.strategy = toolChest.getCacheStrategy(query, objectMapper);
|
||||
this.dataSourceAnalysis = query.getDataSource().getAnalysis();
|
||||
|
||||
this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
|
||||
|
|
|
@ -86,7 +86,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
|
|||
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
|
||||
{
|
||||
Query<T> query = queryPlus.getQuery();
|
||||
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
|
||||
final CacheStrategy strategy = toolChest.getCacheStrategy(query, mapper);
|
||||
final boolean populateCache = canPopulateCache(query, strategy);
|
||||
final boolean useCache = canUseCache(query, strategy);
|
||||
|
||||
|
|
|
@ -73,7 +73,7 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
|
|||
this.cache = cache;
|
||||
this.cacheConfig = cacheConfig;
|
||||
this.query = query;
|
||||
this.strategy = queryToolChest.getCacheStrategy(query);
|
||||
this.strategy = queryToolChest.getCacheStrategy(query, objectMapper);
|
||||
this.populateResultCache = CacheUtil.isPopulateResultCache(
|
||||
query,
|
||||
strategy,
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.druid.query.topn.TopNQueryBuilder;
|
|||
import org.apache.druid.query.topn.TopNQueryConfig;
|
||||
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
|
||||
import org.apache.druid.query.topn.TopNResultValue;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Assert;
|
||||
|
@ -90,7 +91,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class CachingQueryRunnerTest
|
||||
public class CachingQueryRunnerTest extends InitializedNullHandlingTest
|
||||
{
|
||||
@Parameterized.Parameters(name = "numBackgroundThreads={0}")
|
||||
public static Iterable<Object[]> constructorFeeder()
|
||||
|
@ -222,8 +223,8 @@ public class CachingQueryRunnerTest
|
|||
Cache cache = EasyMock.mock(Cache.class);
|
||||
EasyMock.replay(cache);
|
||||
CachingQueryRunner queryRunner = makeCachingQueryRunner(null, cache, toolchest, Sequences.empty());
|
||||
Assert.assertFalse(queryRunner.canPopulateCache(query, toolchest.getCacheStrategy(query)));
|
||||
Assert.assertFalse(queryRunner.canUseCache(query, toolchest.getCacheStrategy(query)));
|
||||
Assert.assertFalse(queryRunner.canPopulateCache(query, toolchest.getCacheStrategy(query, null)));
|
||||
Assert.assertFalse(queryRunner.canUseCache(query, toolchest.getCacheStrategy(query, null)));
|
||||
queryRunner.run(QueryPlus.wrap(query));
|
||||
EasyMock.verifyUnexpectedCalls(cache);
|
||||
}
|
||||
|
@ -243,7 +244,7 @@ public class CachingQueryRunnerTest
|
|||
|
||||
QueryToolChest toolchest = EasyMock.mock(QueryToolChest.class);
|
||||
Cache cache = EasyMock.mock(Cache.class);
|
||||
EasyMock.expect(toolchest.getCacheStrategy(query)).andReturn(null);
|
||||
EasyMock.expect(toolchest.getCacheStrategy(EasyMock.eq(query), EasyMock.anyObject())).andReturn(null);
|
||||
EasyMock.replay(cache, toolchest);
|
||||
CachingQueryRunner queryRunner = makeCachingQueryRunner(new byte[0], cache, toolchest, Sequences.empty());
|
||||
Assert.assertFalse(queryRunner.canPopulateCache(query, null));
|
||||
|
@ -339,7 +340,7 @@ public class CachingQueryRunnerTest
|
|||
resultSeq
|
||||
);
|
||||
|
||||
CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query);
|
||||
CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query, null);
|
||||
Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey(
|
||||
CACHE_ID,
|
||||
SEGMENT_DESCRIPTOR,
|
||||
|
@ -383,7 +384,7 @@ public class CachingQueryRunnerTest
|
|||
|
||||
byte[] cacheKeyPrefix = RandomUtils.nextBytes(10);
|
||||
|
||||
CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query);
|
||||
CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query, null);
|
||||
Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey(
|
||||
CACHE_ID,
|
||||
SEGMENT_DESCRIPTOR,
|
||||
|
@ -399,7 +400,7 @@ public class CachingQueryRunnerTest
|
|||
toolchest,
|
||||
Sequences.empty()
|
||||
);
|
||||
Assert.assertTrue(runner.canUseCache(query, toolchest.getCacheStrategy(query)));
|
||||
Assert.assertTrue(runner.canUseCache(query, toolchest.getCacheStrategy(query, null)));
|
||||
List<Result> results = runner.run(QueryPlus.wrap(query)).toList();
|
||||
Assert.assertEquals(expectedResults.toString(), results.toString());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue