diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java index 082870c83c8..05e51da314a 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java @@ -102,7 +102,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } } } else { - Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime); + Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime()); if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) { log.makeAlert("Worker node provisioning taking too long") .addData("millisSinceLastProvision", durSinceLastProvision.getMillis()) @@ -111,8 +111,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } log.info( - "%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.", - currentlyProvisioning + "%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker. Current wait time: %s", + currentlyProvisioning, + durSinceLastProvision ); } @@ -203,7 +204,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat return true; } } else { - Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime); + Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime()); if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) { log.makeAlert("Worker node termination taking too long") .addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis()) diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java index 2052ae014bb..186723ef7df 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -30,6 +30,10 @@ import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem; import com.metamx.druid.merger.coordinator.ZkWorker; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.worker.Worker; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.Event; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceEventBuilder; import junit.framework.Assert; import org.easymock.EasyMock; import org.joda.time.DateTime; @@ -90,7 +94,7 @@ public class SimpleResourceManagementStrategyTest @Override public Duration getMaxScalingDuration() { - return null; + return new Duration(1000); } @Override @@ -184,6 +188,62 @@ public class SimpleResourceManagementStrategyTest EasyMock.verify(autoScalingStrategy); } + @Test + public void testProvisionAlert() throws Exception + { + ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); + EmittingLogger.registerEmitter(emitter); + emitter.emit(EasyMock.anyObject()); + EasyMock.expectLastCall(); + EasyMock.replay(emitter); + + EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList()).times(2); + EasyMock.expect(autoScalingStrategy.provision()).andReturn( + new AutoScalingData(Lists.newArrayList("fake"), Lists.newArrayList("faker")) + ); + EasyMock.replay(autoScalingStrategy); + + boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( + Arrays.asList( + new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) + ), + Arrays.asList( + new TestZkWorker(testTask) + ) + ); + + Assert.assertTrue(provisionedSomething); + Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); + DateTime createdTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue( + simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + ); + + Thread.sleep(2000); + + provisionedSomething = simpleResourceManagementStrategy.doProvision( + Arrays.asList( + new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) + ), + Arrays.asList( + new TestZkWorker(testTask) + ) + ); + + Assert.assertFalse(provisionedSomething); + Assert.assertTrue( + simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + ); + DateTime anotherCreatedTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue( + createdTime.equals(anotherCreatedTime) + ); + + EasyMock.verify(autoScalingStrategy); + EasyMock.verify(emitter); + } + @Test public void testDoSuccessfulTerminate() throws Exception {