diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 16ae15610da..4f017b96ce6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -67,6 +67,14 @@ public class HadoopIndexTask extends AbstractTask public static String DEFAULT_HADOOP_COORDINATES = "org.apache.hadoop:hadoop-client:2.3.0"; + private static String getTheDataSource(HadoopIngestionSpec spec, HadoopIngestionSpec config) + { + if (spec != null) { + return spec.getDataSchema().getDataSource(); + } + return config.getDataSchema().getDataSource(); + } + @JsonIgnore private final HadoopIngestionSpec spec; @JsonIgnore @@ -74,36 +82,39 @@ public class HadoopIndexTask extends AbstractTask /** * @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters - * for creating Druid index segments. It may be modified. - *

- * Here, we will ensure that the DbConnectorConfig field of the spec is set to null, such that the - * job does not push a list of published segments the database. Instead, we will use the method - * IndexGeneratorJob.getPublishedSegments() to simply return a list of the published - * segments, and let the indexing service report these segments to the database. + * for creating Druid index segments. It may be modified. + *

+ * Here, we will ensure that the DbConnectorConfig field of the spec is set to null, such that the + * job does not push a list of published segments the database. Instead, we will use the method + * IndexGeneratorJob.getPublishedSegments() to simply return a list of the published + * segments, and let the indexing service report these segments to the database. */ @JsonCreator public HadoopIndexTask( @JsonProperty("id") String id, @JsonProperty("spec") HadoopIngestionSpec spec, + @JsonProperty("config") HadoopIngestionSpec config, // backwards compat @JsonProperty("hadoopCoordinates") String hadoopCoordinates, @JsonProperty("hadoopDependencyCoordinates") List hadoopDependencyCoordinates ) { super( - id != null ? id : String.format("index_hadoop_%s_%s", spec.getDataSchema().getDataSource(), new DateTime()), - spec.getDataSchema().getDataSource() + id != null ? id : String.format("index_hadoop_%s_%s", getTheDataSource(spec, config), new DateTime()), + getTheDataSource(spec, config) ); + + this.spec = spec == null ? config : spec; + // Some HadoopIngestionSpec stuff doesn't make sense in the context of the indexing service Preconditions.checkArgument( - spec.getIOConfig().getSegmentOutputPath() == null, + this.spec.getIOConfig().getSegmentOutputPath() == null, "segmentOutputPath must be absent" ); - Preconditions.checkArgument(spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent"); - Preconditions.checkArgument(spec.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent"); + Preconditions.checkArgument(this.spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent"); + Preconditions.checkArgument(this.spec.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent"); - this.spec = spec; this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.asList( hadoopCoordinates == null ? DEFAULT_HADOOP_COORDINATES : hadoopCoordinates ) : hadoopDependencyCoordinates; @@ -131,7 +142,7 @@ public class HadoopIndexTask extends AbstractTask } } - @JsonProperty("schema") + @JsonProperty("spec") public HadoopIngestionSpec getSpec() { return spec; @@ -194,7 +205,7 @@ public class HadoopIndexTask extends AbstractTask String config = (String) determineConfigurationMainMethod.invoke(null, new Object[]{determineConfigArgs}); HadoopIngestionSpec indexerSchema = toolbox.getObjectMapper() - .readValue(config, HadoopIngestionSpec.class); + .readValue(config, HadoopIngestionSpec.class); // We should have a lock from before we started running only if interval was specified @@ -247,10 +258,10 @@ public class HadoopIndexTask extends AbstractTask String version = args[1]; final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.jsonMapper - .readValue( - schema, - HadoopIngestionSpec.class - ); + .readValue( + schema, + HadoopIngestionSpec.class + ); final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema( theSchema .withTuningConfig(theSchema.getTuningConfig().withVersion(version)) @@ -276,10 +287,10 @@ public class HadoopIndexTask extends AbstractTask final String segmentOutputPath = args[2]; final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.jsonMapper - .readValue( - schema, - HadoopIngestionSpec.class - ); + .readValue( + schema, + HadoopIngestionSpec.class + ); final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema( theSchema .withIOConfig(theSchema.getIOConfig().withSegmentOutputPath(segmentOutputPath)) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 92cd788f077..f0feae9490a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -396,6 +396,7 @@ public class TaskSerdeTest public void testHadoopIndexTaskSerde() throws Exception { final HadoopIndexTask task = new HadoopIndexTask( + null, null, new HadoopIngestionSpec( null, null, null, diff --git a/pom.xml b/pom.xml index b3af8857786..5d0d585e754 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ scm:git:ssh://git@github.com/metamx/druid.git scm:git:ssh://git@github.com/metamx/druid.git http://www.github.com/metamx/druid - druid-0.6.100-SNAPSHOT + druid-0.6.107-SNAPSHOT @@ -41,7 +41,7 @@ UTF-8 0.25.5 2.4.0 - 0.2.0-SNAPSHOT + 0.2.0