make the CliPeon actually able to run on its own

This commit is contained in:
fjy 2013-10-02 15:55:10 -07:00
parent bc8db7daa5
commit 17874eeb67
8 changed files with 63 additions and 11 deletions

View File

@ -25,13 +25,21 @@ import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public abstract class AbstractTask implements Task
{
private static final Joiner ID_JOINER = Joiner.on("_");
@ -134,7 +142,9 @@ public abstract class AbstractTask implements Task
.toString();
}
/** Start helper methods **/
/**
* Start helper methods *
*/
public static String joinId(Object... objects)
{
return ID_JOINER.join(objects);
@ -174,4 +184,18 @@ public abstract class AbstractTask implements Task
{
return id.hashCode();
}
protected Iterable<TaskLock> getTaskLocks(TaskToolbox toolbox) throws IOException
{
final List<TaskLock> locks = toolbox.getTaskActionClient().submit(new LockListAction());
if (locks.isEmpty()) {
return Arrays.asList(
toolbox.getTaskActionClient()
.submit(new LockAcquireAction(getImplicitLockInterval().get()))
);
}
return locks;
}
}

View File

@ -78,7 +78,7 @@ public class DeleteTask extends AbstractTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Strategy: Create an empty segment covering the interval to be deleted
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Interval interval = this.getImplicitLockInterval().get();
final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty);

View File

@ -95,7 +95,7 @@ public class HadoopIndexTask extends AbstractTask
);
// We should have a lock from before we started running
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
log.info("Setting version to: %s", myLock.getVersion());
configCopy.setVersion(myLock.getVersion());

View File

@ -104,7 +104,7 @@ public class IndexGeneratorTask extends AbstractTask
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
// We should have a lock from before we started running
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
// We know this exists
final Interval interval = getImplicitLockInterval().get();

View File

@ -66,7 +66,7 @@ public class KillTask extends AbstractTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
if(!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());

View File

@ -41,6 +41,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
@ -118,7 +119,7 @@ public abstract class MergeTaskBase extends AbstractTask
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final ServiceEmitter emitter = toolbox.getEmitter();
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);

View File

@ -176,7 +176,7 @@ public class RealtimeIndexTask extends AbstractTask
// Shed any locks we might have (e.g. if we were uncleanly killed and restarted) since we'll reacquire
// them if we actually need them
for (final TaskLock taskLock : toolbox.getTaskActionClient().submit(new LockListAction())) {
for (final TaskLock taskLock : getTaskLocks(toolbox)) {
toolbox.getTaskActionClient().submit(new LockReleaseAction(taskLock.getInterval()));
}

View File

@ -41,14 +41,19 @@ import io.druid.guice.PolyBind;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.index.ServiceAnnouncingChatHandlerProvider;
import io.druid.indexing.common.index.NoopChatHandlerProvider;
import io.druid.indexing.common.index.ServiceAnnouncingChatHandlerProvider;
import io.druid.indexing.coordinator.HeapMemoryTaskStorage;
import io.druid.indexing.coordinator.IndexerDBCoordinator;
import io.druid.indexing.coordinator.TaskQueue;
import io.druid.indexing.coordinator.TaskRunner;
import io.druid.indexing.coordinator.TaskStorage;
import io.druid.indexing.coordinator.ThreadPoolTaskRunner;
import io.druid.indexing.worker.executor.ChatHandlerResource;
import io.druid.indexing.worker.executor.ExecutorLifecycle;
@ -115,9 +120,8 @@ public class CliPeon extends GuiceRunnable
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class);
binder.bind(TaskActionClientFactory.class)
.to(RemoteTaskActionClientFactory.class)
.in(LazySingleton.class);
configureTaskActionClient(binder);
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class);
@ -146,6 +150,29 @@ public class CliPeon extends GuiceRunnable
LifecycleModule.register(binder, Server.class);
}
private void configureTaskActionClient(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.peon.mode",
Key.get(TaskActionClientFactory.class),
Key.get(RemoteTaskActionClientFactory.class)
);
final MapBinder<String, TaskActionClientFactory> taskActionBinder = PolyBind.optionBinder(
binder, Key.get(TaskActionClientFactory.class)
);
taskActionBinder.addBinding("local")
.to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
// all of these bindings are so that we can run the peon in local mode
binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);
binder.bind(TaskQueue.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(IndexerDBCoordinator.class).in(LazySingleton.class);
taskActionBinder.addBinding("remote")
.to(RemoteTaskActionClientFactory.class).in(LazySingleton.class);
}
}
);
}