mirror of https://github.com/apache/druid.git
TaskLogs fixes and cleanups.
- Fix negative offsets in FileTaskLogs and HdfsTaskLogs. - Consolidate file offset code into LogUtils (currently used in two places). - Clean up style for HdfsTaskLogs and related classes. - Remove unused code in ForkingTaskRunner.
This commit is contained in:
parent
5e0b6b9896
commit
1e6ce8ac9a
|
@ -19,12 +19,13 @@
|
|||
package io.druid.storage.hdfs.tasklog;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.tasklogs.TaskLogs;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -34,71 +35,77 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
* Indexer hdfs task logs, to support storing hdfs tasks to hdfs
|
||||
*
|
||||
* Created by Frank Ren on 6/20/14.
|
||||
* Indexer hdfs task logs, to support storing hdfs tasks to hdfs.
|
||||
*/
|
||||
public class HdfsTaskLogs implements TaskLogs
|
||||
{
|
||||
private static final Logger log = new Logger(HdfsTaskLogs.class);
|
||||
private static final Logger log = new Logger(HdfsTaskLogs.class);
|
||||
|
||||
private final HdfsTaskLogsConfig config;
|
||||
private final HdfsTaskLogsConfig config;
|
||||
|
||||
@Inject
|
||||
public HdfsTaskLogs(HdfsTaskLogsConfig config)
|
||||
{
|
||||
this.config = config;
|
||||
@Inject
|
||||
public HdfsTaskLogs(HdfsTaskLogsConfig config)
|
||||
{
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushTaskLog(String taskId, File logFile) throws IOException
|
||||
{
|
||||
final Path path = getTaskLogFileFromId(taskId);
|
||||
log.info("Writing task log to: %s", path);
|
||||
Configuration conf = new Configuration();
|
||||
final FileSystem fs = FileSystem.get(conf);
|
||||
FileUtil.copy(logFile, fs, path, false, conf);
|
||||
log.info("Wrote task log to: %s", path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskId, final long offset) throws IOException
|
||||
{
|
||||
final Path path = getTaskLogFileFromId(taskId);
|
||||
final FileSystem fs = FileSystem.get(new Configuration());
|
||||
if (fs.exists(path)) {
|
||||
return Optional.<InputSupplier<InputStream>>of(
|
||||
new InputSupplier<InputStream>()
|
||||
{
|
||||
@Override
|
||||
public InputStream getInput() throws IOException
|
||||
{
|
||||
log.info("Reading task log from: %s", path);
|
||||
final long seekPos;
|
||||
if (offset < 0) {
|
||||
final FileStatus stat = fs.getFileStatus(path);
|
||||
seekPos = Math.max(0, stat.getLen() + offset);
|
||||
} else {
|
||||
seekPos = offset;
|
||||
}
|
||||
final FSDataInputStream inputStream = fs.open(path);
|
||||
inputStream.seek(seekPos);
|
||||
log.info("Read task log from: %s (seek = %,d)", path, seekPos);
|
||||
return inputStream;
|
||||
}
|
||||
}
|
||||
);
|
||||
} else {
|
||||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushTaskLog(String taskId, File logFile) throws IOException
|
||||
{
|
||||
final Path path = getTaskLogFileFromId(taskId);
|
||||
log.info("writing task log to: %s", path);
|
||||
Configuration conf = new Configuration();
|
||||
final FileSystem fs = FileSystem.get(conf);
|
||||
FileUtil.copy(logFile, fs, path, false, conf);
|
||||
log.info("wrote task log to: %s", path);
|
||||
}
|
||||
/**
|
||||
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
|
||||
* path names. So we format paths differently for HDFS.
|
||||
*/
|
||||
private Path getTaskLogFileFromId(String taskId)
|
||||
{
|
||||
return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_")));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskId, final long offset) throws IOException
|
||||
{
|
||||
final Path path = getTaskLogFileFromId(taskId);
|
||||
final FileSystem fs = FileSystem.get(new Configuration());
|
||||
if (fs.exists(path)) {
|
||||
return Optional.<InputSupplier<InputStream>>of(
|
||||
new InputSupplier<InputStream>() {
|
||||
@Override
|
||||
public InputStream getInput() throws IOException
|
||||
{
|
||||
log.info("reading task log from: %s", path);
|
||||
final InputStream inputStream = fs.open(path);
|
||||
ByteStreams.skipFully(inputStream, offset);
|
||||
log.info("read task log from: %s", path);
|
||||
return inputStream;
|
||||
}
|
||||
}
|
||||
);
|
||||
} else {
|
||||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
|
||||
* path names. So we format paths differently for HDFS.
|
||||
*/
|
||||
private Path getTaskLogFileFromId(String taskId)
|
||||
{
|
||||
return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_")));
|
||||
}
|
||||
|
||||
// some hadoop version Path.mergePaths does not exist
|
||||
private static String mergePaths(String path1, String path2)
|
||||
{
|
||||
return path1 + (path1.endsWith(Path.SEPARATOR) ? "" : Path.SEPARATOR) + path2;
|
||||
}
|
||||
// some hadoop version Path.mergePaths does not exist
|
||||
private static String mergePaths(String path1, String path2)
|
||||
{
|
||||
return path1 + (path1.endsWith(Path.SEPARATOR) ? "" : Path.SEPARATOR) + path2;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -23,19 +23,23 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import javax.validation.constraints.NotNull;
|
||||
|
||||
/**
|
||||
* Indexer hdfs task logs configuration
|
||||
*
|
||||
* Created by Frank Ren on 6/20/14.
|
||||
* Indexer hdfs task logs configuration.
|
||||
*/
|
||||
public class HdfsTaskLogsConfig
|
||||
{
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
private String directory;
|
||||
|
||||
public String getDirectory()
|
||||
{
|
||||
return directory;
|
||||
}
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
private String directory;
|
||||
|
||||
public HdfsTaskLogsConfig(String directory)
|
||||
{
|
||||
this.directory = directory;
|
||||
}
|
||||
|
||||
public String getDirectory()
|
||||
{
|
||||
return directory;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
package io.druid.indexing.common.tasklogs;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.storage.hdfs.tasklog.HdfsTaskLogs;
|
||||
import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
|
||||
import io.druid.tasklogs.TaskLogs;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Map;
|
||||
|
||||
public class HdfsTaskLogsTest
|
||||
{
|
||||
@Test
|
||||
public void testSimple() throws Exception
|
||||
{
|
||||
final File tmpDir = Files.createTempDir();
|
||||
try {
|
||||
final File logDir = new File(tmpDir, "logs");
|
||||
final File logFile = new File(tmpDir, "log");
|
||||
Files.write("blah", logFile, Charsets.UTF_8);
|
||||
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()));
|
||||
taskLogs.pushTaskLog("foo", logFile);
|
||||
|
||||
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
|
||||
for (Map.Entry<Long, String> entry : expected.entrySet()) {
|
||||
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput());
|
||||
final String string = new String(bytes);
|
||||
Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
|
||||
}
|
||||
}
|
||||
finally {
|
||||
FileUtils.deleteDirectory(tmpDir);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,6 +30,11 @@ public class FileTaskLogsConfig
|
|||
@NotNull
|
||||
private File directory = new File("log");
|
||||
|
||||
public FileTaskLogsConfig(File directory)
|
||||
{
|
||||
this.directory = directory;
|
||||
}
|
||||
|
||||
public File getDirectory()
|
||||
{
|
||||
return directory;
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.indexing.common.tasklogs;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.Files;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -29,7 +28,6 @@ import io.druid.indexing.common.config.FileTaskLogsConfig;
|
|||
import io.druid.tasklogs.TaskLogs;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
|
@ -69,9 +67,7 @@ public class FileTaskLogs implements TaskLogs
|
|||
@Override
|
||||
public InputStream getInput() throws IOException
|
||||
{
|
||||
final InputStream inputStream = new FileInputStream(file);
|
||||
ByteStreams.skipFully(inputStream, offset);
|
||||
return inputStream;
|
||||
return LogUtils.streamFile(file, offset);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
package io.druid.indexing.common.tasklogs;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.channels.Channels;
|
||||
|
||||
public class LogUtils
|
||||
{
|
||||
/**
|
||||
* Open a stream to a file.
|
||||
*
|
||||
* @param offset If zero, stream the entire log. If positive, read from this byte position onwards. If negative,
|
||||
* read this many bytes from the end of the file.
|
||||
*
|
||||
* @return input supplier for this log, if available from this provider
|
||||
*/
|
||||
public static InputStream streamFile(final File file, final long offset) throws IOException
|
||||
{
|
||||
final RandomAccessFile raf = new RandomAccessFile(file, "r");
|
||||
final long rafLength = raf.length();
|
||||
if (offset > 0) {
|
||||
raf.seek(offset);
|
||||
} else if (offset < 0 && offset < rafLength) {
|
||||
raf.seek(Math.max(0, rafLength + offset));
|
||||
}
|
||||
return Channels.newInputStream(raf.getChannel());
|
||||
}
|
||||
}
|
|
@ -44,6 +44,7 @@ import io.druid.guice.annotations.Self;
|
|||
import io.druid.indexing.common.TaskStatus;
|
||||
import io.druid.indexing.common.config.TaskConfig;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.indexing.common.tasklogs.LogUtils;
|
||||
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
|
||||
import io.druid.indexing.worker.config.WorkerConfig;
|
||||
import io.druid.server.DruidNode;
|
||||
|
@ -391,41 +392,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
@Override
|
||||
public InputStream getInput() throws IOException
|
||||
{
|
||||
final RandomAccessFile raf = new RandomAccessFile(processHolder.logFile, "r");
|
||||
final long rafLength = raf.length();
|
||||
if (offset > 0) {
|
||||
raf.seek(offset);
|
||||
} else if (offset < 0 && offset < rafLength) {
|
||||
raf.seek(Math.max(0, rafLength + offset));
|
||||
}
|
||||
return Channels.newInputStream(raf.getChannel());
|
||||
return LogUtils.streamFile(processHolder.logFile, offset);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private int findUnusedPort()
|
||||
{
|
||||
synchronized (tasks) {
|
||||
int port = config.getStartPort();
|
||||
int maxPortSoFar = -1;
|
||||
|
||||
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
|
||||
if (taskWorkItem.processHolder != null) {
|
||||
if (taskWorkItem.processHolder.port > maxPortSoFar) {
|
||||
maxPortSoFar = taskWorkItem.processHolder.port;
|
||||
}
|
||||
|
||||
if (taskWorkItem.processHolder.port == port) {
|
||||
port = maxPortSoFar + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return port;
|
||||
}
|
||||
}
|
||||
|
||||
private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
|
||||
{
|
||||
private volatile boolean shutdown = false;
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
package io.druid.indexing.common.tasklogs;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.indexing.common.config.FileTaskLogsConfig;
|
||||
import io.druid.tasklogs.TaskLogs;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Map;
|
||||
|
||||
public class FileTaskLogsTest
|
||||
{
|
||||
@Test
|
||||
public void testSimple() throws Exception
|
||||
{
|
||||
final File tmpDir = Files.createTempDir();
|
||||
try {
|
||||
final File logDir = new File(tmpDir, "logs");
|
||||
final File logFile = new File(tmpDir, "log");
|
||||
Files.write("blah", logFile, Charsets.UTF_8);
|
||||
final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir));
|
||||
taskLogs.pushTaskLog("foo", logFile);
|
||||
|
||||
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
|
||||
for (Map.Entry<Long, String> entry : expected.entrySet()) {
|
||||
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput());
|
||||
final String string = new String(bytes);
|
||||
Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
|
||||
}
|
||||
}
|
||||
finally {
|
||||
FileUtils.deleteDirectory(tmpDir);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue