mirror of https://github.com/apache/druid.git
fail complex type 'serde' registration when registered type does not match expected type (#7985)
* make ComplexMetrics.registerSerde type check on register, resolves #7959 * add test * simplify * unused imports :/ * simplify * burned by imports yet again * remove unused constructor * switch to getName * heh oops
This commit is contained in:
parent
da3d141dd2
commit
abf9843e2a
|
@ -29,7 +29,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
|||
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
|
@ -149,7 +148,7 @@ public class FilterPartitionBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
|||
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
|
@ -145,7 +144,7 @@ public class FilteredAggregatorBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
|
||||
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.druid.collections.NonBlockingPool;
|
|||
import org.apache.druid.collections.StupidPool;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.Row;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
|
@ -265,7 +264,7 @@ public class GroupByTypeInterfaceBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT %d", System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
setupQueries();
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
|||
import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
|
||||
import org.apache.druid.collections.StupidPool;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
|
@ -233,7 +232,7 @@ public class TopNTypeInterfaceBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
setupQueries();
|
||||
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.druid.data.input.impl.DoubleDimensionSchema;
|
|||
import org.apache.druid.data.input.impl.FloatDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.LongDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.StringDimensionSchema;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
|
@ -79,7 +78,7 @@ public class SegmentGenerator implements Closeable
|
|||
)
|
||||
{
|
||||
// In case we need to generate hyperUniques.
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
final BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(
|
||||
schemaInfo.getColumnSchemas(),
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
|
|||
import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
|
@ -93,7 +92,7 @@ public class IncrementalIndexReadBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
|
|||
import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndex;
|
||||
|
@ -71,7 +70,7 @@ public class IndexIngestionBenchmark
|
|||
@Setup
|
||||
public void setup()
|
||||
{
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
rows = new ArrayList<InputRow>();
|
||||
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
|
|||
import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.math.expr.ExprMacroTable;
|
||||
|
@ -113,7 +112,7 @@ public class IndexMergeBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
indexesToMerge = new ArrayList<>();
|
||||
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
|
|||
import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
|
@ -99,7 +98,7 @@ public class IndexPersistBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
rows = new ArrayList<InputRow>();
|
||||
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.druid.collections.NonBlockingPool;
|
|||
import org.apache.druid.collections.StupidPool;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.Row;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
|
@ -353,7 +352,7 @@ public class GroupByBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
executorService = Execs.multiThreaded(numProcessingThreads, "GroupByThreadPool[%d]");
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
|||
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.Row;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
|
@ -244,7 +243,7 @@ public class ScanBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
executorService = Execs.multiThreaded(numProcessingThreads, "ScanThreadPool");
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
|||
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.Row;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
|
@ -314,7 +313,7 @@ public class SearchBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
executorService = Execs.multiThreaded(numSegments, "SearchThreadPool");
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
|||
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.Row;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
|
@ -175,7 +174,7 @@ public class SelectBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
executorService = Execs.multiThreaded(numSegments, "SelectThreadPool");
|
||||
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
|
|||
import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
|
@ -240,7 +239,7 @@ public class TimeseriesBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
executorService = Execs.multiThreaded(numSegments, "TimeseriesThreadPool");
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
|||
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import org.apache.druid.collections.StupidPool;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
|
@ -215,7 +214,7 @@ public class TopNBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
executorService = Execs.multiThreaded(numSegments, "TopNThreadPool");
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
|
|||
import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
|
||||
import org.apache.druid.collections.StupidPool;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
|
@ -286,7 +285,7 @@ public class TimeCompareBenchmark
|
|||
{
|
||||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
executorService = Execs.multiThreaded(numSegments, "TopNThreadPool");
|
||||
|
||||
|
|
|
@ -81,6 +81,6 @@ public class MomentSketchModule implements DruidModule
|
|||
@VisibleForTesting
|
||||
public static void registerSerde()
|
||||
{
|
||||
ComplexMetrics.registerSerde(MomentSketchAggregatorFactory.TYPE_NAME, MomentSketchComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(MomentSketchAggregatorFactory.TYPE_NAME, new MomentSketchComplexMetricSerde());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,6 @@ public class TDigestSketchModule implements DruidModule
|
|||
@VisibleForTesting
|
||||
static void registerSerde()
|
||||
{
|
||||
ComplexMetrics.registerSerde(TDigestBuildSketchAggregatorFactory.TYPE_NAME, TDigestSketchComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(TDigestBuildSketchAggregatorFactory.TYPE_NAME, new TDigestSketchComplexMetricSerde());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -72,8 +72,8 @@ public class HllSketchModule implements DruidModule
|
|||
@VisibleForTesting
|
||||
public static void registerSerde()
|
||||
{
|
||||
ComplexMetrics.registerSerde(TYPE_NAME, HllSketchMergeComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(BUILD_TYPE_NAME, HllSketchBuildComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(MERGE_TYPE_NAME, HllSketchMergeComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(TYPE_NAME, new HllSketchMergeComplexMetricSerde());
|
||||
ComplexMetrics.registerSerde(BUILD_TYPE_NAME, new HllSketchBuildComplexMetricSerde());
|
||||
ComplexMetrics.registerSerde(MERGE_TYPE_NAME, new HllSketchMergeComplexMetricSerde());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,6 +74,6 @@ public class DoublesSketchModule implements DruidModule
|
|||
@VisibleForTesting
|
||||
public static void registerSerde()
|
||||
{
|
||||
ComplexMetrics.registerSerde(DOUBLES_SKETCH, DoublesSketchComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(DOUBLES_SKETCH, new DoublesSketchComplexMetricSerde());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,8 +70,8 @@ public class SketchModule implements DruidModule
|
|||
@VisibleForTesting
|
||||
public static void registerSerde()
|
||||
{
|
||||
ComplexMetrics.registerSerde(THETA_SKETCH, SketchMergeComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(THETA_SKETCH_MERGE_AGG, SketchMergeComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(THETA_SKETCH_BUILD_AGG, SketchBuildComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(THETA_SKETCH, new SketchMergeComplexMetricSerde());
|
||||
ComplexMetrics.registerSerde(THETA_SKETCH_MERGE_AGG, new SketchMergeComplexMetricSerde());
|
||||
ComplexMetrics.registerSerde(THETA_SKETCH_BUILD_AGG, new SketchBuildComplexMetricSerde());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,12 +44,12 @@ public class OldApiSketchModule implements DruidModule
|
|||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
ComplexMetrics.registerSerde(SKETCH_BUILD, SketchBuildComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(SET_SKETCH, SketchMergeComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(SKETCH_MERGE, SketchMergeComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(SketchModule.THETA_SKETCH, SketchMergeComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(SketchModule.THETA_SKETCH_MERGE_AGG, SketchMergeComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(SketchModule.THETA_SKETCH_BUILD_AGG, SketchBuildComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(SKETCH_BUILD, new SketchBuildComplexMetricSerde());
|
||||
ComplexMetrics.registerSerde(SET_SKETCH, new SketchMergeComplexMetricSerde());
|
||||
ComplexMetrics.registerSerde(SKETCH_MERGE, new SketchMergeComplexMetricSerde());
|
||||
ComplexMetrics.registerSerde(SketchModule.THETA_SKETCH, new SketchMergeComplexMetricSerde());
|
||||
ComplexMetrics.registerSerde(SketchModule.THETA_SKETCH_MERGE_AGG, new SketchMergeComplexMetricSerde());
|
||||
ComplexMetrics.registerSerde(SketchModule.THETA_SKETCH_BUILD_AGG, new SketchBuildComplexMetricSerde());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -47,9 +47,9 @@ public class ArrayOfDoublesSketchModule implements DruidModule
|
|||
@Override
|
||||
public void configure(final Binder binder)
|
||||
{
|
||||
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH, ArrayOfDoublesSketchMergeComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG, ArrayOfDoublesSketchMergeComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG, ArrayOfDoublesSketchBuildComplexMetricSerde::new);
|
||||
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH, new ArrayOfDoublesSketchMergeComplexMetricSerde());
|
||||
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG, new ArrayOfDoublesSketchMergeComplexMetricSerde());
|
||||
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG, new ArrayOfDoublesSketchBuildComplexMetricSerde());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -52,7 +52,7 @@ public class BloomFilterSerializersModule extends SimpleModule
|
|||
addDeserializer(BloomKFilter.class, new BloomKFilterDeserializer());
|
||||
addDeserializer(BloomKFilterHolder.class, new BloomKFilterHolderDeserializer());
|
||||
|
||||
ComplexMetrics.registerSerde(BLOOM_FILTER_TYPE_NAME, BloomFilterSerde::new);
|
||||
ComplexMetrics.registerSerde(BLOOM_FILTER_TYPE_NAME, new BloomFilterSerde());
|
||||
}
|
||||
|
||||
private static class BloomKFilterSerializer extends StdSerializer<BloomKFilter>
|
||||
|
|
|
@ -66,7 +66,7 @@ public class ApproximateHistogramDruidModule implements DruidModule
|
|||
@VisibleForTesting
|
||||
public static void registerSerde()
|
||||
{
|
||||
ComplexMetrics.registerSerde("approximateHistogram", ApproximateHistogramFoldingSerde::new);
|
||||
ComplexMetrics.registerSerde(FixedBucketsHistogramAggregator.TYPE_NAME, FixedBucketsHistogramSerde::new);
|
||||
ComplexMetrics.registerSerde("approximateHistogram", new ApproximateHistogramFoldingSerde());
|
||||
ComplexMetrics.registerSerde(FixedBucketsHistogramAggregator.TYPE_NAME, new FixedBucketsHistogramSerde());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,6 +65,6 @@ public class DruidStatsModule implements DruidModule
|
|||
SqlBindings.addAggregator(binder, BaseVarianceSqlAggregator.StdDevSampSqlAggregator.class);
|
||||
SqlBindings.addAggregator(binder, BaseVarianceSqlAggregator.StdDevSqlAggregator.class);
|
||||
}
|
||||
ComplexMetrics.registerSerde("variance", VarianceSerde::new);
|
||||
ComplexMetrics.registerSerde("variance", new VarianceSerde());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.druid.jackson;
|
|||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
|
||||
|
@ -74,12 +73,9 @@ public class AggregatorsModule extends SimpleModule
|
|||
{
|
||||
super("AggregatorFactories");
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde(
|
||||
"preComputedHyperUnique",
|
||||
() -> new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault())
|
||||
);
|
||||
ComplexMetrics.registerSerde("serializablePairLongString", SerializablePairLongStringSerde::new);
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
ComplexMetrics.registerSerde("preComputedHyperUnique", new PreComputedHyperUniquesSerde());
|
||||
ComplexMetrics.registerSerde("serializablePairLongString", new SerializablePairLongStringSerde());
|
||||
|
||||
setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
|
||||
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
|
||||
|
|
|
@ -43,6 +43,11 @@ public class HyperUniquesSerde extends ComplexMetricSerde
|
|||
|
||||
private final HyperLogLogHash hyperLogLogHash;
|
||||
|
||||
public HyperUniquesSerde()
|
||||
{
|
||||
this(HyperLogLogHash.getDefault());
|
||||
}
|
||||
|
||||
public HyperUniquesSerde(HyperLogLogHash hyperLogLogHash)
|
||||
{
|
||||
this.hyperLogLogHash = hyperLogLogHash;
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.druid.query.aggregation.hyperloglog;
|
|||
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.hll.HyperLogLogCollector;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.segment.serde.ComplexMetricExtractor;
|
||||
|
@ -30,9 +29,9 @@ import java.nio.ByteBuffer;
|
|||
|
||||
public class PreComputedHyperUniquesSerde extends HyperUniquesSerde
|
||||
{
|
||||
public PreComputedHyperUniquesSerde(HyperLogLogHash hyperLogLogHash)
|
||||
public PreComputedHyperUniquesSerde()
|
||||
{
|
||||
super(hyperLogLogHash);
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.druid.java.util.common.ISE;
|
|||
import javax.annotation.Nullable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -38,13 +37,19 @@ public class ComplexMetrics
|
|||
return complexSerializers.get(type);
|
||||
}
|
||||
|
||||
public static void registerSerde(String type, Supplier<ComplexMetricSerde> serdeSupplier)
|
||||
public static void registerSerde(String type, ComplexMetricSerde serde)
|
||||
{
|
||||
if (ComplexMetrics.getSerdeForType(type) == null) {
|
||||
if (complexSerializers.containsKey(type)) {
|
||||
throw new ISE("Serializer for type[%s] already exists.", type);
|
||||
if (complexSerializers.containsKey(type)) {
|
||||
if (!complexSerializers.get(type).getClass().getName().equals(serde.getClass().getName())) {
|
||||
throw new ISE(
|
||||
"Incompatible serializer for type[%s] already exists. Expected [%s], found [%s].",
|
||||
type,
|
||||
serde.getClass().getName(),
|
||||
complexSerializers.get(type).getClass().getName()
|
||||
);
|
||||
}
|
||||
complexSerializers.put(type, serdeSupplier.get());
|
||||
} else {
|
||||
complexSerializers.put(type, serde);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.google.common.base.Function;
|
|||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.druid.data.input.MapBasedInputRow;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
|
@ -94,7 +93,7 @@ public class SchemalessIndexTest
|
|||
private static QueryableIndex mergedIndex = null;
|
||||
|
||||
static {
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
}
|
||||
|
||||
private final IndexMerger indexMerger;
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
|
|||
import org.apache.druid.data.input.impl.StringDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.StringInputRowParser;
|
||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||
import org.apache.druid.hll.HyperLogLogHash;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
|
@ -161,7 +160,7 @@ public class TestIndex
|
|||
private static final IndexIO INDEX_IO = TestHelper.getTestIndexIO();
|
||||
|
||||
static {
|
||||
ComplexMetrics.registerSerde("hyperUnique", () -> new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
}
|
||||
|
||||
private static Supplier<IncrementalIndex> realtimeIndex = Suppliers.memoize(
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.segment.serde;
|
||||
|
||||
import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
|
||||
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
public class ComplexMetricsTest
|
||||
{
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testRegister()
|
||||
{
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType("hyperUnique");
|
||||
Assert.assertNotNull(serde);
|
||||
Assert.assertTrue(serde instanceof HyperUniquesSerde);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegisterDuplicate()
|
||||
{
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType("hyperUnique");
|
||||
Assert.assertNotNull(serde);
|
||||
Assert.assertTrue(serde instanceof HyperUniquesSerde);
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
serde = ComplexMetrics.getSerdeForType("hyperUnique");
|
||||
Assert.assertNotNull(serde);
|
||||
Assert.assertTrue(serde instanceof HyperUniquesSerde);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConflicting()
|
||||
{
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
|
||||
|
||||
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType("hyperUnique");
|
||||
Assert.assertNotNull(serde);
|
||||
Assert.assertTrue(serde instanceof HyperUniquesSerde);
|
||||
|
||||
expectedException.expect(IllegalStateException.class);
|
||||
expectedException.expectMessage("Incompatible serializer for type[hyperUnique] already exists. Expected [org.apache.druid.query.aggregation.SerializablePairLongStringSerde], found [org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde");
|
||||
|
||||
ComplexMetrics.registerSerde("hyperUnique", new SerializablePairLongStringSerde());
|
||||
|
||||
serde = ComplexMetrics.getSerdeForType("hyperUnique");
|
||||
Assert.assertNotNull(serde);
|
||||
Assert.assertTrue(serde instanceof HyperUniquesSerde);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue