mirror of https://github.com/apache/druid.git
bug fix for indexer coordinator not alerting when worker provisioning taking too long
This commit is contained in:
parent
a49d0c5e4c
commit
ac2d4e52da
|
@ -102,7 +102,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
}
|
||||
}
|
||||
} else {
|
||||
Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
|
||||
Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
|
||||
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
|
||||
log.makeAlert("Worker node provisioning taking too long")
|
||||
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
|
||||
|
@ -111,8 +111,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
}
|
||||
|
||||
log.info(
|
||||
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
|
||||
currentlyProvisioning
|
||||
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker. Current wait time: %s",
|
||||
currentlyProvisioning,
|
||||
durSinceLastProvision
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -203,7 +204,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
return true;
|
||||
}
|
||||
} else {
|
||||
Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
|
||||
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime());
|
||||
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
|
||||
log.makeAlert("Worker node termination taking too long")
|
||||
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
|
||||
|
|
|
@ -30,6 +30,10 @@ import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
|
|||
import com.metamx.druid.merger.coordinator.ZkWorker;
|
||||
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
|
||||
import com.metamx.druid.merger.worker.Worker;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.core.Event;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceEventBuilder;
|
||||
import junit.framework.Assert;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -90,7 +94,7 @@ public class SimpleResourceManagementStrategyTest
|
|||
@Override
|
||||
public Duration getMaxScalingDuration()
|
||||
{
|
||||
return null;
|
||||
return new Duration(1000);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -184,6 +188,62 @@ public class SimpleResourceManagementStrategyTest
|
|||
EasyMock.verify(autoScalingStrategy);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProvisionAlert() throws Exception
|
||||
{
|
||||
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
|
||||
EmittingLogger.registerEmitter(emitter);
|
||||
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(emitter);
|
||||
|
||||
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
|
||||
.andReturn(Lists.<String>newArrayList()).times(2);
|
||||
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
|
||||
new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker"))
|
||||
);
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
|
||||
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
|
||||
Arrays.<TaskRunnerWorkItem>asList(
|
||||
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
|
||||
),
|
||||
Arrays.<ZkWorker>asList(
|
||||
new TestZkWorker(testTask)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertTrue(provisionedSomething);
|
||||
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
|
||||
DateTime createdTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp();
|
||||
Assert.assertTrue(
|
||||
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
|
||||
);
|
||||
|
||||
Thread.sleep(2000);
|
||||
|
||||
provisionedSomething = simpleResourceManagementStrategy.doProvision(
|
||||
Arrays.<TaskRunnerWorkItem>asList(
|
||||
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
|
||||
),
|
||||
Arrays.<ZkWorker>asList(
|
||||
new TestZkWorker(testTask)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertFalse(provisionedSomething);
|
||||
Assert.assertTrue(
|
||||
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
|
||||
);
|
||||
DateTime anotherCreatedTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp();
|
||||
Assert.assertTrue(
|
||||
createdTime.equals(anotherCreatedTime)
|
||||
);
|
||||
|
||||
EasyMock.verify(autoScalingStrategy);
|
||||
EasyMock.verify(emitter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoSuccessfulTerminate() throws Exception
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue