mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Merge pull request #610 from metamx/ec2-user-data
More flexible EC2 user data classes.
This commit is contained in:
commit
b0517dc55d
@ -69,10 +69,9 @@ A sample worker setup spec is shown below:
|
||||
"keyName":"keyName"
|
||||
},
|
||||
"userData":{
|
||||
"classType":"galaxy",
|
||||
"env":"druid",
|
||||
"version":"druid_version",
|
||||
"type":"sample_cluster/worker"
|
||||
"impl":"string",
|
||||
"data":"version=:VERSION:",
|
||||
"versionReplacementString":":VERSION:"
|
||||
}
|
||||
}
|
||||
```
|
||||
@ -81,8 +80,8 @@ Issuing a GET request at the same URL will return the current worker setup spec
|
||||
|
||||
|Property|Description|Default|
|
||||
|--------|-----------|-------|
|
||||
|`minVersion`|The coordinator only assigns tasks to workers with a version greater than the minVersion. If this is not specified, the minVersion will be the same as the coordinator version.|none|
|
||||
|`minVersion`|The coordinator only assigns tasks to workers with a version greater than the minVersion. If this is not specified, the minVersion will be druid.indexer.runner.minWorkerVersion.|none|
|
||||
|`minNumWorkers`|The minimum number of workers that can be in the cluster at any given time.|0|
|
||||
|`maxNumWorkers`|The maximum number of workers that can be in the cluster at any given time.|0|
|
||||
|`nodeData`|A JSON object that contains metadata about new nodes to create.|none|
|
||||
|`userData`|A JSON object that contains metadata about how the node should register itself on startup. This data is sent with node creation requests.|none|
|
||||
|`nodeData`|A JSON object that describes how to launch new nodes. Currently, only EC2 is supported.|none; required|
|
||||
|`userData`|A JSON object that describes how to configure new nodes. Currently, only EC2 is supported. If you have set druid.indexer.autoscale.workerVersion, this must have a versionReplacementString. Otherwise, a versionReplacementString is not necessary.|none; optional|
|
||||
|
@ -22,6 +22,7 @@ Additional peon configs include:
|
||||
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|/tmp/persistent/tasks|
|
||||
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing|
|
||||
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000|
|
||||
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0|
|
||||
|`druid.indexer.task.chathandler.type`|Choices are "noop" and "announce". Certain tasks will use service discovery to announce an HTTP endpoint that events can be posted to.|noop|
|
||||
|
||||
If the peon is running in remote mode, there must be an overlord up and running. Running peons in remote mode require the following configurations:
|
||||
|
@ -29,19 +29,14 @@ import com.amazonaws.services.ec2.model.Reservation;
|
||||
import com.amazonaws.services.ec2.model.RunInstancesRequest;
|
||||
import com.amazonaws.services.ec2.model.RunInstancesResult;
|
||||
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.indexing.overlord.setup.EC2NodeData;
|
||||
import io.druid.indexing.overlord.setup.GalaxyUserData;
|
||||
import io.druid.indexing.overlord.setup.WorkerSetupData;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -50,20 +45,17 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(EC2AutoScalingStrategy.class);
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final AmazonEC2 amazonEC2Client;
|
||||
private final SimpleResourceManagementConfig config;
|
||||
private final Supplier<WorkerSetupData> workerSetupDataRef;
|
||||
|
||||
@Inject
|
||||
public EC2AutoScalingStrategy(
|
||||
@Json ObjectMapper jsonMapper,
|
||||
AmazonEC2 amazonEC2Client,
|
||||
SimpleResourceManagementConfig config,
|
||||
Supplier<WorkerSetupData> workerSetupDataRef
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.amazonEC2Client = amazonEC2Client;
|
||||
this.config = config;
|
||||
this.workerSetupDataRef = workerSetupDataRef;
|
||||
@ -73,15 +65,21 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
|
||||
public AutoScalingData provision()
|
||||
{
|
||||
try {
|
||||
WorkerSetupData setupData = workerSetupDataRef.get();
|
||||
EC2NodeData workerConfig = setupData.getNodeData();
|
||||
final WorkerSetupData setupData = workerSetupDataRef.get();
|
||||
final EC2NodeData workerConfig = setupData.getNodeData();
|
||||
final String userDataBase64;
|
||||
|
||||
GalaxyUserData userData = setupData.getUserData();
|
||||
if (config.getWorkerVersion() != null) {
|
||||
userData = userData.withVersion(config.getWorkerVersion());
|
||||
if (setupData.getUserData() == null) {
|
||||
userDataBase64 = null;
|
||||
} else {
|
||||
if (config.getWorkerVersion() == null) {
|
||||
userDataBase64 = setupData.getUserData().getUserDataBase64();
|
||||
} else {
|
||||
userDataBase64 = setupData.getUserData().withVersion(config.getWorkerVersion()).getUserDataBase64();
|
||||
}
|
||||
}
|
||||
|
||||
RunInstancesResult result = amazonEC2Client.runInstances(
|
||||
final RunInstancesResult result = amazonEC2Client.runInstances(
|
||||
new RunInstancesRequest(
|
||||
workerConfig.getAmiId(),
|
||||
workerConfig.getMinInstances(),
|
||||
@ -91,16 +89,10 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
|
||||
.withSecurityGroupIds(workerConfig.getSecurityGroupIds())
|
||||
.withPlacement(new Placement(setupData.getAvailabilityZone()))
|
||||
.withKeyName(workerConfig.getKeyName())
|
||||
.withUserData(
|
||||
Base64.encodeBase64String(
|
||||
jsonMapper.writeValueAsBytes(
|
||||
userData
|
||||
)
|
||||
)
|
||||
)
|
||||
.withUserData(userDataBase64)
|
||||
);
|
||||
|
||||
List<String> instanceIds = Lists.transform(
|
||||
final List<String> instanceIds = Lists.transform(
|
||||
result.getReservation().getInstances(),
|
||||
new Function<Instance, String>()
|
||||
{
|
||||
|
@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord.setup;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
|
||||
/**
|
||||
* Represents any user data that may be needed to launch EC2 instances.
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "impl", defaultImpl = GalaxyEC2UserData.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "galaxy", value = GalaxyEC2UserData.class),
|
||||
@JsonSubTypes.Type(name = "string", value = StringEC2UserData.class)
|
||||
})
|
||||
public interface EC2UserData<T extends EC2UserData>
|
||||
{
|
||||
/**
|
||||
* Return a copy of this instance with a different worker version. If no changes are needed (possibly because the
|
||||
* user data does not depend on the worker version) then it is OK to return "this".
|
||||
*/
|
||||
public EC2UserData<T> withVersion(String version);
|
||||
|
||||
public String getUserDataBase64();
|
||||
}
|
@ -19,24 +19,32 @@
|
||||
|
||||
package io.druid.indexing.overlord.setup;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class GalaxyUserData
|
||||
public class GalaxyEC2UserData implements EC2UserData<GalaxyEC2UserData>
|
||||
{
|
||||
public final String env;
|
||||
public final String version;
|
||||
public final String type;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final String env;
|
||||
private final String version;
|
||||
private final String type;
|
||||
|
||||
@JsonCreator
|
||||
public GalaxyUserData(
|
||||
public GalaxyEC2UserData(
|
||||
@JacksonInject @Json ObjectMapper jsonMapper,
|
||||
@JsonProperty("env") String env,
|
||||
@JsonProperty("version") String version,
|
||||
@JsonProperty("type") String type
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.env = env;
|
||||
this.version = version;
|
||||
this.type = type;
|
||||
@ -60,9 +68,21 @@ public class GalaxyUserData
|
||||
return type;
|
||||
}
|
||||
|
||||
public GalaxyUserData withVersion(String ver)
|
||||
@Override
|
||||
public GalaxyEC2UserData withVersion(String ver)
|
||||
{
|
||||
return new GalaxyUserData(env, ver, type);
|
||||
return new GalaxyEC2UserData(jsonMapper, env, ver, type);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUserDataBase64()
|
||||
{
|
||||
try {
|
||||
return Base64.encodeBase64String(jsonMapper.writeValueAsBytes(this));
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord.setup;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.api.client.util.Charsets;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
|
||||
public class StringEC2UserData implements EC2UserData<StringEC2UserData>
|
||||
{
|
||||
private final String data;
|
||||
private final String versionReplacementString;
|
||||
private final String version;
|
||||
|
||||
@JsonCreator
|
||||
public StringEC2UserData(
|
||||
@JsonProperty("data") String data,
|
||||
@JsonProperty("versionReplacementString") String versionReplacementString,
|
||||
@JsonProperty("version") String version
|
||||
)
|
||||
{
|
||||
this.data = data;
|
||||
this.versionReplacementString = versionReplacementString;
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getData()
|
||||
{
|
||||
return data;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getVersionReplacementString()
|
||||
{
|
||||
return versionReplacementString;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getVersion()
|
||||
{
|
||||
return version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StringEC2UserData withVersion(final String _version)
|
||||
{
|
||||
return new StringEC2UserData(data, versionReplacementString, _version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUserDataBase64()
|
||||
{
|
||||
final String finalData;
|
||||
if (versionReplacementString != null && version != null) {
|
||||
finalData = data.replace(versionReplacementString, version);
|
||||
} else {
|
||||
finalData = data;
|
||||
}
|
||||
return Base64.encodeBase64String(finalData.getBytes(Charsets.UTF_8));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "StringEC2UserData{" +
|
||||
"data='" + data + '\'' +
|
||||
", versionReplacementString='" + versionReplacementString + '\'' +
|
||||
", version='" + version + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
@ -33,7 +33,7 @@ public class WorkerSetupData
|
||||
private final int maxNumWorkers;
|
||||
private final String availabilityZone;
|
||||
private final EC2NodeData nodeData;
|
||||
private final GalaxyUserData userData;
|
||||
private final EC2UserData userData;
|
||||
|
||||
@JsonCreator
|
||||
public WorkerSetupData(
|
||||
@ -42,7 +42,7 @@ public class WorkerSetupData
|
||||
@JsonProperty("maxNumWorkers") int maxNumWorkers,
|
||||
@JsonProperty("availabilityZone") String availabilityZone,
|
||||
@JsonProperty("nodeData") EC2NodeData nodeData,
|
||||
@JsonProperty("userData") GalaxyUserData userData
|
||||
@JsonProperty("userData") EC2UserData userData
|
||||
)
|
||||
{
|
||||
this.minVersion = minVersion;
|
||||
@ -84,7 +84,7 @@ public class WorkerSetupData
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public GalaxyUserData getUserData()
|
||||
public EC2UserData getUserData()
|
||||
{
|
||||
return userData;
|
||||
}
|
||||
|
@ -19,15 +19,47 @@
|
||||
|
||||
package io.druid.indexing.common;
|
||||
|
||||
import com.fasterxml.jackson.databind.BeanProperty;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.metamx.common.ISE;
|
||||
import io.druid.guice.ServerModule;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TestUtils
|
||||
{
|
||||
public static final ObjectMapper MAPPER = new DefaultObjectMapper();
|
||||
|
||||
static {
|
||||
final List<? extends Module> list = new ServerModule().getJacksonModules();
|
||||
for (Module module : list) {
|
||||
MAPPER.registerModule(module);
|
||||
}
|
||||
MAPPER.setInjectableValues(
|
||||
new InjectableValues()
|
||||
{
|
||||
@Override
|
||||
public Object findInjectableValue(
|
||||
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
|
||||
)
|
||||
{
|
||||
if (valueId.equals("com.fasterxml.jackson.databind.ObjectMapper")) {
|
||||
return TestUtils.MAPPER;
|
||||
}
|
||||
throw new ISE("No Injectable value found");
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public static boolean conditionValid(IndexingServiceCondition condition)
|
||||
{
|
||||
try {
|
||||
|
@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import io.druid.indexing.common.TestUtils;
|
||||
import io.druid.indexing.overlord.setup.EC2UserData;
|
||||
import io.druid.indexing.overlord.setup.GalaxyEC2UserData;
|
||||
import io.druid.indexing.overlord.setup.StringEC2UserData;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class WorkerSetupDataTest
|
||||
{
|
||||
@Test
|
||||
public void testGalaxyEC2UserDataSerde() throws IOException
|
||||
{
|
||||
final String json = "{\"env\":\"druid\",\"version\":null,\"type\":\"typical\"}";
|
||||
final GalaxyEC2UserData userData = (GalaxyEC2UserData) TestUtils.MAPPER.readValue(json, EC2UserData.class);
|
||||
Assert.assertEquals("druid", userData.getEnv());
|
||||
Assert.assertEquals("typical", userData.getType());
|
||||
Assert.assertNull(userData.getVersion());
|
||||
Assert.assertEquals("1234", userData.withVersion("1234").getVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringEC2UserDataSerde() throws IOException
|
||||
{
|
||||
final String json = "{\"impl\":\"string\",\"data\":\"hey :ver:\",\"versionReplacementString\":\":ver:\",\"version\":\"1234\"}";
|
||||
final StringEC2UserData userData = (StringEC2UserData) TestUtils.MAPPER.readValue(json, EC2UserData.class);
|
||||
Assert.assertEquals("hey :ver:", userData.getData());
|
||||
Assert.assertEquals("1234", userData.getVersion());
|
||||
Assert.assertEquals(
|
||||
Base64.encodeBase64String("hey 1234".getBytes(Charsets.UTF_8)),
|
||||
userData.getUserDataBase64()
|
||||
);
|
||||
Assert.assertEquals(
|
||||
Base64.encodeBase64String("hey xyz".getBytes(Charsets.UTF_8)),
|
||||
userData.withVersion("xyz").getUserDataBase64()
|
||||
);
|
||||
}
|
||||
}
|
@ -30,7 +30,7 @@ import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.common.guava.DSuppliers;
|
||||
import io.druid.indexing.overlord.setup.EC2NodeData;
|
||||
import io.druid.indexing.overlord.setup.GalaxyUserData;
|
||||
import io.druid.indexing.overlord.setup.GalaxyEC2UserData;
|
||||
import io.druid.indexing.overlord.setup.WorkerSetupData;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import org.easymock.EasyMock;
|
||||
@ -75,7 +75,6 @@ public class EC2AutoScalingStrategyTest
|
||||
.withPrivateIpAddress(IP);
|
||||
|
||||
strategy = new EC2AutoScalingStrategy(
|
||||
new DefaultObjectMapper(),
|
||||
amazonEC2Client,
|
||||
new SimpleResourceManagementConfig().setWorkerPort(8080).setWorkerVersion(""),
|
||||
DSuppliers.of(workerSetupData)
|
||||
@ -101,7 +100,7 @@ public class EC2AutoScalingStrategyTest
|
||||
1,
|
||||
"",
|
||||
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo"),
|
||||
new GalaxyUserData("env", "version", "type")
|
||||
new GalaxyEC2UserData(new DefaultObjectMapper(), "env", "version", "type")
|
||||
)
|
||||
);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user