mirror of https://github.com/apache/druid.git
Merge branch 'master' of github.com:metamx/druid
This commit is contained in:
commit
b1a559365e
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -24,11 +24,11 @@
|
||||||
<artifactId>druid-services</artifactId>
|
<artifactId>druid-services</artifactId>
|
||||||
<name>druid-services</name>
|
<name>druid-services</name>
|
||||||
<description>druid-services</description>
|
<description>druid-services</description>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<modules>
|
<modules>
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>druid-examples</artifactId>
|
<artifactId>druid-examples</artifactId>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>druid-examples</artifactId>
|
<artifactId>druid-examples</artifactId>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -176,6 +176,9 @@ public class IndexerCoordinatorResource
|
||||||
if (!configManager.set(WorkerSetupData.CONFIG_KEY, workerSetupData)) {
|
if (!configManager.set(WorkerSetupData.CONFIG_KEY, workerSetupData)) {
|
||||||
return Response.status(Response.Status.BAD_REQUEST).build();
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.info("Updating Worker Setup configs: %s", workerSetupData);
|
||||||
|
|
||||||
return Response.ok().build();
|
return Response.ok().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -102,18 +102,22 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
|
Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker. Current wait time: %s",
|
||||||
|
currentlyProvisioning,
|
||||||
|
durSinceLastProvision
|
||||||
|
);
|
||||||
|
|
||||||
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
|
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
|
||||||
log.makeAlert("Worker node provisioning taking too long")
|
log.makeAlert("Worker node provisioning taking too long!")
|
||||||
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
|
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
|
||||||
.addData("provisioningCount", currentlyProvisioning.size())
|
.addData("provisioningCount", currentlyProvisioning.size())
|
||||||
.emit();
|
.emit();
|
||||||
}
|
|
||||||
|
|
||||||
log.info(
|
currentlyProvisioning.clear();
|
||||||
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
|
}
|
||||||
currentlyProvisioning
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
@ -203,18 +207,21 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
|
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime());
|
||||||
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
|
|
||||||
log.makeAlert("Worker node termination taking too long")
|
|
||||||
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
|
|
||||||
.addData("terminatingCount", currentlyTerminating.size())
|
|
||||||
.emit();
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
"%s still terminating. Wait for all nodes to terminate before trying again.",
|
"%s still terminating. Wait for all nodes to terminate before trying again.",
|
||||||
currentlyTerminating
|
currentlyTerminating
|
||||||
);
|
);
|
||||||
|
|
||||||
|
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
|
||||||
|
log.makeAlert("Worker node termination taking too long!")
|
||||||
|
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
|
||||||
|
.addData("terminatingCount", currentlyTerminating.size())
|
||||||
|
.emit();
|
||||||
|
|
||||||
|
currentlyTerminating.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -91,4 +91,17 @@ public class EC2NodeData
|
||||||
{
|
{
|
||||||
return keyName;
|
return keyName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "EC2NodeData{" +
|
||||||
|
"amiId='" + amiId + '\'' +
|
||||||
|
", instanceType='" + instanceType + '\'' +
|
||||||
|
", minInstances=" + minInstances +
|
||||||
|
", maxInstances=" + maxInstances +
|
||||||
|
", securityGroupIds=" + securityGroupIds +
|
||||||
|
", keyName='" + keyName + '\'' +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,4 +60,14 @@ public class GalaxyUserData
|
||||||
{
|
{
|
||||||
return type;
|
return type;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "GalaxyUserData{" +
|
||||||
|
"env='" + env + '\'' +
|
||||||
|
", version='" + version + '\'' +
|
||||||
|
", type='" + type + '\'' +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,4 +75,15 @@ public class WorkerSetupData
|
||||||
{
|
{
|
||||||
return userData;
|
return userData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "WorkerSetupData{" +
|
||||||
|
"minVersion='" + minVersion + '\'' +
|
||||||
|
", minNumWorkers=" + minNumWorkers +
|
||||||
|
", nodeData=" + nodeData +
|
||||||
|
", userData=" + userData +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,10 @@ import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
|
||||||
import com.metamx.druid.merger.coordinator.ZkWorker;
|
import com.metamx.druid.merger.coordinator.ZkWorker;
|
||||||
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
|
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
|
||||||
import com.metamx.druid.merger.worker.Worker;
|
import com.metamx.druid.merger.worker.Worker;
|
||||||
|
import com.metamx.emitter.EmittingLogger;
|
||||||
|
import com.metamx.emitter.core.Event;
|
||||||
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
|
import com.metamx.emitter.service.ServiceEventBuilder;
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
@ -90,7 +94,7 @@ public class SimpleResourceManagementStrategyTest
|
||||||
@Override
|
@Override
|
||||||
public Duration getMaxScalingDuration()
|
public Duration getMaxScalingDuration()
|
||||||
{
|
{
|
||||||
return null;
|
return new Duration(1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -184,6 +188,62 @@ public class SimpleResourceManagementStrategyTest
|
||||||
EasyMock.verify(autoScalingStrategy);
|
EasyMock.verify(autoScalingStrategy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProvisionAlert() throws Exception
|
||||||
|
{
|
||||||
|
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
|
||||||
|
EmittingLogger.registerEmitter(emitter);
|
||||||
|
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
EasyMock.replay(emitter);
|
||||||
|
|
||||||
|
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
|
||||||
|
.andReturn(Lists.<String>newArrayList()).times(2);
|
||||||
|
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
|
||||||
|
new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker"))
|
||||||
|
);
|
||||||
|
EasyMock.replay(autoScalingStrategy);
|
||||||
|
|
||||||
|
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
|
||||||
|
Arrays.<TaskRunnerWorkItem>asList(
|
||||||
|
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
|
||||||
|
),
|
||||||
|
Arrays.<ZkWorker>asList(
|
||||||
|
new TestZkWorker(testTask)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertTrue(provisionedSomething);
|
||||||
|
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
|
||||||
|
DateTime createdTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp();
|
||||||
|
Assert.assertTrue(
|
||||||
|
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
|
||||||
|
);
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
provisionedSomething = simpleResourceManagementStrategy.doProvision(
|
||||||
|
Arrays.<TaskRunnerWorkItem>asList(
|
||||||
|
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
|
||||||
|
),
|
||||||
|
Arrays.<ZkWorker>asList(
|
||||||
|
new TestZkWorker(testTask)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertFalse(provisionedSomething);
|
||||||
|
Assert.assertTrue(
|
||||||
|
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
|
||||||
|
);
|
||||||
|
DateTime anotherCreatedTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp();
|
||||||
|
Assert.assertTrue(
|
||||||
|
createdTime.equals(anotherCreatedTime)
|
||||||
|
);
|
||||||
|
|
||||||
|
EasyMock.verify(autoScalingStrategy);
|
||||||
|
EasyMock.verify(emitter);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDoSuccessfulTerminate() throws Exception
|
public void testDoSuccessfulTerminate() throws Exception
|
||||||
{
|
{
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -23,7 +23,7 @@
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<packaging>pom</packaging>
|
<packaging>pom</packaging>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
<name>druid</name>
|
<name>druid</name>
|
||||||
<description>druid</description>
|
<description>druid</description>
|
||||||
<scm>
|
<scm>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.3.19-SNAPSHOT</version>
|
<version>0.3.20-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
Loading…
Reference in New Issue