diff --git a/server/src/main/java/io/druid/guice/TaskLogsModule.java b/indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java similarity index 79% rename from server/src/main/java/io/druid/guice/TaskLogsModule.java rename to indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java index eedd12caabc..6d9b0e038b1 100644 --- a/server/src/main/java/io/druid/guice/TaskLogsModule.java +++ b/indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java @@ -23,13 +23,15 @@ import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.multibindings.MapBinder; +import io.druid.indexing.common.config.FileTaskLogsConfig; +import io.druid.indexing.common.tasklogs.FileTaskLogs; import io.druid.tasklogs.NoopTaskLogs; import io.druid.tasklogs.TaskLogPusher; import io.druid.tasklogs.TaskLogs; /** */ -public class TaskLogsModule implements Module +public class IndexingServiceTaskLogsModule implements Module { @Override public void configure(Binder binder) @@ -38,7 +40,10 @@ public class TaskLogsModule implements Module final MapBinder taskLogBinder = Binders.taskLogsBinder(binder); taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class); + taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class); binder.bind(NoopTaskLogs.class).in(LazySingleton.class); + binder.bind(FileTaskLogs.class).in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.indexer.logs", FileTaskLogsConfig.class); binder.bind(TaskLogPusher.class).to(TaskLogs.class); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java new file mode 100644 index 00000000000..a07bade39cf --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java @@ -0,0 +1,18 @@ +package io.druid.indexing.common.config; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.NotNull; +import java.io.File; + +public class FileTaskLogsConfig +{ + @JsonProperty + @NotNull + private File directory = new File("log"); + + public File getDirectory() + { + return directory; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java new file mode 100644 index 00000000000..771f918463b --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java @@ -0,0 +1,68 @@ +package io.druid.indexing.common.tasklogs; + +import com.google.common.base.Optional; +import com.google.common.io.ByteStreams; +import com.google.common.io.Files; +import com.google.common.io.InputSupplier; +import com.google.inject.Inject; +import com.metamx.common.logger.Logger; +import io.druid.indexing.common.config.FileTaskLogsConfig; +import io.druid.tasklogs.TaskLogs; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; + +public class FileTaskLogs implements TaskLogs +{ + private static final Logger log = new Logger(FileTaskLogs.class); + + private final FileTaskLogsConfig config; + + @Inject + public FileTaskLogs( + FileTaskLogsConfig config + ) + { + this.config = config; + } + + @Override + public void pushTaskLog(final String taskid, File file) throws IOException + { + if (!config.getDirectory().exists()) { + config.getDirectory().mkdir(); + } + final File outputFile = fileForTask(taskid); + Files.copy(file, outputFile); + log.info("Wrote task log to: %s", outputFile); + } + + @Override + public Optional> streamTaskLog(final String taskid, final long offset) throws IOException + { + final File file = fileForTask(taskid); + if (file.exists()) { + return Optional.>of( + new InputSupplier() + { + @Override + public InputStream getInput() throws IOException + { + final InputStream inputStream = new FileInputStream(file); + ByteStreams.skipFully(inputStream, offset); + return inputStream; + } + } + ); + } else { + return Optional.absent(); + } + } + + private File fileForTask(final String taskid) + { + return new File(config.getDirectory(), String.format("%s.log", taskid)); + } +} diff --git a/server/src/main/java/io/druid/initialization/Initialization.java b/server/src/main/java/io/druid/initialization/Initialization.java index 408cc2c1d0d..bb30718a33f 100644 --- a/server/src/main/java/io/druid/initialization/Initialization.java +++ b/server/src/main/java/io/druid/initialization/Initialization.java @@ -52,7 +52,6 @@ import io.druid.guice.QueryableModule; import io.druid.guice.ServerModule; import io.druid.guice.ServerViewModule; import io.druid.guice.StorageNodeModule; -import io.druid.guice.TaskLogsModule; import io.druid.guice.annotations.Client; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; @@ -299,7 +298,6 @@ public class Initialization new JacksonConfigManagerModule(), new IndexingServiceDiscoveryModule(), new DataSegmentPusherPullerModule(), - new TaskLogsModule(), new FirehoseModule() ); diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index 95319e7ee55..d0867b42aa6 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -28,6 +28,7 @@ import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.IndexingServiceModuleHelper; +import io.druid.guice.IndexingServiceTaskLogsModule; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; @@ -103,7 +104,8 @@ public class CliMiddleManager extends ServerRunnable ); } }, - new IndexingServiceFirehoseModule() + new IndexingServiceFirehoseModule(), + new IndexingServiceTaskLogsModule() ); } } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index ebee2eeea21..3e1313c51ba 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -32,6 +32,7 @@ import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.IndexingServiceModuleHelper; +import io.druid.guice.IndexingServiceTaskLogsModule; import io.druid.guice.JacksonConfigProvider; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; @@ -211,7 +212,8 @@ public class CliOverlord extends ServerRunnable JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); } }, - new IndexingServiceFirehoseModule() + new IndexingServiceFirehoseModule(), + new IndexingServiceTaskLogsModule() ); }