Merger: Task logging system

This commit is contained in:
Gian Merlino 2013-04-09 10:33:52 -07:00
parent 33434f2c82
commit f90304d07b
27 changed files with 725 additions and 136 deletions

View File

@ -143,7 +143,7 @@ public class TaskToolbox
@Override
public File getCacheDirectory()
{
return new File(getTaskDir(), "fetched_segments");
return new File(getTaskWorkDir(), "fetched_segments");
}
}
);
@ -156,8 +156,7 @@ public class TaskToolbox
return retVal;
}
public File getTaskDir() {
return new File(config.getBaseTaskDir(), task.getId());
public File getTaskWorkDir() {
return new File(new File(config.getBaseTaskDir(), task.getId()), "work");
}
}

View File

@ -19,7 +19,6 @@
package com.metamx.druid.merger.common.config;
import com.metamx.druid.merger.common.task.Task;
import org.skife.config.Config;
import org.skife.config.Default;

View File

@ -0,0 +1,15 @@
package com.metamx.druid.merger.common.config;
import org.skife.config.Config;
import org.skife.config.DefaultNull;
public abstract class TaskLogConfig
{
@Config("druid.merger.logs.s3bucket")
@DefaultNull
public abstract String getLogStorageBucket();
@Config("druid.merger.logs.s3prefix")
@DefaultNull
public abstract String getLogStoragePrefix();
}

View File

@ -92,7 +92,7 @@ public class DeleteTask extends AbstractTask
.shardSpec(new NoneShardSpec())
.build();
final File outDir = new File(toolbox.getTaskDir(), segment.getIdentifier());
final File outDir = new File(toolbox.getTaskWorkDir(), segment.getIdentifier());
final File fileToUpload = IndexMerger.merge(Lists.newArrayList(emptyAdapter), new AggregatorFactory[0], outDir);
// Upload the segment

View File

@ -109,7 +109,7 @@ public class IndexGeneratorTask extends AbstractTask
// Set up temporary directory for indexing
final File tmpDir = new File(
toolbox.getTaskDir(),
toolbox.getTaskWorkDir(),
String.format(
"%s_%s_%s_%s_%s",
this.getDataSource(),

View File

@ -122,7 +122,7 @@ public abstract class MergeTaskBase extends AbstractTask
final ServiceEmitter emitter = toolbox.getEmitter();
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);
final File taskDir = toolbox.getTaskDir();
final File taskDir = toolbox.getTaskWorkDir();
try {

View File

@ -144,7 +144,7 @@ public class RealtimeIndexTask extends AbstractTask
// TODO -- the ServerView, which seems kind of odd?)
final RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
windowPeriod,
new File(toolbox.getTaskDir(), "persist"),
new File(toolbox.getTaskWorkDir(), "persist"),
segmentGranularity
);

View File

@ -0,0 +1,26 @@
package com.metamx.druid.merger.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.InputSupplier;
import com.metamx.common.logger.Logger;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
public class NoopTaskLogs implements TaskLogs
{
private final Logger log = new Logger(TaskLogs.class);
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(String taskid, long offset) throws IOException
{
return Optional.absent();
}
@Override
public void pushTaskLog(String taskid, File logFile) throws IOException
{
log.info("Not pushing logs for task: %s", taskid);
}
}

View File

@ -0,0 +1,110 @@
package com.metamx.druid.merger.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.io.InputSupplier;
import com.metamx.common.logger.Logger;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageService;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.StorageObject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
/**
* Provides task logs archived on S3.
*/
public class S3TaskLogs implements TaskLogs
{
private static final Logger log = new Logger(S3TaskLogs.class);
private final String bucket;
private final String prefix;
private final StorageService service;
public S3TaskLogs(String bucket, String prefix, RestS3Service service)
{
this.bucket = Preconditions.checkNotNull(bucket, "bucket");
this.prefix = Preconditions.checkNotNull(prefix, "prefix");
this.service = Preconditions.checkNotNull(service, "service");
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskid, final long offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid);
try {
final StorageObject objectDetails = service.getObjectDetails(bucket, taskKey, null, null, null, null);
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
{
@Override
public InputStream getInput() throws IOException
{
try {
final long start;
final long end = objectDetails.getContentLength() - 1;
if (offset > 0 && offset < objectDetails.getContentLength()) {
start = offset;
} else if (offset < 0 && (-1 * offset) < objectDetails.getContentLength()) {
start = objectDetails.getContentLength() + offset;
} else {
start = 0;
}
return service.getObject(
bucket,
taskKey,
null,
null,
new String[]{objectDetails.getETag()},
null,
start,
end
).getDataInputStream();
}
catch (ServiceException e) {
throw new IOException(e);
}
}
}
);
}
catch (ServiceException e) {
if (e.getErrorCode() != null && (e.getErrorCode().equals("NoSuchKey") || e.getErrorCode()
.equals("NoSuchBucket"))) {
return Optional.absent();
} else {
throw new IOException(String.format("Failed to stream logs from: %s", taskKey), e);
}
}
}
public void pushTaskLog(String taskid, File logFile) throws IOException
{
final String taskKey = getTaskLogKey(taskid);
try {
log.info("Pushing task log %s to: %s", logFile, taskKey);
final StorageObject object = new StorageObject(logFile);
object.setKey(taskKey);
service.putObject(bucket, object);
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, IOException.class);
throw Throwables.propagate(e);
}
}
private String getTaskLogKey(String taskid)
{
return String.format("%s/%s/log", prefix, taskid);
}
}

View File

@ -0,0 +1,35 @@
package com.metamx.druid.merger.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.io.InputSupplier;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
/**
* Provides task logs based on a series of underlying task log providers.
*/
public class SwitchingTaskLogProvider implements TaskLogProvider
{
private final List<TaskLogProvider> providers;
public SwitchingTaskLogProvider(List<TaskLogProvider> providers)
{
this.providers = ImmutableList.copyOf(providers);
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(String taskid, long offset) throws IOException
{
for (TaskLogProvider provider : providers) {
final Optional<InputSupplier<InputStream>> stream = provider.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
return stream;
}
}
return Optional.absent();
}
}

View File

@ -0,0 +1,23 @@
package com.metamx.druid.merger.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.InputSupplier;
import java.io.IOException;
import java.io.InputStream;
/**
* Something that knows how to stream logs for tasks.
*/
public interface TaskLogProvider
{
/**
* Stream log for a task.
*
* @param offset If zero, stream the entire log. If positive, attempt to read from this position onwards. If
* negative, attempt to read this many bytes from the end of the file (like <tt>tail -n</tt>).
*
* @return input supplier for this log, if available from this provider
*/
public Optional<InputSupplier<InputStream>> streamTaskLog(String taskid, long offset) throws IOException;
}

View File

@ -0,0 +1,12 @@
package com.metamx.druid.merger.common.tasklogs;
import java.io.File;
import java.io.IOException;
/**
* Something that knows how to persist local task logs to some form of long-term storage.
*/
public interface TaskLogPusher
{
public void pushTaskLog(String taskid, File logFile) throws IOException;
}

View File

@ -0,0 +1,5 @@
package com.metamx.druid.merger.common.tasklogs;
public interface TaskLogs extends TaskLogProvider, TaskLogPusher
{
}

View File

@ -199,7 +199,7 @@ public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker
public TaskStatus call()
{
final long startTime = System.currentTimeMillis();
final File taskDir = toolbox.getTaskDir();
final File taskDir = toolbox.getTaskWorkDir();
TaskStatus status;

View File

@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.ListenableFuture;
@ -39,12 +40,13 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.common.tasklogs.TaskLogPusher;
import com.metamx.druid.merger.coordinator.config.ForkingTaskRunnerConfig;
import com.metamx.druid.merger.worker.executor.ExecutorMain;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
@ -54,6 +56,7 @@ import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@ -66,19 +69,22 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class);
private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
private final Object lock = new Object();
private final Object processLock = new Object();
private final ForkingTaskRunnerConfig config;
private final TaskLogPusher taskLogPusher;
private final ListeningExecutorService exec;
private final ObjectMapper jsonMapper;
private final List<ProcessHolder> processes = Lists.newArrayList();
public ForkingTaskRunner(
ForkingTaskRunnerConfig config,
TaskLogPusher taskLogPusher,
ExecutorService exec,
ObjectMapper jsonMapper
)
{
this.config = config;
this.taskLogPusher = taskLogPusher;
this.exec = MoreExecutors.listeningDecorator(exec);
this.jsonMapper = jsonMapper;
}
@ -92,25 +98,29 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
@Override
public TaskStatus call()
{
// TODO Keep around for some amount of time?
// TODO Directory per attempt? token? uuid?
final File tempDir = Files.createTempDir();
final String attemptUUID = UUID.randomUUID().toString();
final File taskDir = new File(config.getBaseTaskDir(), task.getId());
final File attemptDir = new File(taskDir, attemptUUID);
ProcessHolder processHolder = null;
try {
final File taskFile = new File(tempDir, "task.json");
final File statusFile = new File(tempDir, "status.json");
final File logFile = new File(tempDir, "log");
if (!attemptDir.mkdirs()) {
throw new IOException(String.format("Could not create directories: %s", attemptDir));
}
// locked so we can choose childHost/childPort based on processes.size
// and make sure we don't double up on ProcessHolders for a task
synchronized (lock) {
final File taskFile = new File(attemptDir, "task.json");
final File statusFile = new File(attemptDir, "status.json");
final File logFile = new File(attemptDir, "log");
// locked so we can safely assign port = findUnusedPort
synchronized (processLock) {
if (getProcessHolder(task.getId()).isPresent()) {
throw new ISE("Task already running: %s", task.getId());
}
final List<String> command = Lists.newArrayList();
final int childPort = config.getStartPort() + processes.size();
final int childPort = findUnusedPort();
final String childHost = String.format(config.getHostPattern(), childPort);
Iterables.addAll(
@ -144,34 +154,50 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
// TODO configurable
command.add(ExecutorMain.class.getName());
command.add(config.getMainClass());
command.add(taskFile.toString());
command.add(statusFile.toString());
Files.write(jsonMapper.writeValueAsBytes(task), taskFile);
jsonMapper.writeValue(taskFile, task);
log.info("Running command: %s", Joiner.on(" ").join(command));
processHolder = new ProcessHolder(
task,
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile
logFile,
childPort
);
processes.add(processHolder);
}
log.info("Logging task %s output to: %s", task.getId(), logFile);
final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput();
final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput();
final InputStream fromProc = processHolder.process.getInputStream();
ByteStreams.copy(fromProc, toLogfile);
fromProc.close();
toLogfile.close();
boolean copyFailed = false;
try {
ByteStreams.copy(fromProc, toLogfile);
} catch (Exception e) {
log.warn(e, "Failed to read from process for task: %s", task.getId());
copyFailed = true;
} finally {
Closeables.closeQuietly(fromProc);
Closeables.closeQuietly(toLogfile);
}
final int statusCode = processHolder.process.waitFor();
if (statusCode == 0) {
log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
// Upload task logs
// TODO: For very long-lived tasks, upload periodically? Truncated?
// TODO: Store task logs for each attempt separately?
taskLogPusher.pushTaskLog(task.getId(), logFile);
if (!copyFailed && statusCode == 0) {
// Process exited successfully
return jsonMapper.readValue(statusFile, TaskStatus.class);
} else {
@ -187,21 +213,18 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
throw Throwables.propagate(e);
}
finally {
if (processHolder != null) {
synchronized (lock) {
processes.remove(processHolder);
try {
if (processHolder != null) {
synchronized (processLock) {
processes.remove(processHolder);
}
}
}
if (tempDir.exists()) {
log.info("Removing temporary directory: %s", tempDir);
// TODO may want to keep this around a bit longer
// try {
// FileUtils.deleteDirectory(tempDir);
// }
// catch (IOException e) {
// log.error(e, "Failed to delete temporary directory");
// }
log.info("Removing temporary directory: %s", attemptDir);
FileUtils.deleteDirectory(attemptDir);
}
catch (Exception e) {
log.error(e, "Failed to delete temporary directory");
}
}
}
@ -212,8 +235,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
@LifecycleStop
public void stop()
{
synchronized (lock) {
exec.shutdownNow();
synchronized (processLock) {
exec.shutdown();
for (ProcessHolder processHolder : processes) {
log.info("Destroying process: %s", processHolder.process);
@ -226,12 +249,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
public void shutdown(final String taskid)
{
final Optional<ProcessHolder> processHolder = getProcessHolder(taskid);
if(processHolder.isPresent()) {
if (processHolder.isPresent()) {
final int shutdowns = processHolder.get().shutdowns.getAndIncrement();
if (shutdowns == 0) {
log.info("Attempting to gracefully shutdown task: %s", taskid);
try {
// TODO this is the WORST
// This is gross, but it may still be nicer than talking to the forked JVM via HTTP.
final OutputStream out = processHolder.get().process.getOutputStream();
out.write(
jsonMapper.writeValueAsBytes(
@ -243,10 +266,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
);
out.write('\n');
out.flush();
} catch (IOException e) {
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
// Will trigger normal failure mechanisms due to process exit
log.info("Killing process for task: %s", taskid);
processHolder.get().process.destroy();
}
@ -272,7 +297,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
}
@Override
public Optional<InputSupplier<InputStream>> getLogs(final String taskid, final long offset)
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskid, final long offset)
{
final Optional<ProcessHolder> processHolder = getProcessHolder(taskid);
@ -299,9 +324,29 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
}
}
private int findUnusedPort()
{
synchronized (processLock) {
int port = config.getStartPort();
int maxPortSoFar = -1;
for (ProcessHolder processHolder : processes) {
if (processHolder.port > maxPortSoFar) {
maxPortSoFar = processHolder.port;
}
if (processHolder.port == port) {
port = maxPortSoFar + 1;
}
}
return port;
}
}
private Optional<ProcessHolder> getProcessHolder(final String taskid)
{
synchronized (lock) {
synchronized (processLock) {
return Iterables.tryFind(
processes, new Predicate<ProcessHolder>()
{
@ -320,14 +365,16 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
private final Task task;
private final Process process;
private final File logFile;
private final int port;
private AtomicInteger shutdowns = new AtomicInteger(0);
private ProcessHolder(Task task, Process process, File logFile)
private ProcessHolder(Task task, Process process, File logFile, int port)
{
this.task = task;
this.process = process;
this.logFile = logFile;
this.port = port;
}
}
}

View File

@ -22,11 +22,14 @@ package com.metamx.druid.merger.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.io.InputSupplier;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@ -37,12 +40,14 @@ import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.ToStringResponseHandler;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
@ -53,6 +58,8 @@ import org.apache.zookeeper.CreateMode;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
@ -62,6 +69,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -80,13 +88,13 @@ import java.util.concurrent.atomic.AtomicReference;
* <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will automatically retry any tasks
* that were associated with the node.
*
* <p/>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/
public class RemoteTaskRunner implements TaskRunner
public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
{
private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
private static final ToStringResponseHandler responseHandler = new ToStringResponseHandler(Charsets.UTF_8);
private static final ToStringResponseHandler STRING_RESPONSE_HANDLER = new ToStringResponseHandler(Charsets.UTF_8);
private static final Joiner JOINER = Joiner.on("/");
private final ObjectMapper jsonMapper;
@ -242,6 +250,7 @@ public class RemoteTaskRunner implements TaskRunner
/**
* Finds the worker running the task and forwards the shutdown signal to the worker.
*
* @param taskId
*/
@Override
@ -250,23 +259,18 @@ public class RemoteTaskRunner implements TaskRunner
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
if (zkWorker == null) {
// TODO Ability to shut down pending tasks
log.info("Can't shutdown! No worker running task %s", taskId);
return;
}
final RetryPolicy shutdownRetryPolicy = retryPolicyFactory.makeRetryPolicy();
URL url;
try {
url = new URL(String.format("http://%s/mmx/v1/worker/task/%s/shutdown", zkWorker.getWorker().getHost(), taskId));
}
catch (MalformedURLException e) {
throw Throwables.propagate(e);
}
final URL url = workerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId));
while (!shutdownRetryPolicy.hasExceededRetryThreshold()) {
try {
final String response = httpClient.post(url)
.go(responseHandler)
.go(STRING_RESPONSE_HANDLER)
.get();
log.info("Sent shutdown message to worker: %s, response: %s", zkWorker.getWorker().getHost(), response);
@ -291,6 +295,54 @@ public class RemoteTaskRunner implements TaskRunner
}
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskId, final long offset)
{
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
if (zkWorker == null) {
// Worker is not running this task, it might be available in deep storage
return Optional.absent();
} else {
// Worker is still running this task
final URL url = workerURL(zkWorker.getWorker(), String.format("/task/%s/log?offset=%d", taskId, offset));
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
{
@Override
public InputStream getInput() throws IOException
{
try {
return httpClient.get(url)
.go(new InputStreamResponseHandler())
.get();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (ExecutionException e) {
// Unwrap if possible
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw Throwables.propagate(e);
}
}
}
);
}
}
private URL workerURL(Worker worker, String path)
{
Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path);
try {
return new URL(String.format("http://%s/mmx/worker/v1%s", worker.getHost(), path));
}
catch (MalformedURLException e) {
throw Throwables.propagate(e);
}
}
/**
* Adds a task to the pending queue
* @param taskRunnerWorkItem

View File

@ -1,13 +0,0 @@
package com.metamx.druid.merger.coordinator;
// TODO move to common or worker?
import com.google.common.base.Optional;
import com.google.common.io.InputSupplier;
import java.io.InputStream;
public interface TaskLogProvider
{
public Optional<InputSupplier<InputStream>> getLogs(String taskid, long offset);
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
@ -104,10 +105,9 @@ public class TaskMasterLifecycle
leaderLifecycle.addManagedInstance(taskConsumer);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
leading = true;
try {
leaderLifecycle.start();
leading = true;
while (leading && !Thread.currentThread().isInterrupted()) {
mayBeStopped.await();
@ -213,23 +213,39 @@ public class TaskMasterLifecycle
}
}
public TaskRunner getTaskRunner()
public Optional<TaskRunner> getTaskRunner()
{
return taskRunner;
if (leading) {
return Optional.of(taskRunner);
} else {
return Optional.absent();
}
}
public TaskQueue getTaskQueue()
public Optional<TaskQueue> getTaskQueue()
{
return taskQueue;
if (leading) {
return Optional.of(taskQueue);
} else {
return Optional.absent();
}
}
public TaskActionClient getTaskActionClient(Task task)
public Optional<TaskActionClient> getTaskActionClient(Task task)
{
return taskActionClientFactory.create(task);
if (leading) {
return Optional.of(taskActionClientFactory.create(task));
} else {
return Optional.absent();
}
}
public ResourceManagementScheduler getResourceManagementScheduler()
public Optional<ResourceManagementScheduler> getResourceManagementScheduler()
{
return resourceManagementScheduler;
if (leading) {
return Optional.of(resourceManagementScheduler);
} else {
return Optional.absent();
}
}
}

View File

@ -80,6 +80,10 @@ public class TaskQueue
*/
public void bootstrap()
{
// TODO: Periodically fixup the database to refer to what we think is happening so bootstraps don't resurrect
// TODO: bogus stuff caused by leader races or whatevs. Also so that bogus stuff is detect by clients in a
// TODO: timely manner.
giant.lock();
try {

View File

@ -1,10 +1,16 @@
package com.metamx.druid.merger.coordinator.config;
import com.metamx.druid.merger.worker.executor.ExecutorMain;
import org.skife.config.Config;
import org.skife.config.Default;
import java.io.File;
public abstract class ForkingTaskRunnerConfig
{
@Config("druid.merger.taskDir")
public abstract File getBaseTaskDir();
@Config("druid.indexer.fork.java")
@Default("java")
public abstract String getJavaCommand();
@ -18,6 +24,12 @@ public abstract class ForkingTaskRunnerConfig
return System.getProperty("java.class.path");
}
@Config("druid.indexer.fork.main")
public String getMainClass()
{
return ExecutorMain.class.getName();
}
@Config("druid.indexer.fork.hostpattern")
public abstract String getHostPattern();

View File

@ -21,11 +21,10 @@ package com.metamx.druid.merger.coordinator.config;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.metamx.druid.merger.common.task.Task;
import org.skife.config.Config;
import org.skife.config.Default;
import org.skife.config.DefaultNull;
import java.io.File;
import java.util.Set;
/**

View File

@ -23,10 +23,12 @@ import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.ec2.AmazonEC2Client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
@ -59,7 +61,13 @@ import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.actions.TaskActionToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskLogConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.common.tasklogs.NoopTaskLogs;
import com.metamx.druid.merger.common.tasklogs.S3TaskLogs;
import com.metamx.druid.merger.common.tasklogs.SwitchingTaskLogProvider;
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
import com.metamx.druid.merger.common.tasklogs.TaskLogs;
import com.metamx.druid.merger.coordinator.DbTaskStorage;
import com.metamx.druid.merger.coordinator.ForkingTaskRunner;
import com.metamx.druid.merger.coordinator.HeapMemoryTaskStorage;
@ -101,6 +109,9 @@ import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
@ -110,6 +121,8 @@ import org.mortbay.resource.ResourceCollection;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.List;
import java.util.Properties;
@ -134,6 +147,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
private final Properties props;
private final ConfigurationObjectFactory configFactory;
private RestS3Service s3Service = null;
private List<Monitor> monitors = null;
private ServiceEmitter emitter = null;
private DbConnectorConfig dbConnectorConfig = null;
@ -150,6 +164,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
private HttpClient httpClient = null;
private TaskActionClientFactory taskActionClientFactory = null;
private TaskMasterLifecycle taskMasterLifecycle = null;
private TaskLogs persistentTaskLogs = null;
private TaskLogProvider taskLogProvider = null;
private Server server = null;
private boolean initialized = false;
@ -181,6 +197,12 @@ public class IndexerCoordinatorNode extends RegisteringNode
return this;
}
public IndexerCoordinatorNode setS3Service(RestS3Service s3Service)
{
this.s3Service = s3Service;
return this;
}
public IndexerCoordinatorNode setTaskQueue(TaskQueue taskQueue)
{
this.taskQueue = taskQueue;
@ -255,6 +277,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
initializeTaskRunnerFactory(configManager);
initializeResourceManagement(configManager);
initializeTaskMasterLifecycle();
initializePersistentTaskLogs();
initializeTaskLogProvider();
initializeServer();
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
@ -273,6 +297,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
emitter,
taskMasterLifecycle,
new TaskStorageQueryAdapter(taskStorage),
taskLogProvider,
configManager
)
);
@ -365,6 +390,52 @@ public class IndexerCoordinatorNode extends RegisteringNode
}
}
private void initializePersistentTaskLogs() throws S3ServiceException
{
if (persistentTaskLogs == null) {
final TaskLogConfig taskLogConfig = configFactory.build(TaskLogConfig.class);
if (taskLogConfig.getLogStorageBucket() != null) {
initializeS3Service();
persistentTaskLogs = new S3TaskLogs(
taskLogConfig.getLogStorageBucket(),
taskLogConfig.getLogStoragePrefix(),
s3Service
);
} else {
persistentTaskLogs = new NoopTaskLogs();
}
}
}
private void initializeTaskLogProvider()
{
if (taskLogProvider == null) {
final List<TaskLogProvider> providers = Lists.newArrayList();
// Use our TaskRunner if it is also a TaskLogProvider
providers.add(
new TaskLogProvider()
{
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(String taskid, long offset) throws IOException
{
final TaskRunner runner = taskMasterLifecycle.getTaskRunner().orNull();
if (runner instanceof TaskLogProvider) {
return ((TaskLogProvider) runner).streamTaskLog(taskid, offset);
} else {
return Optional.absent();
}
}
}
);
// Use our persistent log storage
providers.add(persistentTaskLogs);
taskLogProvider = new SwitchingTaskLogProvider(providers);
}
}
@LifecycleStart
public synchronized void start() throws Exception
{
@ -438,6 +509,18 @@ public class IndexerCoordinatorNode extends RegisteringNode
EmittingLogger.registerEmitter(emitter);
}
private void initializeS3Service() throws S3ServiceException
{
if(s3Service == null) {
s3Service = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
}
}
private void initializeMonitors()
{
if (monitors == null) {
@ -545,7 +628,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
.build()
);
RemoteTaskRunner remoteTaskRunner = new RemoteTaskRunner(
return new RemoteTaskRunner(
getJsonMapper(),
configFactory.build(RemoteTaskRunnerConfig.class),
curatorFramework,
@ -560,8 +643,6 @@ public class IndexerCoordinatorNode extends RegisteringNode
configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class),
httpClient
);
return remoteTaskRunner;
}
};
@ -574,6 +655,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
final ExecutorService runnerExec = Executors.newFixedThreadPool(config.getNumLocalThreads());
return new ForkingTaskRunner(
configFactory.build(ForkingTaskRunnerConfig.class),
persistentTaskLogs,
runnerExec,
getJsonMapper()
);

View File

@ -21,30 +21,40 @@ package com.metamx.druid.merger.coordinator.http;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.merger.common.actions.TaskActionHolder;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.emitter.service.ServiceEmitter;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@ -60,6 +70,7 @@ public class IndexerCoordinatorResource
private final ServiceEmitter emitter;
private final TaskMasterLifecycle taskMasterLifecycle;
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskLogProvider taskLogProvider;
private final JacksonConfigManager configManager;
private final ObjectMapper jsonMapper;
@ -71,6 +82,7 @@ public class IndexerCoordinatorResource
ServiceEmitter emitter,
TaskMasterLifecycle taskMasterLifecycle,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogProvider taskLogProvider,
JacksonConfigManager configManager,
ObjectMapper jsonMapper
) throws Exception
@ -79,6 +91,7 @@ public class IndexerCoordinatorResource
this.emitter = emitter;
this.taskMasterLifecycle = taskMasterLifecycle;
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskLogProvider = taskLogProvider;
this.configManager = configManager;
this.jsonMapper = jsonMapper;
}
@ -108,8 +121,18 @@ public class IndexerCoordinatorResource
@Produces("application/json")
public Response taskPost(final Task task)
{
taskMasterLifecycle.getTaskQueue().add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
return asLeaderWith(
taskMasterLifecycle.getTaskQueue(),
new Function<TaskQueue, Response>()
{
@Override
public Response apply(TaskQueue taskQueue)
{
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
}
);
}
@GET
@ -137,16 +160,20 @@ public class IndexerCoordinatorResource
@POST
@Path("/task/{taskid}/shutdown")
@Produces("application/json")
public Response doShutdown(@PathParam("taskid") String taskid)
public Response doShutdown(@PathParam("taskid") final String taskid)
{
try {
taskMasterLifecycle.getTaskRunner().shutdown(taskid);
}
catch (Exception e) {
return Response.serverError().build();
}
return Response.ok(ImmutableMap.of("task", taskid)).build();
return asLeaderWith(
taskMasterLifecycle.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
public Response apply(TaskRunner taskRunner)
{
taskRunner.shutdown(taskid);
return Response.ok(ImmutableMap.of("task", taskid)).build();
}
}
);
}
// Legacy endpoint
@ -203,20 +230,30 @@ public class IndexerCoordinatorResource
@Produces("application/json")
public <T> Response doAction(final TaskActionHolder<T> holder)
{
final Map<String, Object> retMap;
return asLeaderWith(
taskMasterLifecycle.getTaskActionClient(holder.getTask()),
new Function<TaskActionClient, Response>()
{
@Override
public Response apply(TaskActionClient taskActionClient)
{
final Map<String, Object> retMap;
// TODO make sure this worker is supposed to be running this task (attempt id? token?)
// TODO make sure this worker is supposed to be running this task (attempt id? token?)
try {
final T ret = taskMasterLifecycle.getTaskActionClient(holder.getTask())
.submit(holder.getAction());
retMap = Maps.newHashMap();
retMap.put("result", ret);
} catch(IOException e) {
return Response.serverError().build();
}
try {
final T ret = taskActionClient.submit(holder.getAction());
retMap = Maps.newHashMap();
retMap.put("result", ret);
}
catch (IOException e) {
return Response.serverError().build();
}
return Response.ok().entity(retMap).build();
return Response.ok().entity(retMap).build();
}
}
);
}
@GET
@ -224,10 +261,17 @@ public class IndexerCoordinatorResource
@Produces("application/json")
public Response getPendingTasks()
{
if (taskMasterLifecycle.getTaskRunner() == null) {
return Response.noContent().build();
}
return Response.ok(taskMasterLifecycle.getTaskRunner().getPendingTasks()).build();
return asLeaderWith(
taskMasterLifecycle.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
public Response apply(TaskRunner taskRunner)
{
return Response.ok(taskRunner.getPendingTasks()).build();
}
}
);
}
@GET
@ -235,10 +279,17 @@ public class IndexerCoordinatorResource
@Produces("application/json")
public Response getRunningTasks()
{
if (taskMasterLifecycle.getTaskRunner() == null) {
return Response.noContent().build();
}
return Response.ok(taskMasterLifecycle.getTaskRunner().getRunningTasks()).build();
return asLeaderWith(
taskMasterLifecycle.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
public Response apply(TaskRunner taskRunner)
{
return Response.ok(taskRunner.getRunningTasks()).build();
}
}
);
}
@GET
@ -246,10 +297,17 @@ public class IndexerCoordinatorResource
@Produces("application/json")
public Response getWorkers()
{
if (taskMasterLifecycle.getTaskRunner() == null) {
return Response.noContent().build();
}
return Response.ok(taskMasterLifecycle.getTaskRunner().getWorkers()).build();
return asLeaderWith(
taskMasterLifecycle.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
public Response apply(TaskRunner taskRunner)
{
return Response.ok(taskRunner.getWorkers()).build();
}
}
);
}
@GET
@ -257,9 +315,47 @@ public class IndexerCoordinatorResource
@Produces("application/json")
public Response getScalingState()
{
if (taskMasterLifecycle.getResourceManagementScheduler() == null) {
return Response.noContent().build();
return asLeaderWith(
taskMasterLifecycle.getResourceManagementScheduler(),
new Function<ResourceManagementScheduler, Response>()
{
@Override
public Response apply(ResourceManagementScheduler resourceManagementScheduler)
{
return Response.ok(resourceManagementScheduler.getStats()).build();
}
}
);
}
@GET
@Path("/task/{taskid}/log")
@Produces("text/plain")
public Response doGetLog(
@PathParam("taskid") final String taskid,
@QueryParam("offset") @DefaultValue("0") final long offset
)
{
try {
final Optional<InputSupplier<InputStream>> stream = taskLogProvider.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
return Response.ok(stream.get().getInput()).build();
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
} catch (Exception e) {
log.warn(e, "Failed to stream log for task %s", taskid);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
}
public <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f)
{
if (x.isPresent()) {
return f.apply(x.get());
} else {
// Encourage client to try again soon, when we'll likely have a redirect set up
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
}
return Response.ok(taskMasterLifecycle.getResourceManagementScheduler().getStats()).build();
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
@ -41,6 +42,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
private final ServiceEmitter emitter;
private final TaskMasterLifecycle taskMasterLifecycle;
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskLogProvider taskLogProvider;
private final JacksonConfigManager configManager;
public IndexerCoordinatorServletModule(
@ -49,6 +51,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
ServiceEmitter emitter,
TaskMasterLifecycle taskMasterLifecycle,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogProvider taskLogProvider,
JacksonConfigManager configManager
)
{
@ -57,6 +60,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
this.emitter = emitter;
this.taskMasterLifecycle = taskMasterLifecycle;
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskLogProvider = taskLogProvider;
this.configManager = configManager;
}
@ -69,6 +73,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
bind(ServiceEmitter.class).toInstance(emitter);
bind(TaskMasterLifecycle.class).toInstance(taskMasterLifecycle);
bind(TaskStorageQueryAdapter.class).toInstance(taskStorageQueryAdapter);
bind(TaskLogProvider.class).toInstance(taskLogProvider);
bind(JacksonConfigManager.class).toInstance(configManager);
serve("/*").with(GuiceContainer.class);

View File

@ -40,8 +40,11 @@ import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.config.TaskLogConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.common.tasklogs.NoopTaskLogs;
import com.metamx.druid.merger.common.tasklogs.S3TaskLogs;
import com.metamx.druid.merger.common.tasklogs.TaskLogs;
import com.metamx.druid.merger.coordinator.ForkingTaskRunner;
import com.metamx.druid.merger.coordinator.config.ForkingTaskRunnerConfig;
import com.metamx.druid.merger.worker.Worker;
@ -64,8 +67,12 @@ import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceProvider;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
@ -92,16 +99,17 @@ public class WorkerNode extends RegisteringNode
private final ObjectMapper jsonMapper;
private final ConfigurationObjectFactory configFactory;
private RestS3Service s3Service = null;
private List<Monitor> monitors = null;
private HttpClient httpClient = null;
private ServiceEmitter emitter = null;
private TaskConfig taskConfig = null;
private WorkerConfig workerConfig = null;
private CuratorFramework curatorFramework = null;
private ServiceDiscovery serviceDiscovery = null;
private ServiceProvider coordinatorServiceProvider = null;
private WorkerCuratorCoordinator workerCuratorCoordinator = null;
private WorkerTaskMonitor workerTaskMonitor = null;
private TaskLogs persistentTaskLogs = null;
private ForkingTaskRunner forkingTaskRunner = null;
private Server server = null;
@ -181,6 +189,7 @@ public class WorkerNode extends RegisteringNode
initializeCoordinatorServiceProvider();
initializeJacksonSubtypes();
initializeCuratorCoordinator();
initializePersistentTaskLogs();
initializeTaskRunner();
initializeWorkerTaskMonitor();
initializeServer();
@ -205,6 +214,7 @@ public class WorkerNode extends RegisteringNode
final Context root = new Context(server, "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/mmx/worker/v1/*", 0);
}
@ -287,6 +297,18 @@ public class WorkerNode extends RegisteringNode
EmittingLogger.registerEmitter(emitter);
}
private void initializeS3Service() throws S3ServiceException
{
if(s3Service == null) {
s3Service = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
}
}
private void initializeMonitors()
{
if (monitors == null) {
@ -298,10 +320,6 @@ public class WorkerNode extends RegisteringNode
private void initializeMergerConfig()
{
if (taskConfig == null) {
taskConfig = configFactory.build(TaskConfig.class);
}
if (workerConfig == null) {
workerConfig = configFactory.build(WorkerConfig.class);
}
@ -354,11 +372,29 @@ public class WorkerNode extends RegisteringNode
}
}
private void initializePersistentTaskLogs() throws S3ServiceException
{
if (persistentTaskLogs == null) {
final TaskLogConfig taskLogConfig = configFactory.build(TaskLogConfig.class);
if (taskLogConfig.getLogStorageBucket() != null) {
initializeS3Service();
persistentTaskLogs = new S3TaskLogs(
taskLogConfig.getLogStorageBucket(),
taskLogConfig.getLogStoragePrefix(),
s3Service
);
} else {
persistentTaskLogs = new NoopTaskLogs();
}
}
}
public void initializeTaskRunner()
{
if (forkingTaskRunner == null) {
forkingTaskRunner = new ForkingTaskRunner(
configFactory.build(ForkingTaskRunnerConfig.class),
persistentTaskLogs,
Executors.newFixedThreadPool(workerConfig.getCapacity()),
getJsonMapper()
);

View File

@ -20,16 +20,24 @@
package com.metamx.druid.merger.worker.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.coordinator.ForkingTaskRunner;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
/**
*/
@ -65,4 +73,26 @@ public class WorkerResource
}
return Response.ok(ImmutableMap.of("task", taskid)).build();
}
@GET
@Path("/task/{taskid}/log")
@Produces("text/plain")
public Response doGetLog(
@PathParam("taskid") String taskid,
@QueryParam("offset") @DefaultValue("0") long offset
)
{
final Optional<InputSupplier<InputStream>> stream = taskRunner.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
try {
return Response.ok(stream.get().getInput()).build();
} catch (Exception e) {
log.warn(e, "Failed to read log for task: %s", taskid);
return Response.serverError().build();
}
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
}
}

View File

@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.metamx.druid.merger.coordinator.ForkingTaskRunner;
import com.metamx.druid.merger.coordinator.http.IndexerCoordinatorResource;
import com.metamx.emitter.service.ServiceEmitter;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;