From 7e708811019cfa4f1e854f31c6d7b260b2250450 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 26 Feb 2016 15:10:19 -0800 Subject: [PATCH] HdfsTaskLogs: Allow overwriting existing logs. Necessary because ForkingTaskRunner pushes logs when gracefully stopping, but it may need to re-push those logs when the task finishes for real after restoring. --- .../storage/hdfs/tasklog/HdfsTaskLogs.java | 13 +++- .../common/tasklogs/HdfsTaskLogsTest.java | 64 +++++++++++++------ 2 files changed, 54 insertions(+), 23 deletions(-) diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index d40d4013cef..3fcf7df4773 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -20,6 +20,7 @@ package io.druid.storage.hdfs.tasklog; import com.google.common.base.Optional; import com.google.common.io.ByteSource; +import com.google.common.io.ByteStreams; import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.tasklogs.TaskLogs; @@ -27,12 +28,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; /** * Indexer hdfs task logs, to support storing hdfs tasks to hdfs. @@ -57,8 +59,13 @@ public class HdfsTaskLogs implements TaskLogs final Path path = getTaskLogFileFromId(taskId); log.info("Writing task log to: %s", path); final FileSystem fs = path.getFileSystem(hadoopConfig); - FileUtil.copy(logFile, fs, path, false, hadoopConfig); - log.info("Wrote task log to: %s", path); + try ( + final InputStream in = new FileInputStream(logFile); + final OutputStream out = fs.create(path, true) + ) { + ByteStreams.copy(in, out); + log.info("Wrote task log to: %s", path); + } } @Override diff --git a/extensions/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java b/extensions/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java index 0eab79d835c..c3b253096a3 100644 --- a/extensions/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java +++ b/extensions/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java @@ -26,36 +26,60 @@ import com.google.common.io.Files; import io.druid.storage.hdfs.tasklog.HdfsTaskLogs; import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; import io.druid.tasklogs.TaskLogs; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.IOException; import java.util.Map; public class HdfsTaskLogsTest { - @Test - public void testSimple() throws Exception - { - final File tmpDir = Files.createTempDir(); - try { - final File logDir = new File(tmpDir, "logs"); - final File logFile = new File(tmpDir, "log"); - Files.write("blah", logFile, Charsets.UTF_8); - final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); - taskLogs.pushTaskLog("foo", logFile); + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); - final Map expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah"); - for (Map.Entry entry : expected.entrySet()) { - final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput()); - final String string = new String(bytes); - Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue()); - } - } - finally { - FileUtils.deleteDirectory(tmpDir); + @Test + public void testStream() throws Exception + { + final File tmpDir = tempFolder.newFolder(); + final File logDir = new File(tmpDir, "logs"); + final File logFile = new File(tmpDir, "log"); + Files.write("blah", logFile, Charsets.UTF_8); + final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); + taskLogs.pushTaskLog("foo", logFile); + + final Map expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah"); + for (Map.Entry entry : expected.entrySet()) { + final String string = readLog(taskLogs, entry.getKey()); + Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue()); } } + + @Test + public void testOverwrite() throws Exception + { + final File tmpDir = tempFolder.newFolder(); + final File logDir = new File(tmpDir, "logs"); + final File logFile = new File(tmpDir, "log"); + final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); + + Files.write("blah", logFile, Charsets.UTF_8); + taskLogs.pushTaskLog("foo", logFile); + Assert.assertEquals("blah", readLog(taskLogs, 0)); + + Files.write("blah blah", logFile, Charsets.UTF_8); + taskLogs.pushTaskLog("foo", logFile); + Assert.assertEquals("blah blah", readLog(taskLogs, 0)); + } + + private String readLog(TaskLogs taskLogs, long offset) throws IOException + { + return new String( + ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", offset).get().openStream()), + Charsets.UTF_8 + ); + } }