From 4643030716fb83633d229d9b73069801aef1a518 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 7 Mar 2013 22:38:28 -0800 Subject: [PATCH] TaskToolbox: Rename getTaskActionClientFactory -> getTaskActionClient --- .../druid/merger/common/TaskToolbox.java | 2 +- .../druid/merger/common/task/DeleteTask.java | 4 +-- .../merger/common/task/HadoopIndexTask.java | 4 +-- .../task/IndexDeterminePartitionsTask.java | 2 +- .../common/task/IndexGeneratorTask.java | 4 +-- .../druid/merger/common/task/IndexTask.java | 2 +- .../druid/merger/common/task/KillTask.java | 6 ++-- .../merger/common/task/MergeTaskBase.java | 11 ++----- .../common/task/VersionConverterTask.java | 4 +-- .../http/IndexerCoordinatorResource.java | 5 +-- .../merger/coordinator/TaskLifecycleTest.java | 33 +++++++++---------- .../merger/coordinator/TaskQueueTest.java | 2 +- 12 files changed, 35 insertions(+), 44 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java index 32cb188273a..e69b0f827e7 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java @@ -80,7 +80,7 @@ public class TaskToolbox return config; } - public TaskActionClient getTaskActionClientFactory() + public TaskActionClient getTaskActionClient() { return taskActionClientFactory.create(task); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java index 67754ee00d7..86fd2a7ec37 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java @@ -77,7 +77,7 @@ public class DeleteTask extends AbstractTask public TaskStatus run(TaskToolbox toolbox) throws Exception { // Strategy: Create an empty segment covering the interval to be deleted - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); final Interval interval = this.getImplicitLockInterval().get(); final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]); final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty); @@ -104,7 +104,7 @@ public class DeleteTask extends AbstractTask segment.getVersion() ); - toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java index f0d09d5137d..6e284557529 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java @@ -93,7 +93,7 @@ public class HadoopIndexTask extends AbstractTask ); // We should have a lock from before we started running - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); log.info("Setting version to: %s", myLock.getVersion()); configCopy.setVersion(myLock.getVersion()); @@ -124,7 +124,7 @@ public class HadoopIndexTask extends AbstractTask List publishedSegments = job.getPublishedSegments(); // Request segment pushes - toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments))); // Done return TaskStatus.success(getId()); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java index 3dfe99a68f1..47f72b12501 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java @@ -258,7 +258,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask } ); - toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(nextTasks)); + toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks)); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java index 1790ddc6aa5..dd928883232 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java @@ -100,7 +100,7 @@ public class IndexGeneratorTask extends AbstractTask public TaskStatus run(final TaskToolbox toolbox) throws Exception { // We should have a lock from before we started running - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); // We know this exists final Interval interval = getImplicitLockInterval().get(); @@ -190,7 +190,7 @@ public class IndexGeneratorTask extends AbstractTask ); // Request segment pushes - toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments))); // Done return TaskStatus.success(getId()); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java index d1bfc6d77fc..35babcd6a22 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java @@ -142,7 +142,7 @@ public class IndexTask extends AbstractTask @Override public TaskStatus preflight(TaskToolbox toolbox) throws Exception { - toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(toSubtasks())); + toolbox.getTaskActionClient().submit(new SpawnTasksAction(toSubtasks())); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java index e652ab69151..f4476ffd858 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java @@ -72,7 +72,7 @@ public class KillTask extends AbstractTask public TaskStatus run(TaskToolbox toolbox) throws Exception { // Confirm we have a lock (will throw if there isn't exactly one element) - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); if(!myLock.getDataSource().equals(getDataSource())) { throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); @@ -84,7 +84,7 @@ public class KillTask extends AbstractTask // List unused segments final List unusedSegments = toolbox - .getTaskActionClientFactory() + .getTaskActionClient() .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval())); // Verify none of these segments have versions > lock version @@ -107,7 +107,7 @@ public class KillTask extends AbstractTask } // Remove metadata for these segments - toolbox.getTaskActionClientFactory().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments))); + toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments))); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java index e0b3dd6ff17..4bda0363941 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java @@ -20,15 +20,12 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -36,7 +33,6 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.hash.Hashing; import com.metamx.common.ISE; -import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.merger.common.TaskLock; import com.metamx.druid.merger.common.TaskStatus; @@ -45,7 +41,6 @@ import com.metamx.druid.merger.common.actions.LockListAction; import com.metamx.druid.merger.common.actions.SegmentInsertAction; import com.metamx.druid.shard.NoneShardSpec; import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.service.AlertEvent; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import org.joda.time.DateTime; @@ -115,7 +110,7 @@ public abstract class MergeTaskBase extends AbstractTask @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); final ServiceEmitter emitter = toolbox.getEmitter(); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments); @@ -166,7 +161,7 @@ public abstract class MergeTaskBase extends AbstractTask emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart)); emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize())); - toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); return TaskStatus.success(getId()); } @@ -196,7 +191,7 @@ public abstract class MergeTaskBase extends AbstractTask }; final Set current = ImmutableSet.copyOf( - Iterables.transform(toolbox.getTaskActionClientFactory().submit(defaultListUsedAction()), toIdentifier) + Iterables.transform(toolbox.getTaskActionClient().submit(defaultListUsedAction()), toIdentifier) ); final Set requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier)); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java index c8c0e2cbf42..cebebd218cd 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java @@ -93,7 +93,7 @@ public class VersionConverterTask extends AbstractTask return super.preflight(toolbox); } - final TaskActionClient taskClient = toolbox.getTaskActionClientFactory(); + final TaskActionClient taskClient = toolbox.getTaskActionClient(); List segments = taskClient.submit(defaultListUsedAction()); @@ -176,7 +176,7 @@ public class VersionConverterTask extends AbstractTask DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion)); updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment); - toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment))); } else { log.info("Conversion failed."); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java index b5afa1dceef..21adae8b09b 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; import com.metamx.common.logger.Logger; @@ -35,8 +34,6 @@ import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.TaskMasterLifecycle; import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; -import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; -import com.metamx.druid.merger.coordinator.scaling.ScalingStats; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.emitter.service.ServiceEmitter; @@ -188,7 +185,7 @@ public class IndexerCoordinatorResource public Response doAction(final TaskActionHolder holder) { final T ret = taskMasterLifecycle.getTaskToolbox(holder.getTask()) - .getTaskActionClientFactory() + .getTaskActionClient() .submit(holder.getAction()); final Map retMap = Maps.newHashMap(); diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java index 4fac2c5eded..c94369726e9 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java @@ -66,7 +66,6 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEventBuilder; import org.apache.commons.io.FileUtils; import org.easymock.EasyMock; -import org.jets3t.service.ServiceException; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.After; @@ -283,8 +282,8 @@ public class TaskLifecycleTest // Sort of similar to what realtime tasks do: // Acquire lock for first interval - final Optional lock1 = toolbox.getTaskActionClientFactory().submit(new LockAcquireAction(interval1)); - final List locks1 = toolbox.getTaskActionClientFactory().submit(new LockListAction()); + final Optional lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1)); + final List locks1 = toolbox.getTaskActionClient().submit(new LockListAction()); // (Confirm lock sanity) Assert.assertTrue("lock1 present", lock1.isPresent()); @@ -292,8 +291,8 @@ public class TaskLifecycleTest Assert.assertEquals("locks1", ImmutableList.of(lock1.get()), locks1); // Acquire lock for second interval - final Optional lock2 = toolbox.getTaskActionClientFactory().submit(new LockAcquireAction(interval2)); - final List locks2 = toolbox.getTaskActionClientFactory().submit(new LockListAction()); + final Optional lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2)); + final List locks2 = toolbox.getTaskActionClient().submit(new LockListAction()); // (Confirm lock sanity) Assert.assertTrue("lock2 present", lock2.isPresent()); @@ -301,7 +300,7 @@ public class TaskLifecycleTest Assert.assertEquals("locks2", ImmutableList.of(lock1.get(), lock2.get()), locks2); // Push first segment - toolbox.getTaskActionClientFactory() + toolbox.getTaskActionClient() .submit( new SegmentInsertAction( ImmutableSet.of( @@ -315,14 +314,14 @@ public class TaskLifecycleTest ); // Release first lock - toolbox.getTaskActionClientFactory().submit(new LockReleaseAction(interval1)); - final List locks3 = toolbox.getTaskActionClientFactory().submit(new LockListAction()); + toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1)); + final List locks3 = toolbox.getTaskActionClient().submit(new LockListAction()); // (Confirm lock sanity) Assert.assertEquals("locks3", ImmutableList.of(lock2.get()), locks3); // Push second segment - toolbox.getTaskActionClientFactory() + toolbox.getTaskActionClient() .submit( new SegmentInsertAction( ImmutableSet.of( @@ -336,8 +335,8 @@ public class TaskLifecycleTest ); // Release second lock - toolbox.getTaskActionClientFactory().submit(new LockReleaseAction(interval2)); - final List locks4 = toolbox.getTaskActionClientFactory().submit(new LockListAction()); + toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2)); + final List locks4 = toolbox.getTaskActionClient().submit(new LockListAction()); // (Confirm lock sanity) Assert.assertEquals("locks4", ImmutableList.of(), locks4); @@ -370,7 +369,7 @@ public class TaskLifecycleTest public TaskStatus run(TaskToolbox toolbox) throws Exception { final TaskLock myLock = Iterables.getOnlyElement( - toolbox.getTaskActionClientFactory() + toolbox.getTaskActionClient() .submit(new LockListAction()) ); @@ -380,7 +379,7 @@ public class TaskLifecycleTest .version(myLock.getVersion()) .build(); - toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); return TaskStatus.success(getId()); } }; @@ -406,7 +405,7 @@ public class TaskLifecycleTest @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); final DataSegment segment = DataSegment.builder() .dataSource("ds") @@ -414,7 +413,7 @@ public class TaskLifecycleTest .version(myLock.getVersion()) .build(); - toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); return TaskStatus.success(getId()); } }; @@ -440,7 +439,7 @@ public class TaskLifecycleTest @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); final DataSegment segment = DataSegment.builder() .dataSource("ds") @@ -448,7 +447,7 @@ public class TaskLifecycleTest .version(myLock.getVersion() + "1!!!1!!") .build(); - toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); return TaskStatus.success(getId()); } }; diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java index dfb0d959d1a..939dc9b6b21 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java @@ -375,7 +375,7 @@ public class TaskQueueTest @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(nextTasks)); + toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks)); return TaskStatus.success(id); } };