Allow config-based overriding of hadoop job properties.

This commit is contained in:
Gian Merlino 2014-05-06 09:11:01 -07:00
parent a747ed0011
commit bdf9e74a3b
7 changed files with 59 additions and 7 deletions

View File

@ -108,6 +108,7 @@ public class DetermineHashedPartitionsJob implements Jobby
JobHelper.setupClasspath(config, groupByJob);
config.addInputPaths(groupByJob);
config.addJobProperties(groupByJob);
config.intoConfiguration(groupByJob);
FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir());

View File

@ -138,6 +138,7 @@ public class DeterminePartitionsJob implements Jobby
JobHelper.setupClasspath(config, groupByJob);
config.addInputPaths(groupByJob);
config.addJobProperties(groupByJob);
config.intoConfiguration(groupByJob);
FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir());
@ -187,6 +188,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
JobHelper.setupClasspath(config, dimSelectionJob);
config.addJobProperties(dimSelectionJob);
config.intoConfiguration(dimSelectionJob);
FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath());

View File

@ -129,6 +129,7 @@ public class HadoopDruidIndexerConfig
private volatile DataRollupSpec rollupSpec;
private volatile DbUpdaterJobSpec updaterJobSpec;
private volatile boolean ignoreInvalidRows;
private volatile Map<String, String> jobProperties;
@JsonCreator
public HadoopDruidIndexerConfig(
@ -148,6 +149,7 @@ public class HadoopDruidIndexerConfig
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
// These fields are deprecated and will be removed in the future
final @JsonProperty("timestampColumn") String timestampColumn,
final @JsonProperty("timestampFormat") String timestampFormat,
@ -171,6 +173,9 @@ public class HadoopDruidIndexerConfig
this.rollupSpec = rollupSpec;
this.updaterJobSpec = updaterJobSpec;
this.ignoreInvalidRows = ignoreInvalidRows;
this.jobProperties = (jobProperties == null
? ImmutableMap.<String, String>of()
: ImmutableMap.copyOf(jobProperties));
if (partitionsSpec != null) {
Preconditions.checkArgument(
@ -380,6 +385,17 @@ public class HadoopDruidIndexerConfig
this.ignoreInvalidRows = ignoreInvalidRows;
}
@JsonProperty
public Map<String, String> getJobProperties()
{
return jobProperties;
}
public void setJobProperties(Map<String, String> jobProperties)
{
this.jobProperties = jobProperties;
}
public Optional<List<Interval>> getIntervals()
{
Optional<SortedSet<Interval>> setOptional = getGranularitySpec().bucketIntervals();
@ -613,6 +629,15 @@ public class HadoopDruidIndexerConfig
);
}
public void addJobProperties(Job job)
{
Configuration conf = job.getConfiguration();
for (final Map.Entry<String, String> entry : jobProperties.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
}
public void intoConfiguration(Job job)
{
Configuration conf = job.getConfiguration();

View File

@ -46,7 +46,8 @@ public class HadoopDruidIndexerConfigBuilder
return HadoopDruidIndexerConfig.jsonMapper.convertValue(schema, HadoopDruidIndexerConfig.class);
}
public static HadoopDruidIndexerSchema toSchema(HadoopDruidIndexerConfig config){
public static HadoopDruidIndexerSchema toSchema(HadoopDruidIndexerConfig config)
{
return HadoopDruidIndexerConfig.jsonMapper.convertValue(config, HadoopDruidIndexerSchema.class);
}
@ -62,8 +63,8 @@ public class HadoopDruidIndexerConfigBuilder
return fromMap(
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
file, new TypeReference<Map<String, Object>>()
{
}
{
}
)
);
}
@ -79,8 +80,8 @@ public class HadoopDruidIndexerConfigBuilder
return fromMap(
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
str, new TypeReference<Map<String, Object>>()
{
}
{
}
)
);
}
@ -112,6 +113,7 @@ public class HadoopDruidIndexerConfigBuilder
private volatile DataRollupSpec rollupSpec;
private volatile DbUpdaterJobSpec updaterJobSpec;
private volatile boolean ignoreInvalidRows;
private volatile Map<String, String> jobProperties;
public HadoopDruidIndexerConfigBuilder()
{
@ -131,6 +133,7 @@ public class HadoopDruidIndexerConfigBuilder
this.rollupSpec = null;
this.updaterJobSpec = null;
this.ignoreInvalidRows = false;
this.jobProperties = ImmutableMap.of();
}
public HadoopDruidIndexerConfigBuilder withDataSource(String dataSource)
@ -230,6 +233,12 @@ public class HadoopDruidIndexerConfigBuilder
return this;
}
public HadoopDruidIndexerConfigBuilder withJobProperties(Map<String, String> jobProperties)
{
this.jobProperties = ImmutableMap.copyOf(jobProperties);
return this;
}
public HadoopDruidIndexerConfigBuilder withSchema(HadoopDruidIndexerSchema schema)
{
this.dataSource = schema.getDataSource();
@ -248,6 +257,7 @@ public class HadoopDruidIndexerConfigBuilder
this.rollupSpec = schema.getRollupSpec();
this.updaterJobSpec = schema.getUpdaterJobSpec();
this.ignoreInvalidRows = schema.isIgnoreInvalidRows();
this.jobProperties = schema.getJobProperties();
return this;
}
@ -271,6 +281,7 @@ public class HadoopDruidIndexerConfigBuilder
rollupSpec,
updaterJobSpec,
ignoreInvalidRows,
jobProperties,
null,
null,
null,

View File

@ -53,6 +53,7 @@ public class HadoopDruidIndexerSchema
private final DataRollupSpec rollupSpec;
private final DbUpdaterJobSpec updaterJobSpec;
private final boolean ignoreInvalidRows;
private final Map<String, String> jobProperties;
@JsonCreator
public HadoopDruidIndexerSchema(
@ -72,11 +73,11 @@ public class HadoopDruidIndexerSchema
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
// These fields are deprecated and will be removed in the future
final @JsonProperty("timestampColumn") String timestampColumn,
final @JsonProperty("timestampFormat") String timestampFormat
)
)
{
this.dataSource = dataSource;
this.timestampSpec = (timestampSpec == null) ? new TimestampSpec(timestampColumn, timestampFormat) : timestampSpec;
@ -94,6 +95,9 @@ public class HadoopDruidIndexerSchema
this.rollupSpec = rollupSpec;
this.updaterJobSpec = updaterJobSpec;
this.ignoreInvalidRows = ignoreInvalidRows;
this.jobProperties = (jobProperties == null
? ImmutableMap.<String, String>of()
: ImmutableMap.copyOf(jobProperties));
}
@JsonProperty
@ -191,4 +195,10 @@ public class HadoopDruidIndexerSchema
{
return ignoreInvalidRows;
}
@JsonProperty
public Map<String, String> getJobProperties()
{
return jobProperties;
}
}

View File

@ -163,6 +163,7 @@ public class IndexGeneratorJob implements Jobby
FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());
config.addInputPaths(job);
config.addJobProperties(job);
config.intoConfiguration(job);
JobHelper.setupClasspath(config, job);

View File

@ -398,6 +398,7 @@ public class TaskSerdeTest
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.NONE),
null,
false,
ImmutableMap.of("foo", "bar"),
null,
null
),
@ -414,5 +415,6 @@ public class TaskSerdeTest
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getSchema().getJobProperties(), task2.getSchema().getJobProperties());
}
}