diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java index 1d9e5f9f52e..80beace6430 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java @@ -70,6 +70,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat @Override public boolean doProvision(Collection pendingTasks, Collection zkWorkers) { + if (zkWorkers.size() >= workerSetupdDataRef.get().getMaxNumWorkers()) { + log.info( + "Cannot scale anymore. Num workers = %d, Max num workers = %d", + zkWorkers.size(), + workerSetupdDataRef.get().getMaxNumWorkers() + ); + return false; + } + List workerNodeIds = autoScalingStrategy.ipToIdLookup( Lists.newArrayList( Iterables.transform( diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java index 7fc28437300..f647f694998 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java @@ -20,8 +20,6 @@ package com.metamx.druid.merger.coordinator.setup; - - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -35,6 +33,7 @@ public class WorkerSetupData private final String minVersion; private final int minNumWorkers; + private final int maxNumWorkers; private final EC2NodeData nodeData; private final GalaxyUserData userData; @@ -42,12 +41,14 @@ public class WorkerSetupData public WorkerSetupData( @JsonProperty("minVersion") String minVersion, @JsonProperty("minNumWorkers") int minNumWorkers, + @JsonProperty("maxNumWorkers") int maxNumWorkers, @JsonProperty("nodeData") EC2NodeData nodeData, @JsonProperty("userData") GalaxyUserData userData ) { this.minVersion = minVersion; this.minNumWorkers = minNumWorkers; + this.maxNumWorkers = maxNumWorkers; this.nodeData = nodeData; this.userData = userData; } @@ -64,6 +65,12 @@ public class WorkerSetupData return minNumWorkers; } + @JsonProperty + public int getMaxNumWorkers() + { + return maxNumWorkers; + } + @JsonProperty public EC2NodeData getNodeData() { @@ -82,6 +89,7 @@ public class WorkerSetupData return "WorkerSetupData{" + "minVersion='" + minVersion + '\'' + ", minNumWorkers=" + minNumWorkers + + ", maxNumWorkers=" + maxNumWorkers + ", nodeData=" + nodeData + ", userData=" + userData + '}'; diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index a3980820268..febe9394372 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -324,7 +324,7 @@ public class RemoteTaskRunnerTest pathChildrenCache, scheduledExec, new RetryPolicyFactory(new TestRetryPolicyConfig()), - new AtomicReference(new WorkerSetupData("0", 0, null, null)) + new AtomicReference(new WorkerSetupData("0", 0, 1, null, null)) ); // Create a single worker and wait for things for be ready diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java index cd569cb77e8..e6ab815e262 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java @@ -105,6 +105,7 @@ public class EC2AutoScalingStrategyTest new WorkerSetupData( "0", 0, + 1, new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.newArrayList(), "foo"), new GalaxyUserData("env", "version", "type") ) diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java index 186723ef7df..145c9034e03 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -60,7 +60,11 @@ public class SimpleResourceManagementStrategyTest public void setUp() throws Exception { autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class); - workerSetupData = new AtomicReference(null); + workerSetupData = new AtomicReference( + new WorkerSetupData( + "0", 0, 2, null, null + ) + ); testTask = new TestTask( "task1", @@ -247,7 +251,7 @@ public class SimpleResourceManagementStrategyTest @Test public void testDoSuccessfulTerminate() throws Exception { - workerSetupData.set(new WorkerSetupData("0", 0, null, null)); + workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null)); EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList()); @@ -277,7 +281,7 @@ public class SimpleResourceManagementStrategyTest @Test public void testSomethingTerminating() throws Exception { - workerSetupData.set(new WorkerSetupData("0", 0, null, null)); + workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null)); EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList("ip")).times(2); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java index 1328619db44..382d03966b6 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java @@ -29,7 +29,6 @@ import com.metamx.druid.collect.CountingMap; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; -import javax.annotation.Nullable; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -40,16 +39,20 @@ public class DruidMasterLogger implements DruidMasterHelper { private static final Logger log = new Logger(DruidMasterLogger.class); - private void emitTieredStats(final ServiceEmitter emitter, final String formatString, final Map statMap) + private void emitTieredStats( + final ServiceEmitter emitter, + final String formatString, + final Map statMap + ) { - if (statMap != null) { + if (statMap != null) { for (Map.Entry entry : statMap.entrySet()) { String tier = entry.getKey(); Number value = entry.getValue(); emitter.emit( new ServiceMetricEvent.Builder().build( - String.format(formatString, tier), value.doubleValue() - ) + String.format(formatString, tier), value.doubleValue() + ) ); } } @@ -82,30 +85,43 @@ public class DruidMasterLogger implements DruidMasterHelper } } - emitTieredStats(emitter, "master/%s/cost/raw", - stats.getPerTierStats().get("initialCost")); + emitTieredStats( + emitter, "master/%s/cost/raw", + stats.getPerTierStats().get("initialCost") + ); - emitTieredStats(emitter, "master/%s/cost/normalization", - stats.getPerTierStats().get("normalization")); + emitTieredStats( + emitter, "master/%s/cost/normalization", + stats.getPerTierStats().get("normalization") + ); - emitTieredStats(emitter, "master/%s/moved/count", - stats.getPerTierStats().get("movedCount")); + emitTieredStats( + emitter, "master/%s/moved/count", + stats.getPerTierStats().get("movedCount") + ); - emitTieredStats(emitter, "master/%s/deleted/count", - stats.getPerTierStats().get("deletedCount")); + emitTieredStats( + emitter, "master/%s/deleted/count", + stats.getPerTierStats().get("deletedCount") + ); - emitTieredStats(emitter, "master/%s/cost/normalized", - Maps.transformEntries(stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand"), - new Maps.EntryTransformer() - { - @Override - public Number transformEntry( - @Nullable String key, @Nullable AtomicLong value - ) - { - return value.doubleValue() / 1000d; - } - })); + Map normalized = stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand"); + if (normalized != null) { + emitTieredStats( + emitter, "master/%s/cost/normalized", + Maps.transformEntries( + normalized, + new Maps.EntryTransformer() + { + @Override + public Number transformEntry(String key, AtomicLong value) + { + return value.doubleValue() / 1000d; + } + } + ) + ); + } Map unneeded = stats.getPerTierStats().get("unneededCount"); if (unneeded != null) {