mirror of https://github.com/apache/druid.git
Introduce FileTaskLogs, and move TaskLogs module from server to indexing-service
This commit is contained in:
parent
47c1c8cab2
commit
f3cfd1d781
|
@ -23,13 +23,15 @@ import com.google.inject.Binder;
|
|||
import com.google.inject.Key;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.multibindings.MapBinder;
|
||||
import io.druid.indexing.common.config.FileTaskLogsConfig;
|
||||
import io.druid.indexing.common.tasklogs.FileTaskLogs;
|
||||
import io.druid.tasklogs.NoopTaskLogs;
|
||||
import io.druid.tasklogs.TaskLogPusher;
|
||||
import io.druid.tasklogs.TaskLogs;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TaskLogsModule implements Module
|
||||
public class IndexingServiceTaskLogsModule implements Module
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
|
@ -38,7 +40,10 @@ public class TaskLogsModule implements Module
|
|||
|
||||
final MapBinder<String, TaskLogs> taskLogBinder = Binders.taskLogsBinder(binder);
|
||||
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
|
||||
taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class);
|
||||
binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
|
||||
binder.bind(FileTaskLogs.class).in(LazySingleton.class);
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.logs", FileTaskLogsConfig.class);
|
||||
|
||||
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package io.druid.indexing.common.config;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.io.File;
|
||||
|
||||
public class FileTaskLogsConfig
|
||||
{
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
private File directory = new File("log");
|
||||
|
||||
public File getDirectory()
|
||||
{
|
||||
return directory;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
package io.druid.indexing.common.tasklogs;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.Files;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.indexing.common.config.FileTaskLogsConfig;
|
||||
import io.druid.tasklogs.TaskLogs;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public class FileTaskLogs implements TaskLogs
|
||||
{
|
||||
private static final Logger log = new Logger(FileTaskLogs.class);
|
||||
|
||||
private final FileTaskLogsConfig config;
|
||||
|
||||
@Inject
|
||||
public FileTaskLogs(
|
||||
FileTaskLogsConfig config
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushTaskLog(final String taskid, File file) throws IOException
|
||||
{
|
||||
if (!config.getDirectory().exists()) {
|
||||
config.getDirectory().mkdir();
|
||||
}
|
||||
final File outputFile = fileForTask(taskid);
|
||||
Files.copy(file, outputFile);
|
||||
log.info("Wrote task log to: %s", outputFile);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskid, final long offset) throws IOException
|
||||
{
|
||||
final File file = fileForTask(taskid);
|
||||
if (file.exists()) {
|
||||
return Optional.<InputSupplier<InputStream>>of(
|
||||
new InputSupplier<InputStream>()
|
||||
{
|
||||
@Override
|
||||
public InputStream getInput() throws IOException
|
||||
{
|
||||
final InputStream inputStream = new FileInputStream(file);
|
||||
ByteStreams.skipFully(inputStream, offset);
|
||||
return inputStream;
|
||||
}
|
||||
}
|
||||
);
|
||||
} else {
|
||||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
|
||||
private File fileForTask(final String taskid)
|
||||
{
|
||||
return new File(config.getDirectory(), String.format("%s.log", taskid));
|
||||
}
|
||||
}
|
|
@ -52,7 +52,6 @@ import io.druid.guice.QueryableModule;
|
|||
import io.druid.guice.ServerModule;
|
||||
import io.druid.guice.ServerViewModule;
|
||||
import io.druid.guice.StorageNodeModule;
|
||||
import io.druid.guice.TaskLogsModule;
|
||||
import io.druid.guice.annotations.Client;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
|
@ -299,7 +298,6 @@ public class Initialization
|
|||
new JacksonConfigManagerModule(),
|
||||
new IndexingServiceDiscoveryModule(),
|
||||
new DataSegmentPusherPullerModule(),
|
||||
new TaskLogsModule(),
|
||||
new FirehoseModule()
|
||||
);
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import com.metamx.common.logger.Logger;
|
|||
import io.airlift.command.Command;
|
||||
import io.druid.guice.IndexingServiceFirehoseModule;
|
||||
import io.druid.guice.IndexingServiceModuleHelper;
|
||||
import io.druid.guice.IndexingServiceTaskLogsModule;
|
||||
import io.druid.guice.Jerseys;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LazySingleton;
|
||||
|
@ -103,7 +104,8 @@ public class CliMiddleManager extends ServerRunnable
|
|||
);
|
||||
}
|
||||
},
|
||||
new IndexingServiceFirehoseModule()
|
||||
new IndexingServiceFirehoseModule(),
|
||||
new IndexingServiceTaskLogsModule()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import com.metamx.common.logger.Logger;
|
|||
import io.airlift.command.Command;
|
||||
import io.druid.guice.IndexingServiceFirehoseModule;
|
||||
import io.druid.guice.IndexingServiceModuleHelper;
|
||||
import io.druid.guice.IndexingServiceTaskLogsModule;
|
||||
import io.druid.guice.JacksonConfigProvider;
|
||||
import io.druid.guice.Jerseys;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
|
@ -211,7 +212,8 @@ public class CliOverlord extends ServerRunnable
|
|||
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
|
||||
}
|
||||
},
|
||||
new IndexingServiceFirehoseModule()
|
||||
new IndexingServiceFirehoseModule(),
|
||||
new IndexingServiceTaskLogsModule()
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue