HdfsTaskLogs: Allow overwriting existing logs.

Necessary because ForkingTaskRunner pushes logs when gracefully stopping,
but it may need to re-push those logs when the task finishes for real after
restoring.
This commit is contained in:
Gian Merlino 2016-02-26 15:10:19 -08:00
parent eb13d7afe3
commit 7e70881101
2 changed files with 54 additions and 23 deletions
extensions/hdfs-storage/src
main/java/io/druid/storage/hdfs/tasklog
test/java/io/druid/indexing/common/tasklogs

View File

@ -20,6 +20,7 @@ package io.druid.storage.hdfs.tasklog;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.tasklogs.TaskLogs;
@ -27,12 +28,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Indexer hdfs task logs, to support storing hdfs tasks to hdfs.
@ -57,8 +59,13 @@ public class HdfsTaskLogs implements TaskLogs
final Path path = getTaskLogFileFromId(taskId);
log.info("Writing task log to: %s", path);
final FileSystem fs = path.getFileSystem(hadoopConfig);
FileUtil.copy(logFile, fs, path, false, hadoopConfig);
log.info("Wrote task log to: %s", path);
try (
final InputStream in = new FileInputStream(logFile);
final OutputStream out = fs.create(path, true)
) {
ByteStreams.copy(in, out);
log.info("Wrote task log to: %s", path);
}
}
@Override

View File

@ -26,36 +26,60 @@ import com.google.common.io.Files;
import io.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import io.druid.tasklogs.TaskLogs;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Map;
public class HdfsTaskLogsTest
{
@Test
public void testSimple() throws Exception
{
final File tmpDir = Files.createTempDir();
try {
final File logDir = new File(tmpDir, "logs");
final File logFile = new File(tmpDir, "log");
Files.write("blah", logFile, Charsets.UTF_8);
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());
taskLogs.pushTaskLog("foo", logFile);
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
for (Map.Entry<Long, String> entry : expected.entrySet()) {
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput());
final String string = new String(bytes);
Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
}
}
finally {
FileUtils.deleteDirectory(tmpDir);
@Test
public void testStream() throws Exception
{
final File tmpDir = tempFolder.newFolder();
final File logDir = new File(tmpDir, "logs");
final File logFile = new File(tmpDir, "log");
Files.write("blah", logFile, Charsets.UTF_8);
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());
taskLogs.pushTaskLog("foo", logFile);
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
for (Map.Entry<Long, String> entry : expected.entrySet()) {
final String string = readLog(taskLogs, entry.getKey());
Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
}
}
@Test
public void testOverwrite() throws Exception
{
final File tmpDir = tempFolder.newFolder();
final File logDir = new File(tmpDir, "logs");
final File logFile = new File(tmpDir, "log");
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());
Files.write("blah", logFile, Charsets.UTF_8);
taskLogs.pushTaskLog("foo", logFile);
Assert.assertEquals("blah", readLog(taskLogs, 0));
Files.write("blah blah", logFile, Charsets.UTF_8);
taskLogs.pushTaskLog("foo", logFile);
Assert.assertEquals("blah blah", readLog(taskLogs, 0));
}
private String readLog(TaskLogs taskLogs, long offset) throws IOException
{
return new String(
ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", offset).get().openStream()),
Charsets.UTF_8
);
}
}