From f5402169319737381d005ab9a69a84c3857d4ba1 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 7 Jan 2020 14:18:54 -0800 Subject: [PATCH] fix InputFormat serde issue with SeekableStream based supervisors (#9136) --- .../supervisor/KafkaSupervisorSpecTest.java | 176 ++++++++++++++++++ .../SeekableStreamSupervisorIOConfig.java | 2 +- 2 files changed, 177 insertions(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index 8e12c8f650c..40ec97a08ef 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -138,6 +138,93 @@ public class KafkaSupervisorSpecTest Assert.assertTrue(serialized.contains("\"tuningConfig\":{")); Assert.assertTrue(serialized.contains("\"indexSpec\":{")); Assert.assertTrue(serialized.contains("\"suspended\":false")); + Assert.assertTrue(serialized.contains("\"parser\":{")); + + KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class); + + String stable = mapper.writeValueAsString(spec2); + + Assert.assertEquals(serialized, stable); + } + + @Test + public void testSerdeWithInputFormat() throws IOException + { + String json = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topic\": \"metrics\",\n" + + " \"inputFormat\": {\n" + + " \"type\": \"json\",\n" + + " \"flattenSpec\": {\n" + + " \"useFieldDiscovery\": true,\n" + + " \"fields\": []\n" + + " },\n" + + " \"featureSpec\": {}\n" + + " }," + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + "}"; + KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class); + + Assert.assertNotNull(spec); + Assert.assertNotNull(spec.getDataSchema()); + Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); + Assert.assertNotNull(spec.getIoConfig()); + Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNotNull(spec.getTuningConfig()); + Assert.assertNull(spec.getContext()); + Assert.assertFalse(spec.isSuspended()); + String serialized = mapper.writeValueAsString(spec); + + // expect default values populated in reserialized string + Assert.assertTrue(serialized.contains("\"tuningConfig\":{")); + Assert.assertTrue(serialized.contains("\"indexSpec\":{")); + Assert.assertTrue(serialized.contains("\"suspended\":false")); + Assert.assertTrue(serialized.contains("\"inputFormat\":{")); KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class); @@ -223,6 +310,95 @@ public class KafkaSupervisorSpecTest Assert.assertTrue(serialized.contains("\"tuningConfig\":{")); Assert.assertTrue(serialized.contains("\"indexSpec\":{")); Assert.assertTrue(serialized.contains("\"suspended\":false")); + Assert.assertTrue(serialized.contains("\"parser\":{")); + + KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class); + + String stable = mapper.writeValueAsString(spec2); + + Assert.assertEquals(serialized, stable); + } + + @Test + public void testSerdeWithSpecAndInputFormat() throws IOException + { + String json = "{\n" + + " \"type\": \"kafka\",\n" + + " \"spec\": {\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topic\": \"metrics\",\n" + + " \"inputFormat\": {\n" + + " \"type\": \"json\",\n" + + " \"flattenSpec\": {\n" + + " \"useFieldDiscovery\": true,\n" + + " \"fields\": []\n" + + " },\n" + + " \"featureSpec\": {}\n" + + " }," + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + " }\n" + + "}"; + KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class); + + Assert.assertNotNull(spec); + Assert.assertNotNull(spec.getDataSchema()); + Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); + Assert.assertNotNull(spec.getIoConfig()); + Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNotNull(spec.getTuningConfig()); + Assert.assertNull(spec.getContext()); + Assert.assertFalse(spec.isSuspended()); + String serialized = mapper.writeValueAsString(spec); + + // expect default values populated in reserialized string + Assert.assertTrue(serialized.contains("\"tuningConfig\":{")); + Assert.assertTrue(serialized.contains("\"indexSpec\":{")); + Assert.assertTrue(serialized.contains("\"suspended\":false")); + Assert.assertTrue(serialized.contains("\"inputFormat\":{")); KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java index caa0abfd41e..8693d684a72 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java @@ -102,7 +102,7 @@ public abstract class SeekableStreamSupervisorIOConfig } @Nullable - @JsonProperty + @JsonProperty("inputFormat") private InputFormat getGivenInputFormat() { return inputFormat;