Allow index job to utilize hadoop cluster information from job config. (#4626)

* Allow ndex job to utilize hadoop cluster information from job config.

* Add new method that inject system configuration and then job configuration.

* Make changes to use HadoopDruidIndexerConfig.addJobProperties method.

* refactor code for overloaded addJobProperties.
This commit is contained in:
T R Kyaw 2017-08-30 17:44:33 -04:00 committed by Himanshu
parent 8dddccc687
commit d6179126ed
3 changed files with 9 additions and 2 deletions

View File

@ -561,8 +561,11 @@ public class HadoopDruidIndexerConfig
public void addJobProperties(Job job) public void addJobProperties(Job job)
{ {
Configuration conf = job.getConfiguration(); addJobProperties(job.getConfiguration());
}
public void addJobProperties(Configuration conf)
{
for (final Map.Entry<String, String> entry : schema.getTuningConfig().getJobProperties().entrySet()) { for (final Map.Entry<String, String> entry : schema.getTuningConfig().getJobProperties().entrySet()) {
conf.set(entry.getKey(), entry.getValue()); conf.set(entry.getKey(), entry.getValue());
} }

View File

@ -101,6 +101,8 @@ public class IndexGeneratorJob implements Jobby
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config) public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
{ {
final Configuration conf = JobHelper.injectSystemProperties(new Configuration()); final Configuration conf = JobHelper.injectSystemProperties(new Configuration());
config.addJobProperties(conf);
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.JSON_MAPPER; final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.JSON_MAPPER;
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder(); ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();

View File

@ -376,7 +376,9 @@ public class JobHelper
Path workingPath = config.makeIntermediatePath(); Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath); log.info("Deleting path[%s]", workingPath);
try { try {
workingPath.getFileSystem(injectSystemProperties(new Configuration())).delete(workingPath, true); Configuration conf = injectSystemProperties(new Configuration());
config.addJobProperties(conf);
workingPath.getFileSystem(conf).delete(workingPath, true);
} }
catch (IOException e) { catch (IOException e) {
log.error(e, "Failed to cleanup path[%s]", workingPath); log.error(e, "Failed to cleanup path[%s]", workingPath);