diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index ecf6c6118f9..925b904c494 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -108,6 +108,7 @@ public class DetermineHashedPartitionsJob implements Jobby JobHelper.setupClasspath(config, groupByJob); config.addInputPaths(groupByJob); + config.addJobProperties(groupByJob); config.intoConfiguration(groupByJob); FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir()); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index b76b3b5cf3c..b05f400b13d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -138,6 +138,7 @@ public class DeterminePartitionsJob implements Jobby JobHelper.setupClasspath(config, groupByJob); config.addInputPaths(groupByJob); + config.addJobProperties(groupByJob); config.intoConfiguration(groupByJob); FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir()); @@ -187,6 +188,7 @@ public class DeterminePartitionsJob implements Jobby dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size()); JobHelper.setupClasspath(config, dimSelectionJob); + config.addJobProperties(dimSelectionJob); config.intoConfiguration(dimSelectionJob); FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath()); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 881e21e4f78..d83eba87f40 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -129,6 +129,7 @@ public class HadoopDruidIndexerConfig private volatile DataRollupSpec rollupSpec; private volatile DbUpdaterJobSpec updaterJobSpec; private volatile boolean ignoreInvalidRows; + private volatile Map jobProperties; @JsonCreator public HadoopDruidIndexerConfig( @@ -148,6 +149,7 @@ public class HadoopDruidIndexerConfig final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec, final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec, final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, + final @JsonProperty("jobProperties") Map jobProperties, // These fields are deprecated and will be removed in the future final @JsonProperty("timestampColumn") String timestampColumn, final @JsonProperty("timestampFormat") String timestampFormat, @@ -171,6 +173,9 @@ public class HadoopDruidIndexerConfig this.rollupSpec = rollupSpec; this.updaterJobSpec = updaterJobSpec; this.ignoreInvalidRows = ignoreInvalidRows; + this.jobProperties = (jobProperties == null + ? ImmutableMap.of() + : ImmutableMap.copyOf(jobProperties)); if (partitionsSpec != null) { Preconditions.checkArgument( @@ -380,6 +385,17 @@ public class HadoopDruidIndexerConfig this.ignoreInvalidRows = ignoreInvalidRows; } + @JsonProperty + public Map getJobProperties() + { + return jobProperties; + } + + public void setJobProperties(Map jobProperties) + { + this.jobProperties = jobProperties; + } + public Optional> getIntervals() { Optional> setOptional = getGranularitySpec().bucketIntervals(); @@ -613,6 +629,15 @@ public class HadoopDruidIndexerConfig ); } + public void addJobProperties(Job job) + { + Configuration conf = job.getConfiguration(); + + for (final Map.Entry entry : jobProperties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + } + public void intoConfiguration(Job job) { Configuration conf = job.getConfiguration(); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfigBuilder.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfigBuilder.java index 5e7adc2ac82..ff6af0a839e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfigBuilder.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfigBuilder.java @@ -46,7 +46,8 @@ public class HadoopDruidIndexerConfigBuilder return HadoopDruidIndexerConfig.jsonMapper.convertValue(schema, HadoopDruidIndexerConfig.class); } - public static HadoopDruidIndexerSchema toSchema(HadoopDruidIndexerConfig config){ + public static HadoopDruidIndexerSchema toSchema(HadoopDruidIndexerConfig config) + { return HadoopDruidIndexerConfig.jsonMapper.convertValue(config, HadoopDruidIndexerSchema.class); } @@ -62,8 +63,8 @@ public class HadoopDruidIndexerConfigBuilder return fromMap( (Map) HadoopDruidIndexerConfig.jsonMapper.readValue( file, new TypeReference>() - { - } + { + } ) ); } @@ -79,8 +80,8 @@ public class HadoopDruidIndexerConfigBuilder return fromMap( (Map) HadoopDruidIndexerConfig.jsonMapper.readValue( str, new TypeReference>() - { - } + { + } ) ); } @@ -112,6 +113,7 @@ public class HadoopDruidIndexerConfigBuilder private volatile DataRollupSpec rollupSpec; private volatile DbUpdaterJobSpec updaterJobSpec; private volatile boolean ignoreInvalidRows; + private volatile Map jobProperties; public HadoopDruidIndexerConfigBuilder() { @@ -131,6 +133,7 @@ public class HadoopDruidIndexerConfigBuilder this.rollupSpec = null; this.updaterJobSpec = null; this.ignoreInvalidRows = false; + this.jobProperties = ImmutableMap.of(); } public HadoopDruidIndexerConfigBuilder withDataSource(String dataSource) @@ -230,6 +233,12 @@ public class HadoopDruidIndexerConfigBuilder return this; } + public HadoopDruidIndexerConfigBuilder withJobProperties(Map jobProperties) + { + this.jobProperties = ImmutableMap.copyOf(jobProperties); + return this; + } + public HadoopDruidIndexerConfigBuilder withSchema(HadoopDruidIndexerSchema schema) { this.dataSource = schema.getDataSource(); @@ -248,6 +257,7 @@ public class HadoopDruidIndexerConfigBuilder this.rollupSpec = schema.getRollupSpec(); this.updaterJobSpec = schema.getUpdaterJobSpec(); this.ignoreInvalidRows = schema.isIgnoreInvalidRows(); + this.jobProperties = schema.getJobProperties(); return this; } @@ -271,6 +281,7 @@ public class HadoopDruidIndexerConfigBuilder rollupSpec, updaterJobSpec, ignoreInvalidRows, + jobProperties, null, null, null, diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerSchema.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerSchema.java index 34d6b2c65b1..1a123b82d6e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerSchema.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerSchema.java @@ -53,6 +53,7 @@ public class HadoopDruidIndexerSchema private final DataRollupSpec rollupSpec; private final DbUpdaterJobSpec updaterJobSpec; private final boolean ignoreInvalidRows; + private final Map jobProperties; @JsonCreator public HadoopDruidIndexerSchema( @@ -72,11 +73,11 @@ public class HadoopDruidIndexerSchema final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec, final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec, final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, + final @JsonProperty("jobProperties") Map jobProperties, // These fields are deprecated and will be removed in the future final @JsonProperty("timestampColumn") String timestampColumn, final @JsonProperty("timestampFormat") String timestampFormat - - ) + ) { this.dataSource = dataSource; this.timestampSpec = (timestampSpec == null) ? new TimestampSpec(timestampColumn, timestampFormat) : timestampSpec; @@ -94,6 +95,9 @@ public class HadoopDruidIndexerSchema this.rollupSpec = rollupSpec; this.updaterJobSpec = updaterJobSpec; this.ignoreInvalidRows = ignoreInvalidRows; + this.jobProperties = (jobProperties == null + ? ImmutableMap.of() + : ImmutableMap.copyOf(jobProperties)); } @JsonProperty @@ -191,4 +195,10 @@ public class HadoopDruidIndexerSchema { return ignoreInvalidRows; } + + @JsonProperty + public Map getJobProperties() + { + return jobProperties; + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 7e3ccbe437e..6ac90396608 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -163,6 +163,7 @@ public class IndexGeneratorJob implements Jobby FileOutputFormat.setOutputPath(job, config.makeIntermediatePath()); config.addInputPaths(job); + config.addJobProperties(job); config.intoConfiguration(job); JobHelper.setupClasspath(config, job); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 5d88d97de48..dc385809bc4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -398,6 +398,7 @@ public class TaskSerdeTest new DataRollupSpec(ImmutableList.of(), QueryGranularity.NONE), null, false, + ImmutableMap.of("foo", "bar"), null, null ), @@ -414,5 +415,6 @@ public class TaskSerdeTest Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getSchema().getJobProperties(), task2.getSchema().getJobProperties()); } }