address cr again

This commit is contained in:
fjy 2014-09-30 11:40:29 -07:00
parent 55db06ccb1
commit 4c23a5e9f6
3 changed files with 10 additions and 19 deletions

View File

@ -30,24 +30,15 @@ import java.util.Set;
*/ */
public class ImmutableZkWorker public class ImmutableZkWorker
{ {
private final ZkWorker mutableZkWorker;
private final Worker worker; private final Worker worker;
private final int currCapacityUsed; private final int currCapacityUsed;
private final Set<String> availabilityGroups; private final Set<String> availabilityGroups;
public ImmutableZkWorker( public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set<String> availabilityGroups)
ZkWorker mutableZkWorker
)
{ {
this.mutableZkWorker = mutableZkWorker; this.worker = worker;
this.worker = mutableZkWorker.getWorker(); this.currCapacityUsed = currCapacityUsed;
this.currCapacityUsed = mutableZkWorker.getCurrCapacityUsed(); this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
this.availabilityGroups = ImmutableSet.copyOf(mutableZkWorker.getAvailabilityGroups());
}
public ZkWorker getMutableZkWorker()
{
return mutableZkWorker;
} }
public Worker getWorker() public Worker getWorker()

View File

@ -64,7 +64,6 @@ import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.MalformedURLException; import java.net.MalformedURLException;
@ -522,7 +521,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return true; return true;
} else { } else {
// Nothing running this task, announce it in ZK for a worker to run it // Nothing running this task, announce it in ZK for a worker to run it
Optional<ZkWorker> zkWorker = strategy.findWorkerForTask( final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
ImmutableMap.copyOf( ImmutableMap.copyOf(
Maps.transformEntries( Maps.transformEntries(
zkWorkers, zkWorkers,
@ -540,8 +539,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
), ),
task task
); );
if (zkWorker.isPresent()) { if (immutableZkWorker.isPresent()) {
announceTask(task, zkWorker.get(), taskRunnerWorkItem); final ZkWorker zkWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost());
announceTask(task, zkWorker, taskRunnerWorkItem);
return true; return true;
} else { } else {
return false; return false;

View File

@ -44,9 +44,9 @@ public interface WorkerSelectStrategy
* @param zkWorkers An immutable map of workers to choose from. * @param zkWorkers An immutable map of workers to choose from.
* @param task The task to assign. * @param task The task to assign.
* *
* @return A {@link io.druid.indexing.overlord.ZkWorker} to run the task if one is available. * @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available.
*/ */
public Optional<ZkWorker> findWorkerForTask( public Optional<ImmutableZkWorker> findWorkerForTask(
final ImmutableMap<String, ImmutableZkWorker> zkWorkers, final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
final Task task final Task task
); );