Task: preflight takes TaskActionClient instead of TaskToolbox

This commit is contained in:
Gian Merlino 2013-03-18 17:12:15 -07:00
parent f47319f118
commit d163f07ae0
10 changed files with 38 additions and 32 deletions

View File

@ -27,8 +27,8 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.metamx.druid.Query;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SegmentListUsedAction;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.query.QueryRunner;
import org.joda.time.Interval;
@ -96,7 +96,7 @@ public abstract class AbstractTask implements Task
}
@Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception
public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception
{
return TaskStatus.running(id);
}

View File

@ -31,6 +31,7 @@ import com.metamx.druid.indexer.granularity.GranularitySpec;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SpawnTasksAction;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec;
@ -144,9 +145,9 @@ public class IndexTask extends AbstractTask
}
@Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception
public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception
{
toolbox.getTaskActionClient().submit(new SpawnTasksAction(toSubtasks()));
taskActionClient.submit(new SpawnTasksAction(toSubtasks()));
return TaskStatus.success(getId());
}

View File

@ -42,6 +42,7 @@ import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@ -186,7 +187,7 @@ public abstract class MergeTaskBase extends AbstractTask
* we are operating on every segment that overlaps the chosen interval.
*/
@Override
public TaskStatus preflight(TaskToolbox toolbox)
public TaskStatus preflight(TaskActionClient taskActionClient)
{
try {
final Function<DataSegment, String> toIdentifier = new Function<DataSegment, String>()
@ -199,7 +200,7 @@ public abstract class MergeTaskBase extends AbstractTask
};
final Set<String> current = ImmutableSet.copyOf(
Iterables.transform(toolbox.getTaskActionClient().submit(defaultListUsedAction()), toIdentifier)
Iterables.transform(taskActionClient.submit(defaultListUsedAction()), toIdentifier)
);
final Set<String> requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier));

View File

@ -25,6 +25,7 @@ import com.google.common.base.Optional;
import com.metamx.druid.Query;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.query.QueryRunner;
import org.joda.time.Interval;
@ -97,14 +98,14 @@ public interface Task
* holding a lock on our dataSource and implicit lock interval (if any). If this method throws an exception, the
* task should be considered a failure.
*
* @param toolbox Toolbox for this task
* @param taskActionClient action client for this task (not the full toolbox)
*
* @return Some kind of status (runnable means continue on to a worker, non-runnable means we completed without
* using a worker).
*
* @throws Exception
*/
public TaskStatus preflight(TaskToolbox toolbox) throws Exception;
public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception;
/**
* Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while

View File

@ -135,15 +135,13 @@ public class VersionConverterTask extends AbstractTask
}
@Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception
public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception
{
if (segment != null) {
return super.preflight(toolbox);
return super.preflight(taskActionClient);
}
final TaskActionClient taskClient = toolbox.getTaskActionClient();
List<DataSegment> segments = taskClient.submit(defaultListUsedAction());
List<DataSegment> segments = taskActionClient.submit(defaultListUsedAction());
final FunctionalIterable<Task> tasks = FunctionalIterable
.create(segments)
@ -164,7 +162,7 @@ public class VersionConverterTask extends AbstractTask
}
);
taskClient.submit(new SpawnTasksAction(Lists.newArrayList(tasks)));
taskActionClient.submit(new SpawnTasksAction(Lists.newArrayList(tasks)));
return TaskStatus.success(getId());
}

View File

@ -25,8 +25,8 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.exec.TaskConsumer;
@ -51,7 +51,7 @@ public class TaskMasterLifecycle
private final ReentrantLock giant = new ReentrantLock();
private final Condition mayBeStopped = giant.newCondition();
private final TaskQueue taskQueue;
private final TaskToolboxFactory taskToolboxFactory;
private final TaskActionClientFactory taskActionClientFactory;
private volatile boolean leading = false;
private volatile TaskRunner taskRunner;
@ -61,7 +61,7 @@ public class TaskMasterLifecycle
public TaskMasterLifecycle(
final TaskQueue taskQueue,
final TaskToolboxFactory taskToolboxFactory,
final TaskActionClientFactory taskActionClientFactory,
final IndexerCoordinatorConfig indexerCoordinatorConfig,
final ServiceDiscoveryConfig serviceDiscoveryConfig,
final TaskRunnerFactory runnerFactory,
@ -71,7 +71,7 @@ public class TaskMasterLifecycle
)
{
this.taskQueue = taskQueue;
this.taskToolboxFactory = taskToolboxFactory;
this.taskActionClientFactory = taskActionClientFactory;
this.leaderSelector = new LeaderSelector(
curator, indexerCoordinatorConfig.getLeaderLatchPath(), new LeaderSelectorListener()
@ -89,7 +89,7 @@ public class TaskMasterLifecycle
final TaskConsumer taskConsumer = new TaskConsumer(
taskQueue,
taskRunner,
taskToolboxFactory,
taskActionClientFactory,
emitter
);
@ -219,9 +219,9 @@ public class TaskMasterLifecycle
return taskQueue;
}
public TaskToolbox getTaskToolbox(Task task)
public TaskActionClient getTaskActionClient(Task task)
{
return taskToolboxFactory.build(task);
return taskActionClientFactory.create(task);
}
public ResourceManagementScheduler getResourceManagementScheduler()

View File

@ -24,7 +24,7 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner;
@ -36,7 +36,7 @@ public class TaskConsumer implements Runnable
{
private final TaskQueue queue;
private final TaskRunner runner;
private final TaskToolboxFactory toolboxFactory;
private final TaskActionClientFactory taskActionClientFactory;
private final ServiceEmitter emitter;
private final Thread thready;
@ -47,13 +47,13 @@ public class TaskConsumer implements Runnable
public TaskConsumer(
TaskQueue queue,
TaskRunner runner,
TaskToolboxFactory toolboxFactory,
TaskActionClientFactory taskActionClientFactory,
ServiceEmitter emitter
)
{
this.queue = queue;
this.runner = runner;
this.toolboxFactory = toolboxFactory;
this.taskActionClientFactory = taskActionClientFactory;
this.emitter = emitter;
this.thready = new Thread(this);
}
@ -123,7 +123,7 @@ public class TaskConsumer implements Runnable
// Run preflight checks
TaskStatus preflightStatus;
try {
preflightStatus = task.preflight(toolboxFactory.build(task));
preflightStatus = task.preflight(taskActionClientFactory.create(task));
log.info("Preflight done for task: %s", task.getId());
}
catch (Exception e) {

View File

@ -370,7 +370,10 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
taskMasterLifecycle = new TaskMasterLifecycle(
taskQueue,
taskToolboxFactory,
new LocalTaskActionClientFactory(
taskStorage,
new TaskActionToolbox(taskQueue, taskLockbox, mergerDBCoordinator, emitter)
),
config,
serviceDiscoveryConfig,
taskRunnerFactory,

View File

@ -191,8 +191,7 @@ public class IndexerCoordinatorResource
final Map<String, Object> retMap;
try {
final T ret = taskMasterLifecycle.getTaskToolbox(holder.getTask())
.getTaskActionClient()
final T ret = taskMasterLifecycle.getTaskActionClient(holder.getTask())
.submit(holder.getAction());
retMap = Maps.newHashMap();
retMap.put("result", ret);

View File

@ -50,6 +50,7 @@ import com.metamx.druid.merger.common.actions.LockAcquireAction;
import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.LockReleaseAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.actions.TaskActionToolbox;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.AbstractTask;
@ -87,6 +88,7 @@ public class TaskLifecycleTest
private TaskQueue tq = null;
private TaskRunner tr = null;
private MockMergerDBCoordinator mdc = null;
private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null;
private TaskConsumer tc = null;
TaskStorageQueryAdapter tsqa = null;
@ -111,6 +113,7 @@ public class TaskLifecycleTest
tl = new TaskLockbox(ts);
tq = new TaskQueue(ts, tl);
mdc = newMockMDC();
tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter()));
tb = new TaskToolboxFactory(
new TaskConfig()
@ -133,7 +136,7 @@ public class TaskLifecycleTest
return null;
}
},
new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())),
tac,
newMockEmitter(),
null, // s3 client
new DataSegmentPusher()
@ -163,7 +166,7 @@ public class TaskLifecycleTest
Executors.newSingleThreadExecutor()
);
tc = new TaskConsumer(tq, tr, tb, newMockEmitter());
tc = new TaskConsumer(tq, tr, tac, newMockEmitter());
tsqa = new TaskStorageQueryAdapter(ts);
tq.start();