Merge pull request #2558 from gianm/hdfs-log-overwrite

HdfsTaskLogs: Allow overwriting existing logs.
This commit is contained in:
Fangjin Yang 2016-02-26 15:50:51 -08:00
commit dd060eb826
2 changed files with 54 additions and 23 deletions

View File

@ -20,6 +20,7 @@ package io.druid.storage.hdfs.tasklog;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.tasklogs.TaskLogs;
@ -27,12 +28,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Indexer hdfs task logs, to support storing hdfs tasks to hdfs.
@ -57,9 +59,14 @@ public class HdfsTaskLogs implements TaskLogs
final Path path = getTaskLogFileFromId(taskId);
log.info("Writing task log to: %s", path);
final FileSystem fs = path.getFileSystem(hadoopConfig);
FileUtil.copy(logFile, fs, path, false, hadoopConfig);
try (
final InputStream in = new FileInputStream(logFile);
final OutputStream out = fs.create(path, true)
) {
ByteStreams.copy(in, out);
log.info("Wrote task log to: %s", path);
}
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskId, final long offset) throws IOException

View File

@ -26,21 +26,25 @@ import com.google.common.io.Files;
import io.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import io.druid.tasklogs.TaskLogs;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Map;
public class HdfsTaskLogsTest
{
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void testSimple() throws Exception
public void testStream() throws Exception
{
final File tmpDir = Files.createTempDir();
try {
final File tmpDir = tempFolder.newFolder();
final File logDir = new File(tmpDir, "logs");
final File logFile = new File(tmpDir, "log");
Files.write("blah", logFile, Charsets.UTF_8);
@ -49,13 +53,33 @@ public class HdfsTaskLogsTest
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
for (Map.Entry<Long, String> entry : expected.entrySet()) {
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput());
final String string = new String(bytes);
final String string = readLog(taskLogs, entry.getKey());
Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
}
}
finally {
FileUtils.deleteDirectory(tmpDir);
}
@Test
public void testOverwrite() throws Exception
{
final File tmpDir = tempFolder.newFolder();
final File logDir = new File(tmpDir, "logs");
final File logFile = new File(tmpDir, "log");
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());
Files.write("blah", logFile, Charsets.UTF_8);
taskLogs.pushTaskLog("foo", logFile);
Assert.assertEquals("blah", readLog(taskLogs, 0));
Files.write("blah blah", logFile, Charsets.UTF_8);
taskLogs.pushTaskLog("foo", logFile);
Assert.assertEquals("blah blah", readLog(taskLogs, 0));
}
private String readLog(TaskLogs taskLogs, long offset) throws IOException
{
return new String(
ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", offset).get().openStream()),
Charsets.UTF_8
);
}
}