Merge branch 'master' into indexing_console

This commit is contained in:
Fangjin Yang 2013-03-04 15:23:34 -08:00
commit 10ec2288ee
14 changed files with 46 additions and 36 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
</parent>
<modules>

View File

@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.merger.coordinator.ZkWorker;
@ -32,7 +33,9 @@ import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
/**
@ -83,10 +86,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
)
);
for (String workerNodeId : workerNodeIds) {
currentlyProvisioning.remove(workerNodeId);
}
currentlyProvisioning.removeAll(workerNodeIds);
boolean nothingProvisioning = currentlyProvisioning.isEmpty();
if (nothingProvisioning) {
@ -122,7 +122,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
@Override
public boolean doTerminate(Collection<TaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
Set<String> workerNodeIds = Sets.newHashSet(
autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
zkWorkers,
@ -136,17 +137,23 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
)
)
)
);
for (String workerNodeId : workerNodeIds) {
currentlyTerminating.remove(workerNodeId);
Set<String> stillExisting = Sets.newHashSet();
for (String s : currentlyTerminating) {
if (workerNodeIds.contains(s)) {
stillExisting.add(s);
}
}
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
boolean nothingTerminating = currentlyTerminating.isEmpty();
if (nothingTerminating) {
final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers();
if (zkWorkers.size() <= minNumWorkers) {
log.info("Only [%d <= %d] nodes in the cluster, not terminating anything.", zkWorkers.size(), minNumWorkers);
return false;
}
@ -167,13 +174,16 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
)
);
if (thoseLazyWorkers.size() <= minNumWorkers) {
int maxPossibleNodesTerminated = zkWorkers.size() - minNumWorkers;
int numNodesToTerminate = Math.min(maxPossibleNodesTerminated, thoseLazyWorkers.size());
if (numNodesToTerminate <= 0) {
log.info("Found no nodes to terminate.");
return false;
}
AutoScalingData terminated = autoScalingStrategy.terminate(
Lists.transform(
thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size() - 1),
thoseLazyWorkers.subList(0, numNodesToTerminate),
new Function<ZkWorker, String>()
{
@Override

View File

@ -223,9 +223,9 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(workerSetupManager);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2);
.andReturn(Lists.<String>newArrayList("ip")).times(2);
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList("foobar"), Lists.newArrayList("foobrick"))
new AutoScalingData(Lists.<String>newArrayList("ip"), Lists.newArrayList("ip"))
);
EasyMock.replay(autoScalingStrategy);

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
</parent>
<properties>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.17-SNAPSHOT</version>
<version>0.3.19-SNAPSHOT</version>
</parent>
<dependencies>