bug fix: hdfs task log and indexing task not work properly with Hadoop HA

This commit is contained in:
flow 2015-05-21 20:26:58 +08:00
parent e6d22565ad
commit 07659f30ab
5 changed files with 17 additions and 16 deletions

View File

@ -40,11 +40,13 @@ public class HdfsTaskLogs implements TaskLogs
private static final Logger log = new Logger(HdfsTaskLogs.class); private static final Logger log = new Logger(HdfsTaskLogs.class);
private final HdfsTaskLogsConfig config; private final HdfsTaskLogsConfig config;
private final Configuration hadoopConfig;
@Inject @Inject
public HdfsTaskLogs(HdfsTaskLogsConfig config) public HdfsTaskLogs(HdfsTaskLogsConfig config, Configuration hadoopConfig)
{ {
this.config = config; this.config = config;
this.hadoopConfig = hadoopConfig;
} }
@Override @Override
@ -52,9 +54,8 @@ public class HdfsTaskLogs implements TaskLogs
{ {
final Path path = getTaskLogFileFromId(taskId); final Path path = getTaskLogFileFromId(taskId);
log.info("Writing task log to: %s", path); log.info("Writing task log to: %s", path);
Configuration conf = new Configuration(); final FileSystem fs = path.getFileSystem(hadoopConfig);
final FileSystem fs = path.getFileSystem(conf); FileUtil.copy(logFile, fs, path, false, hadoopConfig);
FileUtil.copy(logFile, fs, path, false, conf);
log.info("Wrote task log to: %s", path); log.info("Wrote task log to: %s", path);
} }
@ -62,7 +63,7 @@ public class HdfsTaskLogs implements TaskLogs
public Optional<ByteSource> streamTaskLog(final String taskId, final long offset) throws IOException public Optional<ByteSource> streamTaskLog(final String taskId, final long offset) throws IOException
{ {
final Path path = getTaskLogFileFromId(taskId); final Path path = getTaskLogFileFromId(taskId);
final FileSystem fs = path.getFileSystem(new Configuration()); final FileSystem fs = path.getFileSystem(hadoopConfig);
if (fs.exists(path)) { if (fs.exists(path)) {
return Optional.<ByteSource>of( return Optional.<ByteSource>of(
new ByteSource() new ByteSource()

View File

@ -25,6 +25,7 @@ import io.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import io.druid.tasklogs.TaskLogs; import io.druid.tasklogs.TaskLogs;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -41,7 +42,7 @@ public class HdfsTaskLogsTest
final File logDir = new File(tmpDir, "logs"); final File logDir = new File(tmpDir, "logs");
final File logFile = new File(tmpDir, "log"); final File logFile = new File(tmpDir, "log");
Files.write("blah", logFile, Charsets.UTF_8); Files.write("blah", logFile, Charsets.UTF_8);
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString())); final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());
taskLogs.pushTaskLog("foo", logFile); taskLogs.pushTaskLog("foo", logFile);
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah"); final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");

View File

@ -95,17 +95,11 @@ public class IndexGeneratorJob implements Jobby
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config) public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
{ {
final Configuration conf = new Configuration(); final Configuration conf = JobHelper.injectSystemProperties(new Configuration());
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper; final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder(); ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
final Path descriptorInfoDir = config.makeDescriptorInfoDir(); final Path descriptorInfoDir = config.makeDescriptorInfoDir();
try { try {

View File

@ -100,12 +100,16 @@ public class JobHelper
public static void injectSystemProperties(Job job) public static void injectSystemProperties(Job job)
{ {
final Configuration conf = job.getConfiguration(); injectSystemProperties(job.getConfiguration());
}
public static Configuration injectSystemProperties(Configuration conf) {
for (String propName : System.getProperties().stringPropertyNames()) { for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) { if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
} }
} }
return conf;
} }
public static void ensurePaths(HadoopDruidIndexerConfig config) public static void ensurePaths(HadoopDruidIndexerConfig config)
@ -143,7 +147,7 @@ public class JobHelper
Path workingPath = config.makeIntermediatePath(); Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath); log.info("Deleting path[%s]", workingPath);
try { try {
workingPath.getFileSystem(new Configuration()).delete(workingPath, true); workingPath.getFileSystem(injectSystemProperties(new Configuration())).delete(workingPath, true);
} }
catch (IOException e) { catch (IOException e) {
log.error(e, "Failed to cleanup path[%s]", workingPath); log.error(e, "Failed to cleanup path[%s]", workingPath);

View File

@ -57,7 +57,8 @@ public class ForkingTaskRunnerConfig
"io.druid", "io.druid",
"user.timezone", "user.timezone",
"file.encoding", "file.encoding",
"java.io.tmpdir" "java.io.tmpdir",
"hadoop"
); );
public String getJavaCommand() public String getJavaCommand()