TaskToolbox: Rename getTaskActionClientFactory -> getTaskActionClient

This commit is contained in:
Gian Merlino 2013-03-07 22:38:28 -08:00
parent 08bff3f472
commit 4643030716
12 changed files with 35 additions and 44 deletions

View File

@ -80,7 +80,7 @@ public class TaskToolbox
return config; return config;
} }
public TaskActionClient getTaskActionClientFactory() public TaskActionClient getTaskActionClient()
{ {
return taskActionClientFactory.create(task); return taskActionClientFactory.create(task);
} }

View File

@ -77,7 +77,7 @@ public class DeleteTask extends AbstractTask
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
// Strategy: Create an empty segment covering the interval to be deleted // Strategy: Create an empty segment covering the interval to be deleted
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final Interval interval = this.getImplicitLockInterval().get(); final Interval interval = this.getImplicitLockInterval().get();
final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]); final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty); final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty);
@ -104,7 +104,7 @@ public class DeleteTask extends AbstractTask
segment.getVersion() segment.getVersion()
); );
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -93,7 +93,7 @@ public class HadoopIndexTask extends AbstractTask
); );
// We should have a lock from before we started running // We should have a lock from before we started running
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
log.info("Setting version to: %s", myLock.getVersion()); log.info("Setting version to: %s", myLock.getVersion());
configCopy.setVersion(myLock.getVersion()); configCopy.setVersion(myLock.getVersion());
@ -124,7 +124,7 @@ public class HadoopIndexTask extends AbstractTask
List<DataSegment> publishedSegments = job.getPublishedSegments(); List<DataSegment> publishedSegments = job.getPublishedSegments();
// Request segment pushes // Request segment pushes
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments))); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
// Done // Done
return TaskStatus.success(getId()); return TaskStatus.success(getId());

View File

@ -258,7 +258,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
} }
); );
toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(nextTasks)); toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -100,7 +100,7 @@ public class IndexGeneratorTask extends AbstractTask
public TaskStatus run(final TaskToolbox toolbox) throws Exception public TaskStatus run(final TaskToolbox toolbox) throws Exception
{ {
// We should have a lock from before we started running // We should have a lock from before we started running
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
// We know this exists // We know this exists
final Interval interval = getImplicitLockInterval().get(); final Interval interval = getImplicitLockInterval().get();
@ -190,7 +190,7 @@ public class IndexGeneratorTask extends AbstractTask
); );
// Request segment pushes // Request segment pushes
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments))); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments)));
// Done // Done
return TaskStatus.success(getId()); return TaskStatus.success(getId());

View File

@ -142,7 +142,7 @@ public class IndexTask extends AbstractTask
@Override @Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception public TaskStatus preflight(TaskToolbox toolbox) throws Exception
{ {
toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(toSubtasks())); toolbox.getTaskActionClient().submit(new SpawnTasksAction(toSubtasks()));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -72,7 +72,7 @@ public class KillTask extends AbstractTask
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
// Confirm we have a lock (will throw if there isn't exactly one element) // Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
if(!myLock.getDataSource().equals(getDataSource())) { if(!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
@ -84,7 +84,7 @@ public class KillTask extends AbstractTask
// List unused segments // List unused segments
final List<DataSegment> unusedSegments = toolbox final List<DataSegment> unusedSegments = toolbox
.getTaskActionClientFactory() .getTaskActionClient()
.submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval())); .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
// Verify none of these segments have versions > lock version // Verify none of these segments have versions > lock version
@ -107,7 +107,7 @@ public class KillTask extends AbstractTask
} }
// Remove metadata for these segments // Remove metadata for these segments
toolbox.getTaskActionClientFactory().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments))); toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -20,15 +20,12 @@
package com.metamx.druid.merger.common.task; package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -36,7 +33,6 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskLock; import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
@ -45,7 +41,6 @@ import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction; import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -115,7 +110,7 @@ public abstract class MergeTaskBase extends AbstractTask
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final ServiceEmitter emitter = toolbox.getEmitter(); final ServiceEmitter emitter = toolbox.getEmitter();
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments); final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);
@ -166,7 +161,7 @@ public abstract class MergeTaskBase extends AbstractTask
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart)); emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize())); emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }
@ -196,7 +191,7 @@ public abstract class MergeTaskBase extends AbstractTask
}; };
final Set<String> current = ImmutableSet.copyOf( final Set<String> current = ImmutableSet.copyOf(
Iterables.transform(toolbox.getTaskActionClientFactory().submit(defaultListUsedAction()), toIdentifier) Iterables.transform(toolbox.getTaskActionClient().submit(defaultListUsedAction()), toIdentifier)
); );
final Set<String> requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier)); final Set<String> requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier));

View File

@ -93,7 +93,7 @@ public class VersionConverterTask extends AbstractTask
return super.preflight(toolbox); return super.preflight(toolbox);
} }
final TaskActionClient taskClient = toolbox.getTaskActionClientFactory(); final TaskActionClient taskClient = toolbox.getTaskActionClient();
List<DataSegment> segments = taskClient.submit(defaultListUsedAction()); List<DataSegment> segments = taskClient.submit(defaultListUsedAction());
@ -176,7 +176,7 @@ public class VersionConverterTask extends AbstractTask
DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion)); DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion));
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment); updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment);
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment))); toolbox.getTaskActionClient().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
} else { } else {
log.info("Conversion failed."); log.info("Conversion failed.");
} }

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -35,8 +34,6 @@ import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle; import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter; import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
import com.metamx.druid.merger.coordinator.scaling.ScalingStats;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
@ -188,7 +185,7 @@ public class IndexerCoordinatorResource
public <T> Response doAction(final TaskActionHolder<T> holder) public <T> Response doAction(final TaskActionHolder<T> holder)
{ {
final T ret = taskMasterLifecycle.getTaskToolbox(holder.getTask()) final T ret = taskMasterLifecycle.getTaskToolbox(holder.getTask())
.getTaskActionClientFactory() .getTaskActionClient()
.submit(holder.getAction()); .submit(holder.getAction());
final Map<String, Object> retMap = Maps.newHashMap(); final Map<String, Object> retMap = Maps.newHashMap();

View File

@ -66,7 +66,6 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder; import com.metamx.emitter.service.ServiceEventBuilder;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.jets3t.service.ServiceException;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.After; import org.junit.After;
@ -283,8 +282,8 @@ public class TaskLifecycleTest
// Sort of similar to what realtime tasks do: // Sort of similar to what realtime tasks do:
// Acquire lock for first interval // Acquire lock for first interval
final Optional<TaskLock> lock1 = toolbox.getTaskActionClientFactory().submit(new LockAcquireAction(interval1)); final Optional<TaskLock> lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1));
final List<TaskLock> locks1 = toolbox.getTaskActionClientFactory().submit(new LockListAction()); final List<TaskLock> locks1 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity) // (Confirm lock sanity)
Assert.assertTrue("lock1 present", lock1.isPresent()); Assert.assertTrue("lock1 present", lock1.isPresent());
@ -292,8 +291,8 @@ public class TaskLifecycleTest
Assert.assertEquals("locks1", ImmutableList.of(lock1.get()), locks1); Assert.assertEquals("locks1", ImmutableList.of(lock1.get()), locks1);
// Acquire lock for second interval // Acquire lock for second interval
final Optional<TaskLock> lock2 = toolbox.getTaskActionClientFactory().submit(new LockAcquireAction(interval2)); final Optional<TaskLock> lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2));
final List<TaskLock> locks2 = toolbox.getTaskActionClientFactory().submit(new LockListAction()); final List<TaskLock> locks2 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity) // (Confirm lock sanity)
Assert.assertTrue("lock2 present", lock2.isPresent()); Assert.assertTrue("lock2 present", lock2.isPresent());
@ -301,7 +300,7 @@ public class TaskLifecycleTest
Assert.assertEquals("locks2", ImmutableList.of(lock1.get(), lock2.get()), locks2); Assert.assertEquals("locks2", ImmutableList.of(lock1.get(), lock2.get()), locks2);
// Push first segment // Push first segment
toolbox.getTaskActionClientFactory() toolbox.getTaskActionClient()
.submit( .submit(
new SegmentInsertAction( new SegmentInsertAction(
ImmutableSet.of( ImmutableSet.of(
@ -315,14 +314,14 @@ public class TaskLifecycleTest
); );
// Release first lock // Release first lock
toolbox.getTaskActionClientFactory().submit(new LockReleaseAction(interval1)); toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1));
final List<TaskLock> locks3 = toolbox.getTaskActionClientFactory().submit(new LockListAction()); final List<TaskLock> locks3 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity) // (Confirm lock sanity)
Assert.assertEquals("locks3", ImmutableList.of(lock2.get()), locks3); Assert.assertEquals("locks3", ImmutableList.of(lock2.get()), locks3);
// Push second segment // Push second segment
toolbox.getTaskActionClientFactory() toolbox.getTaskActionClient()
.submit( .submit(
new SegmentInsertAction( new SegmentInsertAction(
ImmutableSet.of( ImmutableSet.of(
@ -336,8 +335,8 @@ public class TaskLifecycleTest
); );
// Release second lock // Release second lock
toolbox.getTaskActionClientFactory().submit(new LockReleaseAction(interval2)); toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2));
final List<TaskLock> locks4 = toolbox.getTaskActionClientFactory().submit(new LockListAction()); final List<TaskLock> locks4 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity) // (Confirm lock sanity)
Assert.assertEquals("locks4", ImmutableList.<TaskLock>of(), locks4); Assert.assertEquals("locks4", ImmutableList.<TaskLock>of(), locks4);
@ -370,7 +369,7 @@ public class TaskLifecycleTest
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
final TaskLock myLock = Iterables.getOnlyElement( final TaskLock myLock = Iterables.getOnlyElement(
toolbox.getTaskActionClientFactory() toolbox.getTaskActionClient()
.submit(new LockListAction()) .submit(new LockListAction())
); );
@ -380,7 +379,7 @@ public class TaskLifecycleTest
.version(myLock.getVersion()) .version(myLock.getVersion())
.build(); .build();
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment))); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }
}; };
@ -406,7 +405,7 @@ public class TaskLifecycleTest
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final DataSegment segment = DataSegment.builder() final DataSegment segment = DataSegment.builder()
.dataSource("ds") .dataSource("ds")
@ -414,7 +413,7 @@ public class TaskLifecycleTest
.version(myLock.getVersion()) .version(myLock.getVersion())
.build(); .build();
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment))); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }
}; };
@ -440,7 +439,7 @@ public class TaskLifecycleTest
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final DataSegment segment = DataSegment.builder() final DataSegment segment = DataSegment.builder()
.dataSource("ds") .dataSource("ds")
@ -448,7 +447,7 @@ public class TaskLifecycleTest
.version(myLock.getVersion() + "1!!!1!!") .version(myLock.getVersion() + "1!!!1!!")
.build(); .build();
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment))); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }
}; };

View File

@ -375,7 +375,7 @@ public class TaskQueueTest
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(nextTasks)); toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks));
return TaskStatus.success(id); return TaskStatus.success(id);
} }
}; };