From 17874eeb67f22b628e78cd540bd26510edc86771 Mon Sep 17 00:00:00 2001 From: fjy Date: Wed, 2 Oct 2013 15:55:10 -0700 Subject: [PATCH] make the CliPeon actually able to run on its own --- .../indexing/common/task/AbstractTask.java | 26 +++++++++++++- .../indexing/common/task/DeleteTask.java | 2 +- .../indexing/common/task/HadoopIndexTask.java | 2 +- .../common/task/IndexGeneratorTask.java | 2 +- .../druid/indexing/common/task/KillTask.java | 2 +- .../indexing/common/task/MergeTaskBase.java | 3 +- .../common/task/RealtimeIndexTask.java | 2 +- .../src/main/java/io/druid/cli/CliPeon.java | 35 ++++++++++++++++--- 8 files changed, 63 insertions(+), 11 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java index ac5b7c2c18a..7905776011a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java @@ -25,13 +25,21 @@ import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockAcquireAction; +import io.druid.indexing.common.actions.LockListAction; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.query.Query; import io.druid.query.QueryRunner; import org.joda.time.Interval; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + public abstract class AbstractTask implements Task { private static final Joiner ID_JOINER = Joiner.on("_"); @@ -134,7 +142,9 @@ public abstract class AbstractTask implements Task .toString(); } - /** Start helper methods **/ + /** + * Start helper methods * + */ public static String joinId(Object... objects) { return ID_JOINER.join(objects); @@ -174,4 +184,18 @@ public abstract class AbstractTask implements Task { return id.hashCode(); } + + protected Iterable getTaskLocks(TaskToolbox toolbox) throws IOException + { + final List locks = toolbox.getTaskActionClient().submit(new LockListAction()); + + if (locks.isEmpty()) { + return Arrays.asList( + toolbox.getTaskActionClient() + .submit(new LockAcquireAction(getImplicitLockInterval().get())) + ); + } + + return locks; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java index b75cb162352..32d3e49e618 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java @@ -78,7 +78,7 @@ public class DeleteTask extends AbstractTask public TaskStatus run(TaskToolbox toolbox) throws Exception { // Strategy: Create an empty segment covering the interval to be deleted - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); final Interval interval = this.getImplicitLockInterval().get(); final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]); final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 8367ae1f040..65d8274b112 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -95,7 +95,7 @@ public class HadoopIndexTask extends AbstractTask ); // We should have a lock from before we started running - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); log.info("Setting version to: %s", myLock.getVersion()); configCopy.setVersion(myLock.getVersion()); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexGeneratorTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexGeneratorTask.java index 9c766e9ea48..659ad4cfd3a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexGeneratorTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexGeneratorTask.java @@ -104,7 +104,7 @@ public class IndexGeneratorTask extends AbstractTask public TaskStatus run(final TaskToolbox toolbox) throws Exception { // We should have a lock from before we started running - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); // We know this exists final Interval interval = getImplicitLockInterval().get(); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java index 6f1422781e5..8f4068a5e46 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java @@ -66,7 +66,7 @@ public class KillTask extends AbstractTask public TaskStatus run(TaskToolbox toolbox) throws Exception { // Confirm we have a lock (will throw if there isn't exactly one element) - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); if(!myLock.getDataSource().equals(getDataSource())) { throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 5936d45a278..750509f9cec 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -41,6 +41,7 @@ import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockListAction; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskActionClient; @@ -118,7 +119,7 @@ public abstract class MergeTaskBase extends AbstractTask @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); final ServiceEmitter emitter = toolbox.getEmitter(); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 691b7a89a27..d9d2a58d6a9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -176,7 +176,7 @@ public class RealtimeIndexTask extends AbstractTask // Shed any locks we might have (e.g. if we were uncleanly killed and restarted) since we'll reacquire // them if we actually need them - for (final TaskLock taskLock : toolbox.getTaskActionClient().submit(new LockListAction())) { + for (final TaskLock taskLock : getTaskLocks(toolbox)) { toolbox.getTaskActionClient().submit(new LockReleaseAction(taskLock.getInterval())); } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index e255e160b01..4efa7e96e50 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -41,14 +41,19 @@ import io.druid.guice.PolyBind; import io.druid.indexing.common.RetryPolicyConfig; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.TaskToolboxFactory; +import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.RemoteTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; +import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.index.ChatHandlerProvider; -import io.druid.indexing.common.index.ServiceAnnouncingChatHandlerProvider; import io.druid.indexing.common.index.NoopChatHandlerProvider; import io.druid.indexing.common.index.ServiceAnnouncingChatHandlerProvider; +import io.druid.indexing.coordinator.HeapMemoryTaskStorage; +import io.druid.indexing.coordinator.IndexerDBCoordinator; +import io.druid.indexing.coordinator.TaskQueue; import io.druid.indexing.coordinator.TaskRunner; +import io.druid.indexing.coordinator.TaskStorage; import io.druid.indexing.coordinator.ThreadPoolTaskRunner; import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.indexing.worker.executor.ExecutorLifecycle; @@ -115,9 +120,8 @@ public class CliPeon extends GuiceRunnable JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class); - binder.bind(TaskActionClientFactory.class) - .to(RemoteTaskActionClientFactory.class) - .in(LazySingleton.class); + configureTaskActionClient(binder); + binder.bind(RetryPolicyFactory.class).in(LazySingleton.class); binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class); @@ -146,6 +150,29 @@ public class CliPeon extends GuiceRunnable LifecycleModule.register(binder, Server.class); } + + private void configureTaskActionClient(Binder binder) + { + PolyBind.createChoice( + binder, + "druid.peon.mode", + Key.get(TaskActionClientFactory.class), + Key.get(RemoteTaskActionClientFactory.class) + ); + final MapBinder taskActionBinder = PolyBind.optionBinder( + binder, Key.get(TaskActionClientFactory.class) + ); + taskActionBinder.addBinding("local") + .to(LocalTaskActionClientFactory.class).in(LazySingleton.class); + // all of these bindings are so that we can run the peon in local mode + binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class); + binder.bind(TaskQueue.class).in(LazySingleton.class); + binder.bind(TaskActionToolbox.class).in(LazySingleton.class); + binder.bind(IndexerDBCoordinator.class).in(LazySingleton.class); + taskActionBinder.addBinding("remote") + .to(RemoteTaskActionClientFactory.class).in(LazySingleton.class); + + } } ); }