From aff4a05ca385adbd4e3e0986aa18be155fa2b01d Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Mon, 7 Jan 2013 13:41:26 -0800 Subject: [PATCH 1/6] db based configs for indexer workers --- .../java/com/metamx/druid/db/DbConnector.java | 12 + .../merger/coordinator/RemoteTaskRunner.java | 23 +- .../config/EC2AutoScalingStrategyConfig.java | 17 -- .../config/RemoteTaskRunnerConfig.java | 7 - .../config/WorkerSetupManagerConfig.java | 17 ++ .../http/IndexerCoordinatorNode.java | 50 +++- .../http/IndexerCoordinatorResource.java | 28 ++- .../http/IndexerCoordinatorServletModule.java | 7 +- .../scaling/EC2AutoScalingStrategy.java | 36 +-- .../merger/coordinator/setup/EC2NodeData.java | 52 +++++ .../coordinator/setup/GalaxyUserData.java | 43 ++++ .../coordinator/setup/WorkerNodeData.java | 14 ++ .../coordinator/setup/WorkerSetupData.java | 52 +++++ .../coordinator/setup/WorkerSetupManager.java | 216 ++++++++++++++++++ .../coordinator/setup/WorkerUserData.java | 14 ++ .../coordinator/RemoteTaskRunnerTest.java | 37 +-- .../scaling/EC2AutoScalingStrategyTest.java | 65 +++--- 17 files changed, 582 insertions(+), 108 deletions(-) create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index 99712df22a5..75cf0ba27e7 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -59,6 +59,18 @@ public class DbConnector ); } + public static void createWorkerSetupTable(final DBI dbi, final String workerTableName) + { + createTable( + dbi, + workerTableName, + String.format( + "CREATE table %s (minVersion TINYTEXT NOT NULL, minNumWorkers SMALLINT NOT NULL, nodeData LONGTEXT NOT NULL, userData LONGTEXT NOT NULL)", + workerTableName + ) + ); + } + public static void createTable( final DBI dbi, final String tableName, diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 2a235b88d86..addb789762f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -39,6 +39,7 @@ import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.worker.Worker; import com.metamx.emitter.EmittingLogger; import com.netflix.curator.framework.CuratorFramework; @@ -88,6 +89,7 @@ public class RemoteTaskRunner implements TaskRunner private final ScheduledExecutorService scheduledExec; private final RetryPolicyFactory retryPolicyFactory; private final ScalingStrategy strategy; + private final WorkerSetupManager workerSetupManager; // all workers that exist in ZK private final Map zkWorkers = new ConcurrentHashMap(); @@ -109,7 +111,8 @@ public class RemoteTaskRunner implements TaskRunner PathChildrenCache workerPathCache, ScheduledExecutorService scheduledExec, RetryPolicyFactory retryPolicyFactory, - ScalingStrategy strategy + ScalingStrategy strategy, + WorkerSetupManager workerSetupManager ) { this.jsonMapper = jsonMapper; @@ -119,6 +122,7 @@ public class RemoteTaskRunner implements TaskRunner this.scheduledExec = scheduledExec; this.retryPolicyFactory = retryPolicyFactory; this.strategy = strategy; + this.workerSetupManager = workerSetupManager; } @LifecycleStart @@ -169,7 +173,7 @@ public class RemoteTaskRunner implements TaskRunner public void run() { if (currentlyTerminating.isEmpty()) { - if (zkWorkers.size() <= config.getMinNumWorkers()) { + if (zkWorkers.size() <= workerSetupManager.getWorkerSetupData().getMinNumWorkers()) { return; } @@ -388,8 +392,7 @@ public class RemoteTaskRunner implements TaskRunner synchronized (statusLock) { try { if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || - event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) - { + event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); final TaskStatus taskStatus; @@ -399,7 +402,7 @@ public class RemoteTaskRunner implements TaskRunner event.getData().getData(), TaskStatus.class ); - if(!taskStatus.getId().equals(taskId)) { + if (!taskStatus.getId().equals(taskId)) { // Sanity check throw new ISE( "Worker[%s] status id does not match payload id: %s != %s", @@ -408,7 +411,8 @@ public class RemoteTaskRunner implements TaskRunner taskStatus.getId() ); } - } catch (Exception e) { + } + catch (Exception e) { log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId); retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId)); throw Throwables.propagate(e); @@ -446,7 +450,8 @@ public class RemoteTaskRunner implements TaskRunner } } } - } catch(Exception e) { + } + catch (Exception e) { log.makeAlert(e, "Failed to handle new worker status") .addData("worker", worker.getHost()) .addData("znode", event.getData().getPath()) @@ -526,7 +531,9 @@ public class RemoteTaskRunner implements TaskRunner public boolean apply(WorkerWrapper input) { return (!input.isAtCapacity() && - input.getWorker().getVersion().compareTo(config.getMinWorkerVersion()) >= 0); + input.getWorker() + .getVersion() + .compareTo(workerSetupManager.getWorkerSetupData().getMinVersion()) >= 0); } } ) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java index c364070e313..a8cfcf8df22 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java @@ -26,24 +26,7 @@ import org.skife.config.Default; */ public abstract class EC2AutoScalingStrategyConfig { - @Config("druid.indexer.amiId") - public abstract String getAmiId(); - @Config("druid.indexer.worker.port") @Default("8080") public abstract String getWorkerPort(); - - @Config("druid.indexer.instanceType") - public abstract String getInstanceType(); - - @Config("druid.indexer.minNumInstancesToProvision") - @Default("1") - public abstract int getMinNumInstancesToProvision(); - - @Config("druid.indexer.maxNumInstancesToProvision") - @Default("1") - public abstract int getMaxNumInstancesToProvision(); - - @Config("druid.indexer.userDataFile") - public abstract String getUserDataFile(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java index 00b869ea6da..2e20c4ffff2 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java @@ -37,13 +37,6 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig @Default("2012-01-01T00:55:00.000Z") public abstract DateTime getTerminateResourcesOriginDateTime(); - @Config("druid.indexer.minWorkerVersion") - public abstract String getMinWorkerVersion(); - - @Config("druid.indexer.minNumWorkers") - @Default("1") - public abstract int getMinNumWorkers(); - @Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion") @Default("1") public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion(); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java new file mode 100644 index 00000000000..ad7444b657e --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java @@ -0,0 +1,17 @@ +package com.metamx.druid.merger.coordinator.config; + +import org.joda.time.Duration; +import org.skife.config.Config; +import org.skife.config.Default; + +/** + */ +public abstract class WorkerSetupManagerConfig +{ + @Config("druid.indexer.workerSetupTable") + public abstract String getWorkerSetupTable(); + + @Config("druid.indexer.poll.duration") + @Default("PT1M") + public abstract Duration getPollDuration(); +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 1c11c62cb7b..348a62b9a28 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -69,6 +69,8 @@ import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; +import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig; import com.metamx.druid.realtime.S3SegmentPusher; import com.metamx.druid.realtime.S3SegmentPusherConfig; import com.metamx.druid.realtime.SegmentPusher; @@ -98,6 +100,7 @@ import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.FilterHolder; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; +import org.skife.jdbi.v2.DBI; import java.net.URL; import java.util.Arrays; @@ -133,6 +136,7 @@ public class IndexerCoordinatorNode extends RegisteringNode private CuratorFramework curatorFramework = null; private ScheduledExecutorFactory scheduledExecutorFactory = null; private IndexerZkConfig indexerZkConfig; + private WorkerSetupManager workerSetupManager = null; private TaskRunnerFactory taskRunnerFactory = null; private TaskMaster taskMaster = null; private Server server = null; @@ -160,14 +164,16 @@ public class IndexerCoordinatorNode extends RegisteringNode return this; } - public void setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator) + public IndexerCoordinatorNode setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator) { this.mergerDBCoordinator = mergerDBCoordinator; + return this; } - public void setTaskQueue(TaskQueue taskQueue) + public IndexerCoordinatorNode setTaskQueue(TaskQueue taskQueue) { this.taskQueue = taskQueue; + return this; } public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator) @@ -182,9 +188,16 @@ public class IndexerCoordinatorNode extends RegisteringNode return this; } - public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) + public IndexerCoordinatorNode setWorkerSetupManager(WorkerSetupManager workerSetupManager) + { + this.workerSetupManager = workerSetupManager; + return this; + } + + public IndexerCoordinatorNode setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { this.taskRunnerFactory = taskRunnerFactory; + return this; } public void init() throws Exception @@ -202,6 +215,7 @@ public class IndexerCoordinatorNode extends RegisteringNode initializeJacksonSubtypes(); initializeCurator(); initializeIndexerZkConfig(); + initializeWorkerSetupManager(); initializeTaskRunnerFactory(); initializeTaskMaster(); initializeServer(); @@ -220,7 +234,8 @@ public class IndexerCoordinatorNode extends RegisteringNode jsonMapper, config, emitter, - taskQueue + taskQueue, + workerSetupManager ) ); @@ -447,6 +462,27 @@ public class IndexerCoordinatorNode extends RegisteringNode } } + public void initializeWorkerSetupManager() + { + if (workerSetupManager == null) { + final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); + final DBI dbi = new DbConnector(dbConnectorConfig).getDBI(); + final WorkerSetupManagerConfig workerSetupManagerConfig = configFactory.build(WorkerSetupManagerConfig.class); + + DbConnector.createWorkerSetupTable(dbi, workerSetupManagerConfig.getWorkerSetupTable()); + workerSetupManager = new WorkerSetupManager( + dbi, Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("WorkerSetupManagerExec--%d") + .build() + ), jsonMapper, workerSetupManagerConfig + ); + } + lifecycle.addManagedInstance(workerSetupManager); + } + public void initializeTaskRunnerFactory() { if (taskRunnerFactory == null) { @@ -476,7 +512,8 @@ public class IndexerCoordinatorNode extends RegisteringNode PropUtils.getProperty(props, "com.metamx.aws.secretKey") ) ), - configFactory.build(EC2AutoScalingStrategyConfig.class) + configFactory.build(EC2AutoScalingStrategyConfig.class), + workerSetupManager ); } else if (config.getStrategyImpl().equalsIgnoreCase("noop")) { strategy = new NoopScalingStrategy(); @@ -491,7 +528,8 @@ public class IndexerCoordinatorNode extends RegisteringNode new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true), retryScheduledExec, new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)), - strategy + strategy, + workerSetupManager ); } }; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java index 6cf9b0a7c16..e4acd93514f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java @@ -28,6 +28,8 @@ import com.metamx.druid.merger.common.task.MergeTask; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.service.ServiceEmitter; import javax.ws.rs.Consumes; @@ -48,18 +50,21 @@ public class IndexerCoordinatorResource private final IndexerCoordinatorConfig config; private final ServiceEmitter emitter; private final TaskQueue tasks; + private final WorkerSetupManager workerSetupManager; @Inject public IndexerCoordinatorResource( IndexerCoordinatorConfig config, ServiceEmitter emitter, - TaskQueue tasks + TaskQueue tasks, + WorkerSetupManager workerSetupManager ) throws Exception { this.config = config; this.emitter = emitter; this.tasks = tasks; + this.workerSetupManager = workerSetupManager; } @POST @@ -115,4 +120,25 @@ public class IndexerCoordinatorResource { return Response.ok(ImmutableMap.of("task", taskid)).build(); } + + @GET + @Path("/worker/setup") + @Produces("application/json") + public Response getWorkerSetupData() + { + return Response.ok(workerSetupManager.getWorkerSetupData()).build(); + } + + @POST + @Path("/worker/setup") + @Consumes("application/json") + public Response setWorkerSetupData( + final WorkerSetupData workerSetupData + ) + { + if (!workerSetupManager.setWorkerSetupData(workerSetupData)) { + return Response.status(Response.Status.BAD_REQUEST).build(); + } + return Response.ok().build(); + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java index 9c657bdc292..4cc1df9fa6f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java @@ -22,6 +22,7 @@ package com.metamx.druid.merger.coordinator.http; import com.google.inject.Provides; import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.service.ServiceEmitter; import com.sun.jersey.guice.JerseyServletModule; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; @@ -38,18 +39,21 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule private final IndexerCoordinatorConfig indexerCoordinatorConfig; private final ServiceEmitter emitter; private final TaskQueue tasks; + private final WorkerSetupManager workerSetupManager; public IndexerCoordinatorServletModule( ObjectMapper jsonMapper, IndexerCoordinatorConfig indexerCoordinatorConfig, ServiceEmitter emitter, - TaskQueue tasks + TaskQueue tasks, + WorkerSetupManager workerSetupManager ) { this.jsonMapper = jsonMapper; this.indexerCoordinatorConfig = indexerCoordinatorConfig; this.emitter = emitter; this.tasks = tasks; + this.workerSetupManager = workerSetupManager; } @Override @@ -60,6 +64,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig); bind(ServiceEmitter.class).toInstance(emitter); bind(TaskQueue.class).toInstance(tasks); + bind(WorkerSetupManager.class).toInstance(workerSetupManager); serve("/*").with(GuiceContainer.class); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java index 265fe62287c..eed69ae7f9e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java @@ -24,19 +24,20 @@ import com.amazonaws.services.ec2.model.DescribeInstancesRequest; import com.amazonaws.services.ec2.model.DescribeInstancesResult; import com.amazonaws.services.ec2.model.Filter; import com.amazonaws.services.ec2.model.Instance; -import com.amazonaws.services.ec2.model.InstanceType; import com.amazonaws.services.ec2.model.Reservation; import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.metamx.common.ISE; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; +import com.metamx.druid.merger.coordinator.setup.EC2NodeData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.EmittingLogger; import org.codehaus.jackson.map.ObjectMapper; -import javax.annotation.Nullable; -import java.io.File; import java.util.List; /** @@ -48,31 +49,40 @@ public class EC2AutoScalingStrategy implements ScalingStrategy private final ObjectMapper jsonMapper; private final AmazonEC2Client amazonEC2Client; private final EC2AutoScalingStrategyConfig config; + private final WorkerSetupManager workerSetupManager; public EC2AutoScalingStrategy( ObjectMapper jsonMapper, AmazonEC2Client amazonEC2Client, - EC2AutoScalingStrategyConfig config + EC2AutoScalingStrategyConfig config, + WorkerSetupManager workerSetupManager ) { this.jsonMapper = jsonMapper; this.amazonEC2Client = amazonEC2Client; this.config = config; + this.workerSetupManager = workerSetupManager; } @Override public AutoScalingData provision() { try { + WorkerSetupData setupData = workerSetupManager.getWorkerSetupData(); + if (!(setupData.getNodeData() instanceof EC2NodeData)) { + throw new ISE("DB misconfiguration! Node data is an instance of [%s]", setupData.getNodeData().getClass()); + } + EC2NodeData workerConfig = (EC2NodeData) setupData.getNodeData(); + log.info("Creating new instance(s)..."); RunInstancesResult result = amazonEC2Client.runInstances( new RunInstancesRequest( - config.getAmiId(), - config.getMinNumInstancesToProvision(), - config.getMaxNumInstancesToProvision() + workerConfig.getAmiId(), + workerConfig.getMinInstances(), + workerConfig.getMaxInstances() ) - .withInstanceType(InstanceType.fromValue(config.getInstanceType())) - .withUserData(jsonMapper.writeValueAsString(new File(config.getUserDataFile()))) + .withInstanceType(workerConfig.getInstanceType()) + .withUserData(jsonMapper.writeValueAsString(setupData.getUserData())) ); List instanceIds = Lists.transform( @@ -80,7 +90,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy new Function() { @Override - public String apply(@Nullable Instance input) + public String apply(Instance input) { return input.getInstanceId(); } @@ -95,7 +105,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy new Function() { @Override - public String apply(@Nullable Instance input) + public String apply(Instance input) { return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort()); } @@ -135,7 +145,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy new Function() { @Override - public String apply(@Nullable Instance input) + public String apply(Instance input) { return input.getInstanceId(); } @@ -150,7 +160,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy new Function() { @Override - public String apply(@Nullable Instance input) + public String apply(Instance input) { return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java new file mode 100644 index 00000000000..209444c6731 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java @@ -0,0 +1,52 @@ +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + */ +public class EC2NodeData implements WorkerNodeData +{ + private final String amiId; + private final String instanceType; + private final int minInstances; + private final int maxInstances; + + @JsonCreator + public EC2NodeData( + @JsonProperty("amiId") String amiId, + @JsonProperty("instanceType") String instanceType, + @JsonProperty("minInstances") int minInstances, + @JsonProperty("maxInstances") int maxInstances + ) + { + this.amiId = amiId; + this.instanceType = instanceType; + this.minInstances = minInstances; + this.maxInstances = maxInstances; + } + + @JsonProperty + public String getAmiId() + { + return amiId; + } + + @JsonProperty + public String getInstanceType() + { + return instanceType; + } + + @JsonProperty + public int getMinInstances() + { + return minInstances; + } + + @JsonProperty + public int getMaxInstances() + { + return maxInstances; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java new file mode 100644 index 00000000000..0baa0ddfb9a --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java @@ -0,0 +1,43 @@ +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + */ +public class GalaxyUserData implements WorkerUserData +{ + public final String env; + public final String ver; + public final String type; + + @JsonCreator + public GalaxyUserData( + @JsonProperty("env") String env, + @JsonProperty("ver") String ver, + @JsonProperty("type") String type + ) + { + this.env = env; + this.ver = ver; + this.type = type; + } + + @JsonProperty + public String getEnv() + { + return env; + } + + @JsonProperty + public String getVer() + { + return ver; + } + + @JsonProperty + public String getType() + { + return type; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java new file mode 100644 index 00000000000..8068a4c267b --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java @@ -0,0 +1,14 @@ +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonSubTypes; +import org.codehaus.jackson.annotate.JsonTypeInfo; + +/** + */ +@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, property="type") +@JsonSubTypes(value={ + @JsonSubTypes.Type(name="ec2", value=EC2NodeData.class) +}) +public interface WorkerNodeData +{ +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java new file mode 100644 index 00000000000..224b22167ae --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java @@ -0,0 +1,52 @@ +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + */ +public class WorkerSetupData +{ + private final String minVersion; + private final int minNumWorkers; + private final WorkerNodeData nodeData; + private final WorkerUserData userData; + + @JsonCreator + public WorkerSetupData( + @JsonProperty("minVersion") String minVersion, + @JsonProperty("minNumWorkers") int minNumWorkers, + @JsonProperty("nodeData") WorkerNodeData nodeData, + @JsonProperty("userData") WorkerUserData userData + ) + { + this.minVersion = minVersion; + this.minNumWorkers = minNumWorkers; + this.nodeData = nodeData; + this.userData = userData; + } + + @JsonProperty + public String getMinVersion() + { + return minVersion; + } + + @JsonProperty + public int getMinNumWorkers() + { + return minNumWorkers; + } + + @JsonProperty + public WorkerNodeData getNodeData() + { + return nodeData; + } + + @JsonProperty + public WorkerUserData getUserData() + { + return userData; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java new file mode 100644 index 00000000000..a4ab8fe8d5d --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java @@ -0,0 +1,216 @@ +package com.metamx.druid.merger.coordinator.setup; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig; +import org.apache.commons.collections.MapUtils; +import org.codehaus.jackson.map.ObjectMapper; +import org.joda.time.Duration; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.FoldController; +import org.skife.jdbi.v2.Folder3; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class WorkerSetupManager +{ + private static final Logger log = new Logger(WorkerSetupManager.class); + + private final DBI dbi; + private final ObjectMapper jsonMapper; + private final ScheduledExecutorService exec; + private final WorkerSetupManagerConfig config; + + private final Object lock = new Object(); + + private volatile AtomicReference workerSetupData = new AtomicReference(null); + private volatile boolean started = false; + + public WorkerSetupManager( + DBI dbi, + ScheduledExecutorService exec, + ObjectMapper jsonMapper, + WorkerSetupManagerConfig config + ) + { + this.dbi = dbi; + this.exec = exec; + this.jsonMapper = jsonMapper; + this.config = config; + } + + @LifecycleStart + public void start() + { + synchronized (lock) { + if (started) { + return; + } + + ScheduledExecutors.scheduleWithFixedDelay( + exec, + new Duration(0), + config.getPollDuration(), + new Runnable() + { + @Override + public void run() + { + poll(); + } + } + ); + + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + + started = false; + } + } + + public void poll() + { + try { + List setupDataList = dbi.withHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + return handle.createQuery( + String.format( + "SELECT minVersion, minNumWorkers, nodeData, userData FROM %s", + config.getWorkerSetupTable() + ) + ).fold( + Lists.newArrayList(), + new Folder3, Map>() + { + @Override + public ArrayList fold( + ArrayList workerNodeConfigurations, + Map stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + // stringObjectMap lowercases and jackson may fail serde + workerNodeConfigurations.add( + new WorkerSetupData( + MapUtils.getString(stringObjectMap, "minVersion"), + MapUtils.getInteger(stringObjectMap, "minNumWorkers"), + jsonMapper.readValue( + MapUtils.getString(stringObjectMap, "nodeData"), + WorkerNodeData.class + ), + jsonMapper.readValue( + MapUtils.getString(stringObjectMap, "userData"), + WorkerUserData.class + ) + ) + ); + return workerNodeConfigurations; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + } + } + ); + + if (setupDataList.isEmpty()) { + throw new ISE("WTF?! No configuration found for worker nodes!"); + } else if (setupDataList.size() != 1) { + throw new ISE("WTF?! Found more than one configuration for worker nodes"); + } + + workerSetupData.set(setupDataList.get(0)); + } + catch (Exception e) { + log.error(e, "Exception while polling for worker setup data!"); + } + } + + @SuppressWarnings("unchecked") + public WorkerSetupData getWorkerSetupData() + { + synchronized (lock) { + if (!started) { + throw new ISE("Must start WorkerSetupManager first!"); + } + + return workerSetupData.get(); + } + } + + public boolean setWorkerSetupData(final WorkerSetupData value) + { + synchronized (lock) { + try { + if (!started) { + throw new ISE("Must start WorkerSetupManager first!"); + } + + dbi.withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + handle.createStatement(String.format("DELETE FROM %s", config.getWorkerSetupTable())).execute(); + handle.createStatement( + String.format( + "INSERT INTO %s (minVersion, minNumWorkers, nodeData, userData) VALUES (:minVersion, :minNumWorkers, :nodeData, :userData)", + config.getWorkerSetupTable() + ) + ) + .bind("minVersion", value.getMinVersion()) + .bind("minNumWorkers", value.getMinNumWorkers()) + .bind("nodeData", jsonMapper.writeValueAsString(value.getNodeData())) + .bind("userData", jsonMapper.writeValueAsString(value.getUserData())) + .execute(); + + return null; + } + } + ); + + workerSetupData.set(value); + } + catch (Exception e) { + log.error(e, "Exception updating worker config"); + return false; + } + } + + return true; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java new file mode 100644 index 00000000000..80857fb58d1 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java @@ -0,0 +1,14 @@ +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonSubTypes; +import org.codehaus.jackson.annotate.JsonTypeInfo; + +/** + */ +@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, property="classType") +@JsonSubTypes(value={ + @JsonSubTypes.Type(name="galaxy", value=GalaxyUserData.class) +}) +public interface WorkerUserData +{ +} diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index eb10731abd9..3282ca5dd3a 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -17,6 +17,8 @@ import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.worker.TaskMonitor; import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.WorkerCuratorCoordinator; @@ -62,6 +64,7 @@ public class RemoteTaskRunnerTest private PathChildrenCache pathChildrenCache; private RemoteTaskRunner remoteTaskRunner; private TaskMonitor taskMonitor; + private WorkerSetupManager workerSetupManager; private ScheduledExecutorService scheduledExec; @@ -69,7 +72,6 @@ public class RemoteTaskRunnerTest private Worker worker1; - @Before public void setUp() throws Exception { @@ -141,9 +143,10 @@ public class RemoteTaskRunnerTest { remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); try { - remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); - fail("ISE expected"); - } catch (ISE expected) { + remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); + fail("ISE expected"); + } + catch (ISE expected) { } } @@ -333,6 +336,17 @@ public class RemoteTaskRunnerTest private void makeRemoteTaskRunner() throws Exception { scheduledExec = EasyMock.createMock(ScheduledExecutorService.class); + workerSetupManager = EasyMock.createMock(WorkerSetupManager.class); + + EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn( + new WorkerSetupData( + "0", + 0, + null, + null + ) + ); + EasyMock.replay(workerSetupManager); remoteTaskRunner = new RemoteTaskRunner( jsonMapper, @@ -341,7 +355,8 @@ public class RemoteTaskRunnerTest pathChildrenCache, scheduledExec, new RetryPolicyFactory(new TestRetryPolicyConfig()), - new TestScalingStrategy() + new TestScalingStrategy(), + workerSetupManager ); // Create a single worker and wait for things for be ready @@ -405,18 +420,6 @@ public class RemoteTaskRunnerTest return null; } - @Override - public String getMinWorkerVersion() - { - return "0"; - } - - @Override - public int getMinNumWorkers() - { - return 0; - } - @Override public int getMaxWorkerIdleTimeMillisBeforeDeletion() { diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java index 958a2c1d836..2cbdfe83efe 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java @@ -29,6 +29,10 @@ import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; +import com.metamx.druid.merger.coordinator.setup.EC2NodeData; +import com.metamx.druid.merger.coordinator.setup.GalaxyUserData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; @@ -52,6 +56,7 @@ public class EC2AutoScalingStrategyTest private Reservation reservation; private Instance instance; private EC2AutoScalingStrategy strategy; + private WorkerSetupManager workerSetupManager; @Before public void setUp() throws Exception @@ -60,6 +65,7 @@ public class EC2AutoScalingStrategyTest runInstancesResult = EasyMock.createMock(RunInstancesResult.class); describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class); reservation = EasyMock.createMock(Reservation.class); + workerSetupManager = EasyMock.createMock(WorkerSetupManager.class); instance = new Instance() .withInstanceId(INSTANCE_ID) @@ -69,44 +75,16 @@ public class EC2AutoScalingStrategyTest strategy = new EC2AutoScalingStrategy( new DefaultObjectMapper(), - amazonEC2Client, new EC2AutoScalingStrategyConfig() - { - @Override - public String getAmiId() - { - return AMI_ID; - } - - @Override - public String getWorkerPort() - { - return "8080"; - } - - @Override - public String getInstanceType() - { - return "t1.micro"; - } - - @Override - public int getMinNumInstancesToProvision() - { - return 1; - } - - @Override - public int getMaxNumInstancesToProvision() - { - return 1; - } - - @Override - public String getUserDataFile() - { - return ""; - } - } + amazonEC2Client, + new EC2AutoScalingStrategyConfig() + { + @Override + public String getWorkerPort() + { + return "8080"; + } + }, + workerSetupManager ); } @@ -117,11 +95,22 @@ public class EC2AutoScalingStrategyTest EasyMock.verify(runInstancesResult); EasyMock.verify(describeInstancesResult); EasyMock.verify(reservation); + EasyMock.verify(workerSetupManager); } @Test public void testScale() { + EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn( + new WorkerSetupData( + "0", + 0, + new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1), + new GalaxyUserData("env", "ver", "type") + ) + ); + EasyMock.replay(workerSetupManager); + EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn( runInstancesResult ); From 79e9ff94a67753dfb0619e040d7965b1dba1fa27 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Mon, 7 Jan 2013 14:07:47 -0800 Subject: [PATCH 2/6] add missing headers --- .../config/WorkerSetupManagerConfig.java | 19 +++++++++++++++++++ .../coordinator/scaling/AutoScalingData.java | 19 +++++++++++++++++++ .../scaling/NoopScalingStrategy.java | 19 +++++++++++++++++++ .../merger/coordinator/setup/EC2NodeData.java | 19 +++++++++++++++++++ .../coordinator/setup/GalaxyUserData.java | 19 +++++++++++++++++++ .../coordinator/setup/WorkerNodeData.java | 19 +++++++++++++++++++ .../coordinator/setup/WorkerSetupData.java | 19 +++++++++++++++++++ .../coordinator/setup/WorkerSetupManager.java | 19 +++++++++++++++++++ .../coordinator/setup/WorkerUserData.java | 19 +++++++++++++++++++ 9 files changed, 171 insertions(+) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java index ad7444b657e..97368c9f77e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator.config; import org.joda.time.Duration; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java index 6cce08f8731..5a1bb4980e5 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator.scaling; import java.util.List; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java index 67eb99293e4..d723df537dc 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator.scaling; import com.metamx.emitter.EmittingLogger; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java index 209444c6731..d1531f7974a 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator.setup; import org.codehaus.jackson.annotate.JsonCreator; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java index 0baa0ddfb9a..21e09f5fd40 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator.setup; import org.codehaus.jackson.annotate.JsonCreator; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java index 8068a4c267b..b7cec6d68d0 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator.setup; import org.codehaus.jackson.annotate.JsonSubTypes; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java index 224b22167ae..f54818f8437 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator.setup; import org.codehaus.jackson.annotate.JsonCreator; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java index a4ab8fe8d5d..03bdd770671 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator.setup; import com.google.common.base.Throwables; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java index 80857fb58d1..4a42c9b3bac 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator.setup; import org.codehaus.jackson.annotate.JsonSubTypes; From 4c2da933895477110d50a23bcfcb858fe2b13776 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Wed, 9 Jan 2013 14:51:35 -0800 Subject: [PATCH 3/6] bug fixes with ec2 auto scaling --- .../java/com/metamx/druid/db/DbConnector.java | 2 +- .../merger/coordinator/RemoteTaskRunner.java | 23 ++++---- .../http/IndexerCoordinatorNode.java | 6 +- .../scaling/EC2AutoScalingStrategy.java | 57 +++++++++++++------ .../scaling/NoopScalingStrategy.java | 7 +++ .../coordinator/scaling/ScalingStrategy.java | 7 +++ .../merger/coordinator/setup/EC2NodeData.java | 22 ++++++- .../coordinator/setup/GalaxyUserData.java | 10 ++-- .../coordinator/setup/WorkerSetupData.java | 22 ++++++- .../coordinator/setup/WorkerSetupManager.java | 16 +++++- .../coordinator/RemoteTaskRunnerTest.java | 8 +++ .../scaling/EC2AutoScalingStrategyTest.java | 9 ++- pom.xml | 2 +- 13 files changed, 146 insertions(+), 45 deletions(-) diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index 75cf0ba27e7..b918af2b2b5 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -65,7 +65,7 @@ public class DbConnector dbi, workerTableName, String.format( - "CREATE table %s (minVersion TINYTEXT NOT NULL, minNumWorkers SMALLINT NOT NULL, nodeData LONGTEXT NOT NULL, userData LONGTEXT NOT NULL)", + "CREATE table %s (minVersion TINYTEXT NOT NULL, minNumWorkers SMALLINT NOT NULL, nodeData LONGTEXT NOT NULL, userData LONGTEXT NOT NULL, securityGroupIds LONGTEXT NOT NULL, keyName TINYTEXT NOT NULL)", workerTableName ) ); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index addb789762f..0e8927cb0b9 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -54,6 +54,7 @@ import org.joda.time.Duration; import org.joda.time.Period; import javax.annotation.Nullable; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -148,7 +149,7 @@ public class RemoteTaskRunner implements TaskRunner Worker.class ); log.info("Worker[%s] removed!", worker.getHost()); - removeWorker(worker.getHost()); + removeWorker(worker); } } } @@ -222,7 +223,7 @@ public class RemoteTaskRunner implements TaskRunner } log.info( - "[%s] still terminating. Wait for all nodes to terminate before trying again.", + "%s still terminating. Wait for all nodes to terminate before trying again.", currentlyTerminating ); } @@ -372,7 +373,7 @@ public class RemoteTaskRunner implements TaskRunner private void addWorker(final Worker worker) { try { - currentlyProvisioning.remove(worker.getHost()); + currentlyProvisioning.removeAll(strategy.ipLookup(Arrays.asList(worker.getIp()))); final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost()); final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true); @@ -483,22 +484,22 @@ public class RemoteTaskRunner implements TaskRunner * When a ephemeral worker node disappears from ZK, we have to make sure there are no tasks still assigned * to the worker. If tasks remain, they are retried. * - * @param workerId - id of the removed worker + * @param worker - the removed worker */ - private void removeWorker(final String workerId) + private void removeWorker(final Worker worker) { - currentlyTerminating.remove(workerId); + currentlyTerminating.remove(worker.getHost()); - WorkerWrapper workerWrapper = zkWorkers.get(workerId); + WorkerWrapper workerWrapper = zkWorkers.get(worker.getHost()); if (workerWrapper != null) { try { Set tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks()); - tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), workerId))); + tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), worker.getHost()))); for (String taskId : tasksToRetry) { TaskWrapper taskWrapper = tasks.get(taskId); if (taskWrapper != null) { - retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId)); + retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId)); } } @@ -508,7 +509,7 @@ public class RemoteTaskRunner implements TaskRunner log.error(e, "Failed to cleanly remove worker[%s]"); } } - zkWorkers.remove(workerId); + zkWorkers.remove(worker.getHost()); } private WorkerWrapper findWorkerForTask() @@ -558,7 +559,7 @@ public class RemoteTaskRunner implements TaskRunner } log.info( - "[%s] still provisioning. Wait for all provisioned nodes to complete before requesting new worker.", + "%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.", currentlyProvisioning ); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 5248b30712e..15d76cf2fec 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -47,6 +47,9 @@ import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.loading.S3SegmentPusher; +import com.metamx.druid.loading.S3SegmentPusherConfig; +import com.metamx.druid.loading.SegmentPusher; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; @@ -70,9 +73,6 @@ import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig; import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; -import com.metamx.druid.loading.S3SegmentPusher; -import com.metamx.druid.loading.S3SegmentPusherConfig; -import com.metamx.druid.loading.SegmentPusher; import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java index eed69ae7f9e..35f9d7d25cb 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java @@ -36,6 +36,7 @@ import com.metamx.druid.merger.coordinator.setup.EC2NodeData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.EmittingLogger; +import org.apache.commons.codec.binary.Base64; import org.codehaus.jackson.map.ObjectMapper; import java.util.List; @@ -82,7 +83,15 @@ public class EC2AutoScalingStrategy implements ScalingStrategy workerConfig.getMaxInstances() ) .withInstanceType(workerConfig.getInstanceType()) - .withUserData(jsonMapper.writeValueAsString(setupData.getUserData())) + .withSecurityGroupIds(workerConfig.getSecurityGroupIds()) + .withKeyName(workerConfig.getKeyName()) + .withUserData( + Base64.encodeBase64String( + jsonMapper.writeValueAsBytes( + setupData.getUserData() + ) + ) + ) ); List instanceIds = Lists.transform( @@ -107,7 +116,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy @Override public String apply(Instance input) { - return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort()); + return input.getInstanceId(); } } ), @@ -127,7 +136,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy DescribeInstancesResult result = amazonEC2Client.describeInstances( new DescribeInstancesRequest() .withFilters( - new Filter("private-ip-address", nodeIds) + new Filter("instance-id", nodeIds) ) ); @@ -139,19 +148,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy try { log.info("Terminating instance[%s]", instances); amazonEC2Client.terminateInstances( - new TerminateInstancesRequest( - Lists.transform( - instances, - new Function() - { - @Override - public String apply(Instance input) - { - return input.getInstanceId(); - } - } - ) - ) + new TerminateInstancesRequest(nodeIds) ); return new AutoScalingData( @@ -175,4 +172,32 @@ public class EC2AutoScalingStrategy implements ScalingStrategy return null; } + + @Override + public List ipLookup(List ips) + { + DescribeInstancesResult result = amazonEC2Client.describeInstances( + new DescribeInstancesRequest() + .withFilters( + new Filter("private-ip-address", ips) + ) + ); + + List instances = Lists.newArrayList(); + for (Reservation reservation : result.getReservations()) { + instances.addAll(reservation.getInstances()); + } + + return Lists.transform( + instances, + new Function() + { + @Override + public String apply(Instance input) + { + return input.getInstanceId(); + } + } + ); + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java index d723df537dc..2b412ca6202 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java @@ -43,4 +43,11 @@ public class NoopScalingStrategy implements ScalingStrategy log.info("If I were a real strategy I'd terminate %s now", nodeIds); return null; } + + @Override + public List ipLookup(List ips) + { + log.info("I'm not a real strategy so I'm returning what I got %s", ips); + return ips; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java index 9b7da8fb3a4..52105451ed3 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java @@ -28,4 +28,11 @@ public interface ScalingStrategy public AutoScalingData provision(); public AutoScalingData terminate(List nodeIds); + + /** + * Provides a lookup of ip addresses to node ids + * @param ips + * @return + */ + public List ipLookup(List ips); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java index d1531f7974a..3e986f94d56 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java @@ -22,6 +22,8 @@ package com.metamx.druid.merger.coordinator.setup; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; +import java.util.List; + /** */ public class EC2NodeData implements WorkerNodeData @@ -30,19 +32,25 @@ public class EC2NodeData implements WorkerNodeData private final String instanceType; private final int minInstances; private final int maxInstances; + private final List securityGroupIds; + private final String keyName; @JsonCreator public EC2NodeData( @JsonProperty("amiId") String amiId, @JsonProperty("instanceType") String instanceType, @JsonProperty("minInstances") int minInstances, - @JsonProperty("maxInstances") int maxInstances + @JsonProperty("maxInstances") int maxInstances, + @JsonProperty("securityGroupIds") List securityGroupIds, + @JsonProperty("keyName") String keyName ) { this.amiId = amiId; this.instanceType = instanceType; this.minInstances = minInstances; this.maxInstances = maxInstances; + this.securityGroupIds = securityGroupIds; + this.keyName = keyName; } @JsonProperty @@ -68,4 +76,16 @@ public class EC2NodeData implements WorkerNodeData { return maxInstances; } + + @JsonProperty + public List getSecurityGroupIds() + { + return securityGroupIds; + } + + @JsonProperty + public String getKeyName() + { + return keyName; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java index 21e09f5fd40..046a8b55f32 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java @@ -27,18 +27,18 @@ import org.codehaus.jackson.annotate.JsonProperty; public class GalaxyUserData implements WorkerUserData { public final String env; - public final String ver; + public final String version; public final String type; @JsonCreator public GalaxyUserData( @JsonProperty("env") String env, - @JsonProperty("ver") String ver, + @JsonProperty("version") String version, @JsonProperty("type") String type ) { this.env = env; - this.ver = ver; + this.version = version; this.type = type; } @@ -49,9 +49,9 @@ public class GalaxyUserData implements WorkerUserData } @JsonProperty - public String getVer() + public String getVersion() { - return ver; + return version; } @JsonProperty diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java index f54818f8437..d0173ef06b8 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java @@ -22,6 +22,8 @@ package com.metamx.druid.merger.coordinator.setup; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; +import java.util.List; + /** */ public class WorkerSetupData @@ -30,19 +32,25 @@ public class WorkerSetupData private final int minNumWorkers; private final WorkerNodeData nodeData; private final WorkerUserData userData; + private final List securityGroupIds; + private final String keyName; @JsonCreator public WorkerSetupData( @JsonProperty("minVersion") String minVersion, @JsonProperty("minNumWorkers") int minNumWorkers, @JsonProperty("nodeData") WorkerNodeData nodeData, - @JsonProperty("userData") WorkerUserData userData + @JsonProperty("userData") WorkerUserData userData, + @JsonProperty("securityGroupIds") List securityGroupIds, + @JsonProperty("keyName") String keyName ) { this.minVersion = minVersion; this.minNumWorkers = minNumWorkers; this.nodeData = nodeData; this.userData = userData; + this.securityGroupIds = securityGroupIds; + this.keyName = keyName; } @JsonProperty @@ -68,4 +76,16 @@ public class WorkerSetupData { return userData; } + + @JsonProperty + public List getSecurityGroupIds() + { + return securityGroupIds; + } + + @JsonProperty + public String getKeyName() + { + return keyName; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java index 03bdd770671..42f1a880eda 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java @@ -29,6 +29,7 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig; import org.apache.commons.collections.MapUtils; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; import org.joda.time.Duration; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.FoldController; @@ -122,7 +123,7 @@ public class WorkerSetupManager { return handle.createQuery( String.format( - "SELECT minVersion, minNumWorkers, nodeData, userData FROM %s", + "SELECT minVersion, minNumWorkers, nodeData, userData, securityGroupIds, keyName FROM %s", config.getWorkerSetupTable() ) ).fold( @@ -150,7 +151,14 @@ public class WorkerSetupManager jsonMapper.readValue( MapUtils.getString(stringObjectMap, "userData"), WorkerUserData.class - ) + ), + (List) jsonMapper.readValue( + MapUtils.getString(stringObjectMap, "securityGroupIds"), + new TypeReference>() + { + } + ), + MapUtils.getString(stringObjectMap, "keyName") ) ); return workerNodeConfigurations; @@ -207,7 +215,7 @@ public class WorkerSetupManager handle.createStatement(String.format("DELETE FROM %s", config.getWorkerSetupTable())).execute(); handle.createStatement( String.format( - "INSERT INTO %s (minVersion, minNumWorkers, nodeData, userData) VALUES (:minVersion, :minNumWorkers, :nodeData, :userData)", + "INSERT INTO %s (minVersion, minNumWorkers, nodeData, userData, securityGroupIds, keyName) VALUES (:minVersion, :minNumWorkers, :nodeData, :userData, :securityGroupIds, :keyName)", config.getWorkerSetupTable() ) ) @@ -215,6 +223,8 @@ public class WorkerSetupManager .bind("minNumWorkers", value.getMinNumWorkers()) .bind("nodeData", jsonMapper.writeValueAsString(value.getNodeData())) .bind("userData", jsonMapper.writeValueAsString(value.getUserData())) + .bind("securityGroupIds", jsonMapper.writeValueAsString(value.getSecurityGroupIds())) + .bind("keyName", jsonMapper.writeValueAsString(value.getKeyName())) .execute(); return null; diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index 3282ca5dd3a..edf3499ff74 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -343,6 +343,8 @@ public class RemoteTaskRunnerTest "0", 0, null, + null, + Lists.newArrayList(), null ) ); @@ -404,6 +406,12 @@ public class RemoteTaskRunnerTest { return null; } + + @Override + public List ipLookup(List ips) + { + return ips; + } } private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java index 2cbdfe83efe..2660a22952e 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java @@ -27,6 +27,7 @@ import com.amazonaws.services.ec2.model.Reservation; import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; +import com.google.common.collect.Lists; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.druid.merger.coordinator.setup.EC2NodeData; @@ -105,8 +106,10 @@ public class EC2AutoScalingStrategyTest new WorkerSetupData( "0", 0, - new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1), - new GalaxyUserData("env", "ver", "type") + new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.newArrayList(), "foo"), + new GalaxyUserData("env", "version", "type"), + Arrays.asList("foo"), + "foo2" ) ); EasyMock.replay(workerSetupManager); @@ -133,7 +136,7 @@ public class EC2AutoScalingStrategyTest Assert.assertEquals(created.getNodeIds().size(), 1); Assert.assertEquals(created.getNodes().size(), 1); - Assert.assertEquals(String.format("%s:8080", IP), created.getNodeIds().get(0)); + Assert.assertEquals("theInstance", created.getNodeIds().get(0)); AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyHost")); diff --git a/pom.xml b/pom.xml index 5a165a16ba2..898fc197b93 100644 --- a/pom.xml +++ b/pom.xml @@ -84,7 +84,7 @@ commons-codec commons-codec - 1.3 + 1.7 commons-httpclient From d1f4317af770e854c56ccaf46d639d6aca2fdeba Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Mon, 14 Jan 2013 14:55:04 -0800 Subject: [PATCH 4/6] updates to autoscaling config based on code review comments --- .../java/com/metamx/druid/db/DbConnector.java | 2 +- .../merger/coordinator/RemoteTaskRunner.java | 7 ++-- .../config/RemoteTaskRunnerConfig.java | 2 +- .../scaling/EC2AutoScalingStrategy.java | 39 ++++++++++++------- .../coordinator/scaling/ScalingStrategy.java | 2 +- .../merger/coordinator/setup/EC2NodeData.java | 2 +- .../coordinator/setup/GalaxyUserData.java | 2 +- .../coordinator/setup/WorkerNodeData.java | 33 ---------------- .../coordinator/setup/WorkerSetupData.java | 30 +++----------- .../coordinator/setup/WorkerSetupManager.java | 32 +++------------ .../coordinator/setup/WorkerUserData.java | 33 ---------------- .../merger/worker/config/WorkerConfig.java | 3 +- .../coordinator/RemoteTaskRunnerTest.java | 2 - .../scaling/EC2AutoScalingStrategyTest.java | 6 +-- 14 files changed, 50 insertions(+), 145 deletions(-) delete mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java delete mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index b918af2b2b5..45a0b937964 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -65,7 +65,7 @@ public class DbConnector dbi, workerTableName, String.format( - "CREATE table %s (minVersion TINYTEXT NOT NULL, minNumWorkers SMALLINT NOT NULL, nodeData LONGTEXT NOT NULL, userData LONGTEXT NOT NULL, securityGroupIds LONGTEXT NOT NULL, keyName TINYTEXT NOT NULL)", + "CREATE table %s (config LONGTEXT NOT NULL)", workerTableName ) ); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 0e8927cb0b9..5537a6b6420 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -53,7 +53,6 @@ import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Period; -import javax.annotation.Nullable; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -185,7 +184,7 @@ public class RemoteTaskRunner implements TaskRunner new Predicate() { @Override - public boolean apply(@Nullable WorkerWrapper input) + public boolean apply(WorkerWrapper input) { return input.getRunningTasks().isEmpty() && System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis() @@ -201,9 +200,9 @@ public class RemoteTaskRunner implements TaskRunner new Function() { @Override - public String apply(@Nullable WorkerWrapper input) + public String apply(WorkerWrapper input) { - return input.getWorker().getHost(); + return input.getWorker().getIp(); } } ) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java index 2e20c4ffff2..c9badf7ef88 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java @@ -38,7 +38,7 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig public abstract DateTime getTerminateResourcesOriginDateTime(); @Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion") - @Default("1") + @Default("10000") public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion(); @Config("druid.indexer.maxScalingDuration") diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java index 35f9d7d25cb..8d51da61afd 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java @@ -30,7 +30,6 @@ import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.google.common.base.Function; import com.google.common.collect.Lists; -import com.metamx.common.ISE; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.druid.merger.coordinator.setup.EC2NodeData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; @@ -39,6 +38,7 @@ import com.metamx.emitter.EmittingLogger; import org.apache.commons.codec.binary.Base64; import org.codehaus.jackson.map.ObjectMapper; +import javax.annotation.Nullable; import java.util.List; /** @@ -70,10 +70,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy { try { WorkerSetupData setupData = workerSetupManager.getWorkerSetupData(); - if (!(setupData.getNodeData() instanceof EC2NodeData)) { - throw new ISE("DB misconfiguration! Node data is an instance of [%s]", setupData.getNodeData().getClass()); - } - EC2NodeData workerConfig = (EC2NodeData) setupData.getNodeData(); + EC2NodeData workerConfig = setupData.getNodeData(); log.info("Creating new instance(s)..."); RunInstancesResult result = amazonEC2Client.runInstances( @@ -131,12 +128,12 @@ public class EC2AutoScalingStrategy implements ScalingStrategy } @Override - public AutoScalingData terminate(List nodeIds) + public AutoScalingData terminate(List ids) { DescribeInstancesResult result = amazonEC2Client.describeInstances( new DescribeInstancesRequest() .withFilters( - new Filter("instance-id", nodeIds) + new Filter("private-ip-address", ids) ) ); @@ -148,18 +145,30 @@ public class EC2AutoScalingStrategy implements ScalingStrategy try { log.info("Terminating instance[%s]", instances); amazonEC2Client.terminateInstances( - new TerminateInstancesRequest(nodeIds) + new TerminateInstancesRequest( + Lists.transform( + instances, + new Function() + { + @Override + public String apply(Instance input) + { + return input.getInstanceId(); + } + } + ) + ) ); return new AutoScalingData( Lists.transform( - instances, - new Function() + ids, + new Function() { @Override - public String apply(Instance input) + public String apply(@Nullable String input) { - return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort()); + return String.format("%s:%s", input, config.getWorkerPort()); } } ), @@ -188,7 +197,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy instances.addAll(reservation.getInstances()); } - return Lists.transform( + List retVal = Lists.transform( instances, new Function() { @@ -199,5 +208,9 @@ public class EC2AutoScalingStrategy implements ScalingStrategy } } ); + + log.info("Performing lookup: %s --> %s", ips, retVal); + + return retVal; } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java index 52105451ed3..150de1357e0 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java @@ -27,7 +27,7 @@ public interface ScalingStrategy { public AutoScalingData provision(); - public AutoScalingData terminate(List nodeIds); + public AutoScalingData terminate(List ids); /** * Provides a lookup of ip addresses to node ids diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java index 3e986f94d56..8d302df25f6 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java @@ -26,7 +26,7 @@ import java.util.List; /** */ -public class EC2NodeData implements WorkerNodeData +public class EC2NodeData { private final String amiId; private final String instanceType; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java index 046a8b55f32..876a2635273 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java @@ -24,7 +24,7 @@ import org.codehaus.jackson.annotate.JsonProperty; /** */ -public class GalaxyUserData implements WorkerUserData +public class GalaxyUserData { public final String env; public final String version; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java deleted file mode 100644 index b7cec6d68d0..00000000000 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.merger.coordinator.setup; - -import org.codehaus.jackson.annotate.JsonSubTypes; -import org.codehaus.jackson.annotate.JsonTypeInfo; - -/** - */ -@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, property="type") -@JsonSubTypes(value={ - @JsonSubTypes.Type(name="ec2", value=EC2NodeData.class) -}) -public interface WorkerNodeData -{ -} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java index d0173ef06b8..8395fa2d6c8 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java @@ -30,27 +30,21 @@ public class WorkerSetupData { private final String minVersion; private final int minNumWorkers; - private final WorkerNodeData nodeData; - private final WorkerUserData userData; - private final List securityGroupIds; - private final String keyName; + private final EC2NodeData nodeData; + private final GalaxyUserData userData; @JsonCreator public WorkerSetupData( @JsonProperty("minVersion") String minVersion, @JsonProperty("minNumWorkers") int minNumWorkers, - @JsonProperty("nodeData") WorkerNodeData nodeData, - @JsonProperty("userData") WorkerUserData userData, - @JsonProperty("securityGroupIds") List securityGroupIds, - @JsonProperty("keyName") String keyName + @JsonProperty("nodeData") EC2NodeData nodeData, + @JsonProperty("userData") GalaxyUserData userData ) { this.minVersion = minVersion; this.minNumWorkers = minNumWorkers; this.nodeData = nodeData; this.userData = userData; - this.securityGroupIds = securityGroupIds; - this.keyName = keyName; } @JsonProperty @@ -66,26 +60,14 @@ public class WorkerSetupData } @JsonProperty - public WorkerNodeData getNodeData() + public EC2NodeData getNodeData() { return nodeData; } @JsonProperty - public WorkerUserData getUserData() + public GalaxyUserData getUserData() { return userData; } - - @JsonProperty - public List getSecurityGroupIds() - { - return securityGroupIds; - } - - @JsonProperty - public String getKeyName() - { - return keyName; - } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java index 42f1a880eda..baa56290af7 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java @@ -123,7 +123,7 @@ public class WorkerSetupManager { return handle.createQuery( String.format( - "SELECT minVersion, minNumWorkers, nodeData, userData, securityGroupIds, keyName FROM %s", + "SELECT config FROM %s", config.getWorkerSetupTable() ) ).fold( @@ -141,24 +141,9 @@ public class WorkerSetupManager try { // stringObjectMap lowercases and jackson may fail serde workerNodeConfigurations.add( - new WorkerSetupData( - MapUtils.getString(stringObjectMap, "minVersion"), - MapUtils.getInteger(stringObjectMap, "minNumWorkers"), - jsonMapper.readValue( - MapUtils.getString(stringObjectMap, "nodeData"), - WorkerNodeData.class - ), - jsonMapper.readValue( - MapUtils.getString(stringObjectMap, "userData"), - WorkerUserData.class - ), - (List) jsonMapper.readValue( - MapUtils.getString(stringObjectMap, "securityGroupIds"), - new TypeReference>() - { - } - ), - MapUtils.getString(stringObjectMap, "keyName") + jsonMapper.readValue( + MapUtils.getString(stringObjectMap, "config"), + WorkerSetupData.class ) ); return workerNodeConfigurations; @@ -215,16 +200,11 @@ public class WorkerSetupManager handle.createStatement(String.format("DELETE FROM %s", config.getWorkerSetupTable())).execute(); handle.createStatement( String.format( - "INSERT INTO %s (minVersion, minNumWorkers, nodeData, userData, securityGroupIds, keyName) VALUES (:minVersion, :minNumWorkers, :nodeData, :userData, :securityGroupIds, :keyName)", + "INSERT INTO %s (config) VALUES (:config)", config.getWorkerSetupTable() ) ) - .bind("minVersion", value.getMinVersion()) - .bind("minNumWorkers", value.getMinNumWorkers()) - .bind("nodeData", jsonMapper.writeValueAsString(value.getNodeData())) - .bind("userData", jsonMapper.writeValueAsString(value.getUserData())) - .bind("securityGroupIds", jsonMapper.writeValueAsString(value.getSecurityGroupIds())) - .bind("keyName", jsonMapper.writeValueAsString(value.getKeyName())) + .bind("config", jsonMapper.writeValueAsString(value)) .execute(); return null; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java deleted file mode 100644 index 4a42c9b3bac..00000000000 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.merger.coordinator.setup; - -import org.codehaus.jackson.annotate.JsonSubTypes; -import org.codehaus.jackson.annotate.JsonTypeInfo; - -/** - */ -@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, property="classType") -@JsonSubTypes(value={ - @JsonSubTypes.Type(name="galaxy", value=GalaxyUserData.class) -}) -public interface WorkerUserData -{ -} diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java b/merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java index 5b5f3a0a6e7..4689acef261 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java @@ -41,6 +41,7 @@ public abstract class WorkerConfig public int getCapacity() { - return Runtime.getRuntime().availableProcessors() - 1; + return 1; + //return Runtime.getRuntime().availableProcessors() - 1; } } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index edf3499ff74..1e31efa121c 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -343,8 +343,6 @@ public class RemoteTaskRunnerTest "0", 0, null, - null, - Lists.newArrayList(), null ) ); diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java index 2660a22952e..c3aa8378b07 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java @@ -107,9 +107,7 @@ public class EC2AutoScalingStrategyTest "0", 0, new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.newArrayList(), "foo"), - new GalaxyUserData("env", "version", "type"), - Arrays.asList("foo"), - "foo2" + new GalaxyUserData("env", "version", "type") ) ); EasyMock.replay(workerSetupManager); @@ -138,7 +136,7 @@ public class EC2AutoScalingStrategyTest Assert.assertEquals(created.getNodes().size(), 1); Assert.assertEquals("theInstance", created.getNodeIds().get(0)); - AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyHost")); + AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP")); Assert.assertEquals(deleted.getNodeIds().size(), 1); Assert.assertEquals(deleted.getNodes().size(), 1); From 38b2041ad9f436913db6352e43983f384a914f39 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Thu, 17 Jan 2013 14:56:48 -0800 Subject: [PATCH 5/6] key/value config table --- .../java/com/metamx/druid/db/DbConnector.java | 8 +-- .../config/WorkerSetupManagerConfig.java | 7 +- .../http/IndexerCoordinatorNode.java | 2 +- .../coordinator/setup/WorkerSetupManager.java | 71 ++++++++++--------- .../merger/worker/config/WorkerConfig.java | 3 +- 5 files changed, 47 insertions(+), 44 deletions(-) diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index 45a0b937964..f3c4c1f13d3 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -59,14 +59,14 @@ public class DbConnector ); } - public static void createWorkerSetupTable(final DBI dbi, final String workerTableName) + public static void createConfigTable(final DBI dbi, final String configTableName) { createTable( dbi, - workerTableName, + configTableName, String.format( - "CREATE table %s (config LONGTEXT NOT NULL)", - workerTableName + "CREATE table %s (name VARCHAR(255) NOT NULL, payload LONGTEXT NOT NULL, INDEX(name), PRIMARY KEY(name))", + configTableName ) ); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java index 97368c9f77e..16eeb1c3439 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java @@ -27,8 +27,11 @@ import org.skife.config.Default; */ public abstract class WorkerSetupManagerConfig { - @Config("druid.indexer.workerSetupTable") - public abstract String getWorkerSetupTable(); + @Config("druid.indexer.configTable") + public abstract String getConfigTable(); + + @Config("druid.indexer.workerSetupConfigName") + public abstract String getWorkerSetupConfigName(); @Config("druid.indexer.poll.duration") @Default("PT1M") diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 15d76cf2fec..3dae4046764 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -469,7 +469,7 @@ public class IndexerCoordinatorNode extends RegisteringNode final DBI dbi = new DbConnector(dbConnectorConfig).getDBI(); final WorkerSetupManagerConfig workerSetupManagerConfig = configFactory.build(WorkerSetupManagerConfig.class); - DbConnector.createWorkerSetupTable(dbi, workerSetupManagerConfig.getWorkerSetupTable()); + DbConnector.createConfigTable(dbi, workerSetupManagerConfig.getConfigTable()); workerSetupManager = new WorkerSetupManager( dbi, Executors.newScheduledThreadPool( 1, diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java index baa56290af7..5e43e68ae66 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java @@ -29,7 +29,6 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig; import org.apache.commons.collections.MapUtils; import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.type.TypeReference; import org.joda.time.Duration; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.FoldController; @@ -123,37 +122,39 @@ public class WorkerSetupManager { return handle.createQuery( String.format( - "SELECT config FROM %s", - config.getWorkerSetupTable() + "SELECT payload FROM %s WHERE name = :name", + config.getConfigTable() ) - ).fold( - Lists.newArrayList(), - new Folder3, Map>() - { - @Override - public ArrayList fold( - ArrayList workerNodeConfigurations, - Map stringObjectMap, - FoldController foldController, - StatementContext statementContext - ) throws SQLException - { - try { - // stringObjectMap lowercases and jackson may fail serde - workerNodeConfigurations.add( - jsonMapper.readValue( - MapUtils.getString(stringObjectMap, "config"), - WorkerSetupData.class - ) - ); - return workerNodeConfigurations; - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - ); + ) + .bind("name", config.getWorkerSetupConfigName()) + .fold( + Lists.newArrayList(), + new Folder3, Map>() + { + @Override + public ArrayList fold( + ArrayList workerNodeConfigurations, + Map stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + // stringObjectMap lowercases and jackson may fail serde + workerNodeConfigurations.add( + jsonMapper.readValue( + MapUtils.getString(stringObjectMap, "payload"), + WorkerSetupData.class + ) + ); + return workerNodeConfigurations; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); } } ); @@ -197,14 +198,14 @@ public class WorkerSetupManager @Override public Void withHandle(Handle handle) throws Exception { - handle.createStatement(String.format("DELETE FROM %s", config.getWorkerSetupTable())).execute(); handle.createStatement( String.format( - "INSERT INTO %s (config) VALUES (:config)", - config.getWorkerSetupTable() + "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload", + config.getConfigTable() ) ) - .bind("config", jsonMapper.writeValueAsString(value)) + .bind("name", config.getWorkerSetupConfigName()) + .bind("payload", jsonMapper.writeValueAsString(value)) .execute(); return null; diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java b/merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java index 4689acef261..5b5f3a0a6e7 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java @@ -41,7 +41,6 @@ public abstract class WorkerConfig public int getCapacity() { - return 1; - //return Runtime.getRuntime().availableProcessors() - 1; + return Runtime.getRuntime().availableProcessors() - 1; } } From ac31afbce57c2974b044ed8de1a9ed224e2eb9ad Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Fri, 18 Jan 2013 16:45:23 -0800 Subject: [PATCH 6/6] remove redundant index for primary key in config table --- common/src/main/java/com/metamx/druid/db/DbConnector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index f3c4c1f13d3..73013ce6aa2 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -65,7 +65,7 @@ public class DbConnector dbi, configTableName, String.format( - "CREATE table %s (name VARCHAR(255) NOT NULL, payload LONGTEXT NOT NULL, INDEX(name), PRIMARY KEY(name))", + "CREATE table %s (name VARCHAR(255) NOT NULL, payload LONGTEXT NOT NULL, PRIMARY KEY(name))", configTableName ) );