fix logic of how to assign tasks to workers

This commit is contained in:
fjy 2013-08-02 09:01:02 -07:00
parent 584ccac833
commit c33f2f06ff

View File

@ -29,6 +29,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.InputSupplier;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE;
@ -61,9 +62,11 @@ import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@ -670,7 +673,20 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
private ZkWorker findWorkerForTask(final Task task)
{
for (ZkWorker zkWorker : zkWorkers.values()) {
TreeSet<ZkWorker> sortedWorkers = Sets.newTreeSet(
new Comparator<ZkWorker>()
{
@Override
public int compare(
ZkWorker zkWorker, ZkWorker zkWorker2
)
{
return -Ints.compare(zkWorker.getCurrCapacity(), zkWorker2.getCurrCapacity());
}
}
);
sortedWorkers.addAll(zkWorkers.values());
for (ZkWorker zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) &&
zkWorker.getWorker().getVersion().compareTo(workerSetupData.get().getMinVersion()) >= 0) {
return zkWorker;