fix InputFormat serde issue with SeekableStream based supervisors (#9136)

This commit is contained in:
Clint Wylie 2020-01-07 14:18:54 -08:00 committed by Jonathan Wei
parent c248e00984
commit f540216931
2 changed files with 177 additions and 1 deletions

View File

@ -138,6 +138,93 @@ public class KafkaSupervisorSpecTest
Assert.assertTrue(serialized.contains("\"tuningConfig\":{"));
Assert.assertTrue(serialized.contains("\"indexSpec\":{"));
Assert.assertTrue(serialized.contains("\"suspended\":false"));
Assert.assertTrue(serialized.contains("\"parser\":{"));
KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class);
String stable = mapper.writeValueAsString(spec2);
Assert.assertEquals(serialized, stable);
}
@Test
public void testSerdeWithInputFormat() throws IOException
{
String json = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"metrics-kafka\",\n"
+ " \"timestampSpec\": {\n"
+ " \"column\": \"timestamp\",\n"
+ " \"format\": \"auto\"\n"
+ " },\n"
+ " \"dimensionsSpec\": {\n"
+ " \"dimensions\": [],\n"
+ " \"dimensionExclusions\": [\n"
+ " \"timestamp\",\n"
+ " \"value\"\n"
+ " ]\n"
+ " },\n"
+ " \"metricsSpec\": [\n"
+ " {\n"
+ " \"name\": \"count\",\n"
+ " \"type\": \"count\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_sum\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleSum\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_min\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMin\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_max\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMax\"\n"
+ " }\n"
+ " ],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"HOUR\",\n"
+ " \"queryGranularity\": \"NONE\"\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"topic\": \"metrics\",\n"
+ " \"inputFormat\": {\n"
+ " \"type\": \"json\",\n"
+ " \"flattenSpec\": {\n"
+ " \"useFieldDiscovery\": true,\n"
+ " \"fields\": []\n"
+ " },\n"
+ " \"featureSpec\": {}\n"
+ " },"
+ " \"consumerProperties\": {\n"
+ " \"bootstrap.servers\": \"localhost:9092\"\n"
+ " },\n"
+ " \"taskCount\": 1\n"
+ " }\n"
+ "}";
KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class);
Assert.assertNotNull(spec);
Assert.assertNotNull(spec.getDataSchema());
Assert.assertEquals(4, spec.getDataSchema().getAggregators().length);
Assert.assertNotNull(spec.getIoConfig());
Assert.assertEquals("metrics", spec.getIoConfig().getTopic());
Assert.assertNotNull(spec.getTuningConfig());
Assert.assertNull(spec.getContext());
Assert.assertFalse(spec.isSuspended());
String serialized = mapper.writeValueAsString(spec);
// expect default values populated in reserialized string
Assert.assertTrue(serialized.contains("\"tuningConfig\":{"));
Assert.assertTrue(serialized.contains("\"indexSpec\":{"));
Assert.assertTrue(serialized.contains("\"suspended\":false"));
Assert.assertTrue(serialized.contains("\"inputFormat\":{"));
KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class);
@ -223,6 +310,95 @@ public class KafkaSupervisorSpecTest
Assert.assertTrue(serialized.contains("\"tuningConfig\":{"));
Assert.assertTrue(serialized.contains("\"indexSpec\":{"));
Assert.assertTrue(serialized.contains("\"suspended\":false"));
Assert.assertTrue(serialized.contains("\"parser\":{"));
KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class);
String stable = mapper.writeValueAsString(spec2);
Assert.assertEquals(serialized, stable);
}
@Test
public void testSerdeWithSpecAndInputFormat() throws IOException
{
String json = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"spec\": {\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"metrics-kafka\",\n"
+ " \"timestampSpec\": {\n"
+ " \"column\": \"timestamp\",\n"
+ " \"format\": \"auto\"\n"
+ " },\n"
+ " \"dimensionsSpec\": {\n"
+ " \"dimensions\": [],\n"
+ " \"dimensionExclusions\": [\n"
+ " \"timestamp\",\n"
+ " \"value\"\n"
+ " ]\n"
+ " },\n"
+ " \"metricsSpec\": [\n"
+ " {\n"
+ " \"name\": \"count\",\n"
+ " \"type\": \"count\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_sum\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleSum\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_min\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMin\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_max\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMax\"\n"
+ " }\n"
+ " ],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"HOUR\",\n"
+ " \"queryGranularity\": \"NONE\"\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"topic\": \"metrics\",\n"
+ " \"inputFormat\": {\n"
+ " \"type\": \"json\",\n"
+ " \"flattenSpec\": {\n"
+ " \"useFieldDiscovery\": true,\n"
+ " \"fields\": []\n"
+ " },\n"
+ " \"featureSpec\": {}\n"
+ " },"
+ " \"consumerProperties\": {\n"
+ " \"bootstrap.servers\": \"localhost:9092\"\n"
+ " },\n"
+ " \"taskCount\": 1\n"
+ " }\n"
+ " }\n"
+ "}";
KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class);
Assert.assertNotNull(spec);
Assert.assertNotNull(spec.getDataSchema());
Assert.assertEquals(4, spec.getDataSchema().getAggregators().length);
Assert.assertNotNull(spec.getIoConfig());
Assert.assertEquals("metrics", spec.getIoConfig().getTopic());
Assert.assertNotNull(spec.getTuningConfig());
Assert.assertNull(spec.getContext());
Assert.assertFalse(spec.isSuspended());
String serialized = mapper.writeValueAsString(spec);
// expect default values populated in reserialized string
Assert.assertTrue(serialized.contains("\"tuningConfig\":{"));
Assert.assertTrue(serialized.contains("\"indexSpec\":{"));
Assert.assertTrue(serialized.contains("\"suspended\":false"));
Assert.assertTrue(serialized.contains("\"inputFormat\":{"));
KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class);

View File

@ -102,7 +102,7 @@ public abstract class SeekableStreamSupervisorIOConfig
}
@Nullable
@JsonProperty
@JsonProperty("inputFormat")
private InputFormat getGivenInputFormat()
{
return inputFormat;