mirror of https://github.com/apache/druid.git
Merge pull request #1379 from flowbehappy/fix-hadoop-ha
bug fix: hdfs task log and indexing task not work properly with Hadoop HA
This commit is contained in:
commit
5ad5d7d18b
|
@ -40,11 +40,13 @@ public class HdfsTaskLogs implements TaskLogs
|
||||||
private static final Logger log = new Logger(HdfsTaskLogs.class);
|
private static final Logger log = new Logger(HdfsTaskLogs.class);
|
||||||
|
|
||||||
private final HdfsTaskLogsConfig config;
|
private final HdfsTaskLogsConfig config;
|
||||||
|
private final Configuration hadoopConfig;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public HdfsTaskLogs(HdfsTaskLogsConfig config)
|
public HdfsTaskLogs(HdfsTaskLogsConfig config, Configuration hadoopConfig)
|
||||||
{
|
{
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
this.hadoopConfig = hadoopConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -52,9 +54,8 @@ public class HdfsTaskLogs implements TaskLogs
|
||||||
{
|
{
|
||||||
final Path path = getTaskLogFileFromId(taskId);
|
final Path path = getTaskLogFileFromId(taskId);
|
||||||
log.info("Writing task log to: %s", path);
|
log.info("Writing task log to: %s", path);
|
||||||
Configuration conf = new Configuration();
|
final FileSystem fs = path.getFileSystem(hadoopConfig);
|
||||||
final FileSystem fs = path.getFileSystem(conf);
|
FileUtil.copy(logFile, fs, path, false, hadoopConfig);
|
||||||
FileUtil.copy(logFile, fs, path, false, conf);
|
|
||||||
log.info("Wrote task log to: %s", path);
|
log.info("Wrote task log to: %s", path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,7 +63,7 @@ public class HdfsTaskLogs implements TaskLogs
|
||||||
public Optional<ByteSource> streamTaskLog(final String taskId, final long offset) throws IOException
|
public Optional<ByteSource> streamTaskLog(final String taskId, final long offset) throws IOException
|
||||||
{
|
{
|
||||||
final Path path = getTaskLogFileFromId(taskId);
|
final Path path = getTaskLogFileFromId(taskId);
|
||||||
final FileSystem fs = path.getFileSystem(new Configuration());
|
final FileSystem fs = path.getFileSystem(hadoopConfig);
|
||||||
if (fs.exists(path)) {
|
if (fs.exists(path)) {
|
||||||
return Optional.<ByteSource>of(
|
return Optional.<ByteSource>of(
|
||||||
new ByteSource()
|
new ByteSource()
|
||||||
|
|
|
@ -25,6 +25,7 @@ import io.druid.storage.hdfs.tasklog.HdfsTaskLogs;
|
||||||
import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
|
import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
|
||||||
import io.druid.tasklogs.TaskLogs;
|
import io.druid.tasklogs.TaskLogs;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -41,7 +42,7 @@ public class HdfsTaskLogsTest
|
||||||
final File logDir = new File(tmpDir, "logs");
|
final File logDir = new File(tmpDir, "logs");
|
||||||
final File logFile = new File(tmpDir, "log");
|
final File logFile = new File(tmpDir, "log");
|
||||||
Files.write("blah", logFile, Charsets.UTF_8);
|
Files.write("blah", logFile, Charsets.UTF_8);
|
||||||
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()));
|
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());
|
||||||
taskLogs.pushTaskLog("foo", logFile);
|
taskLogs.pushTaskLog("foo", logFile);
|
||||||
|
|
||||||
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
|
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
|
||||||
|
|
|
@ -95,17 +95,11 @@ public class IndexGeneratorJob implements Jobby
|
||||||
|
|
||||||
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
|
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
|
||||||
{
|
{
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = JobHelper.injectSystemProperties(new Configuration());
|
||||||
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
|
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
|
||||||
|
|
||||||
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
|
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
|
||||||
|
|
||||||
for (String propName : System.getProperties().stringPropertyNames()) {
|
|
||||||
if (propName.startsWith("hadoop.")) {
|
|
||||||
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final Path descriptorInfoDir = config.makeDescriptorInfoDir();
|
final Path descriptorInfoDir = config.makeDescriptorInfoDir();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -100,12 +100,16 @@ public class JobHelper
|
||||||
|
|
||||||
public static void injectSystemProperties(Job job)
|
public static void injectSystemProperties(Job job)
|
||||||
{
|
{
|
||||||
final Configuration conf = job.getConfiguration();
|
injectSystemProperties(job.getConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Configuration injectSystemProperties(Configuration conf) {
|
||||||
for (String propName : System.getProperties().stringPropertyNames()) {
|
for (String propName : System.getProperties().stringPropertyNames()) {
|
||||||
if (propName.startsWith("hadoop.")) {
|
if (propName.startsWith("hadoop.")) {
|
||||||
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
|
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void ensurePaths(HadoopDruidIndexerConfig config)
|
public static void ensurePaths(HadoopDruidIndexerConfig config)
|
||||||
|
@ -143,7 +147,7 @@ public class JobHelper
|
||||||
Path workingPath = config.makeIntermediatePath();
|
Path workingPath = config.makeIntermediatePath();
|
||||||
log.info("Deleting path[%s]", workingPath);
|
log.info("Deleting path[%s]", workingPath);
|
||||||
try {
|
try {
|
||||||
workingPath.getFileSystem(new Configuration()).delete(workingPath, true);
|
workingPath.getFileSystem(injectSystemProperties(new Configuration())).delete(workingPath, true);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
log.error(e, "Failed to cleanup path[%s]", workingPath);
|
log.error(e, "Failed to cleanup path[%s]", workingPath);
|
||||||
|
|
|
@ -57,7 +57,8 @@ public class ForkingTaskRunnerConfig
|
||||||
"io.druid",
|
"io.druid",
|
||||||
"user.timezone",
|
"user.timezone",
|
||||||
"file.encoding",
|
"file.encoding",
|
||||||
"java.io.tmpdir"
|
"java.io.tmpdir",
|
||||||
|
"hadoop"
|
||||||
);
|
);
|
||||||
|
|
||||||
public String getJavaCommand()
|
public String getJavaCommand()
|
||||||
|
|
Loading…
Reference in New Issue