Merge pull request #769 from metamx/task-logs-fixes

TaskLogs fixes and cleanups.
This commit is contained in:
fjy 2014-09-29 17:37:00 -06:00
commit 3f524baa67
8 changed files with 198 additions and 103 deletions

View File

@ -19,12 +19,13 @@
package io.druid.storage.hdfs.tasklog;
import com.google.common.base.Optional;
import com.google.common.io.ByteStreams;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.tasklogs.TaskLogs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@ -34,71 +35,77 @@ import java.io.IOException;
import java.io.InputStream;
/**
* Indexer hdfs task logs, to support storing hdfs tasks to hdfs
*
* Created by Frank Ren on 6/20/14.
* Indexer hdfs task logs, to support storing hdfs tasks to hdfs.
*/
public class HdfsTaskLogs implements TaskLogs
{
private static final Logger log = new Logger(HdfsTaskLogs.class);
private static final Logger log = new Logger(HdfsTaskLogs.class);
private final HdfsTaskLogsConfig config;
private final HdfsTaskLogsConfig config;
@Inject
public HdfsTaskLogs(HdfsTaskLogsConfig config)
{
this.config = config;
@Inject
public HdfsTaskLogs(HdfsTaskLogsConfig config)
{
this.config = config;
}
@Override
public void pushTaskLog(String taskId, File logFile) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
log.info("Writing task log to: %s", path);
Configuration conf = new Configuration();
final FileSystem fs = FileSystem.get(conf);
FileUtil.copy(logFile, fs, path, false, conf);
log.info("Wrote task log to: %s", path);
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskId, final long offset) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
final FileSystem fs = FileSystem.get(new Configuration());
if (fs.exists(path)) {
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
{
@Override
public InputStream getInput() throws IOException
{
log.info("Reading task log from: %s", path);
final long seekPos;
if (offset < 0) {
final FileStatus stat = fs.getFileStatus(path);
seekPos = Math.max(0, stat.getLen() + offset);
} else {
seekPos = offset;
}
final FSDataInputStream inputStream = fs.open(path);
inputStream.seek(seekPos);
log.info("Read task log from: %s (seek = %,d)", path, seekPos);
return inputStream;
}
}
);
} else {
return Optional.absent();
}
}
@Override
public void pushTaskLog(String taskId, File logFile) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
log.info("writing task log to: %s", path);
Configuration conf = new Configuration();
final FileSystem fs = FileSystem.get(conf);
FileUtil.copy(logFile, fs, path, false, conf);
log.info("wrote task log to: %s", path);
}
/**
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
* path names. So we format paths differently for HDFS.
*/
private Path getTaskLogFileFromId(String taskId)
{
return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_")));
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskId, final long offset) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
final FileSystem fs = FileSystem.get(new Configuration());
if (fs.exists(path)) {
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>() {
@Override
public InputStream getInput() throws IOException
{
log.info("reading task log from: %s", path);
final InputStream inputStream = fs.open(path);
ByteStreams.skipFully(inputStream, offset);
log.info("read task log from: %s", path);
return inputStream;
}
}
);
} else {
return Optional.absent();
}
}
/**
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
* path names. So we format paths differently for HDFS.
*/
private Path getTaskLogFileFromId(String taskId)
{
return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_")));
}
// some hadoop version Path.mergePaths does not exist
private static String mergePaths(String path1, String path2)
{
return path1 + (path1.endsWith(Path.SEPARATOR) ? "" : Path.SEPARATOR) + path2;
}
// some hadoop version Path.mergePaths does not exist
private static String mergePaths(String path1, String path2)
{
return path1 + (path1.endsWith(Path.SEPARATOR) ? "" : Path.SEPARATOR) + path2;
}
}

View File

@ -23,19 +23,23 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
/**
* Indexer hdfs task logs configuration
*
* Created by Frank Ren on 6/20/14.
* Indexer hdfs task logs configuration.
*/
public class HdfsTaskLogsConfig
{
@JsonProperty
@NotNull
private String directory;
public String getDirectory()
{
return directory;
}
@JsonProperty
@NotNull
private String directory;
public HdfsTaskLogsConfig(String directory)
{
this.directory = directory;
}
public String getDirectory()
{
return directory;
}
}

View File

@ -0,0 +1,41 @@
package io.druid.indexing.common.tasklogs;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import io.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import io.druid.tasklogs.TaskLogs;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.Map;
public class HdfsTaskLogsTest
{
@Test
public void testSimple() throws Exception
{
final File tmpDir = Files.createTempDir();
try {
final File logDir = new File(tmpDir, "logs");
final File logFile = new File(tmpDir, "log");
Files.write("blah", logFile, Charsets.UTF_8);
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()));
taskLogs.pushTaskLog("foo", logFile);
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
for (Map.Entry<Long, String> entry : expected.entrySet()) {
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput());
final String string = new String(bytes);
Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
}
}
finally {
FileUtils.deleteDirectory(tmpDir);
}
}
}

View File

@ -30,6 +30,11 @@ public class FileTaskLogsConfig
@NotNull
private File directory = new File("log");
public FileTaskLogsConfig(File directory)
{
this.directory = directory;
}
public File getDirectory()
{
return directory;

View File

@ -20,7 +20,6 @@
package io.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
@ -29,7 +28,6 @@ import io.druid.indexing.common.config.FileTaskLogsConfig;
import io.druid.tasklogs.TaskLogs;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -69,9 +67,7 @@ public class FileTaskLogs implements TaskLogs
@Override
public InputStream getInput() throws IOException
{
final InputStream inputStream = new FileInputStream(file);
ByteStreams.skipFully(inputStream, offset);
return inputStream;
return LogUtils.streamFile(file, offset);
}
}
);

View File

@ -0,0 +1,30 @@
package io.druid.indexing.common.tasklogs;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
public class LogUtils
{
/**
* Open a stream to a file.
*
* @param offset If zero, stream the entire log. If positive, read from this byte position onwards. If negative,
* read this many bytes from the end of the file.
*
* @return input supplier for this log, if available from this provider
*/
public static InputStream streamFile(final File file, final long offset) throws IOException
{
final RandomAccessFile raf = new RandomAccessFile(file, "r");
final long rafLength = raf.length();
if (offset > 0) {
raf.seek(offset);
} else if (offset < 0 && offset < rafLength) {
raf.seek(Math.max(0, rafLength + offset));
}
return Channels.newInputStream(raf.getChannel());
}
}

View File

@ -44,6 +44,7 @@ import io.druid.guice.annotations.Self;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.tasklogs.LogUtils;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.server.DruidNode;
@ -391,41 +392,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
@Override
public InputStream getInput() throws IOException
{
final RandomAccessFile raf = new RandomAccessFile(processHolder.logFile, "r");
final long rafLength = raf.length();
if (offset > 0) {
raf.seek(offset);
} else if (offset < 0 && offset < rafLength) {
raf.seek(Math.max(0, rafLength + offset));
}
return Channels.newInputStream(raf.getChannel());
return LogUtils.streamFile(processHolder.logFile, offset);
}
}
);
}
private int findUnusedPort()
{
synchronized (tasks) {
int port = config.getStartPort();
int maxPortSoFar = -1;
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder != null) {
if (taskWorkItem.processHolder.port > maxPortSoFar) {
maxPortSoFar = taskWorkItem.processHolder.port;
}
if (taskWorkItem.processHolder.port == port) {
port = maxPortSoFar + 1;
}
}
}
return port;
}
}
private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private volatile boolean shutdown = false;

View File

@ -0,0 +1,40 @@
package io.druid.indexing.common.tasklogs;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import io.druid.indexing.common.config.FileTaskLogsConfig;
import io.druid.tasklogs.TaskLogs;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.Map;
public class FileTaskLogsTest
{
@Test
public void testSimple() throws Exception
{
final File tmpDir = Files.createTempDir();
try {
final File logDir = new File(tmpDir, "logs");
final File logFile = new File(tmpDir, "log");
Files.write("blah", logFile, Charsets.UTF_8);
final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir));
taskLogs.pushTaskLog("foo", logFile);
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
for (Map.Entry<Long, String> entry : expected.entrySet()) {
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput());
final String string = new String(bytes);
Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
}
}
finally {
FileUtils.deleteDirectory(tmpDir);
}
}
}