From d628bebbd7ca1f103227d3d73fc938f580e4f7c6 Mon Sep 17 00:00:00 2001 From: Surekha Date: Wed, 20 Nov 2019 10:04:41 -0800 Subject: [PATCH] Make supervisor API similar to submit task API (#8810) * accept spec or dataSchema, tuningConfig, ioConfig while submitting task json * fix test * update docs * lgtm warning * Add original constructor back to IndexTask to minimize changes * fix indentation in docs * Allow spec to be specified in supervisor schema * undo IndexTask spec changes * update docs * Add Nullable and deprecated annotations * remove deprecated configs from SeekableStreamSupervisorSpec * remove nullable annotation --- docs/tutorials/tutorial-kafka.md | 104 ++++++++-------- .../tutorial/wikipedia-kafka-supervisor.json | 113 ++++++++++-------- .../KafkaSupervisorIngestionSpec.java | 66 ++++++++++ .../kafka/supervisor/KafkaSupervisorSpec.java | 55 ++++----- .../KafkaSupervisorTuningConfig.java | 30 +++++ .../indexing/kafka/KafkaSamplerSpecTest.java | 1 + .../supervisor/KafkaSupervisorSpecTest.java | 85 +++++++++++++ .../kafka/supervisor/KafkaSupervisorTest.java | 3 + .../KinesisSupervisorIngestionSpec.java | 67 +++++++++++ .../supervisor/KinesisSupervisorSpec.java | 62 ++++------ .../KinesisSupervisorTuningConfig.java | 37 ++++++ .../kinesis/KinesisSamplerSpecTest.java | 1 + .../supervisor/KinesisSupervisorTest.java | 4 + ...SeekableStreamSupervisorIngestionSpec.java | 61 ++++++++++ .../SeekableStreamSupervisorSpec.java | 39 +++--- 15 files changed, 541 insertions(+), 187 deletions(-) create mode 100644 extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIngestionSpec.java create mode 100644 extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIngestionSpec.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIngestionSpec.java diff --git a/docs/tutorials/tutorial-kafka.md b/docs/tutorials/tutorial-kafka.md index e7502ba444c..6a7b8e30eb8 100644 --- a/docs/tutorials/tutorial-kafka.md +++ b/docs/tutorials/tutorial-kafka.md @@ -180,60 +180,62 @@ Paste in this spec and click `Submit`. ```json { "type": "kafka", - "dataSchema": { - "dataSource": "wikipedia", - "parser": { - "type": "string", - "parseSpec": { - "format": "json", - "timestampSpec": { - "column": "time", - "format": "auto" - }, - "dimensionsSpec": { - "dimensions": [ - "channel", - "cityName", - "comment", - "countryIsoCode", - "countryName", - "isAnonymous", - "isMinor", - "isNew", - "isRobot", - "isUnpatrolled", - "metroCode", - "namespace", - "page", - "regionIsoCode", - "regionName", - "user", - { "name": "added", "type": "long" }, - { "name": "deleted", "type": "long" }, - { "name": "delta", "type": "long" } - ] + "spec" : { + "dataSchema": { + "dataSource": "wikipedia", + "parser": { + "type": "string", + "parseSpec": { + "format": "json", + "timestampSpec": { + "column": "time", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "channel", + "cityName", + "comment", + "countryIsoCode", + "countryName", + "isAnonymous", + "isMinor", + "isNew", + "isRobot", + "isUnpatrolled", + "metroCode", + "namespace", + "page", + "regionIsoCode", + "regionName", + "user", + { "name": "added", "type": "long" }, + { "name": "deleted", "type": "long" }, + { "name": "delta", "type": "long" } + ] + } } + }, + "metricsSpec" : [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "NONE", + "rollup": false } }, - "metricsSpec" : [], - "granularitySpec": { - "type": "uniform", - "segmentGranularity": "DAY", - "queryGranularity": "NONE", - "rollup": false - } - }, - "tuningConfig": { - "type": "kafka", - "reportParseExceptions": false - }, - "ioConfig": { - "topic": "wikipedia", - "replicas": 2, - "taskDuration": "PT10M", - "completionTimeout": "PT20M", - "consumerProperties": { - "bootstrap.servers": "localhost:9092" + "tuningConfig": { + "type": "kafka", + "reportParseExceptions": false + }, + "ioConfig": { + "topic": "wikipedia", + "replicas": 2, + "taskDuration": "PT10M", + "completionTimeout": "PT20M", + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + } } } } diff --git a/examples/quickstart/tutorial/wikipedia-kafka-supervisor.json b/examples/quickstart/tutorial/wikipedia-kafka-supervisor.json index 2a56955c1e3..7c1e62c34f9 100644 --- a/examples/quickstart/tutorial/wikipedia-kafka-supervisor.json +++ b/examples/quickstart/tutorial/wikipedia-kafka-supervisor.json @@ -1,59 +1,70 @@ { "type": "kafka", - "dataSchema": { - "dataSource": "wikipedia", - "parser": { - "type": "string", - "parseSpec": { - "format": "json", - "timestampSpec": { - "column": "time", - "format": "auto" - }, - "dimensionsSpec": { - "dimensions": [ - "channel", - "cityName", - "comment", - "countryIsoCode", - "countryName", - "isAnonymous", - "isMinor", - "isNew", - "isRobot", - "isUnpatrolled", - "metroCode", - "namespace", - "page", - "regionIsoCode", - "regionName", - "user", - { "name": "added", "type": "long" }, - { "name": "deleted", "type": "long" }, - { "name": "delta", "type": "long" } - ] + "spec" : { + "dataSchema": { + "dataSource": "wikipedia", + "parser": { + "type": "string", + "parseSpec": { + "format": "json", + "timestampSpec": { + "column": "time", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "channel", + "cityName", + "comment", + "countryIsoCode", + "countryName", + "isAnonymous", + "isMinor", + "isNew", + "isRobot", + "isUnpatrolled", + "metroCode", + "namespace", + "page", + "regionIsoCode", + "regionName", + "user", + { + "name": "added", + "type": "long" + }, + { + "name": "deleted", + "type": "long" + }, + { + "name": "delta", + "type": "long" + } + ] + } } + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "NONE", + "rollup": false } }, - "metricsSpec" : [], - "granularitySpec": { - "type": "uniform", - "segmentGranularity": "DAY", - "queryGranularity": "NONE", - "rollup": false - } - }, - "tuningConfig": { - "type": "kafka", - "reportParseExceptions": false - }, - "ioConfig": { - "topic": "wikipedia", - "replicas": 1, - "taskDuration": "PT10M", - "completionTimeout": "PT20M", - "consumerProperties": { - "bootstrap.servers": "localhost:9092" + "tuningConfig": { + "type": "kafka", + "reportParseExceptions": false + }, + "ioConfig": { + "topic": "wikipedia", + "replicas": 1, + "taskDuration": "PT10M", + "completionTimeout": "PT20M", + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + } } } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIngestionSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIngestionSpec.java new file mode 100644 index 00000000000..bbc00639f83 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIngestionSpec.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka.supervisor; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec; +import org.apache.druid.segment.indexing.DataSchema; + +public class KafkaSupervisorIngestionSpec extends SeekableStreamSupervisorIngestionSpec +{ + private final DataSchema dataSchema; + private final KafkaSupervisorIOConfig ioConfig; + private final KafkaSupervisorTuningConfig tuningConfig; + + @JsonCreator + public KafkaSupervisorIngestionSpec( + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig, + @JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig + ) + { + super(dataSchema, ioConfig, tuningConfig); + this.dataSchema = dataSchema; + this.ioConfig = ioConfig; + this.tuningConfig = tuningConfig == null ? KafkaSupervisorTuningConfig.defaultConfig() : tuningConfig; + } + + @Override + @JsonProperty("dataSchema") + public DataSchema getDataSchema() + { + return dataSchema; + } + + @Override + @JsonProperty("ioConfig") + public KafkaSupervisorIOConfig getIOConfig() + { + return ioConfig; + } + + @Override + @JsonProperty("tuningConfig") + public KafkaSupervisorTuningConfig getTuningConfig() + { + return tuningConfig; + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 772a26ad07c..5d1f4bfad85 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import javax.annotation.Nullable; import java.util.Map; public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec @@ -44,9 +45,10 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec @JsonCreator public KafkaSupervisorSpec( - @JsonProperty("dataSchema") DataSchema dataSchema, - @JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig, - @JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig, + @JsonProperty("spec") @Nullable KafkaSupervisorIngestionSpec ingestionSchema, + @JsonProperty("dataSchema") @Nullable DataSchema dataSchema, + @JsonProperty("tuningConfig") @Nullable KafkaSupervisorTuningConfig tuningConfig, + @JsonProperty("ioConfig") @Nullable KafkaSupervisorIOConfig ioConfig, @JsonProperty("context") Map context, @JsonProperty("suspended") Boolean suspended, @JacksonInject TaskStorage taskStorage, @@ -61,36 +63,15 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec ) { super( - dataSchema, - tuningConfig != null - ? tuningConfig - : new KafkaSupervisorTuningConfig( - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null + ingestionSchema != null + ? ingestionSchema + : new KafkaSupervisorIngestionSpec( + dataSchema, + ioConfig, + tuningConfig != null + ? tuningConfig + : KafkaSupervisorTuningConfig.defaultConfig() ), - ioConfig, context, suspended, taskStorage, @@ -132,6 +113,7 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec } @Override + @Deprecated @JsonProperty public KafkaSupervisorTuningConfig getTuningConfig() { @@ -139,16 +121,25 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec } @Override + @Deprecated @JsonProperty public KafkaSupervisorIOConfig getIoConfig() { return (KafkaSupervisorIOConfig) super.getIoConfig(); } + @Override + @JsonProperty + public KafkaSupervisorIngestionSpec getSpec() + { + return (KafkaSupervisorIngestionSpec) super.getSpec(); + } + @Override protected KafkaSupervisorSpec toggleSuspend(boolean suspend) { return new KafkaSupervisorSpec( + getSpec(), getDataSchema(), getTuningConfig(), getIoConfig(), diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 5d5592afacf..5c00988e860 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -43,6 +43,36 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig private final Duration shutdownTimeout; private final Duration offsetFetchPeriod; + public static KafkaSupervisorTuningConfig defaultConfig() + { + return new KafkaSupervisorTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } + public KafkaSupervisorTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index 629f0aa64f3..77fdd6ae99d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -137,6 +137,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest insertData(generateRecords(TOPIC)); KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec( + null, DATA_SCHEMA, null, new KafkaSupervisorIOConfig( diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index 1506d9894a2..8e12c8f650c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -146,6 +146,91 @@ public class KafkaSupervisorSpecTest Assert.assertEquals(serialized, stable); } + @Test + public void testSerdeWithSpec() throws IOException + { + String json = "{\n" + + " \"type\": \"kafka\",\n" + + " \"spec\": {\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topic\": \"metrics\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + " }\n" + + "}"; + KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class); + + Assert.assertNotNull(spec); + Assert.assertNotNull(spec.getDataSchema()); + Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); + Assert.assertNotNull(spec.getIoConfig()); + Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNotNull(spec.getTuningConfig()); + Assert.assertNull(spec.getContext()); + Assert.assertFalse(spec.isSuspended()); + String serialized = mapper.writeValueAsString(spec); + + // expect default values populated in reserialized string + Assert.assertTrue(serialized.contains("\"tuningConfig\":{")); + Assert.assertTrue(serialized.contains("\"indexSpec\":{")); + Assert.assertTrue(serialized.contains("\"suspended\":false")); + + KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class); + + String stable = mapper.writeValueAsString(spec2); + + Assert.assertEquals(serialized, stable); + } + @Test public void testSuspendResume() throws IOException { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index c379f7050fa..24603132c9e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -3453,6 +3453,7 @@ public class KafkaSupervisorTest extends EasyMockSupport taskClientFactory, OBJECT_MAPPER, new KafkaSupervisorSpec( + null, dataSchema, tuningConfig, kafkaSupervisorIOConfig, @@ -3561,6 +3562,7 @@ public class KafkaSupervisorTest extends EasyMockSupport taskClientFactory, OBJECT_MAPPER, new KafkaSupervisorSpec( + null, dataSchema, tuningConfig, kafkaSupervisorIOConfig, @@ -3646,6 +3648,7 @@ public class KafkaSupervisorTest extends EasyMockSupport taskClientFactory, OBJECT_MAPPER, new KafkaSupervisorSpec( + null, dataSchema, tuningConfig, kafkaSupervisorIOConfig, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIngestionSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIngestionSpec.java new file mode 100644 index 00000000000..70658910815 --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIngestionSpec.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kinesis.supervisor; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec; +import org.apache.druid.segment.indexing.DataSchema; + +public class KinesisSupervisorIngestionSpec extends SeekableStreamSupervisorIngestionSpec +{ + private final DataSchema dataSchema; + private final KinesisSupervisorIOConfig ioConfig; + private final KinesisSupervisorTuningConfig tuningConfig; + + @JsonCreator + public KinesisSupervisorIngestionSpec( + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("ioConfig") KinesisSupervisorIOConfig ioConfig, + @JsonProperty("tuningConfig") KinesisSupervisorTuningConfig tuningConfig + ) + { + super(dataSchema, ioConfig, tuningConfig); + this.dataSchema = dataSchema; + this.ioConfig = ioConfig; + this.tuningConfig = tuningConfig == null ? KinesisSupervisorTuningConfig.defaultConfig() : tuningConfig; + } + + @Override + @JsonProperty("dataSchema") + public DataSchema getDataSchema() + { + return dataSchema; + } + + @Override + @JsonProperty("ioConfig") + public KinesisSupervisorIOConfig getIOConfig() + { + return ioConfig; + } + + @Override + @JsonProperty("tuningConfig") + public KinesisSupervisorTuningConfig getTuningConfig() + { + return tuningConfig; + } +} + diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index fb5751b84c4..e9610c60970 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import javax.annotation.Nullable; import java.util.Map; public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec @@ -47,9 +48,10 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec @JsonCreator public KinesisSupervisorSpec( - @JsonProperty("dataSchema") DataSchema dataSchema, - @JsonProperty("tuningConfig") KinesisSupervisorTuningConfig tuningConfig, - @JsonProperty("ioConfig") KinesisSupervisorIOConfig ioConfig, + @JsonProperty("spec") @Nullable KinesisSupervisorIngestionSpec ingestionSchema, + @JsonProperty("dataSchema") @Nullable DataSchema dataSchema, + @JsonProperty("tuningConfig") @Nullable KinesisSupervisorTuningConfig tuningConfig, + @JsonProperty("ioConfig") @Nullable KinesisSupervisorIOConfig ioConfig, @JsonProperty("context") Map context, @JsonProperty("suspended") Boolean suspended, @JacksonInject TaskStorage taskStorage, @@ -65,43 +67,15 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec ) { super( - dataSchema, - tuningConfig != null - ? tuningConfig - : new KinesisSupervisorTuningConfig( - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null + ingestionSchema != null + ? ingestionSchema + : new KinesisSupervisorIngestionSpec( + dataSchema, + ioConfig, + tuningConfig != null + ? tuningConfig + : KinesisSupervisorTuningConfig.defaultConfig() ), - ioConfig, context, suspended, taskStorage, @@ -158,6 +132,7 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec } @Override + @Deprecated @JsonProperty public KinesisSupervisorTuningConfig getTuningConfig() { @@ -165,16 +140,25 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec } @Override + @Deprecated @JsonProperty public KinesisSupervisorIOConfig getIoConfig() { return (KinesisSupervisorIOConfig) super.getIoConfig(); } + @Override + @JsonProperty + public KinesisSupervisorIngestionSpec getSpec() + { + return (KinesisSupervisorIngestionSpec) super.getSpec(); + } + @Override protected KinesisSupervisorSpec toggleSuspend(boolean suspend) { return new KinesisSupervisorSpec( + getSpec(), getDataSchema(), getTuningConfig(), getIoConfig(), diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 91da145b1ad..bc3bbd2314a 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -40,6 +40,43 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig private final Duration shutdownTimeout; private final Duration repartitionTransitionDuration; + public static KinesisSupervisorTuningConfig defaultConfig() + { + return new KinesisSupervisorTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } + public KinesisSupervisorTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index 7f50834cf97..9ca804bea17 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -151,6 +151,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport replayAll(); KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec( + null, DATA_SCHEMA, null, new KinesisSupervisorIOConfig( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index b9a1ed6e01a..c3eba4b368a 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -4677,6 +4677,7 @@ public class KinesisSupervisorTest extends EasyMockSupport taskClientFactory, OBJECT_MAPPER, new KinesisSupervisorSpec( + null, dataSchema, tuningConfig, kinesisSupervisorIOConfig, @@ -4780,6 +4781,7 @@ public class KinesisSupervisorTest extends EasyMockSupport taskClientFactory, OBJECT_MAPPER, new KinesisSupervisorSpec( + null, dataSchema, tuningConfig, kinesisSupervisorIOConfig, @@ -4865,6 +4867,7 @@ public class KinesisSupervisorTest extends EasyMockSupport taskClientFactory, OBJECT_MAPPER, new KinesisSupervisorSpec( + null, dataSchema, tuningConfig, kinesisSupervisorIOConfig, @@ -4952,6 +4955,7 @@ public class KinesisSupervisorTest extends EasyMockSupport taskClientFactory, OBJECT_MAPPER, new KinesisSupervisorSpec( + null, dataSchema, tuningConfig, kinesisSupervisorIOConfig, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIngestionSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIngestionSpec.java new file mode 100644 index 00000000000..6fbf917b774 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIngestionSpec.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.segment.indexing.DataSchema; + +public abstract class SeekableStreamSupervisorIngestionSpec +{ + private final DataSchema dataSchema; + private final SeekableStreamSupervisorIOConfig ioConfig; + private final SeekableStreamSupervisorTuningConfig tuningConfig; + + @JsonCreator + public SeekableStreamSupervisorIngestionSpec( + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("ioConfig") SeekableStreamSupervisorIOConfig ioConfig, + @JsonProperty("tuningConfig") SeekableStreamSupervisorTuningConfig tuningConfig + ) + { + this.dataSchema = dataSchema; + this.ioConfig = ioConfig; + this.tuningConfig = tuningConfig; + } + + @JsonProperty("dataSchema") + public DataSchema getDataSchema() + { + return dataSchema; + } + + @JsonProperty("ioConfig") + public SeekableStreamSupervisorIOConfig getIOConfig() + { + return ioConfig; + } + + @JsonProperty("tuningConfig") + public SeekableStreamSupervisorTuningConfig getTuningConfig() + { + return tuningConfig; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 469821ac8e6..ad26c209b4f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -44,15 +44,24 @@ import java.util.Map; public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec { + + private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema( + SeekableStreamSupervisorIngestionSpec ingestionSchema + ) + { + Preconditions.checkNotNull(ingestionSchema, "ingestionSchema"); + Preconditions.checkNotNull(ingestionSchema.getDataSchema(), "dataSchema"); + Preconditions.checkNotNull(ingestionSchema.getIOConfig(), "ioConfig"); + return ingestionSchema; + } + protected final TaskStorage taskStorage; protected final TaskMaster taskMaster; protected final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; protected final SeekableStreamIndexTaskClientFactory indexTaskClientFactory; protected final ObjectMapper mapper; protected final RowIngestionMetersFactory rowIngestionMetersFactory; - private final DataSchema dataSchema; - private final SeekableStreamSupervisorTuningConfig tuningConfig; - private final SeekableStreamSupervisorIOConfig ioConfig; + private final SeekableStreamSupervisorIngestionSpec ingestionSchema; @Nullable private final Map context; protected final ServiceEmitter emitter; @@ -62,9 +71,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec @JsonCreator public SeekableStreamSupervisorSpec( - @JsonProperty("dataSchema") DataSchema dataSchema, - @JsonProperty("tuningConfig") SeekableStreamSupervisorTuningConfig tuningConfig, - @JsonProperty("ioConfig") SeekableStreamSupervisorIOConfig ioConfig, + @JsonProperty("spec") final SeekableStreamSupervisorIngestionSpec ingestionSchema, @JsonProperty("context") @Nullable Map context, @JsonProperty("suspended") Boolean suspended, @JacksonInject TaskStorage taskStorage, @@ -78,9 +85,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig ) { - this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema"); - this.tuningConfig = tuningConfig; // null check done in concrete class - this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); + this.ingestionSchema = checkIngestionSchema(ingestionSchema); this.context = context; this.taskStorage = taskStorage; @@ -95,22 +100,29 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec this.supervisorStateManagerConfig = supervisorStateManagerConfig; } + @JsonProperty + public SeekableStreamSupervisorIngestionSpec getSpec() + { + return ingestionSchema; + } + + @Deprecated @JsonProperty public DataSchema getDataSchema() { - return dataSchema; + return ingestionSchema.getDataSchema(); } @JsonProperty public SeekableStreamSupervisorTuningConfig getTuningConfig() { - return tuningConfig; + return ingestionSchema.getTuningConfig(); } @JsonProperty public SeekableStreamSupervisorIOConfig getIoConfig() { - return ioConfig; + return ingestionSchema.getIOConfig(); } @Nullable @@ -128,7 +140,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec @Override public String getId() { - return dataSchema.getDataSource(); + return ingestionSchema.getDataSchema().getDataSource(); } public DruidMonitorSchedulerConfig getMonitorSchedulerConfig() @@ -171,5 +183,4 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend); - }