diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index f629d4f86e2..6fb90fb0d70 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -40,11 +40,13 @@ public class HdfsTaskLogs implements TaskLogs private static final Logger log = new Logger(HdfsTaskLogs.class); private final HdfsTaskLogsConfig config; + private final Configuration hadoopConfig; @Inject - public HdfsTaskLogs(HdfsTaskLogsConfig config) + public HdfsTaskLogs(HdfsTaskLogsConfig config, Configuration hadoopConfig) { this.config = config; + this.hadoopConfig = hadoopConfig; } @Override @@ -52,9 +54,8 @@ public class HdfsTaskLogs implements TaskLogs { final Path path = getTaskLogFileFromId(taskId); log.info("Writing task log to: %s", path); - Configuration conf = new Configuration(); - final FileSystem fs = path.getFileSystem(conf); - FileUtil.copy(logFile, fs, path, false, conf); + final FileSystem fs = path.getFileSystem(hadoopConfig); + FileUtil.copy(logFile, fs, path, false, hadoopConfig); log.info("Wrote task log to: %s", path); } @@ -62,7 +63,7 @@ public class HdfsTaskLogs implements TaskLogs public Optional streamTaskLog(final String taskId, final long offset) throws IOException { final Path path = getTaskLogFileFromId(taskId); - final FileSystem fs = path.getFileSystem(new Configuration()); + final FileSystem fs = path.getFileSystem(hadoopConfig); if (fs.exists(path)) { return Optional.of( new ByteSource() diff --git a/extensions/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java b/extensions/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java index 0a18ef60531..b1eb1da6f27 100644 --- a/extensions/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java +++ b/extensions/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java @@ -25,6 +25,7 @@ import io.druid.storage.hdfs.tasklog.HdfsTaskLogs; import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; import io.druid.tasklogs.TaskLogs; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Test; @@ -41,7 +42,7 @@ public class HdfsTaskLogsTest final File logDir = new File(tmpDir, "logs"); final File logFile = new File(tmpDir, "log"); Files.write("blah", logFile, Charsets.UTF_8); - final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString())); + final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); taskLogs.pushTaskLog("foo", logFile); final Map expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah"); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 20be5549773..f36ac384e96 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -95,17 +95,11 @@ public class IndexGeneratorJob implements Jobby public static List getPublishedSegments(HadoopDruidIndexerConfig config) { - final Configuration conf = new Configuration(); + final Configuration conf = JobHelper.injectSystemProperties(new Configuration()); final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper; ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); - for (String propName : System.getProperties().stringPropertyNames()) { - if (propName.startsWith("hadoop.")) { - conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); - } - } - final Path descriptorInfoDir = config.makeDescriptorInfoDir(); try { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 48e52971e7d..8c169e8785b 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -100,12 +100,16 @@ public class JobHelper public static void injectSystemProperties(Job job) { - final Configuration conf = job.getConfiguration(); + injectSystemProperties(job.getConfiguration()); + } + + public static Configuration injectSystemProperties(Configuration conf) { for (String propName : System.getProperties().stringPropertyNames()) { if (propName.startsWith("hadoop.")) { conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); } } + return conf; } public static void ensurePaths(HadoopDruidIndexerConfig config) @@ -143,7 +147,7 @@ public class JobHelper Path workingPath = config.makeIntermediatePath(); log.info("Deleting path[%s]", workingPath); try { - workingPath.getFileSystem(new Configuration()).delete(workingPath, true); + workingPath.getFileSystem(injectSystemProperties(new Configuration())).delete(workingPath, true); } catch (IOException e) { log.error(e, "Failed to cleanup path[%s]", workingPath); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java index 232d56a0fa4..f35825b7cf2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java @@ -57,7 +57,8 @@ public class ForkingTaskRunnerConfig "io.druid", "user.timezone", "file.encoding", - "java.io.tmpdir" + "java.io.tmpdir", + "hadoop" ); public String getJavaCommand()