mirror of https://github.com/apache/druid.git
Add PostAggregators to generator cache keys for top-n queries (#3899)
* Add PostAggregators to generator cache keys for top-n queries * Add tests for strings * Remove debug comments * Add type keys and list sizes to cache key * Make post aggregators used for sort are considered for cache key generation * Use assertArrayEquals() * Improve findPostAggregatorsForSort() * Address comments * fix test failure * address comments
This commit is contained in:
parent
8e31a465ad
commit
991e2852da
|
@ -23,7 +23,7 @@ package io.druid.common.utils;
|
|||
*/
|
||||
public class StringUtils extends io.druid.java.util.common.StringUtils
|
||||
{
|
||||
private static final byte[] EMPTY_BYTES = new byte[0];
|
||||
public static final byte[] EMPTY_BYTES = new byte[0];
|
||||
|
||||
// should be used only for estimation
|
||||
// returns the same result with StringUtils.fromUtf8(value).length for valid string values
|
||||
|
|
|
@ -26,6 +26,8 @@ import com.google.common.collect.Ordering;
|
|||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Doubles;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.aggregation.post.PostAggregatorIds;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
|
@ -147,4 +149,12 @@ public class SketchEstimatePostAggregator implements PostAggregator
|
|||
result = 31 * result + (errorBoundsStdDev != null ? errorBoundsStdDev.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
final CacheKeyBuilder builder = new CacheKeyBuilder(PostAggregatorIds.DATA_SKETCHES_SKETCH_ESTIMATE)
|
||||
.appendCacheable(field);
|
||||
return errorBoundsStdDev == null ? builder.build() : builder.appendInt(errorBoundsStdDev).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,8 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.collect.Sets;
|
||||
import com.yahoo.sketches.Util;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.aggregation.post.PostAggregatorIds;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
@ -34,9 +35,6 @@ import java.util.Set;
|
|||
|
||||
public class SketchSetPostAggregator implements PostAggregator
|
||||
{
|
||||
|
||||
private static final Logger LOG = new Logger(SketchSetPostAggregator.class);
|
||||
|
||||
private final String name;
|
||||
private final List<PostAggregator> fields;
|
||||
private final SketchHolder.Func func;
|
||||
|
@ -163,4 +161,33 @@ public class SketchSetPostAggregator implements PostAggregator
|
|||
result = 31 * result + maxSketchSize;
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
final CacheKeyBuilder builder = new CacheKeyBuilder(PostAggregatorIds.DATA_SKETCHES_SKETCH_SET)
|
||||
.appendString(getFunc())
|
||||
.appendInt(maxSketchSize);
|
||||
|
||||
if (preserveFieldOrderInCacheKey(func)) {
|
||||
builder.appendCacheables(fields);
|
||||
} else {
|
||||
builder.appendCacheablesIgnoringOrder(fields);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private static boolean preserveFieldOrderInCacheKey(SketchHolder.Func func)
|
||||
{
|
||||
switch (func) {
|
||||
case NOT:
|
||||
return true;
|
||||
case UNION:
|
||||
case INTERSECT:
|
||||
return false;
|
||||
default:
|
||||
throw new IAE(func.name());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,8 +29,8 @@ public abstract class ApproximateHistogramPostAggregator implements PostAggregat
|
|||
{
|
||||
private static final Comparator COMPARATOR = ApproximateHistogramAggregator.COMPARATOR;
|
||||
|
||||
private final String name;
|
||||
private final String fieldName;
|
||||
protected final String name;
|
||||
protected final String fieldName;
|
||||
|
||||
public ApproximateHistogramPostAggregator(
|
||||
String name,
|
||||
|
|
|
@ -23,8 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.query.aggregation.post.PostAggregatorIds;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -35,8 +36,6 @@ public class BucketsPostAggregator extends ApproximateHistogramPostAggregator
|
|||
private final float bucketSize;
|
||||
private final float offset;
|
||||
|
||||
private String fieldName;
|
||||
|
||||
@JsonCreator
|
||||
public BucketsPostAggregator(
|
||||
@JsonProperty("name") String name,
|
||||
|
@ -51,7 +50,6 @@ public class BucketsPostAggregator extends ApproximateHistogramPostAggregator
|
|||
throw new IAE("Illegal bucketSize [%s], must be > 0", this.bucketSize);
|
||||
}
|
||||
this.offset = offset;
|
||||
this.fieldName = fieldName;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -63,7 +61,7 @@ public class BucketsPostAggregator extends ApproximateHistogramPostAggregator
|
|||
@Override
|
||||
public Object compute(Map<String, Object> values)
|
||||
{
|
||||
ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName());
|
||||
ApproximateHistogram ah = (ApproximateHistogram) values.get(fieldName);
|
||||
return ah.toHistogram(bucketSize, offset);
|
||||
}
|
||||
|
||||
|
@ -89,4 +87,14 @@ public class BucketsPostAggregator extends ApproximateHistogramPostAggregator
|
|||
", offset=" + this.getOffset() +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.HISTOGRAM_BUCKETS)
|
||||
.appendString(fieldName)
|
||||
.appendFloat(bucketSize)
|
||||
.appendFloat(offset)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.query.aggregation.post.PostAggregatorIds;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
@ -55,7 +57,7 @@ public class CustomBucketsPostAggregator extends ApproximateHistogramPostAggrega
|
|||
@Override
|
||||
public Object compute(Map<String, Object> values)
|
||||
{
|
||||
ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName());
|
||||
ApproximateHistogram ah = (ApproximateHistogram) values.get(fieldName);
|
||||
return ah.toHistogram(breaks);
|
||||
}
|
||||
|
||||
|
@ -74,4 +76,13 @@ public class CustomBucketsPostAggregator extends ApproximateHistogramPostAggrega
|
|||
", breaks=" + Arrays.toString(this.getBreaks()) +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.HISTOGRAM_CUSTOM_BUCKETS)
|
||||
.appendString(fieldName)
|
||||
.appendFloatArray(breaks)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,8 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.query.aggregation.post.PostAggregatorIds;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -33,7 +34,6 @@ import java.util.Set;
|
|||
public class EqualBucketsPostAggregator extends ApproximateHistogramPostAggregator
|
||||
{
|
||||
private final int numBuckets;
|
||||
private String fieldName;
|
||||
|
||||
@JsonCreator
|
||||
public EqualBucketsPostAggregator(
|
||||
|
@ -47,7 +47,6 @@ public class EqualBucketsPostAggregator extends ApproximateHistogramPostAggregat
|
|||
if (this.numBuckets <= 1) {
|
||||
throw new IAE("Illegal number of buckets[%s], must be > 1", this.numBuckets);
|
||||
}
|
||||
this.fieldName = fieldName;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -59,7 +58,7 @@ public class EqualBucketsPostAggregator extends ApproximateHistogramPostAggregat
|
|||
@Override
|
||||
public Object compute(Map<String, Object> values)
|
||||
{
|
||||
ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName());
|
||||
ApproximateHistogram ah = (ApproximateHistogram) values.get(fieldName);
|
||||
return ah.toHistogram(numBuckets);
|
||||
}
|
||||
|
||||
|
@ -73,9 +72,18 @@ public class EqualBucketsPostAggregator extends ApproximateHistogramPostAggregat
|
|||
public String toString()
|
||||
{
|
||||
return "EqualBucketsPostAggregator{" +
|
||||
"name='" + this.getName() + '\'' +
|
||||
", fieldName='" + this.getFieldName() + '\'' +
|
||||
", numBuckets=" + this.getNumBuckets() +
|
||||
"name='" + name + '\'' +
|
||||
", fieldName='" + fieldName + '\'' +
|
||||
", numBuckets=" + numBuckets +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.HISTOGRAM_EQUAL_BUCKETS)
|
||||
.appendString(fieldName)
|
||||
.appendInt(numBuckets)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.query.aggregation.post.PostAggregatorIds;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
|
@ -40,8 +42,6 @@ public class MaxPostAggregator extends ApproximateHistogramPostAggregator
|
|||
}
|
||||
};
|
||||
|
||||
private String fieldName;
|
||||
|
||||
@JsonCreator
|
||||
public MaxPostAggregator(
|
||||
@JsonProperty("name") String name,
|
||||
|
@ -49,7 +49,6 @@ public class MaxPostAggregator extends ApproximateHistogramPostAggregator
|
|||
)
|
||||
{
|
||||
super(name, fieldName);
|
||||
this.fieldName = fieldName;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -67,7 +66,7 @@ public class MaxPostAggregator extends ApproximateHistogramPostAggregator
|
|||
@Override
|
||||
public Object compute(Map<String, Object> values)
|
||||
{
|
||||
final ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName());
|
||||
final ApproximateHistogram ah = (ApproximateHistogram) values.get(fieldName);
|
||||
return ah.getMax();
|
||||
}
|
||||
|
||||
|
@ -78,4 +77,12 @@ public class MaxPostAggregator extends ApproximateHistogramPostAggregator
|
|||
"fieldName='" + fieldName + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.HISTOGRAM_MAX)
|
||||
.appendString(fieldName)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.query.aggregation.post.PostAggregatorIds;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
|
@ -40,8 +42,6 @@ public class MinPostAggregator extends ApproximateHistogramPostAggregator
|
|||
}
|
||||
};
|
||||
|
||||
private String fieldName;
|
||||
|
||||
@JsonCreator
|
||||
public MinPostAggregator(
|
||||
@JsonProperty("name") String name,
|
||||
|
@ -49,7 +49,6 @@ public class MinPostAggregator extends ApproximateHistogramPostAggregator
|
|||
)
|
||||
{
|
||||
super(name, fieldName);
|
||||
this.fieldName = fieldName;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -67,7 +66,7 @@ public class MinPostAggregator extends ApproximateHistogramPostAggregator
|
|||
@Override
|
||||
public Object compute(Map<String, Object> values)
|
||||
{
|
||||
final ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName());
|
||||
final ApproximateHistogram ah = (ApproximateHistogram) values.get(fieldName);
|
||||
return ah.getMin();
|
||||
}
|
||||
|
||||
|
@ -78,4 +77,12 @@ public class MinPostAggregator extends ApproximateHistogramPostAggregator
|
|||
"fieldName='" + fieldName + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.HISTOGRAM_MIN)
|
||||
.appendString(fieldName)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,8 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.query.aggregation.post.PostAggregatorIds;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
|
@ -43,7 +44,7 @@ public class QuantilePostAggregator extends ApproximateHistogramPostAggregator
|
|||
};
|
||||
|
||||
private final float probability;
|
||||
private String fieldName;
|
||||
private final String fieldName;
|
||||
|
||||
@JsonCreator
|
||||
public QuantilePostAggregator(
|
||||
|
@ -76,8 +77,8 @@ public class QuantilePostAggregator extends ApproximateHistogramPostAggregator
|
|||
@Override
|
||||
public Object compute(Map<String, Object> values)
|
||||
{
|
||||
final ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName());
|
||||
return ah.getQuantiles(new float[]{this.getProbability()})[0];
|
||||
final ApproximateHistogram ah = (ApproximateHistogram) values.get(fieldName);
|
||||
return ah.getQuantiles(new float[]{probability})[0];
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -120,4 +121,13 @@ public class QuantilePostAggregator extends ApproximateHistogramPostAggregator
|
|||
", fieldName='" + fieldName + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.HISTOGRAM_QUANTILE)
|
||||
.appendString(fieldName)
|
||||
.appendFloat(probability)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,8 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.query.aggregation.post.PostAggregatorIds;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
|
@ -35,7 +36,6 @@ import java.util.Set;
|
|||
public class QuantilesPostAggregator extends ApproximateHistogramPostAggregator
|
||||
{
|
||||
private final float[] probabilities;
|
||||
private String fieldName;
|
||||
|
||||
@JsonCreator
|
||||
public QuantilesPostAggregator(
|
||||
|
@ -46,7 +46,6 @@ public class QuantilesPostAggregator extends ApproximateHistogramPostAggregator
|
|||
{
|
||||
super(name, fieldName);
|
||||
this.probabilities = probabilities;
|
||||
this.fieldName = fieldName;
|
||||
|
||||
for (float p : probabilities) {
|
||||
if (p < 0 | p > 1) {
|
||||
|
@ -70,9 +69,9 @@ public class QuantilesPostAggregator extends ApproximateHistogramPostAggregator
|
|||
@Override
|
||||
public Object compute(Map<String, Object> values)
|
||||
{
|
||||
final ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName());
|
||||
final ApproximateHistogram ah = (ApproximateHistogram) values.get(fieldName);
|
||||
|
||||
return new Quantiles(this.getProbabilities(), ah.getQuantiles(this.getProbabilities()), ah.getMin(), ah.getMax());
|
||||
return new Quantiles(probabilities, ah.getQuantiles(probabilities), ah.getMin(), ah.getMax());
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -85,9 +84,18 @@ public class QuantilesPostAggregator extends ApproximateHistogramPostAggregator
|
|||
public String toString()
|
||||
{
|
||||
return "EqualBucketsPostAggregator{" +
|
||||
"name='" + this.getName() + '\'' +
|
||||
", fieldName='" + this.getFieldName() + '\'' +
|
||||
"name='" + name + '\'' +
|
||||
", fieldName='" + fieldName + '\'' +
|
||||
", probabilities=" + Arrays.toString(this.getProbabilities()) +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.HISTOGRAM_QUANTILES)
|
||||
.appendString(fieldName)
|
||||
.appendFloatArray(probabilities)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.collect.Sets;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
import io.druid.query.aggregation.post.PostAggregatorIds;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
|
@ -101,4 +103,13 @@ public class StandardDeviationPostAggregator implements PostAggregator
|
|||
", isVariancePop='" + isVariancePop + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.VARIANCE_STANDARD_DEVIATION)
|
||||
.appendString(fieldName)
|
||||
.appendBoolean(isVariancePop)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ public final class AllGranularity extends BaseQueryGranularity
|
|||
}
|
||||
|
||||
@Override
|
||||
public byte[] cacheKey()
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new byte[]{0x7f};
|
||||
}
|
||||
|
|
|
@ -31,8 +31,6 @@ public abstract class BaseQueryGranularity extends QueryGranularity
|
|||
|
||||
public abstract long truncate(long offset);
|
||||
|
||||
public abstract byte[] cacheKey();
|
||||
|
||||
public DateTime toDateTime(long offset)
|
||||
{
|
||||
return new DateTime(offset, DateTimeZone.UTC);
|
||||
|
|
|
@ -83,7 +83,7 @@ public class DurationGranularity extends BaseQueryGranularity
|
|||
}
|
||||
|
||||
@Override
|
||||
public byte[] cacheKey()
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return ByteBuffer.allocate(2 * Longs.BYTES).putLong(length).putLong(origin).array();
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ public final class NoneGranularity extends BaseQueryGranularity
|
|||
}
|
||||
|
||||
@Override
|
||||
public byte[] cacheKey()
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new byte[]{0x0};
|
||||
}
|
||||
|
|
|
@ -352,7 +352,7 @@ public class PeriodGranularity extends BaseQueryGranularity
|
|||
}
|
||||
|
||||
@Override
|
||||
public byte[] cacheKey()
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return StringUtils.toUtf8(period.toString() + ":" + chronology.getZone().toString() + ":" + origin);
|
||||
}
|
||||
|
|
|
@ -21,20 +21,19 @@ package io.druid.granularity;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.query.cache.Cacheable;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.ReadableDuration;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class QueryGranularity
|
||||
public abstract class QueryGranularity implements Cacheable
|
||||
{
|
||||
public abstract long next(long offset);
|
||||
|
||||
public abstract long truncate(long offset);
|
||||
|
||||
public abstract byte[] cacheKey();
|
||||
|
||||
public abstract DateTime toDateTime(long offset);
|
||||
|
||||
public abstract Iterable<Long> iterable(final long start, final long end);
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.query.aggregation;
|
||||
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.query.cache.Cacheable;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
@ -37,7 +38,7 @@ import java.util.Map;
|
|||
* provided to the Aggregator through the MetricSelector object, so whatever creates that object gets to choose how
|
||||
* the data is actually stored and accessed.
|
||||
*/
|
||||
public abstract class AggregatorFactory
|
||||
public abstract class AggregatorFactory implements Cacheable
|
||||
{
|
||||
private static final Logger log = new Logger(AggregatorFactory.class);
|
||||
|
||||
|
@ -115,8 +116,6 @@ public abstract class AggregatorFactory
|
|||
|
||||
public abstract List<String> requiredFields();
|
||||
|
||||
public abstract byte[] getCacheKey();
|
||||
|
||||
public abstract String getTypeName();
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package io.druid.query.aggregation;
|
||||
|
||||
import io.druid.query.cache.Cacheable;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -26,13 +28,13 @@ import java.util.Set;
|
|||
/**
|
||||
* Functionally similar to an Aggregator. See the Aggregator interface for more comments.
|
||||
*/
|
||||
public interface PostAggregator
|
||||
public interface PostAggregator extends Cacheable
|
||||
{
|
||||
public Set<String> getDependentFields();
|
||||
Set<String> getDependentFields();
|
||||
|
||||
public Comparator getComparator();
|
||||
Comparator getComparator();
|
||||
|
||||
public Object compute(Map<String, Object> combinedAggregators);
|
||||
Object compute(Map<String, Object> combinedAggregators);
|
||||
|
||||
public String getName();
|
||||
String getName();
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.aggregation.post.PostAggregatorIds;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
|
@ -124,4 +126,12 @@ public class HyperUniqueFinalizingPostAggregator implements PostAggregator
|
|||
", fieldName='" + fieldName + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.HLL_HYPER_UNIQUE_FINALIZING)
|
||||
.appendString(fieldName)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.google.common.collect.Maps;
|
|||
import com.google.common.collect.Sets;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
|
@ -123,6 +124,22 @@ public class ArithmeticPostAggregator implements PostAggregator
|
|||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
final CacheKeyBuilder builder = new CacheKeyBuilder(PostAggregatorIds.ARITHMETIC)
|
||||
.appendString(fnName)
|
||||
.appendString(ordering);
|
||||
|
||||
if (preserveFieldOrderInCacheKey(op)) {
|
||||
builder.appendCacheables(fields);
|
||||
} else {
|
||||
builder.appendCacheablesIgnoringOrder(fields);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@JsonProperty("fn")
|
||||
public String getFnName()
|
||||
{
|
||||
|
@ -152,6 +169,21 @@ public class ArithmeticPostAggregator implements PostAggregator
|
|||
'}';
|
||||
}
|
||||
|
||||
private static boolean preserveFieldOrderInCacheKey(Ops op)
|
||||
{
|
||||
switch (op) {
|
||||
case PLUS:
|
||||
case MULT:
|
||||
return false;
|
||||
case MINUS:
|
||||
case DIV:
|
||||
case QUOTIENT:
|
||||
return true;
|
||||
default:
|
||||
throw new IAE(op.fn);
|
||||
}
|
||||
}
|
||||
|
||||
private static enum Ops
|
||||
{
|
||||
PLUS("+")
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
|
@ -106,11 +107,7 @@ public class ConstantPostAggregator implements PostAggregator
|
|||
|
||||
ConstantPostAggregator that = (ConstantPostAggregator) o;
|
||||
|
||||
if (constantValue != null && that.constantValue != null) {
|
||||
if (constantValue.doubleValue() != that.constantValue.doubleValue()) {
|
||||
return false;
|
||||
}
|
||||
} else if (constantValue != that.constantValue) {
|
||||
if (constantValue.doubleValue() != that.constantValue.doubleValue()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -125,8 +122,15 @@ public class ConstantPostAggregator implements PostAggregator
|
|||
public int hashCode()
|
||||
{
|
||||
int result = name != null ? name.hashCode() : 0;
|
||||
result = 31 * result + (constantValue != null ? constantValue.hashCode() : 0);
|
||||
result = 31 * result + constantValue.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.CONSTANT)
|
||||
.appendDouble(constantValue.doubleValue())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
|
@ -142,4 +143,12 @@ public class DoubleGreatestPostAggregator implements PostAggregator
|
|||
result = 31 * result + fields.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.DOUBLE_GREATEST)
|
||||
.appendCacheablesIgnoringOrder(fields)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
|
@ -142,4 +143,12 @@ public class DoubleLeastPostAggregator implements PostAggregator
|
|||
result = 31 * result + fields.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.DOUBLE_LEAST)
|
||||
.appendCacheablesIgnoringOrder(fields)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
|
|||
import io.druid.math.expr.Expr;
|
||||
import io.druid.math.expr.Parser;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
@ -127,6 +128,15 @@ public class ExpressionPostAggregator implements PostAggregator
|
|||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.EXPRESSION)
|
||||
.appendString(expression)
|
||||
.appendString(ordering)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static enum Ordering implements Comparator<Number>
|
||||
{
|
||||
// ensures the following order: numeric > NaN > Infinite
|
||||
|
|
|
@ -21,8 +21,10 @@ package io.druid.query.aggregation.post;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
|
@ -41,6 +43,7 @@ public class FieldAccessPostAggregator implements PostAggregator
|
|||
@JsonProperty("fieldName") String fieldName
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(fieldName);
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
}
|
||||
|
@ -70,6 +73,14 @@ public class FieldAccessPostAggregator implements PostAggregator
|
|||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.FIELD_ACCESS)
|
||||
.appendString(fieldName)
|
||||
.build();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getFieldName()
|
||||
{
|
||||
|
@ -97,7 +108,7 @@ public class FieldAccessPostAggregator implements PostAggregator
|
|||
|
||||
FieldAccessPostAggregator that = (FieldAccessPostAggregator) o;
|
||||
|
||||
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
|
||||
if (!fieldName.equals(that.fieldName)) {
|
||||
return false;
|
||||
}
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) {
|
||||
|
@ -111,7 +122,7 @@ public class FieldAccessPostAggregator implements PostAggregator
|
|||
public int hashCode()
|
||||
{
|
||||
int result = name != null ? name.hashCode() : 0;
|
||||
result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0);
|
||||
result = 31 * result + fieldName.hashCode();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,9 +24,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.js.JavaScriptConfig;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
import org.mozilla.javascript.Context;
|
||||
import org.mozilla.javascript.ContextFactory;
|
||||
import org.mozilla.javascript.ScriptableObject;
|
||||
|
@ -97,16 +97,12 @@ public class JavaScriptPostAggregator implements PostAggregator
|
|||
Preconditions.checkNotNull(name, "Must have a valid, non-null post-aggregator name");
|
||||
Preconditions.checkNotNull(fieldNames, "Must have a valid, non-null fieldNames");
|
||||
Preconditions.checkNotNull(function, "Must have a valid, non-null function");
|
||||
Preconditions.checkState(!config.isDisabled(), "JavaScript is disabled");
|
||||
|
||||
this.name = name;
|
||||
this.fieldNames = fieldNames;
|
||||
this.function = function;
|
||||
|
||||
if (config.isDisabled()) {
|
||||
this.fn = null;
|
||||
} else {
|
||||
this.fn = compile(function);
|
||||
}
|
||||
this.fn = compile(function);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -124,10 +120,6 @@ public class JavaScriptPostAggregator implements PostAggregator
|
|||
@Override
|
||||
public Object compute(Map<String, Object> combinedAggregators)
|
||||
{
|
||||
if (fn == null) {
|
||||
throw new ISE("JavaScript is disabled");
|
||||
}
|
||||
|
||||
final Object[] args = new Object[fieldNames.size()];
|
||||
int i = 0;
|
||||
for (String field : fieldNames) {
|
||||
|
@ -136,6 +128,15 @@ public class JavaScriptPostAggregator implements PostAggregator
|
|||
return fn.apply(args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.JAVA_SCRIPT)
|
||||
.appendStrings(fieldNames)
|
||||
.appendString(function)
|
||||
.build();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public String getName()
|
||||
|
@ -167,16 +168,16 @@ public class JavaScriptPostAggregator implements PostAggregator
|
|||
|
||||
JavaScriptPostAggregator that = (JavaScriptPostAggregator) o;
|
||||
|
||||
if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) {
|
||||
if (!fieldNames.equals(that.fieldNames)) {
|
||||
return false;
|
||||
}
|
||||
if (fn != null ? !fn.equals(that.fn) : that.fn != null) {
|
||||
if (!fn.equals(that.fn)) {
|
||||
return false;
|
||||
}
|
||||
if (function != null ? !function.equals(that.function) : that.function != null) {
|
||||
if (!function.equals(that.function)) {
|
||||
return false;
|
||||
}
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) {
|
||||
if (!name.equals(that.name)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -186,10 +187,10 @@ public class JavaScriptPostAggregator implements PostAggregator
|
|||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = name != null ? name.hashCode() : 0;
|
||||
result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0);
|
||||
result = 31 * result + (function != null ? function.hashCode() : 0);
|
||||
result = 31 * result + (fn != null ? fn.hashCode() : 0);
|
||||
int result = name.hashCode();
|
||||
result = 31 * result + fieldNames.hashCode();
|
||||
result = 31 * result + function.hashCode();
|
||||
result = 31 * result + fn.hashCode();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Longs;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
|
@ -143,4 +144,12 @@ public class LongGreatestPostAggregator implements PostAggregator
|
|||
result = 31 * result + fields.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.LONG_GREATEST)
|
||||
.appendCacheablesIgnoringOrder(fields)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Longs;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
|
@ -143,4 +144,12 @@ public class LongLeastPostAggregator implements PostAggregator
|
|||
result = 31 * result + fields.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(PostAggregatorIds.LONG_LEAST)
|
||||
.appendCacheablesIgnoringOrder(fields)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.post;
|
||||
|
||||
public class PostAggregatorIds
|
||||
{
|
||||
public static final byte ARITHMETIC = 0;
|
||||
public static final byte CONSTANT = 1;
|
||||
public static final byte DOUBLE_GREATEST = 2;
|
||||
public static final byte DOUBLE_LEAST = 3;
|
||||
public static final byte EXPRESSION = 4;
|
||||
public static final byte FIELD_ACCESS = 5;
|
||||
public static final byte JAVA_SCRIPT = 6;
|
||||
public static final byte LONG_GREATEST = 7;
|
||||
public static final byte LONG_LEAST = 8;
|
||||
public static final byte HLL_HYPER_UNIQUE_FINALIZING = 9;
|
||||
public static final byte HISTOGRAM_BUCKETS = 10;
|
||||
public static final byte HISTOGRAM_CUSTOM_BUCKETS = 11;
|
||||
public static final byte HISTOGRAM_EQUAL_BUCKETS = 12;
|
||||
public static final byte HISTOGRAM_MAX = 13;
|
||||
public static final byte HISTOGRAM_MIN = 14;
|
||||
public static final byte HISTOGRAM_QUANTILE = 15;
|
||||
public static final byte HISTOGRAM_QUANTILES = 16;
|
||||
public static final byte DATA_SKETCHES_SKETCH_ESTIMATE = 17;
|
||||
public static final byte DATA_SKETCHES_SKETCH_SET = 18;
|
||||
public static final byte VARIANCE_STANDARD_DEVIATION = 19;
|
||||
}
|
|
@ -0,0 +1,268 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.cache;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Doubles;
|
||||
import com.google.common.primitives.Floats;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.UnsignedBytes;
|
||||
import io.druid.common.utils.StringUtils;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class CacheKeyBuilder
|
||||
{
|
||||
static final byte BYTE_KEY = 0;
|
||||
static final byte BYTE_ARRAY_KEY = 1;
|
||||
static final byte BOOLEAN_KEY = 2;
|
||||
static final byte INT_KEY = 3;
|
||||
static final byte FLOAT_KEY = 4;
|
||||
static final byte FLOAT_ARRAY_KEY = 5;
|
||||
static final byte DOUBLE_KEY = 6;
|
||||
static final byte STRING_KEY = 7;
|
||||
static final byte STRING_LIST_KEY = 8;
|
||||
static final byte CACHEABLE_KEY = 9;
|
||||
static final byte CACHEABLE_LIST_KEY = 10;
|
||||
|
||||
static final byte[] STRING_SEPARATOR = new byte[]{(byte) 0xFF};
|
||||
static final byte[] EMPTY_BYTES = StringUtils.EMPTY_BYTES;
|
||||
|
||||
private static class Item
|
||||
{
|
||||
private final byte typeKey;
|
||||
private final byte[] item;
|
||||
|
||||
Item(byte typeKey, byte[] item)
|
||||
{
|
||||
this.typeKey = typeKey;
|
||||
this.item = item;
|
||||
}
|
||||
|
||||
int byteSize()
|
||||
{
|
||||
return 1 + item.length;
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] floatArrayToByteArray(float[] input)
|
||||
{
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(Floats.BYTES * input.length);
|
||||
buffer.asFloatBuffer().put(input);
|
||||
return buffer.array();
|
||||
}
|
||||
|
||||
private static byte[] cacheableToByteArray(@Nullable Cacheable cacheable)
|
||||
{
|
||||
if (cacheable == null) {
|
||||
return EMPTY_BYTES;
|
||||
} else {
|
||||
final byte[] key = cacheable.getCacheKey();
|
||||
Preconditions.checkArgument(!Arrays.equals(key, EMPTY_BYTES), "cache key is equal to the empty key");
|
||||
return key;
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] stringCollectionToByteArray(Collection<String> input, boolean preserveOrder)
|
||||
{
|
||||
return collectionToByteArray(
|
||||
input,
|
||||
new Function<String, byte[]>()
|
||||
{
|
||||
@Override
|
||||
public byte[] apply(@Nullable String input)
|
||||
{
|
||||
return StringUtils.toUtf8WithNullToEmpty(input);
|
||||
}
|
||||
},
|
||||
STRING_SEPARATOR,
|
||||
preserveOrder
|
||||
);
|
||||
}
|
||||
|
||||
private static byte[] cacheableCollectionToByteArray(Collection<? extends Cacheable> input, boolean preserveOrder)
|
||||
{
|
||||
return collectionToByteArray(
|
||||
input,
|
||||
new Function<Cacheable, byte[]>()
|
||||
{
|
||||
@Override
|
||||
public byte[] apply(@Nullable Cacheable input)
|
||||
{
|
||||
return input == null ? EMPTY_BYTES : input.getCacheKey();
|
||||
}
|
||||
},
|
||||
EMPTY_BYTES,
|
||||
preserveOrder
|
||||
);
|
||||
}
|
||||
|
||||
private static <T> byte[] collectionToByteArray(
|
||||
Collection<? extends T> collection,
|
||||
Function<T, byte[]> serializeFunction,
|
||||
byte[] separator,
|
||||
boolean preserveOrder
|
||||
)
|
||||
{
|
||||
if (collection.size() > 0) {
|
||||
List<byte[]> byteArrayList = Lists.newArrayListWithCapacity(collection.size());
|
||||
int totalByteLength = 0;
|
||||
for (T eachItem : collection) {
|
||||
final byte[] byteArray = serializeFunction.apply(eachItem);
|
||||
totalByteLength += byteArray.length;
|
||||
byteArrayList.add(byteArray);
|
||||
}
|
||||
|
||||
if (!preserveOrder) {
|
||||
// Sort the byte array list to guarantee that collections of same items but in different orders make the same result
|
||||
Collections.sort(byteArrayList, UnsignedBytes.lexicographicalComparator());
|
||||
}
|
||||
|
||||
final Iterator<byte[]> iterator = byteArrayList.iterator();
|
||||
final int bufSize = Ints.BYTES + separator.length * (byteArrayList.size() - 1) + totalByteLength;
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bufSize)
|
||||
.putInt(byteArrayList.size())
|
||||
.put(iterator.next());
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
buffer.put(separator).put(iterator.next());
|
||||
}
|
||||
|
||||
return buffer.array();
|
||||
} else {
|
||||
return EMPTY_BYTES;
|
||||
}
|
||||
}
|
||||
|
||||
private final List<Item> items = Lists.newArrayList();
|
||||
private final byte id;
|
||||
private int size;
|
||||
|
||||
public CacheKeyBuilder(byte id)
|
||||
{
|
||||
this.id = id;
|
||||
this.size = 1;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendByte(byte input)
|
||||
{
|
||||
appendItem(BYTE_KEY, new byte[]{input});
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendByteArray(byte[] input)
|
||||
{
|
||||
appendItem(BYTE_ARRAY_KEY, input);
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendString(@Nullable String input)
|
||||
{
|
||||
appendItem(STRING_KEY, StringUtils.toUtf8WithNullToEmpty(input));
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendStrings(Collection<String> input)
|
||||
{
|
||||
appendItem(STRING_LIST_KEY, stringCollectionToByteArray(input, true));
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendStringsIgnoringOrder(Collection<String> input)
|
||||
{
|
||||
appendItem(STRING_LIST_KEY, stringCollectionToByteArray(input, false));
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendBoolean(boolean input)
|
||||
{
|
||||
appendItem(BOOLEAN_KEY, new byte[]{(byte) (input ? 1 : 0)});
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendInt(int input)
|
||||
{
|
||||
appendItem(INT_KEY, Ints.toByteArray(input));
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendFloat(float input)
|
||||
{
|
||||
appendItem(FLOAT_KEY, ByteBuffer.allocate(Floats.BYTES).putFloat(input).array());
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendDouble(double input)
|
||||
{
|
||||
appendItem(DOUBLE_KEY, ByteBuffer.allocate(Doubles.BYTES).putDouble(input).array());
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendFloatArray(float[] input)
|
||||
{
|
||||
appendItem(FLOAT_ARRAY_KEY, floatArrayToByteArray(input));
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendCacheable(@Nullable Cacheable input)
|
||||
{
|
||||
appendItem(CACHEABLE_KEY, cacheableToByteArray(input));
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendCacheables(Collection<? extends Cacheable> input)
|
||||
{
|
||||
appendItem(CACHEABLE_LIST_KEY, cacheableCollectionToByteArray(input, true));
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheKeyBuilder appendCacheablesIgnoringOrder(Collection<? extends Cacheable> input)
|
||||
{
|
||||
appendItem(CACHEABLE_LIST_KEY, cacheableCollectionToByteArray(input, false));
|
||||
return this;
|
||||
}
|
||||
|
||||
private void appendItem(byte typeKey, byte[] input)
|
||||
{
|
||||
final Item item = new Item(typeKey, input);
|
||||
items.add(item);
|
||||
size += item.byteSize();
|
||||
}
|
||||
|
||||
public byte[] build()
|
||||
{
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(size);
|
||||
buffer.put(id);
|
||||
|
||||
for (Item item : items) {
|
||||
buffer.put(item.typeKey).put(item.item);
|
||||
}
|
||||
|
||||
return buffer.array();
|
||||
}
|
||||
}
|
|
@ -17,34 +17,14 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query;
|
||||
package io.druid.query.cache;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class QueryCacheHelper
|
||||
public interface Cacheable
|
||||
{
|
||||
public static byte[] computeAggregatorBytes(List<AggregatorFactory> aggregatorSpecs)
|
||||
{
|
||||
List<byte[]> cacheKeySet = Lists.newArrayListWithCapacity(aggregatorSpecs.size());
|
||||
|
||||
int totalSize = 0;
|
||||
for (AggregatorFactory spec : aggregatorSpecs) {
|
||||
final byte[] cacheKey = spec.getCacheKey();
|
||||
cacheKeySet.add(cacheKey);
|
||||
totalSize += cacheKey.length;
|
||||
}
|
||||
|
||||
ByteBuffer retVal = ByteBuffer.allocate(totalSize);
|
||||
for (byte[] bytes : cacheKeySet) {
|
||||
retVal.put(bytes);
|
||||
}
|
||||
return retVal.array();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a byte array used as a cache key.
|
||||
*
|
||||
* @return a cache key
|
||||
*/
|
||||
byte[] getCacheKey();
|
||||
}
|
|
@ -21,6 +21,7 @@ package io.druid.query.dimension;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.druid.query.cache.Cacheable;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
@ -35,7 +36,7 @@ import io.druid.segment.column.ValueType;
|
|||
@JsonSubTypes.Type(name = "listFiltered", value = ListFilteredDimensionSpec.class),
|
||||
@JsonSubTypes.Type(name = "lookup", value = LookupDimensionSpec.class)
|
||||
})
|
||||
public interface DimensionSpec
|
||||
public interface DimensionSpec extends Cacheable
|
||||
{
|
||||
String getDimension();
|
||||
|
||||
|
@ -54,7 +55,5 @@ public interface DimensionSpec
|
|||
*/
|
||||
boolean mustDecorate();
|
||||
|
||||
byte[] getCacheKey();
|
||||
|
||||
boolean preservesOrdering();
|
||||
}
|
||||
|
|
|
@ -188,7 +188,7 @@ public class LookupDimensionSpec implements DimensionSpec
|
|||
.put(DimFilterUtils.STRING_SEPARATOR)
|
||||
.put(replaceWithBytes)
|
||||
.put(DimFilterUtils.STRING_SEPARATOR)
|
||||
.put(retainMissingValue == true ? (byte) 1 : (byte) 0)
|
||||
.put(retainMissingValue ? (byte) 1 : (byte) 0)
|
||||
.array();
|
||||
}
|
||||
|
||||
|
|
|
@ -107,7 +107,7 @@ public class TimeFormatExtractionFn implements ExtractionFn
|
|||
public byte[] getCacheKey()
|
||||
{
|
||||
final byte[] exprBytes = StringUtils.toUtf8(format + "\u0001" + tz.getID() + "\u0001" + locale.toLanguageTag());
|
||||
final byte[] granularityCacheKey = granularity.cacheKey();
|
||||
final byte[] granularityCacheKey = granularity.getCacheKey();
|
||||
return ByteBuffer.allocate(4 + exprBytes.length + granularityCacheKey.length)
|
||||
.put(ExtractionCacheHelper.CACHE_TYPE_ID_TIME_FORMAT)
|
||||
.put(exprBytes)
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.query.filter;
|
|||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.google.common.collect.RangeSet;
|
||||
import io.druid.query.cache.Cacheable;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -41,15 +42,13 @@ import com.google.common.collect.RangeSet;
|
|||
@JsonSubTypes.Type(name="interval", value=IntervalDimFilter.class),
|
||||
@JsonSubTypes.Type(name="like", value=LikeDimFilter.class)
|
||||
})
|
||||
public interface DimFilter
|
||||
public interface DimFilter extends Cacheable
|
||||
{
|
||||
public byte[] getCacheKey();
|
||||
|
||||
/**
|
||||
* @return Returns an optimized filter.
|
||||
* returning the same filter can be a straightforward default implementation.
|
||||
*/
|
||||
public DimFilter optimize();
|
||||
DimFilter optimize();
|
||||
|
||||
/**
|
||||
* Returns a Filter that implements this DimFilter. This does not generally involve optimizing the DimFilter,
|
||||
|
@ -57,7 +56,7 @@ public interface DimFilter
|
|||
*
|
||||
* @return a Filter that implements this DimFilter, or null if this DimFilter is a no-op.
|
||||
*/
|
||||
public Filter toFilter();
|
||||
Filter toFilter();
|
||||
|
||||
/**
|
||||
* Returns a RangeSet that represents the possible range of the input dimension for this DimFilter.This is
|
||||
|
@ -72,5 +71,5 @@ public interface DimFilter
|
|||
* @return a RangeSet that represent the possible range of the input dimension, or null if it is not possible to
|
||||
* determine for this DimFilter.
|
||||
*/
|
||||
public RangeSet<String> getDimensionRangeSet(String dimension);
|
||||
RangeSet<String> getDimensionRangeSet(String dimension);
|
||||
}
|
||||
|
|
|
@ -46,7 +46,6 @@ import io.druid.query.DataSource;
|
|||
import io.druid.query.DruidMetrics;
|
||||
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryCacheHelper;
|
||||
import io.druid.query.QueryDataSource;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryToolChest;
|
||||
|
@ -54,10 +53,10 @@ import io.druid.query.SubqueryQueryRunner;
|
|||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.MetricManipulationFn;
|
||||
import io.druid.query.aggregation.MetricManipulatorFns;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.groupby.strategy.GroupByStrategySelector;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
|
@ -356,45 +355,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
@Override
|
||||
public byte[] computeCacheKey(GroupByQuery query)
|
||||
{
|
||||
final DimFilter dimFilter = query.getDimFilter();
|
||||
final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey();
|
||||
final byte[] aggregatorBytes = QueryCacheHelper.computeAggregatorBytes(query.getAggregatorSpecs());
|
||||
final byte[] granularityBytes = query.getGranularity().cacheKey();
|
||||
final byte[][] dimensionsBytes = new byte[query.getDimensions().size()][];
|
||||
int dimensionsBytesSize = 0;
|
||||
int index = 0;
|
||||
for (DimensionSpec dimension : query.getDimensions()) {
|
||||
dimensionsBytes[index] = dimension.getCacheKey();
|
||||
dimensionsBytesSize += dimensionsBytes[index].length;
|
||||
++index;
|
||||
}
|
||||
final byte[] havingBytes = query.getHavingSpec() == null ? new byte[]{} : query.getHavingSpec().getCacheKey();
|
||||
final byte[] limitBytes = query.getLimitSpec().getCacheKey();
|
||||
|
||||
ByteBuffer buffer = ByteBuffer
|
||||
.allocate(
|
||||
2
|
||||
+ granularityBytes.length
|
||||
+ filterBytes.length
|
||||
+ aggregatorBytes.length
|
||||
+ dimensionsBytesSize
|
||||
+ havingBytes.length
|
||||
+ limitBytes.length
|
||||
)
|
||||
.put(GROUPBY_QUERY)
|
||||
.put(CACHE_STRATEGY_VERSION)
|
||||
.put(granularityBytes)
|
||||
.put(filterBytes)
|
||||
.put(aggregatorBytes);
|
||||
|
||||
for (byte[] dimensionsByte : dimensionsBytes) {
|
||||
buffer.put(dimensionsByte);
|
||||
}
|
||||
|
||||
return buffer
|
||||
.put(havingBytes)
|
||||
.put(limitBytes)
|
||||
.array();
|
||||
return new CacheKeyBuilder(GROUPBY_QUERY)
|
||||
.appendByte(CACHE_STRATEGY_VERSION)
|
||||
.appendCacheable(query.getGranularity())
|
||||
.appendCacheable(query.getDimFilter())
|
||||
.appendCacheablesIgnoringOrder(query.getAggregatorSpecs())
|
||||
.appendCacheablesIgnoringOrder(query.getDimensions())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.query.groupby.having;
|
|||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.cache.Cacheable;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.util.Map;
|
||||
|
@ -43,12 +44,12 @@ import java.util.Map;
|
|||
@JsonSubTypes.Type(name = "always", value = AlwaysHavingSpec.class),
|
||||
@JsonSubTypes.Type(name = "filter", value = DimFilterHavingSpec.class)
|
||||
})
|
||||
public interface HavingSpec
|
||||
public interface HavingSpec extends Cacheable
|
||||
{
|
||||
// Atoms for easy combination, but for now they are mostly useful
|
||||
// for testing.
|
||||
public static final HavingSpec NEVER = new NeverHavingSpec();
|
||||
public static final HavingSpec ALWAYS = new AlwaysHavingSpec();
|
||||
HavingSpec NEVER = new NeverHavingSpec();
|
||||
HavingSpec ALWAYS = new AlwaysHavingSpec();
|
||||
|
||||
/**
|
||||
* Informs this HavingSpec that rows passed to "eval" will have a certain signature. Will be called
|
||||
|
@ -56,7 +57,7 @@ public interface HavingSpec
|
|||
*
|
||||
* @param rowSignature signature of the rows
|
||||
*/
|
||||
public void setRowSignature(Map<String, ValueType> rowSignature);
|
||||
void setRowSignature(Map<String, ValueType> rowSignature);
|
||||
|
||||
/**
|
||||
* Evaluates if a given row satisfies the having spec.
|
||||
|
@ -65,7 +66,5 @@ public interface HavingSpec
|
|||
*
|
||||
* @return true if the given row satisfies the having spec. False otherwise.
|
||||
*/
|
||||
public boolean eval(Row row);
|
||||
|
||||
public byte[] getCacheKey();
|
||||
boolean eval(Row row);
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import io.druid.data.input.Row;
|
|||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.Cacheable;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -36,7 +37,7 @@ import java.util.List;
|
|||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "default", value = DefaultLimitSpec.class)
|
||||
})
|
||||
public interface LimitSpec
|
||||
public interface LimitSpec extends Cacheable
|
||||
{
|
||||
/**
|
||||
* Returns a function that applies a limit to an input sequence that is assumed to be sorted on dimensions.
|
||||
|
@ -47,13 +48,11 @@ public interface LimitSpec
|
|||
*
|
||||
* @return limit function
|
||||
*/
|
||||
public Function<Sequence<Row>, Sequence<Row>> build(
|
||||
Function<Sequence<Row>, Sequence<Row>> build(
|
||||
List<DimensionSpec> dimensions,
|
||||
List<AggregatorFactory> aggs,
|
||||
List<PostAggregator> postAggs
|
||||
);
|
||||
|
||||
public LimitSpec merge(LimitSpec other);
|
||||
|
||||
public byte[] getCacheKey();
|
||||
LimitSpec merge(LimitSpec other);
|
||||
}
|
||||
|
|
|
@ -159,7 +159,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
|
|||
final DimFilter dimFilter = query.getDimensionsFilter();
|
||||
final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey();
|
||||
final byte[] querySpecBytes = query.getQuery().getCacheKey();
|
||||
final byte[] granularityBytes = query.getGranularity().cacheKey();
|
||||
final byte[] granularityBytes = query.getGranularity().getCacheKey();
|
||||
|
||||
final List<DimensionSpec> dimensionSpecs =
|
||||
query.getDimensions() != null ? query.getDimensions() : Collections.<DimensionSpec>emptyList();
|
||||
|
|
|
@ -164,7 +164,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
|
|||
{
|
||||
final DimFilter dimFilter = query.getDimensionsFilter();
|
||||
final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey();
|
||||
final byte[] granularityBytes = query.getGranularity().cacheKey();
|
||||
final byte[] granularityBytes = query.getGranularity().getCacheKey();
|
||||
|
||||
final List<DimensionSpec> dimensionSpecs =
|
||||
query.getDimensions() != null ? query.getDimensions() : Collections.<DimensionSpec>emptyList();
|
||||
|
|
|
@ -33,7 +33,6 @@ import io.druid.query.CacheStrategy;
|
|||
import io.druid.query.DruidMetrics;
|
||||
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryCacheHelper;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.Result;
|
||||
|
@ -42,11 +41,10 @@ import io.druid.query.ResultMergeQueryRunner;
|
|||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.MetricManipulationFn;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -132,22 +130,13 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
|
|||
@Override
|
||||
public byte[] computeCacheKey(TimeseriesQuery query)
|
||||
{
|
||||
final DimFilter dimFilter = query.getDimensionsFilter();
|
||||
final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey();
|
||||
final byte[] aggregatorBytes = QueryCacheHelper.computeAggregatorBytes(query.getAggregatorSpecs());
|
||||
final byte[] granularityBytes = query.getGranularity().cacheKey();
|
||||
final byte descending = query.isDescending() ? (byte) 1 : 0;
|
||||
final byte skipEmptyBuckets = query.isSkipEmptyBuckets() ? (byte) 1 : 0;
|
||||
|
||||
return ByteBuffer
|
||||
.allocate(3 + granularityBytes.length + filterBytes.length + aggregatorBytes.length)
|
||||
.put(TIMESERIES_QUERY)
|
||||
.put(descending)
|
||||
.put(skipEmptyBuckets)
|
||||
.put(granularityBytes)
|
||||
.put(filterBytes)
|
||||
.put(aggregatorBytes)
|
||||
.array();
|
||||
return new CacheKeyBuilder(TIMESERIES_QUERY)
|
||||
.appendBoolean(query.isDescending())
|
||||
.appendBoolean(query.isSkipEmptyBuckets())
|
||||
.appendCacheable(query.getGranularity())
|
||||
.appendCacheable(query.getDimensionsFilter())
|
||||
.appendCacheablesIgnoringOrder(query.getAggregatorSpecs())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
|
|||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.Cacheable;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
|
@ -39,13 +40,13 @@ import java.util.List;
|
|||
@JsonSubTypes.Type(name = "inverted", value = InvertedTopNMetricSpec.class),
|
||||
@JsonSubTypes.Type(name = "dimension", value = DimensionTopNMetricSpec.class),
|
||||
})
|
||||
public interface TopNMetricSpec
|
||||
public interface TopNMetricSpec extends Cacheable
|
||||
{
|
||||
public void verifyPreconditions(List<AggregatorFactory> aggregatorSpecs, List<PostAggregator> postAggregatorSpecs);
|
||||
void verifyPreconditions(List<AggregatorFactory> aggregatorSpecs, List<PostAggregator> postAggregatorSpecs);
|
||||
|
||||
public Comparator getComparator(List<AggregatorFactory> aggregatorSpecs, List<PostAggregator> postAggregatorSpecs);
|
||||
Comparator getComparator(List<AggregatorFactory> aggregatorSpecs, List<PostAggregator> postAggregatorSpecs);
|
||||
|
||||
public TopNResultBuilder getResultBuilder(
|
||||
TopNResultBuilder getResultBuilder(
|
||||
DateTime timestamp,
|
||||
DimensionSpec dimSpec,
|
||||
int threshold,
|
||||
|
@ -54,13 +55,11 @@ public interface TopNMetricSpec
|
|||
List<PostAggregator> postAggs
|
||||
);
|
||||
|
||||
public byte[] getCacheKey();
|
||||
<T> TopNMetricSpecBuilder<T> configureOptimizer(TopNMetricSpecBuilder<T> builder);
|
||||
|
||||
public <T> TopNMetricSpecBuilder<T> configureOptimizer(TopNMetricSpecBuilder<T> builder);
|
||||
void initTopNAlgorithmSelector(TopNAlgorithmSelector selector);
|
||||
|
||||
public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector);
|
||||
String getMetricName(DimensionSpec dimSpec);
|
||||
|
||||
public String getMetricName(DimensionSpec dimSpec);
|
||||
|
||||
public boolean canBeOptimizedUnordered();
|
||||
boolean canBeOptimizedUnordered();
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import com.google.common.collect.Iterables;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
|
@ -39,7 +38,6 @@ import io.druid.query.CacheStrategy;
|
|||
import io.druid.query.DruidMetrics;
|
||||
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryCacheHelper;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.Result;
|
||||
|
@ -49,13 +47,12 @@ import io.druid.query.aggregation.AggregatorFactory;
|
|||
import io.druid.query.aggregation.AggregatorUtil;
|
||||
import io.druid.query.aggregation.MetricManipulationFn;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -291,6 +288,8 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
return TYPE_REFERENCE;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> getCacheStrategy(final TopNQuery query)
|
||||
{
|
||||
|
@ -306,27 +305,22 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
@Override
|
||||
public byte[] computeCacheKey(TopNQuery query)
|
||||
{
|
||||
final byte[] dimensionSpecBytes = query.getDimensionSpec().getCacheKey();
|
||||
final byte[] metricSpecBytes = query.getTopNMetricSpec().getCacheKey();
|
||||
final CacheKeyBuilder builder = new CacheKeyBuilder(TOPN_QUERY)
|
||||
.appendCacheable(query.getDimensionSpec())
|
||||
.appendCacheable(query.getTopNMetricSpec())
|
||||
.appendInt(query.getThreshold())
|
||||
.appendCacheable(query.getGranularity())
|
||||
.appendCacheable(query.getDimensionsFilter())
|
||||
.appendCacheablesIgnoringOrder(query.getAggregatorSpecs());
|
||||
|
||||
final DimFilter dimFilter = query.getDimensionsFilter();
|
||||
final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey();
|
||||
final byte[] aggregatorBytes = QueryCacheHelper.computeAggregatorBytes(query.getAggregatorSpecs());
|
||||
final byte[] granularityBytes = query.getGranularity().cacheKey();
|
||||
final List<PostAggregator> postAggregators = prunePostAggregators(query);
|
||||
if (!postAggregators.isEmpty()) {
|
||||
// Append post aggregators only when they are used as sort keys.
|
||||
// Note that appending an empty list produces a different cache key from not appending it.
|
||||
builder.appendCacheablesIgnoringOrder(postAggregators);
|
||||
}
|
||||
|
||||
return ByteBuffer
|
||||
.allocate(
|
||||
1 + dimensionSpecBytes.length + metricSpecBytes.length + 4 +
|
||||
granularityBytes.length + filterBytes.length + aggregatorBytes.length
|
||||
)
|
||||
.put(TOPN_QUERY)
|
||||
.put(dimensionSpecBytes)
|
||||
.put(metricSpecBytes)
|
||||
.put(Ints.toByteArray(query.getThreshold()))
|
||||
.put(granularityBytes)
|
||||
.put(filterBytes)
|
||||
.put(aggregatorBytes)
|
||||
.array();
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -58,19 +58,14 @@ public class JavaScriptPostAggregatorTest
|
|||
@Test
|
||||
public void testComputeJavaScriptNotAllowed()
|
||||
{
|
||||
JavaScriptPostAggregator javaScriptPostAggregator;
|
||||
|
||||
String absPercentFunction = "function(delta, total) { return 100 * Math.abs(delta) / total; }";
|
||||
javaScriptPostAggregator = new JavaScriptPostAggregator(
|
||||
expectedException.expect(IllegalStateException.class);
|
||||
expectedException.expectMessage("JavaScript is disabled");
|
||||
new JavaScriptPostAggregator(
|
||||
"absPercent",
|
||||
Lists.newArrayList("delta", "total"),
|
||||
absPercentFunction,
|
||||
new JavaScriptConfig(true)
|
||||
);
|
||||
|
||||
expectedException.expect(IllegalStateException.class);
|
||||
expectedException.expectMessage("JavaScript is disabled");
|
||||
javaScriptPostAggregator.compute(Maps.<String, Object>newHashMap());
|
||||
Assert.assertTrue(false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,397 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.cache;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Doubles;
|
||||
import com.google.common.primitives.Floats;
|
||||
import com.google.common.primitives.Ints;
|
||||
import io.druid.common.utils.StringUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
public class CacheKeyBuilderTest
|
||||
{
|
||||
@Test
|
||||
public void testCacheKeyBuilder()
|
||||
{
|
||||
final Cacheable cacheable = new Cacheable()
|
||||
{
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new byte[]{10, 20};
|
||||
}
|
||||
};
|
||||
|
||||
final byte[] actual = new CacheKeyBuilder((byte) 10)
|
||||
.appendBoolean(false)
|
||||
.appendString("test")
|
||||
.appendInt(10)
|
||||
.appendFloat(0.1f)
|
||||
.appendDouble(2.3)
|
||||
.appendByteArray(CacheKeyBuilder.STRING_SEPARATOR) // test when an item is same with the separator
|
||||
.appendFloatArray(new float[]{10.0f, 11.0f})
|
||||
.appendStrings(Lists.newArrayList("test1", "test2"))
|
||||
.appendCacheable(cacheable)
|
||||
.appendCacheable(null)
|
||||
.appendCacheables(Lists.newArrayList(cacheable, null, cacheable))
|
||||
.build();
|
||||
|
||||
final int expectedSize = 1 // id
|
||||
+ 1 // bool
|
||||
+ 4 // 'test'
|
||||
+ Ints.BYTES // 10
|
||||
+ Floats.BYTES // 0.1f
|
||||
+ Doubles.BYTES // 2.3
|
||||
+ CacheKeyBuilder.STRING_SEPARATOR.length // byte array
|
||||
+ Floats.BYTES * 2 // 10.0f, 11.0f
|
||||
+ Ints.BYTES + 5 * 2 + 1 // 'test1' 'test2'
|
||||
+ cacheable.getCacheKey().length // cacheable
|
||||
+ Ints.BYTES + 4 // cacheable list
|
||||
+ 11; // type keys
|
||||
assertEquals(expectedSize, actual.length);
|
||||
|
||||
final byte[] expected = ByteBuffer.allocate(expectedSize)
|
||||
.put((byte) 10)
|
||||
.put(CacheKeyBuilder.BOOLEAN_KEY)
|
||||
.put((byte) 0)
|
||||
.put(CacheKeyBuilder.STRING_KEY)
|
||||
.put(StringUtils.toUtf8("test"))
|
||||
.put(CacheKeyBuilder.INT_KEY)
|
||||
.putInt(10)
|
||||
.put(CacheKeyBuilder.FLOAT_KEY)
|
||||
.putFloat(0.1f)
|
||||
.put(CacheKeyBuilder.DOUBLE_KEY)
|
||||
.putDouble(2.3)
|
||||
.put(CacheKeyBuilder.BYTE_ARRAY_KEY)
|
||||
.put(CacheKeyBuilder.STRING_SEPARATOR)
|
||||
.put(CacheKeyBuilder.FLOAT_ARRAY_KEY)
|
||||
.putFloat(10.0f)
|
||||
.putFloat(11.0f)
|
||||
.put(CacheKeyBuilder.STRING_LIST_KEY)
|
||||
.putInt(2)
|
||||
.put(StringUtils.toUtf8("test1"))
|
||||
.put(CacheKeyBuilder.STRING_SEPARATOR)
|
||||
.put(StringUtils.toUtf8("test2"))
|
||||
.put(CacheKeyBuilder.CACHEABLE_KEY)
|
||||
.put(cacheable.getCacheKey())
|
||||
.put(CacheKeyBuilder.CACHEABLE_KEY)
|
||||
.put(CacheKeyBuilder.CACHEABLE_LIST_KEY)
|
||||
.putInt(3)
|
||||
.put(cacheable.getCacheKey())
|
||||
.put(cacheable.getCacheKey())
|
||||
.array();
|
||||
|
||||
assertArrayEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDifferentOrderList()
|
||||
{
|
||||
byte[] key1 = new CacheKeyBuilder((byte) 10)
|
||||
.appendStringsIgnoringOrder(Lists.newArrayList("AB", "BA"))
|
||||
.build();
|
||||
|
||||
byte[] key2 = new CacheKeyBuilder((byte) 10)
|
||||
.appendStringsIgnoringOrder(Lists.newArrayList("BA", "AB"))
|
||||
.build();
|
||||
|
||||
assertArrayEquals(key1, key2);
|
||||
|
||||
final Cacheable cacheable1 = new Cacheable()
|
||||
{
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new byte[]{1};
|
||||
}
|
||||
};
|
||||
|
||||
final Cacheable cacheable2 = new Cacheable()
|
||||
{
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new byte[]{2};
|
||||
}
|
||||
};
|
||||
|
||||
key1 = new CacheKeyBuilder((byte) 10)
|
||||
.appendCacheablesIgnoringOrder(Lists.newArrayList(cacheable1, cacheable2))
|
||||
.build();
|
||||
|
||||
key2 = new CacheKeyBuilder((byte) 10)
|
||||
.appendCacheablesIgnoringOrder(Lists.newArrayList(cacheable2, cacheable1))
|
||||
.build();
|
||||
|
||||
assertArrayEquals(key1, key2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotEqualStrings()
|
||||
{
|
||||
final List<byte[]> keys = Lists.newArrayList();
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendString("test")
|
||||
.appendString("test")
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendString("testtest")
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendString("testtest")
|
||||
.appendString("")
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendString("")
|
||||
.appendString("testtest")
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendStrings(ImmutableList.of("test", "test"))
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendStrings(ImmutableList.of("testtest"))
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendStrings(ImmutableList.of("testtest", ""))
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendStrings(ImmutableList.of("testtest"))
|
||||
.appendStrings(ImmutableList.<String>of())
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendStrings(ImmutableList.<String>of())
|
||||
.appendStrings(ImmutableList.of("testtest"))
|
||||
.build()
|
||||
);
|
||||
|
||||
assertNotEqualsEachOther(keys);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotEqualCacheables()
|
||||
{
|
||||
final Cacheable test = new Cacheable()
|
||||
{
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return "test".getBytes();
|
||||
}
|
||||
};
|
||||
|
||||
final Cacheable testtest = new Cacheable()
|
||||
{
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return "testtest".getBytes();
|
||||
}
|
||||
};
|
||||
|
||||
final List<byte[]> keys = Lists.newArrayList();
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendCacheable(test)
|
||||
.appendCacheable(test)
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendCacheable(testtest)
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendCacheables(Lists.newArrayList(test, test))
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendCacheables(Lists.newArrayList(testtest))
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendCacheables(Lists.newArrayList(testtest))
|
||||
.appendCacheables(Lists.<Cacheable>newArrayList())
|
||||
.build()
|
||||
);
|
||||
|
||||
keys.add(
|
||||
new CacheKeyBuilder((byte) 10)
|
||||
.appendCacheables(Lists.<Cacheable>newArrayList())
|
||||
.appendCacheables(Lists.newArrayList(testtest))
|
||||
.build()
|
||||
);
|
||||
|
||||
assertNotEqualsEachOther(keys);
|
||||
}
|
||||
|
||||
private static void assertNotEqualsEachOther(List<byte[]> keys)
|
||||
{
|
||||
for (byte[] k1 : keys) {
|
||||
for (byte[] k2 : keys) {
|
||||
if (k1 != k2) {
|
||||
assertFalse(Arrays.equals(k1, k2));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyOrNullStringLists()
|
||||
{
|
||||
byte[] key1 = new CacheKeyBuilder((byte) 10)
|
||||
.appendStrings(Lists.newArrayList("", ""))
|
||||
.build();
|
||||
|
||||
byte[] key2 = new CacheKeyBuilder((byte) 10)
|
||||
.appendStrings(Lists.newArrayList(""))
|
||||
.build();
|
||||
|
||||
assertFalse(Arrays.equals(key1, key2));
|
||||
|
||||
key1 = new CacheKeyBuilder((byte) 10)
|
||||
.appendStrings(Lists.newArrayList(""))
|
||||
.build();
|
||||
|
||||
key2 = new CacheKeyBuilder((byte) 10)
|
||||
.appendStrings(Lists.newArrayList((String) null))
|
||||
.build();
|
||||
|
||||
assertArrayEquals(key1, key2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyOrNullCacheables()
|
||||
{
|
||||
final byte[] key1 = new CacheKeyBuilder((byte) 10)
|
||||
.appendCacheables(Lists.<Cacheable>newArrayList())
|
||||
.build();
|
||||
|
||||
final byte[] key2 = new CacheKeyBuilder((byte) 10)
|
||||
.appendCacheables(Lists.newArrayList((Cacheable) null))
|
||||
.build();
|
||||
|
||||
assertFalse(Arrays.equals(key1, key2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoringOrder()
|
||||
{
|
||||
byte[] actual = new CacheKeyBuilder((byte) 10)
|
||||
.appendStringsIgnoringOrder(Lists.newArrayList("test2", "test1", "te"))
|
||||
.build();
|
||||
|
||||
byte[] expected = ByteBuffer.allocate(20)
|
||||
.put((byte) 10)
|
||||
.put(CacheKeyBuilder.STRING_LIST_KEY)
|
||||
.putInt(3)
|
||||
.put(StringUtils.toUtf8("te"))
|
||||
.put(CacheKeyBuilder.STRING_SEPARATOR)
|
||||
.put(StringUtils.toUtf8("test1"))
|
||||
.put(CacheKeyBuilder.STRING_SEPARATOR)
|
||||
.put(StringUtils.toUtf8("test2"))
|
||||
.array();
|
||||
|
||||
assertArrayEquals(expected, actual);
|
||||
|
||||
final Cacheable c1 = new Cacheable()
|
||||
{
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return "te".getBytes();
|
||||
}
|
||||
};
|
||||
|
||||
final Cacheable c2 = new Cacheable()
|
||||
{
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return "test1".getBytes();
|
||||
}
|
||||
};
|
||||
|
||||
final Cacheable c3 = new Cacheable()
|
||||
{
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return "test2".getBytes();
|
||||
}
|
||||
};
|
||||
|
||||
actual = new CacheKeyBuilder((byte) 10)
|
||||
.appendCacheablesIgnoringOrder(Lists.newArrayList(c3, c2, c1))
|
||||
.build();
|
||||
|
||||
expected = ByteBuffer.allocate(18)
|
||||
.put((byte) 10)
|
||||
.put(CacheKeyBuilder.CACHEABLE_LIST_KEY)
|
||||
.putInt(3)
|
||||
.put(c1.getCacheKey())
|
||||
.put(c2.getCacheKey())
|
||||
.put(c3.getCacheKey())
|
||||
.array();
|
||||
|
||||
assertArrayEquals(expected, actual);
|
||||
}
|
||||
}
|
|
@ -36,6 +36,10 @@ import io.druid.query.TableDataSource;
|
|||
import io.druid.query.TestQueryRunners;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
import io.druid.query.aggregation.post.ConstantPostAggregator;
|
||||
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||
import io.druid.segment.IncrementalIndexSegment;
|
||||
|
@ -73,7 +77,7 @@ public class TopNQueryQueryToolChestTest
|
|||
null,
|
||||
QueryGranularities.ALL,
|
||||
ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("metric1")),
|
||||
null,
|
||||
ImmutableList.<PostAggregator>of(new ConstantPostAggregator("post", 10)),
|
||||
null
|
||||
)
|
||||
);
|
||||
|
@ -106,6 +110,75 @@ public class TopNQueryQueryToolChestTest
|
|||
Assert.assertEquals(result, fromCacheResult);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testComputeCacheKeyWithDifferentPostAgg() throws Exception
|
||||
{
|
||||
final TopNQuery query1 = new TopNQuery(
|
||||
new TableDataSource("dummy"),
|
||||
new DefaultDimensionSpec("test", "test"),
|
||||
new NumericTopNMetricSpec("post"),
|
||||
3,
|
||||
new MultipleIntervalSegmentSpec(
|
||||
ImmutableList.of(
|
||||
new Interval(
|
||||
"2015-01-01/2015-01-02"
|
||||
)
|
||||
)
|
||||
),
|
||||
null,
|
||||
QueryGranularities.ALL,
|
||||
ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("metric1")),
|
||||
ImmutableList.<PostAggregator>of(new ConstantPostAggregator("post", 10)),
|
||||
null
|
||||
);
|
||||
|
||||
final TopNQuery query2 = new TopNQuery(
|
||||
new TableDataSource("dummy"),
|
||||
new DefaultDimensionSpec("test", "test"),
|
||||
new NumericTopNMetricSpec("post"),
|
||||
3,
|
||||
new MultipleIntervalSegmentSpec(
|
||||
ImmutableList.of(
|
||||
new Interval(
|
||||
"2015-01-01/2015-01-02"
|
||||
)
|
||||
)
|
||||
),
|
||||
null,
|
||||
QueryGranularities.ALL,
|
||||
ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("metric1")),
|
||||
ImmutableList.<PostAggregator>of(
|
||||
new ArithmeticPostAggregator(
|
||||
"post",
|
||||
"+",
|
||||
ImmutableList.<PostAggregator>of(
|
||||
new FieldAccessPostAggregator(
|
||||
null,
|
||||
"metric1"
|
||||
),
|
||||
new FieldAccessPostAggregator(
|
||||
null,
|
||||
"metric1"
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
null
|
||||
);
|
||||
|
||||
final CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> strategy1 = new TopNQueryQueryToolChest(
|
||||
null,
|
||||
null
|
||||
).getCacheStrategy(query1);
|
||||
|
||||
final CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> strategy2 = new TopNQueryQueryToolChest(
|
||||
null,
|
||||
null
|
||||
).getCacheStrategy(query2);
|
||||
|
||||
Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinTopNThreshold() throws Exception
|
||||
{
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
|||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
@ -204,10 +205,36 @@ public class CachingClusteredClientTest
|
|||
)
|
||||
);
|
||||
private static final List<AggregatorFactory> RENAMED_AGGS = Arrays.asList(
|
||||
new CountAggregatorFactory("rows2"),
|
||||
new CountAggregatorFactory("rows"),
|
||||
new LongSumAggregatorFactory("imps", "imps"),
|
||||
new LongSumAggregatorFactory("impers2", "imps")
|
||||
);
|
||||
private static final List<PostAggregator> DIFF_ORDER_POST_AGGS = Arrays.<PostAggregator>asList(
|
||||
new ArithmeticPostAggregator(
|
||||
"avg_imps_per_row",
|
||||
"/",
|
||||
Arrays.<PostAggregator>asList(
|
||||
new FieldAccessPostAggregator("imps", "imps"),
|
||||
new FieldAccessPostAggregator("rows", "rows")
|
||||
)
|
||||
),
|
||||
new ArithmeticPostAggregator(
|
||||
"avg_imps_per_row_half",
|
||||
"/",
|
||||
Arrays.<PostAggregator>asList(
|
||||
new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"),
|
||||
new ConstantPostAggregator("constant", 2)
|
||||
)
|
||||
),
|
||||
new ArithmeticPostAggregator(
|
||||
"avg_imps_per_row_double",
|
||||
"*",
|
||||
Arrays.<PostAggregator>asList(
|
||||
new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"),
|
||||
new ConstantPostAggregator("constant", 2)
|
||||
)
|
||||
)
|
||||
);
|
||||
private static final DimFilter DIM_FILTER = null;
|
||||
private static final List<PostAggregator> RENAMED_POST_AGGS = ImmutableList.of();
|
||||
private static final QueryGranularity GRANULARITY = QueryGranularities.DAY;
|
||||
|
@ -270,7 +297,7 @@ public class CachingClusteredClientTest
|
|||
new Function<Integer, Object[]>()
|
||||
{
|
||||
@Override
|
||||
public Object[] apply(@Nullable Integer input)
|
||||
public Object[] apply(Integer input)
|
||||
{
|
||||
return new Object[]{input};
|
||||
}
|
||||
|
@ -764,23 +791,24 @@ public class CachingClusteredClientTest
|
|||
.context(CONTEXT);
|
||||
|
||||
QueryRunner runner = new FinalizeResultsQueryRunner(
|
||||
client, new TopNQueryQueryToolChest(
|
||||
new TopNQueryConfig(),
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
client,
|
||||
new TopNQueryQueryToolChest(
|
||||
new TopNQueryConfig(),
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
);
|
||||
|
||||
testQueryCaching(
|
||||
runner,
|
||||
builder.build(),
|
||||
new Interval("2011-01-01/2011-01-02"),
|
||||
makeTopNResults(new DateTime("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998),
|
||||
makeTopNResultsWithoutRename(new DateTime("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998),
|
||||
|
||||
new Interval("2011-01-02/2011-01-03"),
|
||||
makeTopNResults(new DateTime("2011-01-02"), "a", 50, 4997, "b", 50, 4996, "c", 50, 4995),
|
||||
makeTopNResultsWithoutRename(new DateTime("2011-01-02"), "a", 50, 4997, "b", 50, 4996, "c", 50, 4995),
|
||||
|
||||
new Interval("2011-01-05/2011-01-10"),
|
||||
makeTopNResults(
|
||||
makeTopNResultsWithoutRename(
|
||||
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
|
@ -789,7 +817,7 @@ public class CachingClusteredClientTest
|
|||
),
|
||||
|
||||
new Interval("2011-01-05/2011-01-10"),
|
||||
makeTopNResults(
|
||||
makeTopNResultsWithoutRename(
|
||||
new DateTime("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
|
@ -817,7 +845,7 @@ public class CachingClusteredClientTest
|
|||
builder.intervals("2011-01-01/2011-01-10")
|
||||
.metric("imps")
|
||||
.aggregators(RENAMED_AGGS)
|
||||
.postAggregators(RENAMED_POST_AGGS)
|
||||
.postAggregators(DIFF_ORDER_POST_AGGS)
|
||||
.build(),
|
||||
context
|
||||
)
|
||||
|
@ -851,7 +879,7 @@ public class CachingClusteredClientTest
|
|||
runner,
|
||||
builder.build(),
|
||||
new Interval("2011-11-04/2011-11-08"),
|
||||
makeTopNResults(
|
||||
makeTopNResultsWithoutRename(
|
||||
new DateTime("2011-11-04", TIMEZONE), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
new DateTime("2011-11-05", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-11-06", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
|
@ -871,7 +899,7 @@ public class CachingClusteredClientTest
|
|||
builder.intervals("2011-11-04/2011-11-08")
|
||||
.metric("imps")
|
||||
.aggregators(RENAMED_AGGS)
|
||||
.postAggregators(RENAMED_POST_AGGS)
|
||||
.postAggregators(DIFF_ORDER_POST_AGGS)
|
||||
.build(),
|
||||
context
|
||||
)
|
||||
|
@ -884,14 +912,14 @@ public class CachingClusteredClientTest
|
|||
List<Sequence<Result<TopNResultValue>>> sequences =
|
||||
ImmutableList.of(
|
||||
Sequences.simple(
|
||||
makeTopNResults(
|
||||
makeTopNResultsWithoutRename(
|
||||
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
|
||||
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
|
||||
)
|
||||
),
|
||||
Sequences.simple(
|
||||
makeTopNResults(
|
||||
makeTopNResultsWithoutRename(
|
||||
new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
|
||||
|
@ -901,7 +929,7 @@ public class CachingClusteredClientTest
|
|||
);
|
||||
|
||||
TestHelper.assertExpectedResults(
|
||||
makeTopNResults(
|
||||
makeTopNResultsWithoutRename(
|
||||
new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
|
@ -951,13 +979,13 @@ public class CachingClusteredClientTest
|
|||
runner,
|
||||
builder.build(),
|
||||
new Interval("2011-01-01/2011-01-02"),
|
||||
makeTopNResults(),
|
||||
makeTopNResultsWithoutRename(),
|
||||
|
||||
new Interval("2011-01-02/2011-01-03"),
|
||||
makeTopNResults(),
|
||||
makeTopNResultsWithoutRename(),
|
||||
|
||||
new Interval("2011-01-05/2011-01-10"),
|
||||
makeTopNResults(
|
||||
makeTopNResultsWithoutRename(
|
||||
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
|
@ -966,7 +994,7 @@ public class CachingClusteredClientTest
|
|||
),
|
||||
|
||||
new Interval("2011-01-05/2011-01-10"),
|
||||
makeTopNResults(
|
||||
makeTopNResultsWithoutRename(
|
||||
new DateTime("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
|
@ -993,7 +1021,7 @@ public class CachingClusteredClientTest
|
|||
builder.intervals("2011-01-01/2011-01-10")
|
||||
.metric("imps")
|
||||
.aggregators(RENAMED_AGGS)
|
||||
.postAggregators(RENAMED_POST_AGGS)
|
||||
.postAggregators(DIFF_ORDER_POST_AGGS)
|
||||
.build(),
|
||||
context
|
||||
)
|
||||
|
@ -1025,13 +1053,13 @@ public class CachingClusteredClientTest
|
|||
runner,
|
||||
builder.build(),
|
||||
new Interval("2011-01-01/2011-01-02"),
|
||||
makeTopNResults(),
|
||||
makeTopNResultsWithoutRename(),
|
||||
|
||||
new Interval("2011-01-02/2011-01-03"),
|
||||
makeTopNResults(),
|
||||
makeTopNResultsWithoutRename(),
|
||||
|
||||
new Interval("2011-01-05/2011-01-10"),
|
||||
makeTopNResults(
|
||||
makeTopNResultsWithoutRename(
|
||||
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
|
@ -1040,7 +1068,7 @@ public class CachingClusteredClientTest
|
|||
),
|
||||
|
||||
new Interval("2011-01-05/2011-01-10"),
|
||||
makeTopNResults(
|
||||
makeTopNResultsWithoutRename(
|
||||
new DateTime("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
|
@ -1051,7 +1079,7 @@ public class CachingClusteredClientTest
|
|||
|
||||
HashMap<String, List> context = new HashMap<String, List>();
|
||||
TestHelper.assertExpectedResults(
|
||||
makeTopNResults(
|
||||
makeTopNResultsWithoutRename(
|
||||
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
new DateTime("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
|
@ -1067,7 +1095,7 @@ public class CachingClusteredClientTest
|
|||
builder.intervals("2011-01-01/2011-01-10")
|
||||
.metric("avg_imps_per_row_double")
|
||||
.aggregators(AGGS)
|
||||
.postAggregators(POST_AGGS)
|
||||
.postAggregators(DIFF_ORDER_POST_AGGS)
|
||||
.build(),
|
||||
context
|
||||
)
|
||||
|
@ -2526,7 +2554,7 @@ public class CachingClusteredClientTest
|
|||
(DateTime) objects[i],
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.of(
|
||||
"rows2", objects[i + 1],
|
||||
"rows", objects[i + 1],
|
||||
"imps", objects[i + 2],
|
||||
"impers2", objects[i + 2]
|
||||
)
|
||||
|
@ -2537,9 +2565,27 @@ public class CachingClusteredClientTest
|
|||
return retVal;
|
||||
}
|
||||
|
||||
private Iterable<Result<TopNResultValue>> makeTopNResults
|
||||
private Iterable<Result<TopNResultValue>> makeTopNResultsWithoutRename
|
||||
(Object... objects)
|
||||
{
|
||||
return makeTopNResults(
|
||||
Lists.newArrayList(
|
||||
TOP_DIM,
|
||||
"rows",
|
||||
"imps",
|
||||
"impers",
|
||||
"avg_imps_per_row",
|
||||
"avg_imps_per_row_double",
|
||||
"avg_imps_per_row_half"
|
||||
),
|
||||
objects
|
||||
);
|
||||
}
|
||||
|
||||
private Iterable<Result<TopNResultValue>> makeTopNResults
|
||||
(List<String> names, Object... objects)
|
||||
{
|
||||
Preconditions.checkArgument(names.size() == 7);
|
||||
List<Result<TopNResultValue>> retVal = Lists.newArrayList();
|
||||
int index = 0;
|
||||
while (index < objects.length) {
|
||||
|
@ -2556,14 +2602,14 @@ public class CachingClusteredClientTest
|
|||
final double rows = ((Number) objects[index + 1]).doubleValue();
|
||||
values.add(
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put(TOP_DIM, objects[index])
|
||||
.put("rows", rows)
|
||||
.put("imps", imps)
|
||||
.put("impers", imps)
|
||||
.put("avg_imps_per_row", imps / rows)
|
||||
.put("avg_imps_per_row_double", ((imps * 2) / rows))
|
||||
.put("avg_imps_per_row_half", (imps / (rows * 2)))
|
||||
.build()
|
||||
.put(names.get(0), objects[index])
|
||||
.put(names.get(1), rows)
|
||||
.put(names.get(2), imps)
|
||||
.put(names.get(3), imps)
|
||||
.put(names.get(4), imps / rows)
|
||||
.put(names.get(5), ((imps * 2) / rows))
|
||||
.put(names.get(6), (imps / (rows * 2)))
|
||||
.build()
|
||||
);
|
||||
index += 3;
|
||||
}
|
||||
|
@ -2576,34 +2622,18 @@ public class CachingClusteredClientTest
|
|||
private Iterable<Result<TopNResultValue>> makeRenamedTopNResults
|
||||
(Object... objects)
|
||||
{
|
||||
List<Result<TopNResultValue>> retVal = Lists.newArrayList();
|
||||
int index = 0;
|
||||
while (index < objects.length) {
|
||||
DateTime timestamp = (DateTime) objects[index++];
|
||||
|
||||
List<Map<String, Object>> values = Lists.newArrayList();
|
||||
while (index < objects.length && !(objects[index] instanceof DateTime)) {
|
||||
if (objects.length - index < 3) {
|
||||
throw new ISE(
|
||||
"expect 3 values for each entry in the top list, had %d values left.", objects.length - index
|
||||
);
|
||||
}
|
||||
final double imps = ((Number) objects[index + 2]).doubleValue();
|
||||
final double rows = ((Number) objects[index + 1]).doubleValue();
|
||||
values.add(
|
||||
ImmutableMap.of(
|
||||
TOP_DIM, objects[index],
|
||||
"rows2", rows,
|
||||
"imps", imps,
|
||||
"impers2", imps
|
||||
)
|
||||
);
|
||||
index += 3;
|
||||
}
|
||||
|
||||
retVal.add(new Result<>(timestamp, new TopNResultValue(values)));
|
||||
}
|
||||
return retVal;
|
||||
return makeTopNResults(
|
||||
Lists.newArrayList(
|
||||
TOP_DIM,
|
||||
"rows",
|
||||
"imps",
|
||||
"impers2",
|
||||
"avg_imps_per_row",
|
||||
"avg_imps_per_row_double",
|
||||
"avg_imps_per_row_half"
|
||||
),
|
||||
objects
|
||||
);
|
||||
}
|
||||
|
||||
private Iterable<Result<SearchResultValue>> makeSearchResults
|
||||
|
@ -3086,16 +3116,16 @@ public class CachingClusteredClientTest
|
|||
|
||||
TestHelper.assertExpectedObjects(
|
||||
makeGroupByResults(
|
||||
new DateTime("2011-01-05T"), ImmutableMap.of("output2", "c", "rows2", 3, "imps", 3, "impers2", 3),
|
||||
new DateTime("2011-01-05T01"), ImmutableMap.of("output2", "c", "rows2", 3, "imps", 3, "impers2", 3),
|
||||
new DateTime("2011-01-06T"), ImmutableMap.of("output2", "d", "rows2", 4, "imps", 4, "impers2", 4),
|
||||
new DateTime("2011-01-06T01"), ImmutableMap.of("output2", "d", "rows2", 4, "imps", 4, "impers2", 4),
|
||||
new DateTime("2011-01-07T"), ImmutableMap.of("output2", "e", "rows2", 5, "imps", 5, "impers2", 5),
|
||||
new DateTime("2011-01-07T01"), ImmutableMap.of("output2", "e", "rows2", 5, "imps", 5, "impers2", 5),
|
||||
new DateTime("2011-01-08T"), ImmutableMap.of("output2", "f", "rows2", 6, "imps", 6, "impers2", 6),
|
||||
new DateTime("2011-01-08T01"), ImmutableMap.of("output2", "f", "rows2", 6, "imps", 6, "impers2", 6),
|
||||
new DateTime("2011-01-09T"), ImmutableMap.of("output2", "g", "rows2", 7, "imps", 7, "impers2", 7),
|
||||
new DateTime("2011-01-09T01"), ImmutableMap.of("output2", "g", "rows2", 7, "imps", 7, "impers2", 7)
|
||||
new DateTime("2011-01-05T"), ImmutableMap.of("output2", "c", "rows", 3, "imps", 3, "impers2", 3),
|
||||
new DateTime("2011-01-05T01"), ImmutableMap.of("output2", "c", "rows", 3, "imps", 3, "impers2", 3),
|
||||
new DateTime("2011-01-06T"), ImmutableMap.of("output2", "d", "rows", 4, "imps", 4, "impers2", 4),
|
||||
new DateTime("2011-01-06T01"), ImmutableMap.of("output2", "d", "rows", 4, "imps", 4, "impers2", 4),
|
||||
new DateTime("2011-01-07T"), ImmutableMap.of("output2", "e", "rows", 5, "imps", 5, "impers2", 5),
|
||||
new DateTime("2011-01-07T01"), ImmutableMap.of("output2", "e", "rows", 5, "imps", 5, "impers2", 5),
|
||||
new DateTime("2011-01-08T"), ImmutableMap.of("output2", "f", "rows", 6, "imps", 6, "impers2", 6),
|
||||
new DateTime("2011-01-08T01"), ImmutableMap.of("output2", "f", "rows", 6, "imps", 6, "impers2", 6),
|
||||
new DateTime("2011-01-09T"), ImmutableMap.of("output2", "g", "rows", 7, "imps", 7, "impers2", 7),
|
||||
new DateTime("2011-01-09T01"), ImmutableMap.of("output2", "g", "rows", 7, "imps", 7, "impers2", 7)
|
||||
),
|
||||
runner.run(
|
||||
builder.setInterval("2011-01-05/2011-01-10")
|
||||
|
|
Loading…
Reference in New Issue