From 571375a33e340373ee81cedd856a36e69d5e1f53 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Mon, 4 Mar 2013 14:40:02 -0800 Subject: [PATCH] fixing a bug with indexing service not correctly killing worker nodes --- .../SimpleResourceManagementStrategy.java | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java index 74d34d718d2..3f522908960 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.metamx.common.guava.FunctionalIterable; import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem; import com.metamx.druid.merger.coordinator.ZkWorker; @@ -32,7 +33,9 @@ import org.joda.time.DateTime; import org.joda.time.Duration; import java.util.Collection; +import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; /** @@ -83,10 +86,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat ) ); - for (String workerNodeId : workerNodeIds) { - currentlyProvisioning.remove(workerNodeId); - } - + currentlyProvisioning.removeAll(workerNodeIds); boolean nothingProvisioning = currentlyProvisioning.isEmpty(); if (nothingProvisioning) { @@ -122,31 +122,38 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat @Override public boolean doTerminate(Collection pendingTasks, Collection zkWorkers) { - List workerNodeIds = autoScalingStrategy.ipToIdLookup( - Lists.newArrayList( - Iterables.transform( - zkWorkers, - new Function() - { - @Override - public String apply(ZkWorker input) - { - return input.getWorker().getIp(); - } - } + Set workerNodeIds = Sets.newHashSet( + autoScalingStrategy.ipToIdLookup( + Lists.newArrayList( + Iterables.transform( + zkWorkers, + new Function() + { + @Override + public String apply(ZkWorker input) + { + return input.getWorker().getIp(); + } + } + ) ) ) ); - for (String workerNodeId : workerNodeIds) { - currentlyTerminating.remove(workerNodeId); + Set stillExisting = Sets.newHashSet(); + for (String s : currentlyTerminating) { + if (workerNodeIds.contains(s)) { + stillExisting.add(s); + } } - + currentlyTerminating.clear(); + currentlyTerminating.addAll(stillExisting); boolean nothingTerminating = currentlyTerminating.isEmpty(); if (nothingTerminating) { final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers(); if (zkWorkers.size() <= minNumWorkers) { + log.info("Only [%d <= %d] nodes in the cluster, not terminating anything.", zkWorkers.size(), minNumWorkers); return false; } @@ -167,13 +174,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat ) ); - if (thoseLazyWorkers.size() <= minNumWorkers) { - return false; - } - AutoScalingData terminated = autoScalingStrategy.terminate( Lists.transform( - thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size() - 1), + thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size()), new Function() { @Override