Make supervisor API similar to submit task API (#8810)

* accept spec or dataSchema, tuningConfig, ioConfig while submitting task json

* fix test

* update docs

* lgtm warning

* Add original constructor back to IndexTask to minimize changes

* fix indentation in docs

* Allow spec to be specified in supervisor schema

* undo IndexTask spec changes

* update docs

* Add Nullable and deprecated annotations

* remove deprecated configs from SeekableStreamSupervisorSpec

* remove nullable annotation
This commit is contained in:
Surekha 2019-11-20 10:04:41 -08:00 committed by Jihoon Son
parent ee8f048381
commit d628bebbd7
15 changed files with 541 additions and 187 deletions

View File

@ -180,6 +180,7 @@ Paste in this spec and click `Submit`.
```json ```json
{ {
"type": "kafka", "type": "kafka",
"spec" : {
"dataSchema": { "dataSchema": {
"dataSource": "wikipedia", "dataSource": "wikipedia",
"parser": { "parser": {
@ -236,6 +237,7 @@ Paste in this spec and click `Submit`.
"bootstrap.servers": "localhost:9092" "bootstrap.servers": "localhost:9092"
} }
} }
}
} }
``` ```

View File

@ -1,5 +1,6 @@
{ {
"type": "kafka", "type": "kafka",
"spec" : {
"dataSchema": { "dataSchema": {
"dataSource": "wikipedia", "dataSource": "wikipedia",
"parser": { "parser": {
@ -28,14 +29,23 @@
"regionIsoCode", "regionIsoCode",
"regionName", "regionName",
"user", "user",
{ "name": "added", "type": "long" }, {
{ "name": "deleted", "type": "long" }, "name": "added",
{ "name": "delta", "type": "long" } "type": "long"
},
{
"name": "deleted",
"type": "long"
},
{
"name": "delta",
"type": "long"
}
] ]
} }
} }
}, },
"metricsSpec" : [], "metricsSpec": [],
"granularitySpec": { "granularitySpec": {
"type": "uniform", "type": "uniform",
"segmentGranularity": "DAY", "segmentGranularity": "DAY",
@ -56,4 +66,5 @@
"bootstrap.servers": "localhost:9092" "bootstrap.servers": "localhost:9092"
} }
} }
}
} }

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
import org.apache.druid.segment.indexing.DataSchema;
public class KafkaSupervisorIngestionSpec extends SeekableStreamSupervisorIngestionSpec
{
private final DataSchema dataSchema;
private final KafkaSupervisorIOConfig ioConfig;
private final KafkaSupervisorTuningConfig tuningConfig;
@JsonCreator
public KafkaSupervisorIngestionSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig,
@JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig
)
{
super(dataSchema, ioConfig, tuningConfig);
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null ? KafkaSupervisorTuningConfig.defaultConfig() : tuningConfig;
}
@Override
@JsonProperty("dataSchema")
public DataSchema getDataSchema()
{
return dataSchema;
}
@Override
@JsonProperty("ioConfig")
public KafkaSupervisorIOConfig getIOConfig()
{
return ioConfig;
}
@Override
@JsonProperty("tuningConfig")
public KafkaSupervisorTuningConfig getTuningConfig()
{
return tuningConfig;
}
}

View File

@ -36,6 +36,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import javax.annotation.Nullable;
import java.util.Map; import java.util.Map;
public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
@ -44,9 +45,10 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
@JsonCreator @JsonCreator
public KafkaSupervisorSpec( public KafkaSupervisorSpec(
@JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("spec") @Nullable KafkaSupervisorIngestionSpec ingestionSchema,
@JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig, @JsonProperty("dataSchema") @Nullable DataSchema dataSchema,
@JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig, @JsonProperty("tuningConfig") @Nullable KafkaSupervisorTuningConfig tuningConfig,
@JsonProperty("ioConfig") @Nullable KafkaSupervisorIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context, @JsonProperty("context") Map<String, Object> context,
@JsonProperty("suspended") Boolean suspended, @JsonProperty("suspended") Boolean suspended,
@JacksonInject TaskStorage taskStorage, @JacksonInject TaskStorage taskStorage,
@ -61,36 +63,15 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
) )
{ {
super( super(
ingestionSchema != null
? ingestionSchema
: new KafkaSupervisorIngestionSpec(
dataSchema, dataSchema,
ioConfig,
tuningConfig != null tuningConfig != null
? tuningConfig ? tuningConfig
: new KafkaSupervisorTuningConfig( : KafkaSupervisorTuningConfig.defaultConfig()
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
), ),
ioConfig,
context, context,
suspended, suspended,
taskStorage, taskStorage,
@ -132,6 +113,7 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
} }
@Override @Override
@Deprecated
@JsonProperty @JsonProperty
public KafkaSupervisorTuningConfig getTuningConfig() public KafkaSupervisorTuningConfig getTuningConfig()
{ {
@ -139,16 +121,25 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
} }
@Override @Override
@Deprecated
@JsonProperty @JsonProperty
public KafkaSupervisorIOConfig getIoConfig() public KafkaSupervisorIOConfig getIoConfig()
{ {
return (KafkaSupervisorIOConfig) super.getIoConfig(); return (KafkaSupervisorIOConfig) super.getIoConfig();
} }
@Override
@JsonProperty
public KafkaSupervisorIngestionSpec getSpec()
{
return (KafkaSupervisorIngestionSpec) super.getSpec();
}
@Override @Override
protected KafkaSupervisorSpec toggleSuspend(boolean suspend) protected KafkaSupervisorSpec toggleSuspend(boolean suspend)
{ {
return new KafkaSupervisorSpec( return new KafkaSupervisorSpec(
getSpec(),
getDataSchema(), getDataSchema(),
getTuningConfig(), getTuningConfig(),
getIoConfig(), getIoConfig(),

View File

@ -43,6 +43,36 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
private final Duration shutdownTimeout; private final Duration shutdownTimeout;
private final Duration offsetFetchPeriod; private final Duration offsetFetchPeriod;
public static KafkaSupervisorTuningConfig defaultConfig()
{
return new KafkaSupervisorTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
}
public KafkaSupervisorTuningConfig( public KafkaSupervisorTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,

View File

@ -137,6 +137,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
insertData(generateRecords(TOPIC)); insertData(generateRecords(TOPIC));
KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec( KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
null,
DATA_SCHEMA, DATA_SCHEMA,
null, null,
new KafkaSupervisorIOConfig( new KafkaSupervisorIOConfig(

View File

@ -146,6 +146,91 @@ public class KafkaSupervisorSpecTest
Assert.assertEquals(serialized, stable); Assert.assertEquals(serialized, stable);
} }
@Test
public void testSerdeWithSpec() throws IOException
{
String json = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"spec\": {\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"metrics-kafka\",\n"
+ " \"parser\": {\n"
+ " \"type\": \"string\",\n"
+ " \"parseSpec\": {\n"
+ " \"format\": \"json\",\n"
+ " \"timestampSpec\": {\n"
+ " \"column\": \"timestamp\",\n"
+ " \"format\": \"auto\"\n"
+ " },\n"
+ " \"dimensionsSpec\": {\n"
+ " \"dimensions\": [],\n"
+ " \"dimensionExclusions\": [\n"
+ " \"timestamp\",\n"
+ " \"value\"\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"metricsSpec\": [\n"
+ " {\n"
+ " \"name\": \"count\",\n"
+ " \"type\": \"count\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_sum\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleSum\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_min\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMin\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_max\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMax\"\n"
+ " }\n"
+ " ],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"HOUR\",\n"
+ " \"queryGranularity\": \"NONE\"\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"topic\": \"metrics\",\n"
+ " \"consumerProperties\": {\n"
+ " \"bootstrap.servers\": \"localhost:9092\"\n"
+ " },\n"
+ " \"taskCount\": 1\n"
+ " }\n"
+ " }\n"
+ "}";
KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class);
Assert.assertNotNull(spec);
Assert.assertNotNull(spec.getDataSchema());
Assert.assertEquals(4, spec.getDataSchema().getAggregators().length);
Assert.assertNotNull(spec.getIoConfig());
Assert.assertEquals("metrics", spec.getIoConfig().getTopic());
Assert.assertNotNull(spec.getTuningConfig());
Assert.assertNull(spec.getContext());
Assert.assertFalse(spec.isSuspended());
String serialized = mapper.writeValueAsString(spec);
// expect default values populated in reserialized string
Assert.assertTrue(serialized.contains("\"tuningConfig\":{"));
Assert.assertTrue(serialized.contains("\"indexSpec\":{"));
Assert.assertTrue(serialized.contains("\"suspended\":false"));
KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class);
String stable = mapper.writeValueAsString(spec2);
Assert.assertEquals(serialized, stable);
}
@Test @Test
public void testSuspendResume() throws IOException public void testSuspendResume() throws IOException
{ {

View File

@ -3453,6 +3453,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskClientFactory, taskClientFactory,
OBJECT_MAPPER, OBJECT_MAPPER,
new KafkaSupervisorSpec( new KafkaSupervisorSpec(
null,
dataSchema, dataSchema,
tuningConfig, tuningConfig,
kafkaSupervisorIOConfig, kafkaSupervisorIOConfig,
@ -3561,6 +3562,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskClientFactory, taskClientFactory,
OBJECT_MAPPER, OBJECT_MAPPER,
new KafkaSupervisorSpec( new KafkaSupervisorSpec(
null,
dataSchema, dataSchema,
tuningConfig, tuningConfig,
kafkaSupervisorIOConfig, kafkaSupervisorIOConfig,
@ -3646,6 +3648,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskClientFactory, taskClientFactory,
OBJECT_MAPPER, OBJECT_MAPPER,
new KafkaSupervisorSpec( new KafkaSupervisorSpec(
null,
dataSchema, dataSchema,
tuningConfig, tuningConfig,
kafkaSupervisorIOConfig, kafkaSupervisorIOConfig,

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kinesis.supervisor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
import org.apache.druid.segment.indexing.DataSchema;
public class KinesisSupervisorIngestionSpec extends SeekableStreamSupervisorIngestionSpec
{
private final DataSchema dataSchema;
private final KinesisSupervisorIOConfig ioConfig;
private final KinesisSupervisorTuningConfig tuningConfig;
@JsonCreator
public KinesisSupervisorIngestionSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") KinesisSupervisorIOConfig ioConfig,
@JsonProperty("tuningConfig") KinesisSupervisorTuningConfig tuningConfig
)
{
super(dataSchema, ioConfig, tuningConfig);
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null ? KinesisSupervisorTuningConfig.defaultConfig() : tuningConfig;
}
@Override
@JsonProperty("dataSchema")
public DataSchema getDataSchema()
{
return dataSchema;
}
@Override
@JsonProperty("ioConfig")
public KinesisSupervisorIOConfig getIOConfig()
{
return ioConfig;
}
@Override
@JsonProperty("tuningConfig")
public KinesisSupervisorTuningConfig getTuningConfig()
{
return tuningConfig;
}
}

View File

@ -38,6 +38,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import javax.annotation.Nullable;
import java.util.Map; import java.util.Map;
public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
@ -47,9 +48,10 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
@JsonCreator @JsonCreator
public KinesisSupervisorSpec( public KinesisSupervisorSpec(
@JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("spec") @Nullable KinesisSupervisorIngestionSpec ingestionSchema,
@JsonProperty("tuningConfig") KinesisSupervisorTuningConfig tuningConfig, @JsonProperty("dataSchema") @Nullable DataSchema dataSchema,
@JsonProperty("ioConfig") KinesisSupervisorIOConfig ioConfig, @JsonProperty("tuningConfig") @Nullable KinesisSupervisorTuningConfig tuningConfig,
@JsonProperty("ioConfig") @Nullable KinesisSupervisorIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context, @JsonProperty("context") Map<String, Object> context,
@JsonProperty("suspended") Boolean suspended, @JsonProperty("suspended") Boolean suspended,
@JacksonInject TaskStorage taskStorage, @JacksonInject TaskStorage taskStorage,
@ -65,43 +67,15 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
) )
{ {
super( super(
ingestionSchema != null
? ingestionSchema
: new KinesisSupervisorIngestionSpec(
dataSchema, dataSchema,
ioConfig,
tuningConfig != null tuningConfig != null
? tuningConfig ? tuningConfig
: new KinesisSupervisorTuningConfig( : KinesisSupervisorTuningConfig.defaultConfig()
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
), ),
ioConfig,
context, context,
suspended, suspended,
taskStorage, taskStorage,
@ -158,6 +132,7 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
} }
@Override @Override
@Deprecated
@JsonProperty @JsonProperty
public KinesisSupervisorTuningConfig getTuningConfig() public KinesisSupervisorTuningConfig getTuningConfig()
{ {
@ -165,16 +140,25 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
} }
@Override @Override
@Deprecated
@JsonProperty @JsonProperty
public KinesisSupervisorIOConfig getIoConfig() public KinesisSupervisorIOConfig getIoConfig()
{ {
return (KinesisSupervisorIOConfig) super.getIoConfig(); return (KinesisSupervisorIOConfig) super.getIoConfig();
} }
@Override
@JsonProperty
public KinesisSupervisorIngestionSpec getSpec()
{
return (KinesisSupervisorIngestionSpec) super.getSpec();
}
@Override @Override
protected KinesisSupervisorSpec toggleSuspend(boolean suspend) protected KinesisSupervisorSpec toggleSuspend(boolean suspend)
{ {
return new KinesisSupervisorSpec( return new KinesisSupervisorSpec(
getSpec(),
getDataSchema(), getDataSchema(),
getTuningConfig(), getTuningConfig(),
getIoConfig(), getIoConfig(),

View File

@ -40,6 +40,43 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
private final Duration shutdownTimeout; private final Duration shutdownTimeout;
private final Duration repartitionTransitionDuration; private final Duration repartitionTransitionDuration;
public static KinesisSupervisorTuningConfig defaultConfig()
{
return new KinesisSupervisorTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
}
public KinesisSupervisorTuningConfig( public KinesisSupervisorTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,

View File

@ -151,6 +151,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
replayAll(); replayAll();
KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec( KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec(
null,
DATA_SCHEMA, DATA_SCHEMA,
null, null,
new KinesisSupervisorIOConfig( new KinesisSupervisorIOConfig(

View File

@ -4677,6 +4677,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
taskClientFactory, taskClientFactory,
OBJECT_MAPPER, OBJECT_MAPPER,
new KinesisSupervisorSpec( new KinesisSupervisorSpec(
null,
dataSchema, dataSchema,
tuningConfig, tuningConfig,
kinesisSupervisorIOConfig, kinesisSupervisorIOConfig,
@ -4780,6 +4781,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
taskClientFactory, taskClientFactory,
OBJECT_MAPPER, OBJECT_MAPPER,
new KinesisSupervisorSpec( new KinesisSupervisorSpec(
null,
dataSchema, dataSchema,
tuningConfig, tuningConfig,
kinesisSupervisorIOConfig, kinesisSupervisorIOConfig,
@ -4865,6 +4867,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
taskClientFactory, taskClientFactory,
OBJECT_MAPPER, OBJECT_MAPPER,
new KinesisSupervisorSpec( new KinesisSupervisorSpec(
null,
dataSchema, dataSchema,
tuningConfig, tuningConfig,
kinesisSupervisorIOConfig, kinesisSupervisorIOConfig,
@ -4952,6 +4955,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
taskClientFactory, taskClientFactory,
OBJECT_MAPPER, OBJECT_MAPPER,
new KinesisSupervisorSpec( new KinesisSupervisorSpec(
null,
dataSchema, dataSchema,
tuningConfig, tuningConfig,
kinesisSupervisorIOConfig, kinesisSupervisorIOConfig,

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream.supervisor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.indexing.DataSchema;
public abstract class SeekableStreamSupervisorIngestionSpec
{
private final DataSchema dataSchema;
private final SeekableStreamSupervisorIOConfig ioConfig;
private final SeekableStreamSupervisorTuningConfig tuningConfig;
@JsonCreator
public SeekableStreamSupervisorIngestionSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") SeekableStreamSupervisorIOConfig ioConfig,
@JsonProperty("tuningConfig") SeekableStreamSupervisorTuningConfig tuningConfig
)
{
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig;
}
@JsonProperty("dataSchema")
public DataSchema getDataSchema()
{
return dataSchema;
}
@JsonProperty("ioConfig")
public SeekableStreamSupervisorIOConfig getIOConfig()
{
return ioConfig;
}
@JsonProperty("tuningConfig")
public SeekableStreamSupervisorTuningConfig getTuningConfig()
{
return tuningConfig;
}
}

View File

@ -44,15 +44,24 @@ import java.util.Map;
public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
{ {
private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema(
SeekableStreamSupervisorIngestionSpec ingestionSchema
)
{
Preconditions.checkNotNull(ingestionSchema, "ingestionSchema");
Preconditions.checkNotNull(ingestionSchema.getDataSchema(), "dataSchema");
Preconditions.checkNotNull(ingestionSchema.getIOConfig(), "ioConfig");
return ingestionSchema;
}
protected final TaskStorage taskStorage; protected final TaskStorage taskStorage;
protected final TaskMaster taskMaster; protected final TaskMaster taskMaster;
protected final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; protected final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
protected final SeekableStreamIndexTaskClientFactory indexTaskClientFactory; protected final SeekableStreamIndexTaskClientFactory indexTaskClientFactory;
protected final ObjectMapper mapper; protected final ObjectMapper mapper;
protected final RowIngestionMetersFactory rowIngestionMetersFactory; protected final RowIngestionMetersFactory rowIngestionMetersFactory;
private final DataSchema dataSchema; private final SeekableStreamSupervisorIngestionSpec ingestionSchema;
private final SeekableStreamSupervisorTuningConfig tuningConfig;
private final SeekableStreamSupervisorIOConfig ioConfig;
@Nullable @Nullable
private final Map<String, Object> context; private final Map<String, Object> context;
protected final ServiceEmitter emitter; protected final ServiceEmitter emitter;
@ -62,9 +71,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
@JsonCreator @JsonCreator
public SeekableStreamSupervisorSpec( public SeekableStreamSupervisorSpec(
@JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("spec") final SeekableStreamSupervisorIngestionSpec ingestionSchema,
@JsonProperty("tuningConfig") SeekableStreamSupervisorTuningConfig tuningConfig,
@JsonProperty("ioConfig") SeekableStreamSupervisorIOConfig ioConfig,
@JsonProperty("context") @Nullable Map<String, Object> context, @JsonProperty("context") @Nullable Map<String, Object> context,
@JsonProperty("suspended") Boolean suspended, @JsonProperty("suspended") Boolean suspended,
@JacksonInject TaskStorage taskStorage, @JacksonInject TaskStorage taskStorage,
@ -78,9 +85,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
@JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig
) )
{ {
this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema"); this.ingestionSchema = checkIngestionSchema(ingestionSchema);
this.tuningConfig = tuningConfig; // null check done in concrete class
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
this.context = context; this.context = context;
this.taskStorage = taskStorage; this.taskStorage = taskStorage;
@ -95,22 +100,29 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
this.supervisorStateManagerConfig = supervisorStateManagerConfig; this.supervisorStateManagerConfig = supervisorStateManagerConfig;
} }
@JsonProperty
public SeekableStreamSupervisorIngestionSpec getSpec()
{
return ingestionSchema;
}
@Deprecated
@JsonProperty @JsonProperty
public DataSchema getDataSchema() public DataSchema getDataSchema()
{ {
return dataSchema; return ingestionSchema.getDataSchema();
} }
@JsonProperty @JsonProperty
public SeekableStreamSupervisorTuningConfig getTuningConfig() public SeekableStreamSupervisorTuningConfig getTuningConfig()
{ {
return tuningConfig; return ingestionSchema.getTuningConfig();
} }
@JsonProperty @JsonProperty
public SeekableStreamSupervisorIOConfig getIoConfig() public SeekableStreamSupervisorIOConfig getIoConfig()
{ {
return ioConfig; return ingestionSchema.getIOConfig();
} }
@Nullable @Nullable
@ -128,7 +140,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
@Override @Override
public String getId() public String getId()
{ {
return dataSchema.getDataSource(); return ingestionSchema.getDataSchema().getDataSource();
} }
public DruidMonitorSchedulerConfig getMonitorSchedulerConfig() public DruidMonitorSchedulerConfig getMonitorSchedulerConfig()
@ -171,5 +183,4 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend); protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend);
} }