From 1e6ce8ac9aa5c2007daa9d1525c4e0d2f4e99af7 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 29 Sep 2014 16:20:34 -0700 Subject: [PATCH] TaskLogs fixes and cleanups. - Fix negative offsets in FileTaskLogs and HdfsTaskLogs. - Consolidate file offset code into LogUtils (currently used in two places). - Clean up style for HdfsTaskLogs and related classes. - Remove unused code in ForkingTaskRunner. --- .../storage/hdfs/tasklog/HdfsTaskLogs.java | 123 +++++++++--------- .../hdfs/tasklog/HdfsTaskLogsConfig.java | 24 ++-- .../common/tasklogs/HdfsTaskLogsTest.java | 41 ++++++ .../common/config/FileTaskLogsConfig.java | 5 + .../common/tasklogs/FileTaskLogs.java | 6 +- .../indexing/common/tasklogs/LogUtils.java | 30 +++++ .../indexing/overlord/ForkingTaskRunner.java | 32 +---- .../common/tasklogs/FileTaskLogsTest.java | 40 ++++++ 8 files changed, 198 insertions(+), 103 deletions(-) create mode 100644 hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java create mode 100644 indexing-service/src/main/java/io/druid/indexing/common/tasklogs/LogUtils.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java diff --git a/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index b5db10a2a80..1a72904d12d 100644 --- a/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -19,12 +19,13 @@ package io.druid.storage.hdfs.tasklog; import com.google.common.base.Optional; -import com.google.common.io.ByteStreams; import com.google.common.io.InputSupplier; import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.tasklogs.TaskLogs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -34,71 +35,77 @@ import java.io.IOException; import java.io.InputStream; /** - * Indexer hdfs task logs, to support storing hdfs tasks to hdfs - * - * Created by Frank Ren on 6/20/14. + * Indexer hdfs task logs, to support storing hdfs tasks to hdfs. */ public class HdfsTaskLogs implements TaskLogs { - private static final Logger log = new Logger(HdfsTaskLogs.class); + private static final Logger log = new Logger(HdfsTaskLogs.class); - private final HdfsTaskLogsConfig config; + private final HdfsTaskLogsConfig config; - @Inject - public HdfsTaskLogs(HdfsTaskLogsConfig config) - { - this.config = config; + @Inject + public HdfsTaskLogs(HdfsTaskLogsConfig config) + { + this.config = config; + } + + @Override + public void pushTaskLog(String taskId, File logFile) throws IOException + { + final Path path = getTaskLogFileFromId(taskId); + log.info("Writing task log to: %s", path); + Configuration conf = new Configuration(); + final FileSystem fs = FileSystem.get(conf); + FileUtil.copy(logFile, fs, path, false, conf); + log.info("Wrote task log to: %s", path); + } + + @Override + public Optional> streamTaskLog(final String taskId, final long offset) throws IOException + { + final Path path = getTaskLogFileFromId(taskId); + final FileSystem fs = FileSystem.get(new Configuration()); + if (fs.exists(path)) { + return Optional.>of( + new InputSupplier() + { + @Override + public InputStream getInput() throws IOException + { + log.info("Reading task log from: %s", path); + final long seekPos; + if (offset < 0) { + final FileStatus stat = fs.getFileStatus(path); + seekPos = Math.max(0, stat.getLen() + offset); + } else { + seekPos = offset; + } + final FSDataInputStream inputStream = fs.open(path); + inputStream.seek(seekPos); + log.info("Read task log from: %s (seek = %,d)", path, seekPos); + return inputStream; + } + } + ); + } else { + return Optional.absent(); } + } - @Override - public void pushTaskLog(String taskId, File logFile) throws IOException - { - final Path path = getTaskLogFileFromId(taskId); - log.info("writing task log to: %s", path); - Configuration conf = new Configuration(); - final FileSystem fs = FileSystem.get(conf); - FileUtil.copy(logFile, fs, path, false, conf); - log.info("wrote task log to: %s", path); - } + /** + * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in + * path names. So we format paths differently for HDFS. + */ + private Path getTaskLogFileFromId(String taskId) + { + return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_"))); + } - @Override - public Optional> streamTaskLog(final String taskId, final long offset) throws IOException - { - final Path path = getTaskLogFileFromId(taskId); - final FileSystem fs = FileSystem.get(new Configuration()); - if (fs.exists(path)) { - return Optional.>of( - new InputSupplier() { - @Override - public InputStream getInput() throws IOException - { - log.info("reading task log from: %s", path); - final InputStream inputStream = fs.open(path); - ByteStreams.skipFully(inputStream, offset); - log.info("read task log from: %s", path); - return inputStream; - } - } - ); - } else { - return Optional.absent(); - } - } - - /** - * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in - * path names. So we format paths differently for HDFS. - */ - private Path getTaskLogFileFromId(String taskId) - { - return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_"))); - } - - // some hadoop version Path.mergePaths does not exist - private static String mergePaths(String path1, String path2) - { - return path1 + (path1.endsWith(Path.SEPARATOR) ? "" : Path.SEPARATOR) + path2; - } + // some hadoop version Path.mergePaths does not exist + private static String mergePaths(String path1, String path2) + { + return path1 + (path1.endsWith(Path.SEPARATOR) ? "" : Path.SEPARATOR) + path2; + } } diff --git a/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogsConfig.java b/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogsConfig.java index 447dff7c52a..6e2c67734fb 100644 --- a/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogsConfig.java +++ b/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogsConfig.java @@ -23,19 +23,23 @@ import com.fasterxml.jackson.annotation.JsonProperty; import javax.validation.constraints.NotNull; /** - * Indexer hdfs task logs configuration - * - * Created by Frank Ren on 6/20/14. + * Indexer hdfs task logs configuration. */ public class HdfsTaskLogsConfig { - @JsonProperty - @NotNull - private String directory; - public String getDirectory() - { - return directory; - } + @JsonProperty + @NotNull + private String directory; + + public HdfsTaskLogsConfig(String directory) + { + this.directory = directory; + } + + public String getDirectory() + { + return directory; + } } diff --git a/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java b/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java new file mode 100644 index 00000000000..4e14ea3c19d --- /dev/null +++ b/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java @@ -0,0 +1,41 @@ +package io.druid.indexing.common.tasklogs; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteStreams; +import com.google.common.io.Files; +import io.druid.storage.hdfs.tasklog.HdfsTaskLogs; +import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; +import io.druid.tasklogs.TaskLogs; +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.Map; + +public class HdfsTaskLogsTest +{ + @Test + public void testSimple() throws Exception + { + final File tmpDir = Files.createTempDir(); + try { + final File logDir = new File(tmpDir, "logs"); + final File logFile = new File(tmpDir, "log"); + Files.write("blah", logFile, Charsets.UTF_8); + final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString())); + taskLogs.pushTaskLog("foo", logFile); + + final Map expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah"); + for (Map.Entry entry : expected.entrySet()) { + final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput()); + final String string = new String(bytes); + Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue()); + } + } + finally { + FileUtils.deleteDirectory(tmpDir); + } + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java index dfc7c9a9951..d1add268faa 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java @@ -30,6 +30,11 @@ public class FileTaskLogsConfig @NotNull private File directory = new File("log"); + public FileTaskLogsConfig(File directory) + { + this.directory = directory; + } + public File getDirectory() { return directory; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java index e1649b46f32..dadd91d9bea 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java @@ -20,7 +20,6 @@ package io.druid.indexing.common.tasklogs; import com.google.common.base.Optional; -import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.common.io.InputSupplier; import com.google.inject.Inject; @@ -29,7 +28,6 @@ import io.druid.indexing.common.config.FileTaskLogsConfig; import io.druid.tasklogs.TaskLogs; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -69,9 +67,7 @@ public class FileTaskLogs implements TaskLogs @Override public InputStream getInput() throws IOException { - final InputStream inputStream = new FileInputStream(file); - ByteStreams.skipFully(inputStream, offset); - return inputStream; + return LogUtils.streamFile(file, offset); } } ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/LogUtils.java b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/LogUtils.java new file mode 100644 index 00000000000..2e5f3fff9c7 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/LogUtils.java @@ -0,0 +1,30 @@ +package io.druid.indexing.common.tasklogs; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.channels.Channels; + +public class LogUtils +{ + /** + * Open a stream to a file. + * + * @param offset If zero, stream the entire log. If positive, read from this byte position onwards. If negative, + * read this many bytes from the end of the file. + * + * @return input supplier for this log, if available from this provider + */ + public static InputStream streamFile(final File file, final long offset) throws IOException + { + final RandomAccessFile raf = new RandomAccessFile(file, "r"); + final long rafLength = raf.length(); + if (offset > 0) { + raf.seek(offset); + } else if (offset < 0 && offset < rafLength) { + raf.seek(Math.max(0, rafLength + offset)); + } + return Channels.newInputStream(raf.getChannel()); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 36aa925ef32..d7c68491e4d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -44,6 +44,7 @@ import io.druid.guice.annotations.Self; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; +import io.druid.indexing.common.tasklogs.LogUtils; import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.server.DruidNode; @@ -391,41 +392,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer @Override public InputStream getInput() throws IOException { - final RandomAccessFile raf = new RandomAccessFile(processHolder.logFile, "r"); - final long rafLength = raf.length(); - if (offset > 0) { - raf.seek(offset); - } else if (offset < 0 && offset < rafLength) { - raf.seek(Math.max(0, rafLength + offset)); - } - return Channels.newInputStream(raf.getChannel()); + return LogUtils.streamFile(processHolder.logFile, offset); } } ); } - private int findUnusedPort() - { - synchronized (tasks) { - int port = config.getStartPort(); - int maxPortSoFar = -1; - - for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { - if (taskWorkItem.processHolder != null) { - if (taskWorkItem.processHolder.port > maxPortSoFar) { - maxPortSoFar = taskWorkItem.processHolder.port; - } - - if (taskWorkItem.processHolder.port == port) { - port = maxPortSoFar + 1; - } - } - } - - return port; - } - } - private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem { private volatile boolean shutdown = false; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java b/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java new file mode 100644 index 00000000000..be227db20ad --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java @@ -0,0 +1,40 @@ +package io.druid.indexing.common.tasklogs; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteStreams; +import com.google.common.io.Files; +import io.druid.indexing.common.config.FileTaskLogsConfig; +import io.druid.tasklogs.TaskLogs; +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.Map; + +public class FileTaskLogsTest +{ + @Test + public void testSimple() throws Exception + { + final File tmpDir = Files.createTempDir(); + try { + final File logDir = new File(tmpDir, "logs"); + final File logFile = new File(tmpDir, "log"); + Files.write("blah", logFile, Charsets.UTF_8); + final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir)); + taskLogs.pushTaskLog("foo", logFile); + + final Map expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah"); + for (Map.Entry entry : expected.entrySet()) { + final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput()); + final String string = new String(bytes); + Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue()); + } + } + finally { + FileUtils.deleteDirectory(tmpDir); + } + } +}