fixing a bug with indexing service not correctly killing worker nodes

This commit is contained in:
Fangjin Yang 2013-03-04 14:40:02 -08:00
parent 6977fe03bd
commit 571375a33e
1 changed files with 27 additions and 24 deletions

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem; import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.merger.coordinator.ZkWorker; import com.metamx.druid.merger.coordinator.ZkWorker;
@ -32,7 +33,9 @@ import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
/** /**
@ -83,10 +86,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
) )
); );
for (String workerNodeId : workerNodeIds) { currentlyProvisioning.removeAll(workerNodeIds);
currentlyProvisioning.remove(workerNodeId);
}
boolean nothingProvisioning = currentlyProvisioning.isEmpty(); boolean nothingProvisioning = currentlyProvisioning.isEmpty();
if (nothingProvisioning) { if (nothingProvisioning) {
@ -122,31 +122,38 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
@Override @Override
public boolean doTerminate(Collection<TaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers) public boolean doTerminate(Collection<TaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{ {
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup( Set<String> workerNodeIds = Sets.newHashSet(
Lists.newArrayList( autoScalingStrategy.ipToIdLookup(
Iterables.transform( Lists.newArrayList(
zkWorkers, Iterables.transform(
new Function<ZkWorker, String>() zkWorkers,
{ new Function<ZkWorker, String>()
@Override {
public String apply(ZkWorker input) @Override
{ public String apply(ZkWorker input)
return input.getWorker().getIp(); {
} return input.getWorker().getIp();
} }
}
)
) )
) )
); );
for (String workerNodeId : workerNodeIds) { Set<String> stillExisting = Sets.newHashSet();
currentlyTerminating.remove(workerNodeId); for (String s : currentlyTerminating) {
if (workerNodeIds.contains(s)) {
stillExisting.add(s);
}
} }
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
boolean nothingTerminating = currentlyTerminating.isEmpty(); boolean nothingTerminating = currentlyTerminating.isEmpty();
if (nothingTerminating) { if (nothingTerminating) {
final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers(); final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers();
if (zkWorkers.size() <= minNumWorkers) { if (zkWorkers.size() <= minNumWorkers) {
log.info("Only [%d <= %d] nodes in the cluster, not terminating anything.", zkWorkers.size(), minNumWorkers);
return false; return false;
} }
@ -167,13 +174,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
) )
); );
if (thoseLazyWorkers.size() <= minNumWorkers) {
return false;
}
AutoScalingData terminated = autoScalingStrategy.terminate( AutoScalingData terminated = autoScalingStrategy.terminate(
Lists.transform( Lists.transform(
thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size() - 1), thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size()),
new Function<ZkWorker, String>() new Function<ZkWorker, String>()
{ {
@Override @Override