mirror of https://github.com/apache/druid.git
fixing a bug with indexing service not correctly killing worker nodes
This commit is contained in:
parent
75cefa05b5
commit
665f1909c6
|
@ -23,6 +23,7 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
|
||||
import com.metamx.druid.merger.coordinator.ZkWorker;
|
||||
|
@ -32,7 +33,9 @@ import org.joda.time.DateTime;
|
|||
import org.joda.time.Duration;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
/**
|
||||
|
@ -83,10 +86,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
)
|
||||
);
|
||||
|
||||
for (String workerNodeId : workerNodeIds) {
|
||||
currentlyProvisioning.remove(workerNodeId);
|
||||
}
|
||||
|
||||
currentlyProvisioning.removeAll(workerNodeIds);
|
||||
boolean nothingProvisioning = currentlyProvisioning.isEmpty();
|
||||
|
||||
if (nothingProvisioning) {
|
||||
|
@ -122,31 +122,38 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
@Override
|
||||
public boolean doTerminate(Collection<TaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
|
||||
{
|
||||
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
|
||||
Lists.newArrayList(
|
||||
Iterables.transform(
|
||||
zkWorkers,
|
||||
new Function<ZkWorker, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(ZkWorker input)
|
||||
{
|
||||
return input.getWorker().getIp();
|
||||
}
|
||||
}
|
||||
Set<String> workerNodeIds = Sets.newHashSet(
|
||||
autoScalingStrategy.ipToIdLookup(
|
||||
Lists.newArrayList(
|
||||
Iterables.transform(
|
||||
zkWorkers,
|
||||
new Function<ZkWorker, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(ZkWorker input)
|
||||
{
|
||||
return input.getWorker().getIp();
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
for (String workerNodeId : workerNodeIds) {
|
||||
currentlyTerminating.remove(workerNodeId);
|
||||
Set<String> stillExisting = Sets.newHashSet();
|
||||
for (String s : currentlyTerminating) {
|
||||
if (workerNodeIds.contains(s)) {
|
||||
stillExisting.add(s);
|
||||
}
|
||||
}
|
||||
|
||||
currentlyTerminating.clear();
|
||||
currentlyTerminating.addAll(stillExisting);
|
||||
boolean nothingTerminating = currentlyTerminating.isEmpty();
|
||||
|
||||
if (nothingTerminating) {
|
||||
final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers();
|
||||
if (zkWorkers.size() <= minNumWorkers) {
|
||||
log.info("Only [%d <= %d] nodes in the cluster, not terminating anything.", zkWorkers.size(), minNumWorkers);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -167,13 +174,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
)
|
||||
);
|
||||
|
||||
if (thoseLazyWorkers.size() <= minNumWorkers) {
|
||||
return false;
|
||||
}
|
||||
|
||||
AutoScalingData terminated = autoScalingStrategy.terminate(
|
||||
Lists.transform(
|
||||
thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size() - 1),
|
||||
thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size()),
|
||||
new Function<ZkWorker, String>()
|
||||
{
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue