This commit is contained in:
Charles Allen 2014-11-06 11:21:36 -08:00
parent 8b1edfd464
commit d52530e0de
10 changed files with 42 additions and 28 deletions

View File

@ -19,6 +19,7 @@
package io.druid.storage.hdfs.tasklog;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
@ -61,16 +62,16 @@ public class HdfsTaskLogs implements TaskLogs
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskId, final long offset) throws IOException
public Optional<ByteSource> streamTaskLog(final String taskId, final long offset) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
final FileSystem fs = FileSystem.get(new Configuration());
if (fs.exists(path)) {
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
return Optional.<ByteSource>of(
new ByteSource()
{
@Override
public InputStream getInput() throws IOException
public InputStream openStream() throws IOException
{
log.info("Reading task log from: %s", path);
final long seekPos;

View File

@ -20,6 +20,7 @@
package io.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
@ -57,15 +58,15 @@ public class FileTaskLogs implements TaskLogs
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskid, final long offset) throws IOException
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException
{
final File file = fileForTask(taskid);
if (file.exists()) {
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
return Optional.<ByteSource>of(
new ByteSource()
{
@Override
public InputStream getInput() throws IOException
public InputStream openStream() throws IOException
{
return LogUtils.streamFile(file, offset);
}

View File

@ -1,5 +1,7 @@
package io.druid.indexing.common.tasklogs;
import com.google.common.io.ByteSource;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;

View File

@ -21,6 +21,7 @@ package io.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import io.druid.tasklogs.TaskLogStreamer;
@ -43,10 +44,10 @@ public class SwitchingTaskLogStreamer implements TaskLogStreamer
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(String taskid, long offset) throws IOException
public Optional<ByteSource> streamTaskLog(String taskid, long offset) throws IOException
{
for (TaskLogStreamer provider : providers) {
final Optional<InputSupplier<InputStream>> stream = provider.streamTaskLog(taskid, offset);
final Optional<ByteSource> stream = provider.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
return stream;
}

View File

@ -20,6 +20,7 @@
package io.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import io.druid.indexing.overlord.TaskMaster;
@ -43,7 +44,7 @@ public class TaskRunnerTaskLogStreamer implements TaskLogStreamer
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(String taskid, long offset) throws IOException
public Optional<ByteSource> streamTaskLog(String taskid, long offset) throws IOException
{
final TaskRunner runner = taskMaster.getTaskRunner().orNull();
if (runner instanceof TaskLogStreamer) {

View File

@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import com.google.common.io.Files;
@ -373,7 +374,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskid, final long offset)
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset)
{
final ProcessHolder processHolder;
@ -386,11 +387,11 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
}
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
return Optional.<ByteSource>of(
new ByteSource()
{
@Override
public InputStream getInput() throws IOException
public InputStream openStream() throws IOException
{
return LogUtils.streamFile(processHolder.logFile, offset);
}

View File

@ -31,6 +31,8 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@ -374,7 +376,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskId, final long offset)
public Optional<ByteSource> streamTaskLog(final String taskId, final long offset)
{
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
@ -384,11 +386,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} else {
// Worker is still running this task
final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/log?offset=%d", taskId, offset));
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
return Optional.<ByteSource>of(
new ByteSource()
{
@Override
public InputStream getInput() throws IOException
public InputStream openStream() throws IOException
{
try {
return httpClient.get(url)

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteSource;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
@ -394,9 +395,11 @@ public class OverlordResource
)
{
try {
final Optional<InputSupplier<InputStream>> stream = taskLogStreamer.streamTaskLog(taskid, offset);
final Optional<ByteSource> stream = taskLogStreamer.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
return Response.ok(stream.get().getInput()).build();
try(InputStream istream = stream.get().openStream()) {
return Response.ok(istream).build();
}
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(

View File

@ -24,6 +24,7 @@ import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
@ -163,11 +164,11 @@ public class WorkerResource
@QueryParam("offset") @DefaultValue("0") long offset
)
{
final Optional<InputSupplier<InputStream>> stream = taskRunner.streamTaskLog(taskid, offset);
final Optional<ByteSource> stream = taskRunner.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
try {
return Response.ok(stream.get().getInput()).build();
try (InputStream logStream = stream.get().openStream()) {
return Response.ok(logStream).build();
}
catch (Exception e) {
log.warn(e, "Failed to read log for task: %s", taskid);

View File

@ -21,6 +21,7 @@ package io.druid.storage.s3;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
@ -53,18 +54,18 @@ public class S3TaskLogs implements TaskLogs
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskid, final long offset) throws IOException
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid);
try {
final StorageObject objectDetails = service.getObjectDetails(config.getS3Bucket(), taskKey, null, null, null, null);
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
return Optional.<ByteSource>of(
new ByteSource()
{
@Override
public InputStream getInput() throws IOException
public InputStream openStream() throws IOException
{
try {
final long start;