DoublesSketchModule: Fix serde for DoublesSketchMergeAggregatorFactory. (#5587)

Fixes #5580.
This commit is contained in:
Gian Merlino 2018-04-07 06:45:55 -07:00 committed by Slim
parent ad6f234e1e
commit 685f4063d4
2 changed files with 72 additions and 27 deletions

View File

@ -35,6 +35,7 @@ public class DoublesSketchModule implements DruidModule
{ {
public static final String DOUBLES_SKETCH = "quantilesDoublesSketch"; public static final String DOUBLES_SKETCH = "quantilesDoublesSketch";
public static final String DOUBLES_SKETCH_MERGE = "quantilesDoublesSketchMerge";
public static final String DOUBLES_SKETCH_HISTOGRAM_POST_AGG = "quantilesDoublesSketchToHistogram"; public static final String DOUBLES_SKETCH_HISTOGRAM_POST_AGG = "quantilesDoublesSketchToHistogram";
public static final String DOUBLES_SKETCH_QUANTILE_POST_AGG = "quantilesDoublesSketchToQuantile"; public static final String DOUBLES_SKETCH_QUANTILE_POST_AGG = "quantilesDoublesSketchToQuantile";
@ -55,6 +56,7 @@ public class DoublesSketchModule implements DruidModule
return Arrays.<Module> asList( return Arrays.<Module> asList(
new SimpleModule("DoublesQuantilesSketchModule").registerSubtypes( new SimpleModule("DoublesQuantilesSketchModule").registerSubtypes(
new NamedType(DoublesSketchAggregatorFactory.class, DOUBLES_SKETCH), new NamedType(DoublesSketchAggregatorFactory.class, DOUBLES_SKETCH),
new NamedType(DoublesSketchMergeAggregatorFactory.class, DOUBLES_SKETCH_MERGE),
new NamedType(DoublesSketchToHistogramPostAggregator.class, DOUBLES_SKETCH_HISTOGRAM_POST_AGG), new NamedType(DoublesSketchToHistogramPostAggregator.class, DOUBLES_SKETCH_HISTOGRAM_POST_AGG),
new NamedType(DoublesSketchToQuantilePostAggregator.class, DOUBLES_SKETCH_QUANTILE_POST_AGG), new NamedType(DoublesSketchToQuantilePostAggregator.class, DOUBLES_SKETCH_QUANTILE_POST_AGG),
new NamedType(DoublesSketchToQuantilesPostAggregator.class, DOUBLES_SKETCH_QUANTILES_POST_AGG), new NamedType(DoublesSketchToQuantilesPostAggregator.class, DOUBLES_SKETCH_QUANTILES_POST_AGG),

View File

@ -27,6 +27,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequence;
import io.druid.query.aggregation.AggregationTestHelper; import io.druid.query.aggregation.AggregationTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryRunnerTest; import io.druid.query.groupby.GroupByQueryRunnerTest;
import org.junit.Assert; import org.junit.Assert;
@ -58,7 +59,8 @@ public class DoublesSketchAggregatorTest
module.getJacksonModules(), config, tempFolder); module.getJacksonModules(), config, tempFolder);
timeSeriesHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper( timeSeriesHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(
module.getJacksonModules(), module.getJacksonModules(),
tempFolder); tempFolder
);
} }
@Parameterized.Parameters(name = "{0}") @Parameterized.Parameters(name = "{0}")
@ -66,7 +68,7 @@ public class DoublesSketchAggregatorTest
{ {
final List<Object[]> constructors = Lists.newArrayList(); final List<Object[]> constructors = Lists.newArrayList();
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
constructors.add(new Object[] {config}); constructors.add(new Object[]{config});
} }
return constructors; return constructors;
} }
@ -76,11 +78,29 @@ public class DoublesSketchAggregatorTest
public void serializeDeserializeFactoryWithFieldName() throws Exception public void serializeDeserializeFactoryWithFieldName() throws Exception
{ {
ObjectMapper objectMapper = new DefaultObjectMapper(); ObjectMapper objectMapper = new DefaultObjectMapper();
new DoublesSketchModule().getJacksonModules().forEach(objectMapper::registerModule);
DoublesSketchAggregatorFactory factory = new DoublesSketchAggregatorFactory("name", "filedName", 128); DoublesSketchAggregatorFactory factory = new DoublesSketchAggregatorFactory("name", "filedName", 128);
DoublesSketchAggregatorFactory other = objectMapper.readValue( AggregatorFactory other = objectMapper.readValue(
objectMapper.writeValueAsString(factory), objectMapper.writeValueAsString(factory),
DoublesSketchAggregatorFactory.class); AggregatorFactory.class
);
Assert.assertEquals(factory, other);
}
// this is to test Json properties and equals for the combining factory
@Test
public void serializeDeserializeCombiningFactoryWithFieldName() throws Exception
{
ObjectMapper objectMapper = new DefaultObjectMapper();
new DoublesSketchModule().getJacksonModules().forEach(objectMapper::registerModule);
DoublesSketchAggregatorFactory factory = new DoublesSketchMergeAggregatorFactory("name", 128);
AggregatorFactory other = objectMapper.readValue(
objectMapper.writeValueAsString(factory),
AggregatorFactory.class
);
Assert.assertEquals(factory, other); Assert.assertEquals(factory, other);
} }
@ -90,7 +110,8 @@ public class DoublesSketchAggregatorTest
{ {
Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment( Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("quantiles/doubles_sketch_data.tsv").getFile()), new File(this.getClass().getClassLoader().getResource("quantiles/doubles_sketch_data.tsv").getFile()),
String.join("\n", String.join(
"\n",
"{", "{",
" \"type\": \"string\",", " \"type\": \"string\",",
" \"parseSpec\": {", " \"parseSpec\": {",
@ -103,16 +124,20 @@ public class DoublesSketchAggregatorTest
" },", " },",
" \"columns\": [\"timestamp\", \"product\", \"sketch\"]", " \"columns\": [\"timestamp\", \"product\", \"sketch\"]",
" }", " }",
"}"), "}"
String.join("\n", ),
String.join(
"\n",
"[", "[",
" {\"type\": \"quantilesDoublesSketch\", \"name\": \"sketch\", \"fieldName\": \"sketch\", \"k\": 128},", " {\"type\": \"quantilesDoublesSketch\", \"name\": \"sketch\", \"fieldName\": \"sketch\", \"k\": 128},",
" {\"type\": \"quantilesDoublesSketch\", \"name\": \"non_existent_sketch\", \"fieldName\": \"non_existent_sketch\", \"k\": 128}", " {\"type\": \"quantilesDoublesSketch\", \"name\": \"non_existent_sketch\", \"fieldName\": \"non_existent_sketch\", \"k\": 128}",
"]"), "]"
),
0, // minTimestamp 0, // minTimestamp
Granularities.NONE, Granularities.NONE,
10, // maxRowCount 10, // maxRowCount
String.join("\n", String.join(
"\n",
"{", "{",
" \"queryType\": \"groupBy\",", " \"queryType\": \"groupBy\",",
" \"dataSource\": \"test_datasource\",", " \"dataSource\": \"test_datasource\",",
@ -127,7 +152,9 @@ public class DoublesSketchAggregatorTest
" {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": \"histogram\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}", " {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": \"histogram\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}",
" ],", " ],",
" \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]", " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
"}")); "}"
)
);
List<Row> results = seq.toList(); List<Row> results = seq.toList();
Assert.assertEquals(1, results.size()); Assert.assertEquals(1, results.size());
Row row = results.get(0); Row row = results.get(0);
@ -155,8 +182,8 @@ public class DoublesSketchAggregatorTest
Assert.assertTrue(histogramObject instanceof double[]); Assert.assertTrue(histogramObject instanceof double[]);
double[] histogram = (double[]) histogramObject; double[] histogram = (double[]) histogramObject;
for (final double bin : histogram) { for (final double bin : histogram) {
Assert.assertEquals(100, bin, 100 * 0.2); // 400 items uniformly // 400 items uniformly distributed into 4 bins
// distributed into 4 bins Assert.assertEquals(100, bin, 100 * 0.2);
} }
} }
@ -165,7 +192,8 @@ public class DoublesSketchAggregatorTest
{ {
Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment( Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()), new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()),
String.join("\n", String.join(
"\n",
"{", "{",
" \"type\": \"string\",", " \"type\": \"string\",",
" \"parseSpec\": {", " \"parseSpec\": {",
@ -178,12 +206,14 @@ public class DoublesSketchAggregatorTest
" },", " },",
" \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]", " \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]",
" }", " }",
"}"), "}"
),
"[{\"type\": \"quantilesDoublesSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"k\": 128}]", "[{\"type\": \"quantilesDoublesSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"k\": 128}]",
0, // minTimestamp 0, // minTimestamp
Granularities.NONE, Granularities.NONE,
10, // maxRowCount 10, // maxRowCount
String.join("\n", String.join(
"\n",
"{", "{",
" \"queryType\": \"groupBy\",", " \"queryType\": \"groupBy\",",
" \"dataSource\": \"test_datasource\",", " \"dataSource\": \"test_datasource\",",
@ -198,7 +228,9 @@ public class DoublesSketchAggregatorTest
" {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": \"histogram\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}", " {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": \"histogram\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}",
" ],", " ],",
" \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]", " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
"}")); "}"
)
);
List<Row> results = seq.toList(); List<Row> results = seq.toList();
Assert.assertEquals(1, results.size()); Assert.assertEquals(1, results.size());
Row row = results.get(0); Row row = results.get(0);
@ -223,7 +255,7 @@ public class DoublesSketchAggregatorTest
Assert.assertEquals(4, histogram.length); Assert.assertEquals(4, histogram.length);
for (final double bin : histogram) { for (final double bin : histogram) {
Assert.assertEquals(100, bin, 100 * 0.2); // 400 items uniformly Assert.assertEquals(100, bin, 100 * 0.2); // 400 items uniformly
// distributed into 4 bins // distributed into 4 bins
} }
} }
@ -232,7 +264,8 @@ public class DoublesSketchAggregatorTest
{ {
Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment( Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()), new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()),
String.join("\n", String.join(
"\n",
"{", "{",
" \"type\": \"string\",", " \"type\": \"string\",",
" \"parseSpec\": {", " \"parseSpec\": {",
@ -245,12 +278,14 @@ public class DoublesSketchAggregatorTest
" },", " },",
" \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]", " \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]",
" }", " }",
"}"), "}"
),
"[{\"type\": \"doubleSum\", \"name\": \"value\", \"fieldName\": \"value\"}]", "[{\"type\": \"doubleSum\", \"name\": \"value\", \"fieldName\": \"value\"}]",
0, // minTimestamp 0, // minTimestamp
Granularities.NONE, Granularities.NONE,
10, // maxRowCount 10, // maxRowCount
String.join("\n", String.join(
"\n",
"{", "{",
" \"queryType\": \"groupBy\",", " \"queryType\": \"groupBy\",",
" \"dataSource\": \"test_datasource\",", " \"dataSource\": \"test_datasource\",",
@ -265,7 +300,9 @@ public class DoublesSketchAggregatorTest
" {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": \"histogram\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}", " {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": \"histogram\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}",
" ],", " ],",
" \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]", " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
"}")); "}"
)
);
List<Row> results = seq.toList(); List<Row> results = seq.toList();
Assert.assertEquals(1, results.size()); Assert.assertEquals(1, results.size());
Row row = results.get(0); Row row = results.get(0);
@ -294,7 +331,7 @@ public class DoublesSketchAggregatorTest
double[] histogram = (double[]) histogramObject; double[] histogram = (double[]) histogramObject;
for (final double bin : histogram) { for (final double bin : histogram) {
Assert.assertEquals(100, bin, 100 * 0.2); // 400 items uniformly Assert.assertEquals(100, bin, 100 * 0.2); // 400 items uniformly
// distributed into 4 bins // distributed into 4 bins
} }
} }
@ -317,7 +354,8 @@ public class DoublesSketchAggregatorTest
" },", " },",
" \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]", " \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]",
" }", " }",
"}"), "}"
),
"[{\"type\": \"doubleSum\", \"name\": \"value\", \"fieldName\": \"value\"}]", "[{\"type\": \"doubleSum\", \"name\": \"value\", \"fieldName\": \"value\"}]",
0, // minTimestamp 0, // minTimestamp
Granularities.NONE, Granularities.NONE,
@ -338,7 +376,9 @@ public class DoublesSketchAggregatorTest
" {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": \"histogram\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}", " {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": \"histogram\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}",
" ],", " ],",
" \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]", " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
"}")); "}"
)
);
List<Row> results = seq.toList(); List<Row> results = seq.toList();
Assert.assertEquals(1, results.size()); Assert.assertEquals(1, results.size());
Row row = results.get(0); Row row = results.get(0);
@ -367,7 +407,7 @@ public class DoublesSketchAggregatorTest
double[] histogram = (double[]) histogramObject; double[] histogram = (double[]) histogramObject;
for (final double bin : histogram) { for (final double bin : histogram) {
Assert.assertEquals(100, bin, 100 * 0.2); // 400 items uniformly Assert.assertEquals(100, bin, 100 * 0.2); // 400 items uniformly
// distributed into 4 bins // distributed into 4 bins
} }
} }
@ -390,7 +430,8 @@ public class DoublesSketchAggregatorTest
" },", " },",
" \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]", " \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]",
" }", " }",
"}"), "}"
),
"[{\"type\": \"doubleSum\", \"name\": \"value\", \"fieldName\": \"value\"}]", "[{\"type\": \"doubleSum\", \"name\": \"value\", \"fieldName\": \"value\"}]",
0, // minTimestamp 0, // minTimestamp
Granularities.NONE, Granularities.NONE,
@ -410,7 +451,9 @@ public class DoublesSketchAggregatorTest
" {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": \"histogram1\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}", " {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": \"histogram1\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}",
" ],", " ],",
" \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]", " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
"}")); "}"
)
);
List<Row> results = seq.toList(); List<Row> results = seq.toList();
Assert.assertEquals(1, results.size()); Assert.assertEquals(1, results.size());
} }