mirror of https://github.com/apache/druid.git
Ability to run tasks in separate JVMs, and cancel tasks.
- ForkingTaskRunner spawns new JVMs for each task - Added ExecutorMain, ExecutorNode to be the spawned JVM - LocalTaskRunner renamed ExecutorServiceTaskRunner - Add shutdown method to Task and TaskRunner
This commit is contained in:
parent
d1eb61d451
commit
ec566ee37a
|
@ -101,6 +101,12 @@ public abstract class AbstractTask implements Task
|
|||
return TaskStatus.running(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown()
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -119,4 +119,10 @@ public interface Task
|
|||
* @throws Exception
|
||||
*/
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception;
|
||||
|
||||
/**
|
||||
* Best-effort task cancellation. May or may not do anything. Calling this multiple times may have
|
||||
* a stronger effect.
|
||||
*/
|
||||
public void shutdown();
|
||||
}
|
||||
|
|
|
@ -21,22 +21,30 @@ package com.metamx.druid.merger.coordinator;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.Query;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.TaskToolbox;
|
||||
import com.metamx.druid.merger.common.TaskToolboxFactory;
|
||||
import com.metamx.druid.merger.common.task.Task;
|
||||
import com.metamx.druid.query.NoopQueryRunner;
|
||||
import com.metamx.druid.query.QueryRunner;
|
||||
import com.metamx.druid.query.segment.QuerySegmentWalker;
|
||||
import com.metamx.druid.query.segment.SegmentDescriptor;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
@ -46,16 +54,15 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|||
/**
|
||||
* Runs tasks in a JVM thread using an ExecutorService.
|
||||
*/
|
||||
public class LocalTaskRunner implements TaskRunner
|
||||
public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker
|
||||
{
|
||||
private final TaskToolboxFactory toolboxFactory;
|
||||
private final ListeningExecutorService exec;
|
||||
|
||||
private final Set<TaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<TaskRunnerWorkItem>();
|
||||
|
||||
private static final Logger log = new Logger(LocalTaskRunner.class);
|
||||
private static final EmittingLogger log = new EmittingLogger(ExecutorServiceTaskRunner.class);
|
||||
|
||||
public LocalTaskRunner(
|
||||
public ExecutorServiceTaskRunner(
|
||||
TaskToolboxFactory toolboxFactory,
|
||||
ExecutorService exec
|
||||
)
|
||||
|
@ -67,6 +74,7 @@ public class LocalTaskRunner implements TaskRunner
|
|||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
// TODO is this right
|
||||
exec.shutdownNow();
|
||||
}
|
||||
|
||||
|
@ -74,7 +82,17 @@ public class LocalTaskRunner implements TaskRunner
|
|||
public ListenableFuture<TaskStatus> run(final Task task)
|
||||
{
|
||||
final TaskToolbox toolbox = toolboxFactory.build(task);
|
||||
return exec.submit(new LocalTaskRunnerCallable(task, toolbox));
|
||||
return exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(final String taskid)
|
||||
{
|
||||
for (final TaskRunnerWorkItem runningItem : runningItems) {
|
||||
if (runningItem.getTask().getId().equals(taskid)) {
|
||||
runningItem.getTask().shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -97,8 +115,8 @@ public class LocalTaskRunner implements TaskRunner
|
|||
@Override
|
||||
public TaskRunnerWorkItem apply(Runnable input)
|
||||
{
|
||||
if (input instanceof LocalTaskRunnerCallable) {
|
||||
return ((LocalTaskRunnerCallable) input).getTaskRunnerWorkItem();
|
||||
if (input instanceof ExecutorServiceTaskRunnerCallable) {
|
||||
return ((ExecutorServiceTaskRunnerCallable) input).getTaskRunnerWorkItem();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -116,14 +134,60 @@ public class LocalTaskRunner implements TaskRunner
|
|||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
private static class LocalTaskRunnerCallable implements Callable<TaskStatus>
|
||||
@Override
|
||||
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
|
||||
{
|
||||
return getQueryRunnerImpl(query);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
|
||||
{
|
||||
return getQueryRunnerImpl(query);
|
||||
}
|
||||
|
||||
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
|
||||
{
|
||||
QueryRunner<T> queryRunner = null;
|
||||
|
||||
final List<Task> runningTasks = Lists.transform(
|
||||
ImmutableList.copyOf(getRunningTasks()), new Function<TaskRunnerWorkItem, Task>()
|
||||
{
|
||||
@Override
|
||||
public Task apply(TaskRunnerWorkItem o)
|
||||
{
|
||||
return o.getTask();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
for (final Task task : runningTasks) {
|
||||
if (task.getDataSource().equals(query.getDataSource())) {
|
||||
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
|
||||
|
||||
if (taskQueryRunner != null) {
|
||||
if (queryRunner == null) {
|
||||
queryRunner = taskQueryRunner;
|
||||
} else {
|
||||
log.makeAlert("Found too many query runners for datasource")
|
||||
.addData("dataSource", query.getDataSource())
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner;
|
||||
}
|
||||
|
||||
private static class ExecutorServiceTaskRunnerCallable implements Callable<TaskStatus>
|
||||
{
|
||||
private final Task task;
|
||||
private final TaskToolbox toolbox;
|
||||
|
||||
private final DateTime createdTime;
|
||||
|
||||
public LocalTaskRunnerCallable(Task task, TaskToolbox toolbox)
|
||||
public ExecutorServiceTaskRunnerCallable(Task task, TaskToolbox toolbox)
|
||||
{
|
||||
this.task = task;
|
||||
this.toolbox = toolbox;
|
||||
|
@ -135,6 +199,7 @@ public class LocalTaskRunner implements TaskRunner
|
|||
public TaskStatus call()
|
||||
{
|
||||
final long startTime = System.currentTimeMillis();
|
||||
final File taskDir = toolbox.getTaskDir();
|
||||
|
||||
TaskStatus status;
|
||||
|
||||
|
@ -156,20 +221,22 @@ public class LocalTaskRunner implements TaskRunner
|
|||
}
|
||||
|
||||
try {
|
||||
final File taskDir = toolbox.getTaskDir();
|
||||
|
||||
if (taskDir.exists()) {
|
||||
log.info("Removing task directory: %s", taskDir);
|
||||
FileUtils.deleteDirectory(taskDir);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Failed to delete task directory: %s", task.getId());
|
||||
log.makeAlert(e, "Failed to delete task directory")
|
||||
.addData("taskDir", taskDir.toString())
|
||||
.addData("task", task.getId())
|
||||
.emit();
|
||||
}
|
||||
|
||||
try {
|
||||
return status.withDuration(System.currentTimeMillis() - startTime);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Uncaught Exception during callback for task[%s]", task);
|
||||
throw Throwables.propagate(e);
|
||||
}
|
|
@ -0,0 +1,293 @@
|
|||
package com.metamx.druid.merger.coordinator;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.CharMatcher;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.Files;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.actions.TaskActionClient;
|
||||
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
|
||||
import com.metamx.druid.merger.common.task.Task;
|
||||
import com.metamx.druid.merger.coordinator.config.ForkingTaskRunnerConfig;
|
||||
import com.metamx.druid.merger.worker.executor.ExecutorMain;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.channels.Channels;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Runs tasks in separate processes using {@link ExecutorMain}.
|
||||
*/
|
||||
public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class);
|
||||
private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
|
||||
|
||||
private final Object lock = new Object();
|
||||
private final ForkingTaskRunnerConfig config;
|
||||
private final ListeningExecutorService exec;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final List<ProcessHolder> processes = Lists.newArrayList();
|
||||
|
||||
public ForkingTaskRunner(
|
||||
ForkingTaskRunnerConfig config,
|
||||
ExecutorService exec,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.exec = MoreExecutors.listeningDecorator(exec);
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<TaskStatus> run(final Task task)
|
||||
{
|
||||
return exec.submit(
|
||||
new Callable<TaskStatus>()
|
||||
{
|
||||
@Override
|
||||
public TaskStatus call()
|
||||
{
|
||||
// TODO Keep around for some amount of time?
|
||||
// TODO Directory per attempt? token? uuid?
|
||||
final File tempDir = Files.createTempDir();
|
||||
ProcessHolder processHolder = null;
|
||||
|
||||
try {
|
||||
final File taskFile = new File(tempDir, "task.json");
|
||||
final File statusFile = new File(tempDir, "status.json");
|
||||
final File logFile = new File(tempDir, "log");
|
||||
|
||||
// locked so we can choose childHost/childPort based on processes.size
|
||||
// and make sure we don't double up on ProcessHolders for a task
|
||||
synchronized (lock) {
|
||||
if (getProcessHolder(task.getId()).isPresent()) {
|
||||
throw new ISE("Task already running: %s", task.getId());
|
||||
}
|
||||
|
||||
final List<String> command = Lists.newArrayList();
|
||||
final int childPort = config.getStartPort() + processes.size();
|
||||
final String childHost = String.format(config.getHostPattern(), childPort);
|
||||
|
||||
Iterables.addAll(
|
||||
command,
|
||||
ImmutableList.of(
|
||||
config.getJavaCommand(),
|
||||
"-cp",
|
||||
config.getJavaClasspath()
|
||||
)
|
||||
);
|
||||
|
||||
Iterables.addAll(
|
||||
command,
|
||||
Splitter.on(CharMatcher.WHITESPACE)
|
||||
.omitEmptyStrings()
|
||||
.split(config.getJavaOptions())
|
||||
);
|
||||
|
||||
for (String propName : System.getProperties().stringPropertyNames()) {
|
||||
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
|
||||
command.add(
|
||||
String.format(
|
||||
"-D%s=%s",
|
||||
propName.substring(CHILD_PROPERTY_PREFIX.length()),
|
||||
System.getProperty(propName)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
command.add(String.format("-Ddruid.host=%s", childHost));
|
||||
command.add(String.format("-Ddruid.port=%d", childPort));
|
||||
|
||||
// TODO configurable
|
||||
command.add(ExecutorMain.class.getName());
|
||||
command.add(taskFile.toString());
|
||||
command.add(statusFile.toString());
|
||||
|
||||
Files.write(jsonMapper.writeValueAsBytes(task), taskFile);
|
||||
|
||||
log.info("Running command: %s", Joiner.on(" ").join(command));
|
||||
processHolder = new ProcessHolder(
|
||||
task,
|
||||
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
|
||||
logFile
|
||||
);
|
||||
|
||||
processes.add(processHolder);
|
||||
}
|
||||
|
||||
log.info("Logging task %s output to: %s", task.getId(), logFile);
|
||||
final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput();
|
||||
|
||||
final InputStream fromProc = processHolder.process.getInputStream();
|
||||
ByteStreams.copy(fromProc, toLogfile);
|
||||
fromProc.close();
|
||||
toLogfile.close();
|
||||
|
||||
final int statusCode = processHolder.process.waitFor();
|
||||
|
||||
if (statusCode == 0) {
|
||||
// Process exited successfully
|
||||
return jsonMapper.readValue(statusFile, TaskStatus.class);
|
||||
} else {
|
||||
// Process exited unsuccessfully
|
||||
return TaskStatus.failure(task.getId());
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
log.info(e, "Interrupted while waiting for process!");
|
||||
return TaskStatus.failure(task.getId());
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
finally {
|
||||
if (processHolder != null) {
|
||||
synchronized (lock) {
|
||||
processes.remove(processHolder);
|
||||
}
|
||||
}
|
||||
|
||||
if (tempDir.exists()) {
|
||||
log.info("Removing temporary directory: %s", tempDir);
|
||||
// TODO may want to keep this around a bit longer
|
||||
// try {
|
||||
// FileUtils.deleteDirectory(tempDir);
|
||||
// }
|
||||
// catch (IOException e) {
|
||||
// log.error(e, "Failed to delete temporary directory");
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
synchronized (lock) {
|
||||
exec.shutdownNow();
|
||||
|
||||
for (ProcessHolder processHolder : processes) {
|
||||
log.info("Destroying process: %s", processHolder.process);
|
||||
processHolder.process.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(final String taskid)
|
||||
{
|
||||
// TODO shutdown harder after more shutdowns
|
||||
final Optional<ProcessHolder> processHolder = getProcessHolder(taskid);
|
||||
if(processHolder.isPresent()) {
|
||||
processHolder.get().shutdowns.incrementAndGet();
|
||||
processHolder.get().process.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<TaskRunnerWorkItem> getRunningTasks()
|
||||
{
|
||||
return ImmutableList.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<TaskRunnerWorkItem> getPendingTasks()
|
||||
{
|
||||
return ImmutableList.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<ZkWorker> getWorkers()
|
||||
{
|
||||
return ImmutableList.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<InputSupplier<InputStream>> getLogs(final String taskid, final long offset)
|
||||
{
|
||||
final Optional<ProcessHolder> processHolder = getProcessHolder(taskid);
|
||||
|
||||
if (processHolder.isPresent()) {
|
||||
return Optional.<InputSupplier<InputStream>>of(
|
||||
new InputSupplier<InputStream>()
|
||||
{
|
||||
@Override
|
||||
public InputStream getInput() throws IOException
|
||||
{
|
||||
final RandomAccessFile raf = new RandomAccessFile(processHolder.get().logFile, "r");
|
||||
final long rafLength = raf.length();
|
||||
if (offset > 0) {
|
||||
raf.seek(offset);
|
||||
} else if (offset < 0 && offset < rafLength) {
|
||||
raf.seek(rafLength + offset);
|
||||
}
|
||||
return Channels.newInputStream(raf.getChannel());
|
||||
}
|
||||
}
|
||||
);
|
||||
} else {
|
||||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<ProcessHolder> getProcessHolder(final String taskid)
|
||||
{
|
||||
synchronized (lock) {
|
||||
return Iterables.tryFind(
|
||||
processes, new Predicate<ProcessHolder>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(ProcessHolder processHolder)
|
||||
{
|
||||
return processHolder.task.getId().equals(taskid);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private static class ProcessHolder
|
||||
{
|
||||
private final Task task;
|
||||
private final Process process;
|
||||
private final File logFile;
|
||||
|
||||
private AtomicInteger shutdowns = new AtomicInteger(0);
|
||||
|
||||
private ProcessHolder(Task task, Process process, File logFile)
|
||||
{
|
||||
this.task = task;
|
||||
this.process = process;
|
||||
this.logFile = logFile;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -227,6 +227,12 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
return taskRunnerWorkItem.getResult();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(String taskid)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem)
|
||||
{
|
||||
log.info("Added pending task %s", taskRunnerWorkItem.getTask().getId());
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
package com.metamx.druid.merger.coordinator;
|
||||
|
||||
// TODO move to common or worker?
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.io.InputSupplier;
|
||||
|
||||
import java.io.InputStream;
|
||||
|
||||
public interface TaskLogProvider
|
||||
{
|
||||
public Optional<InputSupplier<InputStream>> getLogs(String taskid, long offset);
|
||||
}
|
|
@ -39,6 +39,12 @@ public interface TaskRunner
|
|||
*/
|
||||
public ListenableFuture<TaskStatus> run(Task task);
|
||||
|
||||
/**
|
||||
* Best-effort task cancellation. May or may not do anything. Calling this multiple times may have
|
||||
* a stronger effect.
|
||||
*/
|
||||
public void shutdown(String taskid);
|
||||
|
||||
public Collection<TaskRunnerWorkItem> getRunningTasks();
|
||||
|
||||
public Collection<TaskRunnerWorkItem> getPendingTasks();
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
package com.metamx.druid.merger.coordinator.config;
|
||||
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.Default;
|
||||
|
||||
public abstract class ForkingTaskRunnerConfig
|
||||
{
|
||||
@Config("druid.indexer.fork.java")
|
||||
@Default("java")
|
||||
public abstract String getJavaCommand();
|
||||
|
||||
@Config("druid.indexer.fork.opts")
|
||||
@Default("")
|
||||
public abstract String getJavaOptions();
|
||||
|
||||
@Config("druid.indexer.fork.classpath")
|
||||
public String getJavaClasspath() {
|
||||
return System.getProperty("java.class.path");
|
||||
}
|
||||
|
||||
@Config("druid.indexer.fork.hostpattern")
|
||||
public abstract String getHostPattern();
|
||||
|
||||
@Config("druid.indexer.fork.startport")
|
||||
public abstract int getStartPort();
|
||||
}
|
|
@ -21,11 +21,10 @@ package com.metamx.druid.merger.coordinator.http;
|
|||
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.services.ec2.AmazonEC2Client;
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
@ -40,11 +39,7 @@ import com.metamx.common.lifecycle.Lifecycle;
|
|||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.BaseServerNode;
|
||||
import com.metamx.druid.client.ClientConfig;
|
||||
import com.metamx.druid.client.ClientInventoryManager;
|
||||
import com.metamx.druid.client.MutableServerView;
|
||||
import com.metamx.druid.client.OnlyNewSegmentWatcherServerView;
|
||||
import com.metamx.druid.RegisteringNode;
|
||||
import com.metamx.druid.config.ConfigManager;
|
||||
import com.metamx.druid.config.ConfigManagerConfig;
|
||||
import com.metamx.druid.config.JacksonConfigManager;
|
||||
|
@ -56,23 +51,18 @@ import com.metamx.druid.http.RedirectInfo;
|
|||
import com.metamx.druid.http.StatusServlet;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerConfig;
|
||||
import com.metamx.druid.initialization.ServerInit;
|
||||
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.loading.DataSegmentKiller;
|
||||
import com.metamx.druid.loading.DataSegmentPusher;
|
||||
import com.metamx.druid.loading.S3DataSegmentKiller;
|
||||
import com.metamx.druid.merger.common.RetryPolicyFactory;
|
||||
import com.metamx.druid.merger.common.TaskToolboxFactory;
|
||||
import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory;
|
||||
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
|
||||
import com.metamx.druid.merger.common.actions.TaskActionToolbox;
|
||||
import com.metamx.druid.merger.common.config.IndexerZkConfig;
|
||||
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
|
||||
import com.metamx.druid.merger.common.config.TaskConfig;
|
||||
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
|
||||
import com.metamx.druid.merger.coordinator.DbTaskStorage;
|
||||
import com.metamx.druid.merger.coordinator.ForkingTaskRunner;
|
||||
import com.metamx.druid.merger.coordinator.HeapMemoryTaskStorage;
|
||||
import com.metamx.druid.merger.coordinator.LocalTaskRunner;
|
||||
import com.metamx.druid.merger.coordinator.MergerDBCoordinator;
|
||||
import com.metamx.druid.merger.coordinator.RemoteTaskRunner;
|
||||
import com.metamx.druid.merger.coordinator.TaskLockbox;
|
||||
|
@ -83,6 +73,7 @@ import com.metamx.druid.merger.coordinator.TaskRunnerFactory;
|
|||
import com.metamx.druid.merger.coordinator.TaskStorage;
|
||||
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
|
||||
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
|
||||
import com.metamx.druid.merger.coordinator.config.ForkingTaskRunnerConfig;
|
||||
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
|
||||
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
|
||||
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
|
||||
|
@ -95,9 +86,6 @@ import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerFa
|
|||
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy;
|
||||
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig;
|
||||
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
|
||||
import com.metamx.druid.realtime.SegmentAnnouncer;
|
||||
import com.metamx.druid.realtime.ZkSegmentAnnouncer;
|
||||
import com.metamx.druid.realtime.ZkSegmentAnnouncerConfig;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.core.Emitters;
|
||||
|
@ -113,9 +101,6 @@ import com.metamx.metrics.MonitorSchedulerConfig;
|
|||
import com.metamx.metrics.SysMonitor;
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
|
||||
import org.jets3t.service.S3ServiceException;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
import org.jets3t.service.security.AWSCredentials;
|
||||
import org.mortbay.jetty.Server;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.DefaultServlet;
|
||||
|
@ -135,7 +120,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNode>
|
||||
public class IndexerCoordinatorNode extends RegisteringNode
|
||||
{
|
||||
private static final Logger log = new Logger(IndexerCoordinatorNode.class);
|
||||
|
||||
|
@ -145,6 +130,7 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
}
|
||||
|
||||
private final Lifecycle lifecycle;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final Properties props;
|
||||
private final ConfigurationObjectFactory configFactory;
|
||||
|
||||
|
@ -152,11 +138,7 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
private ServiceEmitter emitter = null;
|
||||
private DbConnectorConfig dbConnectorConfig = null;
|
||||
private DBI dbi = null;
|
||||
private RestS3Service s3Service = null;
|
||||
private IndexerCoordinatorConfig config = null;
|
||||
private TaskConfig taskConfig = null;
|
||||
private DataSegmentPusher segmentPusher = null;
|
||||
private TaskToolboxFactory taskToolboxFactory = null;
|
||||
private MergerDBCoordinator mergerDBCoordinator = null;
|
||||
private TaskStorage taskStorage = null;
|
||||
private TaskQueue taskQueue = null;
|
||||
|
@ -166,8 +148,8 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
private IndexerZkConfig indexerZkConfig;
|
||||
private TaskRunnerFactory taskRunnerFactory = null;
|
||||
private ResourceManagementSchedulerFactory resourceManagementSchedulerFactory = null;
|
||||
private TaskActionClientFactory taskActionClientFactory = null;
|
||||
private TaskMasterLifecycle taskMasterLifecycle = null;
|
||||
private MutableServerView newSegmentServerView = null;
|
||||
private Server server = null;
|
||||
|
||||
private boolean initialized = false;
|
||||
|
@ -176,14 +158,14 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
Properties props,
|
||||
Lifecycle lifecycle,
|
||||
ObjectMapper jsonMapper,
|
||||
ObjectMapper smileMapper,
|
||||
ConfigurationObjectFactory configFactory
|
||||
)
|
||||
{
|
||||
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
super(ImmutableList.of(jsonMapper));
|
||||
|
||||
this.lifecycle = lifecycle;
|
||||
this.props = props;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.configFactory = configFactory;
|
||||
}
|
||||
|
||||
|
@ -205,30 +187,12 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
return this;
|
||||
}
|
||||
|
||||
public IndexerCoordinatorNode setNewSegmentServerView(MutableServerView newSegmentServerView)
|
||||
{
|
||||
this.newSegmentServerView = newSegmentServerView;
|
||||
return this;
|
||||
}
|
||||
|
||||
public IndexerCoordinatorNode setS3Service(RestS3Service s3Service)
|
||||
{
|
||||
this.s3Service = s3Service;
|
||||
return this;
|
||||
}
|
||||
|
||||
public IndexerCoordinatorNode setTaskLockbox(TaskLockbox taskLockbox)
|
||||
{
|
||||
this.taskLockbox = taskLockbox;
|
||||
return this;
|
||||
}
|
||||
|
||||
public IndexerCoordinatorNode setSegmentPusher(DataSegmentPusher segmentPusher)
|
||||
{
|
||||
this.segmentPusher = segmentPusher;
|
||||
return this;
|
||||
}
|
||||
|
||||
public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator)
|
||||
{
|
||||
this.mergerDBCoordinator = mergeDbCoordinator;
|
||||
|
@ -273,19 +237,14 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
initializeEmitter();
|
||||
initializeMonitors();
|
||||
initializeIndexerCoordinatorConfig();
|
||||
initializeTaskConfig();
|
||||
initializeS3Service();
|
||||
initializeMergeDBCoordinator();
|
||||
initializeNewSegmentServerView();
|
||||
initializeTaskStorage();
|
||||
initializeTaskLockbox();
|
||||
initializeTaskQueue();
|
||||
initializeDataSegmentPusher();
|
||||
initializeTaskToolbox();
|
||||
initializeJacksonInjections();
|
||||
initializeJacksonSubtypes();
|
||||
initializeCurator();
|
||||
initializeIndexerZkConfig();
|
||||
initializeTaskActionClientFactory();
|
||||
initializeTaskRunnerFactory(configManager);
|
||||
initializeResourceManagement(configManager);
|
||||
initializeTaskMasterLifecycle();
|
||||
|
@ -364,16 +323,28 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
initialized = true;
|
||||
}
|
||||
|
||||
private ObjectMapper getJsonMapper()
|
||||
{
|
||||
return jsonMapper;
|
||||
}
|
||||
|
||||
private void initializeTaskActionClientFactory()
|
||||
{
|
||||
if (taskActionClientFactory == null) {
|
||||
taskActionClientFactory = new LocalTaskActionClientFactory(
|
||||
taskStorage,
|
||||
new TaskActionToolbox(taskQueue, taskLockbox, mergerDBCoordinator, emitter)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeTaskMasterLifecycle()
|
||||
{
|
||||
if (taskMasterLifecycle == null) {
|
||||
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
|
||||
taskMasterLifecycle = new TaskMasterLifecycle(
|
||||
taskQueue,
|
||||
new LocalTaskActionClientFactory(
|
||||
taskStorage,
|
||||
new TaskActionToolbox(taskQueue, taskLockbox, mergerDBCoordinator, emitter)
|
||||
),
|
||||
taskActionClientFactory,
|
||||
config,
|
||||
serviceDiscoveryConfig,
|
||||
taskRunnerFactory,
|
||||
|
@ -389,7 +360,7 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
public synchronized void start() throws Exception
|
||||
{
|
||||
if (!initialized) {
|
||||
init();
|
||||
doInit();
|
||||
}
|
||||
|
||||
lifecycle.start();
|
||||
|
@ -432,16 +403,6 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
}
|
||||
}
|
||||
|
||||
private void initializeJacksonInjections()
|
||||
{
|
||||
InjectableValues.Std injectables = new InjectableValues.Std();
|
||||
|
||||
injectables.addValue("s3Client", s3Service)
|
||||
.addValue("segmentPusher", segmentPusher);
|
||||
|
||||
getJsonMapper().setInjectableValues(injectables);
|
||||
}
|
||||
|
||||
private void initializeJacksonSubtypes()
|
||||
{
|
||||
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
|
||||
|
@ -489,71 +450,6 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
}
|
||||
}
|
||||
|
||||
private void initializeTaskConfig()
|
||||
{
|
||||
if (taskConfig == null) {
|
||||
taskConfig = configFactory.build(TaskConfig.class);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeNewSegmentServerView()
|
||||
{
|
||||
if (newSegmentServerView == null) {
|
||||
final MutableServerView view = new OnlyNewSegmentWatcherServerView();
|
||||
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
|
||||
getConfigFactory().build(ClientConfig.class),
|
||||
getPhoneBook(),
|
||||
view
|
||||
);
|
||||
lifecycle.addManagedInstance(clientInventoryManager);
|
||||
|
||||
this.newSegmentServerView = view;
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeS3Service() throws S3ServiceException
|
||||
{
|
||||
this.s3Service = new RestS3Service(
|
||||
new AWSCredentials(
|
||||
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public void initializeDataSegmentPusher()
|
||||
{
|
||||
if (segmentPusher == null) {
|
||||
segmentPusher = ServerInit.getSegmentPusher(props, configFactory, getJsonMapper());
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeTaskToolbox()
|
||||
{
|
||||
if (taskToolboxFactory == null) {
|
||||
final SegmentAnnouncer segmentAnnouncer = new ZkSegmentAnnouncer(
|
||||
configFactory.build(ZkSegmentAnnouncerConfig.class),
|
||||
getPhoneBook()
|
||||
);
|
||||
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);
|
||||
taskToolboxFactory = new TaskToolboxFactory(
|
||||
taskConfig,
|
||||
new LocalTaskActionClientFactory(
|
||||
taskStorage,
|
||||
new TaskActionToolbox(taskQueue, taskLockbox, mergerDBCoordinator, emitter)
|
||||
),
|
||||
emitter,
|
||||
s3Service,
|
||||
segmentPusher,
|
||||
dataSegmentKiller,
|
||||
segmentAnnouncer,
|
||||
newSegmentServerView,
|
||||
getConglomerate(),
|
||||
getJsonMapper()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeMergeDBCoordinator()
|
||||
{
|
||||
if (mergerDBCoordinator == null) {
|
||||
|
@ -657,7 +553,11 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
public TaskRunner build()
|
||||
{
|
||||
final ExecutorService runnerExec = Executors.newFixedThreadPool(config.getNumLocalThreads());
|
||||
return new LocalTaskRunner(taskToolboxFactory, runnerExec);
|
||||
return new ForkingTaskRunner(
|
||||
configFactory.build(ForkingTaskRunnerConfig.class),
|
||||
runnerExec,
|
||||
getJsonMapper()
|
||||
);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
|
@ -753,13 +653,8 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
|
||||
public IndexerCoordinatorNode build()
|
||||
{
|
||||
if (jsonMapper == null && smileMapper == null) {
|
||||
if (jsonMapper == null) {
|
||||
jsonMapper = new DefaultObjectMapper();
|
||||
smileMapper = new DefaultObjectMapper(new SmileFactory());
|
||||
smileMapper.getJsonFactory().setCodec(smileMapper);
|
||||
}
|
||||
else if (jsonMapper == null || smileMapper == null) {
|
||||
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
|
||||
}
|
||||
|
||||
if (lifecycle == null) {
|
||||
|
@ -774,7 +669,7 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
|
|||
configFactory = Config.createFactory(props);
|
||||
}
|
||||
|
||||
return new IndexerCoordinatorNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
return new IndexerCoordinatorNode(props, lifecycle, jsonMapper, configFactory);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -190,11 +190,12 @@ public class IndexerCoordinatorResource
|
|||
{
|
||||
final Map<String, Object> retMap;
|
||||
|
||||
// TODO make sure this worker is supposed to be running this task (attempt id? token?)
|
||||
|
||||
try {
|
||||
final T ret = taskMasterLifecycle.getTaskActionClient(holder.getTask())
|
||||
.submit(holder.getAction());
|
||||
retMap = Maps.newHashMap();
|
||||
retMap.put("result", ret);
|
||||
retMap = ImmutableMap.<String, Object>of("result", ret);
|
||||
} catch(IOException e) {
|
||||
return Response.serverError().build();
|
||||
}
|
||||
|
|
|
@ -19,24 +19,22 @@
|
|||
|
||||
package com.metamx.druid.merger.worker;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.druid.Query;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.TaskToolbox;
|
||||
import com.metamx.druid.merger.common.TaskToolboxFactory;
|
||||
import com.metamx.druid.merger.common.task.Task;
|
||||
import com.metamx.druid.query.NoopQueryRunner;
|
||||
import com.metamx.druid.query.QueryRunner;
|
||||
import com.metamx.druid.merger.coordinator.TaskRunner;
|
||||
import com.metamx.druid.merger.coordinator.TaskRunnerFactory;
|
||||
import com.metamx.druid.query.segment.QuerySegmentWalker;
|
||||
import com.metamx.druid.query.segment.SegmentDescriptor;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
|
||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
|
||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
|
@ -50,29 +48,32 @@ import java.util.concurrent.ExecutorService;
|
|||
* The monitor implements {@link QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
|
||||
* realtime index tasks.
|
||||
*/
|
||||
public class WorkerTaskMonitor implements QuerySegmentWalker
|
||||
public class WorkerTaskMonitor
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class);
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final PathChildrenCache pathChildrenCache;
|
||||
private final CuratorFramework cf;
|
||||
private final WorkerCuratorCoordinator workerCuratorCoordinator;
|
||||
private final TaskToolboxFactory toolboxFactory;
|
||||
private final TaskRunner taskRunner;
|
||||
private final ExecutorService exec;
|
||||
private final List<Task> running = new CopyOnWriteArrayList<Task>();
|
||||
|
||||
public WorkerTaskMonitor(
|
||||
ObjectMapper jsonMapper,
|
||||
PathChildrenCache pathChildrenCache,
|
||||
CuratorFramework cf,
|
||||
WorkerCuratorCoordinator workerCuratorCoordinator,
|
||||
TaskToolboxFactory toolboxFactory,
|
||||
TaskRunner taskRunner,
|
||||
ExecutorService exec
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.pathChildrenCache = pathChildrenCache;
|
||||
this.cf = cf;
|
||||
this.workerCuratorCoordinator = workerCuratorCoordinator;
|
||||
this.toolboxFactory = toolboxFactory;
|
||||
this.taskRunner = taskRunner;
|
||||
this.exec = exec;
|
||||
}
|
||||
|
||||
|
@ -94,11 +95,10 @@ public class WorkerTaskMonitor implements QuerySegmentWalker
|
|||
throws Exception
|
||||
{
|
||||
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
|
||||
final Task task = toolboxFactory.getObjectMapper().readValue(
|
||||
final Task task = jsonMapper.readValue(
|
||||
cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()),
|
||||
Task.class
|
||||
);
|
||||
final TaskToolbox toolbox = toolboxFactory.build(task);
|
||||
|
||||
if (isTaskRunning(task)) {
|
||||
log.warn("Got task %s that I am already running...", task.getId());
|
||||
|
@ -113,7 +113,6 @@ public class WorkerTaskMonitor implements QuerySegmentWalker
|
|||
public void run()
|
||||
{
|
||||
final long startTime = System.currentTimeMillis();
|
||||
final File taskDir = toolbox.getTaskDir();
|
||||
|
||||
log.info("Running task [%s]", task.getId());
|
||||
running.add(task);
|
||||
|
@ -122,7 +121,7 @@ public class WorkerTaskMonitor implements QuerySegmentWalker
|
|||
try {
|
||||
workerCuratorCoordinator.unannounceTask(task.getId());
|
||||
workerCuratorCoordinator.announceStatus(TaskStatus.running(task.getId()));
|
||||
taskStatus = task.run(toolbox);
|
||||
taskStatus = taskRunner.run(task).get();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Failed to run task")
|
||||
|
@ -144,19 +143,6 @@ public class WorkerTaskMonitor implements QuerySegmentWalker
|
|||
.addData("task", task.getId())
|
||||
.emit();
|
||||
}
|
||||
|
||||
try {
|
||||
if (taskDir.exists()) {
|
||||
log.info("Removing task directory: %s", taskDir);
|
||||
FileUtils.deleteDirectory(taskDir);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Failed to delete task directory")
|
||||
.addData("taskDir", taskDir.toString())
|
||||
.addData("task", task.getId())
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -196,38 +182,4 @@ public class WorkerTaskMonitor implements QuerySegmentWalker
|
|||
.emit();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
|
||||
{
|
||||
return getQueryRunnerImpl(query);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
|
||||
{
|
||||
return getQueryRunnerImpl(query);
|
||||
}
|
||||
|
||||
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query) {
|
||||
QueryRunner<T> queryRunner = null;
|
||||
|
||||
for (final Task task : running) {
|
||||
if (task.getDataSource().equals(query.getDataSource())) {
|
||||
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
|
||||
|
||||
if (taskQueryRunner != null) {
|
||||
if (queryRunner == null) {
|
||||
queryRunner = taskQueryRunner;
|
||||
} else {
|
||||
log.makeAlert("Found too many query runners for datasource")
|
||||
.addData("dataSource", query.getDataSource())
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.merger.worker.executor;
|
||||
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.log.LogLevelAdjuster;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.task.Task;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ExecutorMain
|
||||
{
|
||||
private static final Logger log = new Logger(ExecutorMain.class);
|
||||
|
||||
public static void main(String[] args) throws Exception
|
||||
{
|
||||
LogLevelAdjuster.register();
|
||||
|
||||
if (args.length != 2) {
|
||||
log.info("Usage: ExecutorMain <task.json> <status.json>");
|
||||
System.exit(2);
|
||||
}
|
||||
|
||||
final ExecutorNode node = ExecutorNode.builder().build();
|
||||
final Lifecycle lifecycle = new Lifecycle();
|
||||
|
||||
lifecycle.addManagedInstance(node);
|
||||
|
||||
try {
|
||||
lifecycle.start();
|
||||
}
|
||||
catch (Throwable t) {
|
||||
log.info(t, "Throwable caught at startup, committing seppuku");
|
||||
System.exit(2);
|
||||
}
|
||||
|
||||
try {
|
||||
final Task task = node.getJsonMapper().readValue(new File(args[0]), Task.class);
|
||||
|
||||
log.info(
|
||||
"Running with task: %s",
|
||||
node.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(task)
|
||||
);
|
||||
|
||||
final TaskStatus status = node.run(task).get();
|
||||
|
||||
log.info(
|
||||
"Task completed with status: %s",
|
||||
node.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(status)
|
||||
);
|
||||
|
||||
node.getJsonMapper().writeValue(new File(args[1]), status);
|
||||
} finally {
|
||||
lifecycle.stop();
|
||||
}
|
||||
|
||||
// TODO maybe this shouldn't be needed?
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,514 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.merger.worker.executor;
|
||||
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.config.Config;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.druid.BaseServerNode;
|
||||
import com.metamx.druid.client.ClientConfig;
|
||||
import com.metamx.druid.client.ClientInventoryManager;
|
||||
import com.metamx.druid.client.MutableServerView;
|
||||
import com.metamx.druid.client.OnlyNewSegmentWatcherServerView;
|
||||
import com.metamx.druid.http.QueryServlet;
|
||||
import com.metamx.druid.http.StatusServlet;
|
||||
import com.metamx.druid.initialization.CuratorConfig;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerConfig;
|
||||
import com.metamx.druid.initialization.ServerInit;
|
||||
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.loading.DataSegmentKiller;
|
||||
import com.metamx.druid.loading.DataSegmentPusher;
|
||||
import com.metamx.druid.loading.S3DataSegmentKiller;
|
||||
import com.metamx.druid.merger.common.RetryPolicyFactory;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.TaskToolboxFactory;
|
||||
import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
|
||||
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
|
||||
import com.metamx.druid.merger.common.config.TaskConfig;
|
||||
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
|
||||
import com.metamx.druid.merger.common.task.Task;
|
||||
import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner;
|
||||
import com.metamx.druid.merger.worker.config.WorkerConfig;
|
||||
import com.metamx.druid.realtime.SegmentAnnouncer;
|
||||
import com.metamx.druid.realtime.ZkSegmentAnnouncer;
|
||||
import com.metamx.druid.realtime.ZkSegmentAnnouncerConfig;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.core.Emitters;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.HttpClientConfig;
|
||||
import com.metamx.http.client.HttpClientInit;
|
||||
import com.metamx.metrics.JvmMonitor;
|
||||
import com.metamx.metrics.Monitor;
|
||||
import com.metamx.metrics.MonitorScheduler;
|
||||
import com.metamx.metrics.MonitorSchedulerConfig;
|
||||
import com.metamx.metrics.SysMonitor;
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import com.netflix.curator.x.discovery.ServiceDiscovery;
|
||||
import com.netflix.curator.x.discovery.ServiceProvider;
|
||||
import org.jets3t.service.S3ServiceException;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
import org.jets3t.service.security.AWSCredentials;
|
||||
import org.mortbay.jetty.Server;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ExecutorNode extends BaseServerNode<ExecutorNode>
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(ExecutorNode.class);
|
||||
|
||||
public static Builder builder()
|
||||
{
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
private final Lifecycle lifecycle;
|
||||
private final Properties props;
|
||||
private final ConfigurationObjectFactory configFactory;
|
||||
|
||||
private RestS3Service s3Service = null;
|
||||
private List<Monitor> monitors = null;
|
||||
private HttpClient httpClient = null;
|
||||
private ServiceEmitter emitter = null;
|
||||
private TaskConfig taskConfig = null;
|
||||
private WorkerConfig workerConfig = null;
|
||||
private DataSegmentPusher segmentPusher = null;
|
||||
private TaskToolboxFactory taskToolboxFactory = null;
|
||||
private CuratorFramework curatorFramework = null;
|
||||
private ServiceDiscovery serviceDiscovery = null;
|
||||
private ServiceProvider coordinatorServiceProvider = null;
|
||||
private MutableServerView newSegmentServerView = null;
|
||||
private Server server = null;
|
||||
private ExecutorServiceTaskRunner taskRunner = null;
|
||||
|
||||
public ExecutorNode(
|
||||
Properties props,
|
||||
Lifecycle lifecycle,
|
||||
ObjectMapper jsonMapper,
|
||||
ObjectMapper smileMapper,
|
||||
ConfigurationObjectFactory configFactory
|
||||
)
|
||||
{
|
||||
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
|
||||
this.lifecycle = lifecycle;
|
||||
this.props = props;
|
||||
this.configFactory = configFactory;
|
||||
}
|
||||
|
||||
public ExecutorNode setHttpClient(HttpClient httpClient)
|
||||
{
|
||||
this.httpClient = httpClient;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecutorNode setEmitter(ServiceEmitter emitter)
|
||||
{
|
||||
this.emitter = emitter;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecutorNode setS3Service(RestS3Service s3Service)
|
||||
{
|
||||
this.s3Service = s3Service;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecutorNode setSegmentPusher(DataSegmentPusher segmentPusher)
|
||||
{
|
||||
this.segmentPusher = segmentPusher;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecutorNode setTaskToolboxFactory(TaskToolboxFactory taskToolboxFactory)
|
||||
{
|
||||
this.taskToolboxFactory = taskToolboxFactory;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecutorNode setCuratorFramework(CuratorFramework curatorFramework)
|
||||
{
|
||||
this.curatorFramework = curatorFramework;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecutorNode setCoordinatorServiceProvider(ServiceProvider coordinatorServiceProvider)
|
||||
{
|
||||
this.coordinatorServiceProvider = coordinatorServiceProvider;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecutorNode setServiceDiscovery(ServiceDiscovery serviceDiscovery)
|
||||
{
|
||||
this.serviceDiscovery = serviceDiscovery;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecutorNode setNewSegmentServerView(MutableServerView newSegmentServerView)
|
||||
{
|
||||
this.newSegmentServerView = newSegmentServerView;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doInit() throws Exception
|
||||
{
|
||||
initializeHttpClient();
|
||||
initializeEmitter();
|
||||
initializeS3Service();
|
||||
initializeMonitors();
|
||||
initializeMergerConfig();
|
||||
initializeCuratorFramework();
|
||||
initializeServiceDiscovery();
|
||||
initializeCoordinatorServiceProvider();
|
||||
initializeNewSegmentServerView();
|
||||
initializeDataSegmentPusher();
|
||||
initializeTaskToolbox();
|
||||
initializeTaskRunner();
|
||||
initializeJacksonInjections();
|
||||
initializeJacksonSubtypes();
|
||||
initializeServer();
|
||||
|
||||
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
|
||||
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
|
||||
final MonitorScheduler monitorScheduler = new MonitorScheduler(
|
||||
configFactory.build(MonitorSchedulerConfig.class),
|
||||
globalScheduledExec,
|
||||
emitter,
|
||||
monitors
|
||||
);
|
||||
lifecycle.addManagedInstance(monitorScheduler);
|
||||
|
||||
final Context root = new Context(server, "/", Context.SESSIONS);
|
||||
|
||||
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
|
||||
root.addServlet(
|
||||
new ServletHolder(
|
||||
new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger())
|
||||
),
|
||||
"/druid/v2/*"
|
||||
);
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public synchronized void start() throws Exception
|
||||
{
|
||||
init();
|
||||
lifecycle.start();
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public synchronized void stop()
|
||||
{
|
||||
lifecycle.stop();
|
||||
}
|
||||
|
||||
public synchronized ListenableFuture<TaskStatus> run(Task task)
|
||||
{
|
||||
return taskRunner.run(task);
|
||||
}
|
||||
|
||||
private void initializeServer()
|
||||
{
|
||||
if (server == null) {
|
||||
server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class));
|
||||
|
||||
lifecycle.addHandler(
|
||||
new Lifecycle.Handler()
|
||||
{
|
||||
@Override
|
||||
public void start() throws Exception
|
||||
{
|
||||
log.info("Starting Jetty");
|
||||
server.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop()
|
||||
{
|
||||
log.info("Stopping Jetty");
|
||||
try {
|
||||
server.stop();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception thrown while stopping Jetty");
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeJacksonInjections()
|
||||
{
|
||||
InjectableValues.Std injectables = new InjectableValues.Std();
|
||||
|
||||
injectables.addValue("s3Client", s3Service)
|
||||
.addValue("segmentPusher", segmentPusher);
|
||||
|
||||
getJsonMapper().setInjectableValues(injectables);
|
||||
}
|
||||
|
||||
private void initializeJacksonSubtypes()
|
||||
{
|
||||
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
|
||||
}
|
||||
|
||||
private void initializeHttpClient()
|
||||
{
|
||||
if (httpClient == null) {
|
||||
httpClient = HttpClientInit.createClient(
|
||||
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeEmitter()
|
||||
{
|
||||
if (emitter == null) {
|
||||
emitter = new ServiceEmitter(
|
||||
PropUtils.getProperty(props, "druid.service"),
|
||||
PropUtils.getProperty(props, "druid.host"),
|
||||
Emitters.create(props, httpClient, getJsonMapper(), lifecycle)
|
||||
);
|
||||
}
|
||||
EmittingLogger.registerEmitter(emitter);
|
||||
}
|
||||
|
||||
private void initializeS3Service() throws S3ServiceException
|
||||
{
|
||||
if(s3Service == null) {
|
||||
s3Service = new RestS3Service(
|
||||
new AWSCredentials(
|
||||
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeMonitors()
|
||||
{
|
||||
if (monitors == null) {
|
||||
monitors = Lists.newArrayList();
|
||||
monitors.add(new JvmMonitor());
|
||||
monitors.add(new SysMonitor());
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeMergerConfig()
|
||||
{
|
||||
if (taskConfig == null) {
|
||||
taskConfig = configFactory.build(TaskConfig.class);
|
||||
}
|
||||
|
||||
if (workerConfig == null) {
|
||||
workerConfig = configFactory.build(WorkerConfig.class);
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeDataSegmentPusher()
|
||||
{
|
||||
if (segmentPusher == null) {
|
||||
segmentPusher = ServerInit.getSegmentPusher(props, configFactory, getJsonMapper());
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeTaskToolbox() throws S3ServiceException
|
||||
{
|
||||
if (taskToolboxFactory == null) {
|
||||
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);
|
||||
final SegmentAnnouncer segmentAnnouncer = new ZkSegmentAnnouncer(
|
||||
configFactory.build(ZkSegmentAnnouncerConfig.class),
|
||||
getPhoneBook()
|
||||
);
|
||||
lifecycle.addManagedInstance(segmentAnnouncer);
|
||||
taskToolboxFactory = new TaskToolboxFactory(
|
||||
taskConfig,
|
||||
new RemoteTaskActionClientFactory(
|
||||
httpClient,
|
||||
coordinatorServiceProvider,
|
||||
new RetryPolicyFactory(
|
||||
configFactory.buildWithReplacements(
|
||||
RetryPolicyConfig.class,
|
||||
ImmutableMap.of("base_path", "druid.worker.taskActionClient")
|
||||
)
|
||||
),
|
||||
getJsonMapper()
|
||||
),
|
||||
emitter,
|
||||
s3Service,
|
||||
segmentPusher,
|
||||
dataSegmentKiller,
|
||||
segmentAnnouncer,
|
||||
newSegmentServerView,
|
||||
getConglomerate(),
|
||||
getJsonMapper()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeCuratorFramework() throws IOException
|
||||
{
|
||||
if (curatorFramework == null) {
|
||||
final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class);
|
||||
curatorFramework = Initialization.makeCuratorFrameworkClient(
|
||||
curatorConfig,
|
||||
lifecycle
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeServiceDiscovery() throws Exception
|
||||
{
|
||||
if (serviceDiscovery == null) {
|
||||
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
|
||||
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
|
||||
curatorFramework,
|
||||
config,
|
||||
lifecycle
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeCoordinatorServiceProvider()
|
||||
{
|
||||
if (coordinatorServiceProvider == null) {
|
||||
this.coordinatorServiceProvider = Initialization.makeServiceProvider(
|
||||
workerConfig.getMasterService(),
|
||||
serviceDiscovery,
|
||||
lifecycle
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeNewSegmentServerView()
|
||||
{
|
||||
if (newSegmentServerView == null) {
|
||||
final MutableServerView view = new OnlyNewSegmentWatcherServerView();
|
||||
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
|
||||
getConfigFactory().build(ClientConfig.class),
|
||||
getPhoneBook(),
|
||||
view
|
||||
);
|
||||
lifecycle.addManagedInstance(clientInventoryManager);
|
||||
|
||||
this.newSegmentServerView = view;
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeTaskRunner()
|
||||
{
|
||||
if (taskRunner == null) {
|
||||
final ExecutorServiceTaskRunner taskRunner = new ExecutorServiceTaskRunner(
|
||||
taskToolboxFactory,
|
||||
Executors.newSingleThreadExecutor(
|
||||
new ThreadFactoryBuilder()
|
||||
.setNameFormat("task-runner-%d")
|
||||
.build()
|
||||
)
|
||||
);
|
||||
|
||||
this.taskRunner = taskRunner;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Builder
|
||||
{
|
||||
private ObjectMapper jsonMapper = null;
|
||||
private ObjectMapper smileMapper = null;
|
||||
private Lifecycle lifecycle = null;
|
||||
private Properties props = null;
|
||||
private ConfigurationObjectFactory configFactory = null;
|
||||
|
||||
public Builder withMapper(ObjectMapper jsonMapper)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withLifecycle(Lifecycle lifecycle)
|
||||
{
|
||||
this.lifecycle = lifecycle;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withProps(Properties props)
|
||||
{
|
||||
this.props = props;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
|
||||
{
|
||||
this.configFactory = configFactory;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecutorNode build()
|
||||
{
|
||||
if (jsonMapper == null && smileMapper == null) {
|
||||
jsonMapper = new DefaultObjectMapper();
|
||||
smileMapper = new DefaultObjectMapper(new SmileFactory());
|
||||
smileMapper.getJsonFactory().setCodec(smileMapper);
|
||||
}
|
||||
else if (jsonMapper == null || smileMapper == null) {
|
||||
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
|
||||
}
|
||||
|
||||
if (lifecycle == null) {
|
||||
lifecycle = new Lifecycle();
|
||||
}
|
||||
|
||||
if (props == null) {
|
||||
props = Initialization.loadProperties();
|
||||
}
|
||||
|
||||
if (configFactory == null) {
|
||||
configFactory = Config.createFactory(props);
|
||||
}
|
||||
|
||||
return new ExecutorNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,49 +19,31 @@
|
|||
|
||||
package com.metamx.druid.merger.worker.http;
|
||||
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.config.Config;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.druid.BaseServerNode;
|
||||
import com.metamx.druid.client.ClientConfig;
|
||||
import com.metamx.druid.client.ClientInventoryManager;
|
||||
import com.metamx.druid.client.MutableServerView;
|
||||
import com.metamx.druid.client.OnlyNewSegmentWatcherServerView;
|
||||
import com.metamx.druid.http.QueryServlet;
|
||||
import com.metamx.druid.RegisteringNode;
|
||||
import com.metamx.druid.http.StatusServlet;
|
||||
import com.metamx.druid.initialization.CuratorConfig;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerConfig;
|
||||
import com.metamx.druid.initialization.ServerInit;
|
||||
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.loading.DataSegmentKiller;
|
||||
import com.metamx.druid.loading.DataSegmentPusher;
|
||||
import com.metamx.druid.loading.S3DataSegmentKiller;
|
||||
import com.metamx.druid.merger.common.RetryPolicyFactory;
|
||||
import com.metamx.druid.merger.common.TaskToolboxFactory;
|
||||
import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
|
||||
import com.metamx.druid.merger.common.config.IndexerZkConfig;
|
||||
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
|
||||
import com.metamx.druid.merger.common.config.TaskConfig;
|
||||
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
|
||||
import com.metamx.druid.merger.coordinator.ForkingTaskRunner;
|
||||
import com.metamx.druid.merger.coordinator.config.ForkingTaskRunnerConfig;
|
||||
import com.metamx.druid.merger.worker.Worker;
|
||||
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
|
||||
import com.metamx.druid.merger.worker.WorkerTaskMonitor;
|
||||
import com.metamx.druid.merger.worker.config.WorkerConfig;
|
||||
import com.metamx.druid.realtime.SegmentAnnouncer;
|
||||
import com.metamx.druid.realtime.ZkSegmentAnnouncer;
|
||||
import com.metamx.druid.realtime.ZkSegmentAnnouncerConfig;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.core.Emitters;
|
||||
|
@ -78,12 +60,8 @@ import com.netflix.curator.framework.CuratorFramework;
|
|||
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
|
||||
import com.netflix.curator.x.discovery.ServiceDiscovery;
|
||||
import com.netflix.curator.x.discovery.ServiceProvider;
|
||||
import org.jets3t.service.S3ServiceException;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
import org.jets3t.service.security.AWSCredentials;
|
||||
import org.mortbay.jetty.Server;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.DefaultServlet;
|
||||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
|
@ -96,7 +74,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class WorkerNode extends BaseServerNode<WorkerNode>
|
||||
public class WorkerNode extends RegisteringNode
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(WorkerNode.class);
|
||||
|
||||
|
@ -107,22 +85,19 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
|
||||
private final Lifecycle lifecycle;
|
||||
private final Properties props;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ConfigurationObjectFactory configFactory;
|
||||
|
||||
private RestS3Service s3Service = null;
|
||||
private List<Monitor> monitors = null;
|
||||
private HttpClient httpClient = null;
|
||||
private ServiceEmitter emitter = null;
|
||||
private TaskConfig taskConfig = null;
|
||||
private WorkerConfig workerConfig = null;
|
||||
private DataSegmentPusher segmentPusher = null;
|
||||
private TaskToolboxFactory taskToolboxFactory = null;
|
||||
private CuratorFramework curatorFramework = null;
|
||||
private ServiceDiscovery serviceDiscovery = null;
|
||||
private ServiceProvider coordinatorServiceProvider = null;
|
||||
private WorkerCuratorCoordinator workerCuratorCoordinator = null;
|
||||
private WorkerTaskMonitor workerTaskMonitor = null;
|
||||
private MutableServerView newSegmentServerView = null;
|
||||
private Server server = null;
|
||||
|
||||
private boolean initialized = false;
|
||||
|
@ -131,14 +106,14 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
Properties props,
|
||||
Lifecycle lifecycle,
|
||||
ObjectMapper jsonMapper,
|
||||
ObjectMapper smileMapper,
|
||||
ConfigurationObjectFactory configFactory
|
||||
)
|
||||
{
|
||||
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
super(ImmutableList.of(jsonMapper));
|
||||
|
||||
this.lifecycle = lifecycle;
|
||||
this.props = props;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.configFactory = configFactory;
|
||||
}
|
||||
|
||||
|
@ -154,24 +129,6 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
return this;
|
||||
}
|
||||
|
||||
public WorkerNode setS3Service(RestS3Service s3Service)
|
||||
{
|
||||
this.s3Service = s3Service;
|
||||
return this;
|
||||
}
|
||||
|
||||
public WorkerNode setSegmentPusher(DataSegmentPusher segmentPusher)
|
||||
{
|
||||
this.segmentPusher = segmentPusher;
|
||||
return this;
|
||||
}
|
||||
|
||||
public WorkerNode setTaskToolboxFactory(TaskToolboxFactory taskToolboxFactory)
|
||||
{
|
||||
this.taskToolboxFactory = taskToolboxFactory;
|
||||
return this;
|
||||
}
|
||||
|
||||
public WorkerNode setCuratorFramework(CuratorFramework curatorFramework)
|
||||
{
|
||||
this.curatorFramework = curatorFramework;
|
||||
|
@ -196,33 +153,21 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
return this;
|
||||
}
|
||||
|
||||
public WorkerNode setNewSegmentServerView(MutableServerView newSegmentServerView)
|
||||
{
|
||||
this.newSegmentServerView = newSegmentServerView;
|
||||
return this;
|
||||
}
|
||||
|
||||
public WorkerNode setWorkerTaskMonitor(WorkerTaskMonitor workerTaskMonitor)
|
||||
{
|
||||
this.workerTaskMonitor = workerTaskMonitor;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doInit() throws Exception
|
||||
{
|
||||
initializeHttpClient();
|
||||
initializeEmitter();
|
||||
initializeS3Service();
|
||||
initializeMonitors();
|
||||
initializeMergerConfig();
|
||||
initializeCuratorFramework();
|
||||
initializeServiceDiscovery();
|
||||
initializeCoordinatorServiceProvider();
|
||||
initializeNewSegmentServerView();
|
||||
initializeDataSegmentPusher();
|
||||
initializeTaskToolbox();
|
||||
initializeJacksonInjections();
|
||||
initializeJacksonSubtypes();
|
||||
initializeCuratorCoordinator();
|
||||
initializeWorkerTaskMonitor();
|
||||
|
@ -241,21 +186,13 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
final Context root = new Context(server, "/", Context.SESSIONS);
|
||||
|
||||
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
|
||||
root.addServlet(new ServletHolder(new DefaultServlet()), "/mmx/*");
|
||||
root.addServlet(
|
||||
new ServletHolder(
|
||||
new QueryServlet(getJsonMapper(), getSmileMapper(), workerTaskMonitor, emitter, getRequestLogger())
|
||||
),
|
||||
"/druid/v2/*"
|
||||
);
|
||||
root.addFilter(GuiceFilter.class, "/mmx/indexer/worker/v1/*", 0);
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public synchronized void start() throws Exception
|
||||
{
|
||||
if (!initialized) {
|
||||
init();
|
||||
doInit();
|
||||
}
|
||||
|
||||
lifecycle.start();
|
||||
|
@ -267,6 +204,11 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
lifecycle.stop();
|
||||
}
|
||||
|
||||
private ObjectMapper getJsonMapper()
|
||||
{
|
||||
return jsonMapper;
|
||||
}
|
||||
|
||||
private void initializeServer()
|
||||
{
|
||||
if (server == null) {
|
||||
|
@ -298,16 +240,6 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
}
|
||||
}
|
||||
|
||||
private void initializeJacksonInjections()
|
||||
{
|
||||
InjectableValues.Std injectables = new InjectableValues.Std();
|
||||
|
||||
injectables.addValue("s3Client", s3Service)
|
||||
.addValue("segmentPusher", segmentPusher);
|
||||
|
||||
getJsonMapper().setInjectableValues(injectables);
|
||||
}
|
||||
|
||||
private void initializeJacksonSubtypes()
|
||||
{
|
||||
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
|
||||
|
@ -334,18 +266,6 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
EmittingLogger.registerEmitter(emitter);
|
||||
}
|
||||
|
||||
private void initializeS3Service() throws S3ServiceException
|
||||
{
|
||||
if(s3Service == null) {
|
||||
s3Service = new RestS3Service(
|
||||
new AWSCredentials(
|
||||
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeMonitors()
|
||||
{
|
||||
if (monitors == null) {
|
||||
|
@ -366,47 +286,6 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
}
|
||||
}
|
||||
|
||||
public void initializeDataSegmentPusher()
|
||||
{
|
||||
if (segmentPusher == null) {
|
||||
segmentPusher = ServerInit.getSegmentPusher(props, configFactory, getJsonMapper());
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeTaskToolbox() throws S3ServiceException
|
||||
{
|
||||
if (taskToolboxFactory == null) {
|
||||
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);
|
||||
final SegmentAnnouncer segmentAnnouncer = new ZkSegmentAnnouncer(
|
||||
configFactory.build(ZkSegmentAnnouncerConfig.class),
|
||||
getPhoneBook()
|
||||
);
|
||||
lifecycle.addManagedInstance(segmentAnnouncer);
|
||||
taskToolboxFactory = new TaskToolboxFactory(
|
||||
taskConfig,
|
||||
new RemoteTaskActionClientFactory(
|
||||
httpClient,
|
||||
coordinatorServiceProvider,
|
||||
new RetryPolicyFactory(
|
||||
configFactory.buildWithReplacements(
|
||||
RetryPolicyConfig.class,
|
||||
ImmutableMap.of("base_path", "druid.worker.taskActionClient")
|
||||
)
|
||||
),
|
||||
getJsonMapper()
|
||||
),
|
||||
emitter,
|
||||
s3Service,
|
||||
segmentPusher,
|
||||
dataSegmentKiller,
|
||||
segmentAnnouncer,
|
||||
newSegmentServerView,
|
||||
getConglomerate(),
|
||||
getJsonMapper()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeCuratorFramework() throws IOException
|
||||
{
|
||||
if (curatorFramework == null) {
|
||||
|
@ -454,21 +333,6 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
}
|
||||
}
|
||||
|
||||
private void initializeNewSegmentServerView()
|
||||
{
|
||||
if (newSegmentServerView == null) {
|
||||
final MutableServerView view = new OnlyNewSegmentWatcherServerView();
|
||||
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
|
||||
getConfigFactory().build(ClientConfig.class),
|
||||
getPhoneBook(),
|
||||
view
|
||||
);
|
||||
lifecycle.addManagedInstance(clientInventoryManager);
|
||||
|
||||
this.newSegmentServerView = view;
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeWorkerTaskMonitor()
|
||||
{
|
||||
if (workerTaskMonitor == null) {
|
||||
|
@ -479,10 +343,15 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
false
|
||||
);
|
||||
workerTaskMonitor = new WorkerTaskMonitor(
|
||||
getJsonMapper(),
|
||||
pathChildrenCache,
|
||||
curatorFramework,
|
||||
workerCuratorCoordinator,
|
||||
taskToolboxFactory,
|
||||
new ForkingTaskRunner(
|
||||
configFactory.build(ForkingTaskRunnerConfig.class),
|
||||
Executors.newFixedThreadPool(workerConfig.getCapacity()),
|
||||
getJsonMapper()
|
||||
),
|
||||
workerExec
|
||||
);
|
||||
lifecycle.addManagedInstance(workerTaskMonitor);
|
||||
|
@ -492,7 +361,6 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
public static class Builder
|
||||
{
|
||||
private ObjectMapper jsonMapper = null;
|
||||
private ObjectMapper smileMapper = null;
|
||||
private Lifecycle lifecycle = null;
|
||||
private Properties props = null;
|
||||
private ConfigurationObjectFactory configFactory = null;
|
||||
|
@ -523,13 +391,8 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
|
||||
public WorkerNode build()
|
||||
{
|
||||
if (jsonMapper == null && smileMapper == null) {
|
||||
if (jsonMapper == null) {
|
||||
jsonMapper = new DefaultObjectMapper();
|
||||
smileMapper = new DefaultObjectMapper(new SmileFactory());
|
||||
smileMapper.getJsonFactory().setCodec(smileMapper);
|
||||
}
|
||||
else if (jsonMapper == null || smileMapper == null) {
|
||||
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
|
||||
}
|
||||
|
||||
if (lifecycle == null) {
|
||||
|
@ -544,7 +407,7 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
|
|||
configFactory = Config.createFactory(props);
|
||||
}
|
||||
|
||||
return new WorkerNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
return new WorkerNode(props, lifecycle, jsonMapper, configFactory);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -281,35 +281,38 @@ public class RemoteTaskRunnerTest
|
|||
workerCuratorCoordinator.start();
|
||||
|
||||
workerTaskMonitor = new WorkerTaskMonitor(
|
||||
jsonMapper,
|
||||
new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true),
|
||||
cf,
|
||||
workerCuratorCoordinator,
|
||||
new TaskToolboxFactory(
|
||||
new TaskConfig()
|
||||
{
|
||||
@Override
|
||||
public File getBaseTaskDir()
|
||||
{
|
||||
try {
|
||||
return File.createTempFile("billy", "yay");
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
new ExecutorServiceTaskRunner(
|
||||
new TaskToolboxFactory(
|
||||
new TaskConfig()
|
||||
{
|
||||
@Override
|
||||
public File getBaseTaskDir()
|
||||
{
|
||||
try {
|
||||
return File.createTempFile("billy", "yay");
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getDefaultRowFlushBoundary()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
@Override
|
||||
public int getDefaultRowFlushBoundary()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHadoopWorkingPath()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}, null, null, null, null, null, null, null, null, jsonMapper
|
||||
@Override
|
||||
public String getHadoopWorkingPath()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}, null, null, null, null, null, null, null, null, jsonMapper
|
||||
), Executors.newSingleThreadExecutor()
|
||||
),
|
||||
Executors.newSingleThreadExecutor()
|
||||
);
|
||||
|
|
|
@ -161,7 +161,7 @@ public class TaskLifecycleTest
|
|||
new DefaultObjectMapper()
|
||||
);
|
||||
|
||||
tr = new LocalTaskRunner(
|
||||
tr = new ExecutorServiceTaskRunner(
|
||||
tb,
|
||||
Executors.newSingleThreadExecutor()
|
||||
);
|
||||
|
|
Loading…
Reference in New Issue