Merge branch 'master' into stop_task

Conflicts:
	merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java
This commit is contained in:
Fangjin Yang 2013-03-26 14:14:23 -07:00
commit 537f527960
18 changed files with 82 additions and 49 deletions

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId> <artifactId>druid-services</artifactId>
<name>druid-services</name> <name>druid-services</name>
<description>druid-services</description> <description>druid-services</description>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId> <artifactId>druid-examples</artifactId>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId> <artifactId>druid-examples</artifactId>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
@ -160,11 +160,6 @@
<groupId>com.github.sgroschupf</groupId> <groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId> <artifactId>zkclient</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.3</version>
</dependency>
<!-- Tests --> <!-- Tests -->
<dependency> <dependency>

View File

@ -70,6 +70,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
@Override @Override
public boolean doProvision(Collection<TaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers) public boolean doProvision(Collection<TaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{ {
if (zkWorkers.size() >= workerSetupdDataRef.get().getMaxNumWorkers()) {
log.info(
"Cannot scale anymore. Num workers = %d, Max num workers = %d",
zkWorkers.size(),
workerSetupdDataRef.get().getMaxNumWorkers()
);
return false;
}
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup( List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
Lists.newArrayList( Lists.newArrayList(
Iterables.transform( Iterables.transform(

View File

@ -20,8 +20,6 @@
package com.metamx.druid.merger.coordinator.setup; package com.metamx.druid.merger.coordinator.setup;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
@ -35,6 +33,7 @@ public class WorkerSetupData
private final String minVersion; private final String minVersion;
private final int minNumWorkers; private final int minNumWorkers;
private final int maxNumWorkers;
private final EC2NodeData nodeData; private final EC2NodeData nodeData;
private final GalaxyUserData userData; private final GalaxyUserData userData;
@ -42,12 +41,14 @@ public class WorkerSetupData
public WorkerSetupData( public WorkerSetupData(
@JsonProperty("minVersion") String minVersion, @JsonProperty("minVersion") String minVersion,
@JsonProperty("minNumWorkers") int minNumWorkers, @JsonProperty("minNumWorkers") int minNumWorkers,
@JsonProperty("maxNumWorkers") int maxNumWorkers,
@JsonProperty("nodeData") EC2NodeData nodeData, @JsonProperty("nodeData") EC2NodeData nodeData,
@JsonProperty("userData") GalaxyUserData userData @JsonProperty("userData") GalaxyUserData userData
) )
{ {
this.minVersion = minVersion; this.minVersion = minVersion;
this.minNumWorkers = minNumWorkers; this.minNumWorkers = minNumWorkers;
this.maxNumWorkers = maxNumWorkers;
this.nodeData = nodeData; this.nodeData = nodeData;
this.userData = userData; this.userData = userData;
} }
@ -64,6 +65,12 @@ public class WorkerSetupData
return minNumWorkers; return minNumWorkers;
} }
@JsonProperty
public int getMaxNumWorkers()
{
return maxNumWorkers;
}
@JsonProperty @JsonProperty
public EC2NodeData getNodeData() public EC2NodeData getNodeData()
{ {
@ -82,6 +89,7 @@ public class WorkerSetupData
return "WorkerSetupData{" + return "WorkerSetupData{" +
"minVersion='" + minVersion + '\'' + "minVersion='" + minVersion + '\'' +
", minNumWorkers=" + minNumWorkers + ", minNumWorkers=" + minNumWorkers +
", maxNumWorkers=" + maxNumWorkers +
", nodeData=" + nodeData + ", nodeData=" + nodeData +
", userData=" + userData + ", userData=" + userData +
'}'; '}';

View File

@ -331,7 +331,7 @@ public class RemoteTaskRunnerTest
pathChildrenCache, pathChildrenCache,
scheduledExec, scheduledExec,
new RetryPolicyFactory(new TestRetryPolicyConfig()), new RetryPolicyFactory(new TestRetryPolicyConfig()),
new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, null, null)), new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, 1, null, null)),
null null
); );

View File

@ -105,6 +105,7 @@ public class EC2AutoScalingStrategyTest
new WorkerSetupData( new WorkerSetupData(
"0", "0",
0, 0,
1,
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo"), new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo"),
new GalaxyUserData("env", "version", "type") new GalaxyUserData("env", "version", "type")
) )

View File

@ -60,7 +60,11 @@ public class SimpleResourceManagementStrategyTest
public void setUp() throws Exception public void setUp() throws Exception
{ {
autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class); autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class);
workerSetupData = new AtomicReference<WorkerSetupData>(null); workerSetupData = new AtomicReference<WorkerSetupData>(
new WorkerSetupData(
"0", 0, 2, null, null
)
);
testTask = new TestTask( testTask = new TestTask(
"task1", "task1",
@ -251,7 +255,7 @@ public class SimpleResourceManagementStrategyTest
@Test @Test
public void testDoSuccessfulTerminate() throws Exception public void testDoSuccessfulTerminate() throws Exception
{ {
workerSetupData.set(new WorkerSetupData("0", 0, null, null)); workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null));
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()); .andReturn(Lists.<String>newArrayList());
@ -281,7 +285,7 @@ public class SimpleResourceManagementStrategyTest
@Test @Test
public void testSomethingTerminating() throws Exception public void testSomethingTerminating() throws Exception
{ {
workerSetupData.set(new WorkerSetupData("0", 0, null, null)); workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null));
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2); .andReturn(Lists.<String>newArrayList("ip")).times(2);

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
</parent> </parent>
<properties> <properties>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.28-SNAPSHOT</version> <version>0.3.29-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -29,7 +29,6 @@ import com.metamx.druid.collect.CountingMap;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import javax.annotation.Nullable;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -40,16 +39,20 @@ public class DruidMasterLogger implements DruidMasterHelper
{ {
private static final Logger log = new Logger(DruidMasterLogger.class); private static final Logger log = new Logger(DruidMasterLogger.class);
private <T extends Number> void emitTieredStats(final ServiceEmitter emitter, final String formatString, final Map<String, T> statMap) private <T extends Number> void emitTieredStats(
final ServiceEmitter emitter,
final String formatString,
final Map<String, T> statMap
)
{ {
if (statMap != null) { if (statMap != null) {
for (Map.Entry<String, T> entry : statMap.entrySet()) { for (Map.Entry<String, T> entry : statMap.entrySet()) {
String tier = entry.getKey(); String tier = entry.getKey();
Number value = entry.getValue(); Number value = entry.getValue();
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder().build( new ServiceMetricEvent.Builder().build(
String.format(formatString, tier), value.doubleValue() String.format(formatString, tier), value.doubleValue()
) )
); );
} }
} }
@ -82,30 +85,43 @@ public class DruidMasterLogger implements DruidMasterHelper
} }
} }
emitTieredStats(emitter, "master/%s/cost/raw", emitTieredStats(
stats.getPerTierStats().get("initialCost")); emitter, "master/%s/cost/raw",
stats.getPerTierStats().get("initialCost")
);
emitTieredStats(emitter, "master/%s/cost/normalization", emitTieredStats(
stats.getPerTierStats().get("normalization")); emitter, "master/%s/cost/normalization",
stats.getPerTierStats().get("normalization")
);
emitTieredStats(emitter, "master/%s/moved/count", emitTieredStats(
stats.getPerTierStats().get("movedCount")); emitter, "master/%s/moved/count",
stats.getPerTierStats().get("movedCount")
);
emitTieredStats(emitter, "master/%s/deleted/count", emitTieredStats(
stats.getPerTierStats().get("deletedCount")); emitter, "master/%s/deleted/count",
stats.getPerTierStats().get("deletedCount")
);
emitTieredStats(emitter, "master/%s/cost/normalized", Map<String, AtomicLong> normalized = stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand");
Maps.transformEntries(stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand"), if (normalized != null) {
new Maps.EntryTransformer<String, AtomicLong, Number>() emitTieredStats(
{ emitter, "master/%s/cost/normalized",
@Override Maps.transformEntries(
public Number transformEntry( normalized,
@Nullable String key, @Nullable AtomicLong value new Maps.EntryTransformer<String, AtomicLong, Number>()
) {
{ @Override
return value.doubleValue() / 1000d; public Number transformEntry(String key, AtomicLong value)
} {
})); return value.doubleValue() / 1000d;
}
}
)
);
}
Map<String, AtomicLong> unneeded = stats.getPerTierStats().get("unneededCount"); Map<String, AtomicLong> unneeded = stats.getPerTierStats().get("unneededCount");
if (unneeded != null) { if (unneeded != null) {