From 4c23a5e9f655cf32b22048c96e49c94098d1f345 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 11:40:29 -0700 Subject: [PATCH] address cr again --- .../indexing/overlord/ImmutableZkWorker.java | 17 ++++------------- .../indexing/overlord/RemoteTaskRunner.java | 8 ++++---- .../overlord/setup/WorkerSelectStrategy.java | 4 ++-- 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java index 893ca3d9a72..51d05d222d0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java @@ -30,24 +30,15 @@ import java.util.Set; */ public class ImmutableZkWorker { - private final ZkWorker mutableZkWorker; private final Worker worker; private final int currCapacityUsed; private final Set availabilityGroups; - public ImmutableZkWorker( - ZkWorker mutableZkWorker - ) + public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set availabilityGroups) { - this.mutableZkWorker = mutableZkWorker; - this.worker = mutableZkWorker.getWorker(); - this.currCapacityUsed = mutableZkWorker.getCurrCapacityUsed(); - this.availabilityGroups = ImmutableSet.copyOf(mutableZkWorker.getAvailabilityGroups()); - } - - public ZkWorker getMutableZkWorker() - { - return mutableZkWorker; + this.worker = worker; + this.currCapacityUsed = currCapacityUsed; + this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups); } public Worker getWorker() diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index df14f4f1ac0..7dc1322a366 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -64,7 +64,6 @@ import org.apache.zookeeper.KeeperException; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; -import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; @@ -522,7 +521,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return true; } else { // Nothing running this task, announce it in ZK for a worker to run it - Optional zkWorker = strategy.findWorkerForTask( + final Optional immutableZkWorker = strategy.findWorkerForTask( ImmutableMap.copyOf( Maps.transformEntries( zkWorkers, @@ -540,8 +539,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer ), task ); - if (zkWorker.isPresent()) { - announceTask(task, zkWorker.get(), taskRunnerWorkItem); + if (immutableZkWorker.isPresent()) { + final ZkWorker zkWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost()); + announceTask(task, zkWorker, taskRunnerWorkItem); return true; } else { return false; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java index 6a627cb25f6..80103b6b721 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -44,9 +44,9 @@ public interface WorkerSelectStrategy * @param zkWorkers An immutable map of workers to choose from. * @param task The task to assign. * - * @return A {@link io.druid.indexing.overlord.ZkWorker} to run the task if one is available. + * @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available. */ - public Optional findWorkerForTask( + public Optional findWorkerForTask( final ImmutableMap zkWorkers, final Task task );