diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index 99712df22a5..75cf0ba27e7 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -59,6 +59,18 @@ public class DbConnector ); } + public static void createWorkerSetupTable(final DBI dbi, final String workerTableName) + { + createTable( + dbi, + workerTableName, + String.format( + "CREATE table %s (minVersion TINYTEXT NOT NULL, minNumWorkers SMALLINT NOT NULL, nodeData LONGTEXT NOT NULL, userData LONGTEXT NOT NULL)", + workerTableName + ) + ); + } + public static void createTable( final DBI dbi, final String tableName, diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 2a235b88d86..addb789762f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -39,6 +39,7 @@ import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.worker.Worker; import com.metamx.emitter.EmittingLogger; import com.netflix.curator.framework.CuratorFramework; @@ -88,6 +89,7 @@ public class RemoteTaskRunner implements TaskRunner private final ScheduledExecutorService scheduledExec; private final RetryPolicyFactory retryPolicyFactory; private final ScalingStrategy strategy; + private final WorkerSetupManager workerSetupManager; // all workers that exist in ZK private final Map zkWorkers = new ConcurrentHashMap(); @@ -109,7 +111,8 @@ public class RemoteTaskRunner implements TaskRunner PathChildrenCache workerPathCache, ScheduledExecutorService scheduledExec, RetryPolicyFactory retryPolicyFactory, - ScalingStrategy strategy + ScalingStrategy strategy, + WorkerSetupManager workerSetupManager ) { this.jsonMapper = jsonMapper; @@ -119,6 +122,7 @@ public class RemoteTaskRunner implements TaskRunner this.scheduledExec = scheduledExec; this.retryPolicyFactory = retryPolicyFactory; this.strategy = strategy; + this.workerSetupManager = workerSetupManager; } @LifecycleStart @@ -169,7 +173,7 @@ public class RemoteTaskRunner implements TaskRunner public void run() { if (currentlyTerminating.isEmpty()) { - if (zkWorkers.size() <= config.getMinNumWorkers()) { + if (zkWorkers.size() <= workerSetupManager.getWorkerSetupData().getMinNumWorkers()) { return; } @@ -388,8 +392,7 @@ public class RemoteTaskRunner implements TaskRunner synchronized (statusLock) { try { if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || - event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) - { + event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); final TaskStatus taskStatus; @@ -399,7 +402,7 @@ public class RemoteTaskRunner implements TaskRunner event.getData().getData(), TaskStatus.class ); - if(!taskStatus.getId().equals(taskId)) { + if (!taskStatus.getId().equals(taskId)) { // Sanity check throw new ISE( "Worker[%s] status id does not match payload id: %s != %s", @@ -408,7 +411,8 @@ public class RemoteTaskRunner implements TaskRunner taskStatus.getId() ); } - } catch (Exception e) { + } + catch (Exception e) { log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId); retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId)); throw Throwables.propagate(e); @@ -446,7 +450,8 @@ public class RemoteTaskRunner implements TaskRunner } } } - } catch(Exception e) { + } + catch (Exception e) { log.makeAlert(e, "Failed to handle new worker status") .addData("worker", worker.getHost()) .addData("znode", event.getData().getPath()) @@ -526,7 +531,9 @@ public class RemoteTaskRunner implements TaskRunner public boolean apply(WorkerWrapper input) { return (!input.isAtCapacity() && - input.getWorker().getVersion().compareTo(config.getMinWorkerVersion()) >= 0); + input.getWorker() + .getVersion() + .compareTo(workerSetupManager.getWorkerSetupData().getMinVersion()) >= 0); } } ) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java index c364070e313..a8cfcf8df22 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java @@ -26,24 +26,7 @@ import org.skife.config.Default; */ public abstract class EC2AutoScalingStrategyConfig { - @Config("druid.indexer.amiId") - public abstract String getAmiId(); - @Config("druid.indexer.worker.port") @Default("8080") public abstract String getWorkerPort(); - - @Config("druid.indexer.instanceType") - public abstract String getInstanceType(); - - @Config("druid.indexer.minNumInstancesToProvision") - @Default("1") - public abstract int getMinNumInstancesToProvision(); - - @Config("druid.indexer.maxNumInstancesToProvision") - @Default("1") - public abstract int getMaxNumInstancesToProvision(); - - @Config("druid.indexer.userDataFile") - public abstract String getUserDataFile(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java index 00b869ea6da..2e20c4ffff2 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java @@ -37,13 +37,6 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig @Default("2012-01-01T00:55:00.000Z") public abstract DateTime getTerminateResourcesOriginDateTime(); - @Config("druid.indexer.minWorkerVersion") - public abstract String getMinWorkerVersion(); - - @Config("druid.indexer.minNumWorkers") - @Default("1") - public abstract int getMinNumWorkers(); - @Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion") @Default("1") public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion(); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java new file mode 100644 index 00000000000..ad7444b657e --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java @@ -0,0 +1,17 @@ +package com.metamx.druid.merger.coordinator.config; + +import org.joda.time.Duration; +import org.skife.config.Config; +import org.skife.config.Default; + +/** + */ +public abstract class WorkerSetupManagerConfig +{ + @Config("druid.indexer.workerSetupTable") + public abstract String getWorkerSetupTable(); + + @Config("druid.indexer.poll.duration") + @Default("PT1M") + public abstract Duration getPollDuration(); +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 1c11c62cb7b..348a62b9a28 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -69,6 +69,8 @@ import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; +import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig; import com.metamx.druid.realtime.S3SegmentPusher; import com.metamx.druid.realtime.S3SegmentPusherConfig; import com.metamx.druid.realtime.SegmentPusher; @@ -98,6 +100,7 @@ import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.FilterHolder; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; +import org.skife.jdbi.v2.DBI; import java.net.URL; import java.util.Arrays; @@ -133,6 +136,7 @@ public class IndexerCoordinatorNode extends RegisteringNode private CuratorFramework curatorFramework = null; private ScheduledExecutorFactory scheduledExecutorFactory = null; private IndexerZkConfig indexerZkConfig; + private WorkerSetupManager workerSetupManager = null; private TaskRunnerFactory taskRunnerFactory = null; private TaskMaster taskMaster = null; private Server server = null; @@ -160,14 +164,16 @@ public class IndexerCoordinatorNode extends RegisteringNode return this; } - public void setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator) + public IndexerCoordinatorNode setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator) { this.mergerDBCoordinator = mergerDBCoordinator; + return this; } - public void setTaskQueue(TaskQueue taskQueue) + public IndexerCoordinatorNode setTaskQueue(TaskQueue taskQueue) { this.taskQueue = taskQueue; + return this; } public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator) @@ -182,9 +188,16 @@ public class IndexerCoordinatorNode extends RegisteringNode return this; } - public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) + public IndexerCoordinatorNode setWorkerSetupManager(WorkerSetupManager workerSetupManager) + { + this.workerSetupManager = workerSetupManager; + return this; + } + + public IndexerCoordinatorNode setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { this.taskRunnerFactory = taskRunnerFactory; + return this; } public void init() throws Exception @@ -202,6 +215,7 @@ public class IndexerCoordinatorNode extends RegisteringNode initializeJacksonSubtypes(); initializeCurator(); initializeIndexerZkConfig(); + initializeWorkerSetupManager(); initializeTaskRunnerFactory(); initializeTaskMaster(); initializeServer(); @@ -220,7 +234,8 @@ public class IndexerCoordinatorNode extends RegisteringNode jsonMapper, config, emitter, - taskQueue + taskQueue, + workerSetupManager ) ); @@ -447,6 +462,27 @@ public class IndexerCoordinatorNode extends RegisteringNode } } + public void initializeWorkerSetupManager() + { + if (workerSetupManager == null) { + final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); + final DBI dbi = new DbConnector(dbConnectorConfig).getDBI(); + final WorkerSetupManagerConfig workerSetupManagerConfig = configFactory.build(WorkerSetupManagerConfig.class); + + DbConnector.createWorkerSetupTable(dbi, workerSetupManagerConfig.getWorkerSetupTable()); + workerSetupManager = new WorkerSetupManager( + dbi, Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("WorkerSetupManagerExec--%d") + .build() + ), jsonMapper, workerSetupManagerConfig + ); + } + lifecycle.addManagedInstance(workerSetupManager); + } + public void initializeTaskRunnerFactory() { if (taskRunnerFactory == null) { @@ -476,7 +512,8 @@ public class IndexerCoordinatorNode extends RegisteringNode PropUtils.getProperty(props, "com.metamx.aws.secretKey") ) ), - configFactory.build(EC2AutoScalingStrategyConfig.class) + configFactory.build(EC2AutoScalingStrategyConfig.class), + workerSetupManager ); } else if (config.getStrategyImpl().equalsIgnoreCase("noop")) { strategy = new NoopScalingStrategy(); @@ -491,7 +528,8 @@ public class IndexerCoordinatorNode extends RegisteringNode new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true), retryScheduledExec, new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)), - strategy + strategy, + workerSetupManager ); } }; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java index 6cf9b0a7c16..e4acd93514f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java @@ -28,6 +28,8 @@ import com.metamx.druid.merger.common.task.MergeTask; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.service.ServiceEmitter; import javax.ws.rs.Consumes; @@ -48,18 +50,21 @@ public class IndexerCoordinatorResource private final IndexerCoordinatorConfig config; private final ServiceEmitter emitter; private final TaskQueue tasks; + private final WorkerSetupManager workerSetupManager; @Inject public IndexerCoordinatorResource( IndexerCoordinatorConfig config, ServiceEmitter emitter, - TaskQueue tasks + TaskQueue tasks, + WorkerSetupManager workerSetupManager ) throws Exception { this.config = config; this.emitter = emitter; this.tasks = tasks; + this.workerSetupManager = workerSetupManager; } @POST @@ -115,4 +120,25 @@ public class IndexerCoordinatorResource { return Response.ok(ImmutableMap.of("task", taskid)).build(); } + + @GET + @Path("/worker/setup") + @Produces("application/json") + public Response getWorkerSetupData() + { + return Response.ok(workerSetupManager.getWorkerSetupData()).build(); + } + + @POST + @Path("/worker/setup") + @Consumes("application/json") + public Response setWorkerSetupData( + final WorkerSetupData workerSetupData + ) + { + if (!workerSetupManager.setWorkerSetupData(workerSetupData)) { + return Response.status(Response.Status.BAD_REQUEST).build(); + } + return Response.ok().build(); + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java index 9c657bdc292..4cc1df9fa6f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java @@ -22,6 +22,7 @@ package com.metamx.druid.merger.coordinator.http; import com.google.inject.Provides; import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.service.ServiceEmitter; import com.sun.jersey.guice.JerseyServletModule; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; @@ -38,18 +39,21 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule private final IndexerCoordinatorConfig indexerCoordinatorConfig; private final ServiceEmitter emitter; private final TaskQueue tasks; + private final WorkerSetupManager workerSetupManager; public IndexerCoordinatorServletModule( ObjectMapper jsonMapper, IndexerCoordinatorConfig indexerCoordinatorConfig, ServiceEmitter emitter, - TaskQueue tasks + TaskQueue tasks, + WorkerSetupManager workerSetupManager ) { this.jsonMapper = jsonMapper; this.indexerCoordinatorConfig = indexerCoordinatorConfig; this.emitter = emitter; this.tasks = tasks; + this.workerSetupManager = workerSetupManager; } @Override @@ -60,6 +64,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig); bind(ServiceEmitter.class).toInstance(emitter); bind(TaskQueue.class).toInstance(tasks); + bind(WorkerSetupManager.class).toInstance(workerSetupManager); serve("/*").with(GuiceContainer.class); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java index 265fe62287c..eed69ae7f9e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java @@ -24,19 +24,20 @@ import com.amazonaws.services.ec2.model.DescribeInstancesRequest; import com.amazonaws.services.ec2.model.DescribeInstancesResult; import com.amazonaws.services.ec2.model.Filter; import com.amazonaws.services.ec2.model.Instance; -import com.amazonaws.services.ec2.model.InstanceType; import com.amazonaws.services.ec2.model.Reservation; import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.metamx.common.ISE; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; +import com.metamx.druid.merger.coordinator.setup.EC2NodeData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.EmittingLogger; import org.codehaus.jackson.map.ObjectMapper; -import javax.annotation.Nullable; -import java.io.File; import java.util.List; /** @@ -48,31 +49,40 @@ public class EC2AutoScalingStrategy implements ScalingStrategy private final ObjectMapper jsonMapper; private final AmazonEC2Client amazonEC2Client; private final EC2AutoScalingStrategyConfig config; + private final WorkerSetupManager workerSetupManager; public EC2AutoScalingStrategy( ObjectMapper jsonMapper, AmazonEC2Client amazonEC2Client, - EC2AutoScalingStrategyConfig config + EC2AutoScalingStrategyConfig config, + WorkerSetupManager workerSetupManager ) { this.jsonMapper = jsonMapper; this.amazonEC2Client = amazonEC2Client; this.config = config; + this.workerSetupManager = workerSetupManager; } @Override public AutoScalingData provision() { try { + WorkerSetupData setupData = workerSetupManager.getWorkerSetupData(); + if (!(setupData.getNodeData() instanceof EC2NodeData)) { + throw new ISE("DB misconfiguration! Node data is an instance of [%s]", setupData.getNodeData().getClass()); + } + EC2NodeData workerConfig = (EC2NodeData) setupData.getNodeData(); + log.info("Creating new instance(s)..."); RunInstancesResult result = amazonEC2Client.runInstances( new RunInstancesRequest( - config.getAmiId(), - config.getMinNumInstancesToProvision(), - config.getMaxNumInstancesToProvision() + workerConfig.getAmiId(), + workerConfig.getMinInstances(), + workerConfig.getMaxInstances() ) - .withInstanceType(InstanceType.fromValue(config.getInstanceType())) - .withUserData(jsonMapper.writeValueAsString(new File(config.getUserDataFile()))) + .withInstanceType(workerConfig.getInstanceType()) + .withUserData(jsonMapper.writeValueAsString(setupData.getUserData())) ); List instanceIds = Lists.transform( @@ -80,7 +90,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy new Function() { @Override - public String apply(@Nullable Instance input) + public String apply(Instance input) { return input.getInstanceId(); } @@ -95,7 +105,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy new Function() { @Override - public String apply(@Nullable Instance input) + public String apply(Instance input) { return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort()); } @@ -135,7 +145,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy new Function() { @Override - public String apply(@Nullable Instance input) + public String apply(Instance input) { return input.getInstanceId(); } @@ -150,7 +160,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy new Function() { @Override - public String apply(@Nullable Instance input) + public String apply(Instance input) { return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java new file mode 100644 index 00000000000..209444c6731 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java @@ -0,0 +1,52 @@ +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + */ +public class EC2NodeData implements WorkerNodeData +{ + private final String amiId; + private final String instanceType; + private final int minInstances; + private final int maxInstances; + + @JsonCreator + public EC2NodeData( + @JsonProperty("amiId") String amiId, + @JsonProperty("instanceType") String instanceType, + @JsonProperty("minInstances") int minInstances, + @JsonProperty("maxInstances") int maxInstances + ) + { + this.amiId = amiId; + this.instanceType = instanceType; + this.minInstances = minInstances; + this.maxInstances = maxInstances; + } + + @JsonProperty + public String getAmiId() + { + return amiId; + } + + @JsonProperty + public String getInstanceType() + { + return instanceType; + } + + @JsonProperty + public int getMinInstances() + { + return minInstances; + } + + @JsonProperty + public int getMaxInstances() + { + return maxInstances; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java new file mode 100644 index 00000000000..0baa0ddfb9a --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java @@ -0,0 +1,43 @@ +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + */ +public class GalaxyUserData implements WorkerUserData +{ + public final String env; + public final String ver; + public final String type; + + @JsonCreator + public GalaxyUserData( + @JsonProperty("env") String env, + @JsonProperty("ver") String ver, + @JsonProperty("type") String type + ) + { + this.env = env; + this.ver = ver; + this.type = type; + } + + @JsonProperty + public String getEnv() + { + return env; + } + + @JsonProperty + public String getVer() + { + return ver; + } + + @JsonProperty + public String getType() + { + return type; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java new file mode 100644 index 00000000000..8068a4c267b --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerNodeData.java @@ -0,0 +1,14 @@ +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonSubTypes; +import org.codehaus.jackson.annotate.JsonTypeInfo; + +/** + */ +@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, property="type") +@JsonSubTypes(value={ + @JsonSubTypes.Type(name="ec2", value=EC2NodeData.class) +}) +public interface WorkerNodeData +{ +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java new file mode 100644 index 00000000000..224b22167ae --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java @@ -0,0 +1,52 @@ +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + */ +public class WorkerSetupData +{ + private final String minVersion; + private final int minNumWorkers; + private final WorkerNodeData nodeData; + private final WorkerUserData userData; + + @JsonCreator + public WorkerSetupData( + @JsonProperty("minVersion") String minVersion, + @JsonProperty("minNumWorkers") int minNumWorkers, + @JsonProperty("nodeData") WorkerNodeData nodeData, + @JsonProperty("userData") WorkerUserData userData + ) + { + this.minVersion = minVersion; + this.minNumWorkers = minNumWorkers; + this.nodeData = nodeData; + this.userData = userData; + } + + @JsonProperty + public String getMinVersion() + { + return minVersion; + } + + @JsonProperty + public int getMinNumWorkers() + { + return minNumWorkers; + } + + @JsonProperty + public WorkerNodeData getNodeData() + { + return nodeData; + } + + @JsonProperty + public WorkerUserData getUserData() + { + return userData; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java new file mode 100644 index 00000000000..a4ab8fe8d5d --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java @@ -0,0 +1,216 @@ +package com.metamx.druid.merger.coordinator.setup; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig; +import org.apache.commons.collections.MapUtils; +import org.codehaus.jackson.map.ObjectMapper; +import org.joda.time.Duration; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.FoldController; +import org.skife.jdbi.v2.Folder3; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class WorkerSetupManager +{ + private static final Logger log = new Logger(WorkerSetupManager.class); + + private final DBI dbi; + private final ObjectMapper jsonMapper; + private final ScheduledExecutorService exec; + private final WorkerSetupManagerConfig config; + + private final Object lock = new Object(); + + private volatile AtomicReference workerSetupData = new AtomicReference(null); + private volatile boolean started = false; + + public WorkerSetupManager( + DBI dbi, + ScheduledExecutorService exec, + ObjectMapper jsonMapper, + WorkerSetupManagerConfig config + ) + { + this.dbi = dbi; + this.exec = exec; + this.jsonMapper = jsonMapper; + this.config = config; + } + + @LifecycleStart + public void start() + { + synchronized (lock) { + if (started) { + return; + } + + ScheduledExecutors.scheduleWithFixedDelay( + exec, + new Duration(0), + config.getPollDuration(), + new Runnable() + { + @Override + public void run() + { + poll(); + } + } + ); + + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + + started = false; + } + } + + public void poll() + { + try { + List setupDataList = dbi.withHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + return handle.createQuery( + String.format( + "SELECT minVersion, minNumWorkers, nodeData, userData FROM %s", + config.getWorkerSetupTable() + ) + ).fold( + Lists.newArrayList(), + new Folder3, Map>() + { + @Override + public ArrayList fold( + ArrayList workerNodeConfigurations, + Map stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + // stringObjectMap lowercases and jackson may fail serde + workerNodeConfigurations.add( + new WorkerSetupData( + MapUtils.getString(stringObjectMap, "minVersion"), + MapUtils.getInteger(stringObjectMap, "minNumWorkers"), + jsonMapper.readValue( + MapUtils.getString(stringObjectMap, "nodeData"), + WorkerNodeData.class + ), + jsonMapper.readValue( + MapUtils.getString(stringObjectMap, "userData"), + WorkerUserData.class + ) + ) + ); + return workerNodeConfigurations; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + } + } + ); + + if (setupDataList.isEmpty()) { + throw new ISE("WTF?! No configuration found for worker nodes!"); + } else if (setupDataList.size() != 1) { + throw new ISE("WTF?! Found more than one configuration for worker nodes"); + } + + workerSetupData.set(setupDataList.get(0)); + } + catch (Exception e) { + log.error(e, "Exception while polling for worker setup data!"); + } + } + + @SuppressWarnings("unchecked") + public WorkerSetupData getWorkerSetupData() + { + synchronized (lock) { + if (!started) { + throw new ISE("Must start WorkerSetupManager first!"); + } + + return workerSetupData.get(); + } + } + + public boolean setWorkerSetupData(final WorkerSetupData value) + { + synchronized (lock) { + try { + if (!started) { + throw new ISE("Must start WorkerSetupManager first!"); + } + + dbi.withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + handle.createStatement(String.format("DELETE FROM %s", config.getWorkerSetupTable())).execute(); + handle.createStatement( + String.format( + "INSERT INTO %s (minVersion, minNumWorkers, nodeData, userData) VALUES (:minVersion, :minNumWorkers, :nodeData, :userData)", + config.getWorkerSetupTable() + ) + ) + .bind("minVersion", value.getMinVersion()) + .bind("minNumWorkers", value.getMinNumWorkers()) + .bind("nodeData", jsonMapper.writeValueAsString(value.getNodeData())) + .bind("userData", jsonMapper.writeValueAsString(value.getUserData())) + .execute(); + + return null; + } + } + ); + + workerSetupData.set(value); + } + catch (Exception e) { + log.error(e, "Exception updating worker config"); + return false; + } + } + + return true; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java new file mode 100644 index 00000000000..80857fb58d1 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerUserData.java @@ -0,0 +1,14 @@ +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonSubTypes; +import org.codehaus.jackson.annotate.JsonTypeInfo; + +/** + */ +@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, property="classType") +@JsonSubTypes(value={ + @JsonSubTypes.Type(name="galaxy", value=GalaxyUserData.class) +}) +public interface WorkerUserData +{ +} diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index eb10731abd9..3282ca5dd3a 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -17,6 +17,8 @@ import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.worker.TaskMonitor; import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.WorkerCuratorCoordinator; @@ -62,6 +64,7 @@ public class RemoteTaskRunnerTest private PathChildrenCache pathChildrenCache; private RemoteTaskRunner remoteTaskRunner; private TaskMonitor taskMonitor; + private WorkerSetupManager workerSetupManager; private ScheduledExecutorService scheduledExec; @@ -69,7 +72,6 @@ public class RemoteTaskRunnerTest private Worker worker1; - @Before public void setUp() throws Exception { @@ -141,9 +143,10 @@ public class RemoteTaskRunnerTest { remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); try { - remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); - fail("ISE expected"); - } catch (ISE expected) { + remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); + fail("ISE expected"); + } + catch (ISE expected) { } } @@ -333,6 +336,17 @@ public class RemoteTaskRunnerTest private void makeRemoteTaskRunner() throws Exception { scheduledExec = EasyMock.createMock(ScheduledExecutorService.class); + workerSetupManager = EasyMock.createMock(WorkerSetupManager.class); + + EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn( + new WorkerSetupData( + "0", + 0, + null, + null + ) + ); + EasyMock.replay(workerSetupManager); remoteTaskRunner = new RemoteTaskRunner( jsonMapper, @@ -341,7 +355,8 @@ public class RemoteTaskRunnerTest pathChildrenCache, scheduledExec, new RetryPolicyFactory(new TestRetryPolicyConfig()), - new TestScalingStrategy() + new TestScalingStrategy(), + workerSetupManager ); // Create a single worker and wait for things for be ready @@ -405,18 +420,6 @@ public class RemoteTaskRunnerTest return null; } - @Override - public String getMinWorkerVersion() - { - return "0"; - } - - @Override - public int getMinNumWorkers() - { - return 0; - } - @Override public int getMaxWorkerIdleTimeMillisBeforeDeletion() { diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java index 958a2c1d836..2cbdfe83efe 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java @@ -29,6 +29,10 @@ import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; +import com.metamx.druid.merger.coordinator.setup.EC2NodeData; +import com.metamx.druid.merger.coordinator.setup.GalaxyUserData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; @@ -52,6 +56,7 @@ public class EC2AutoScalingStrategyTest private Reservation reservation; private Instance instance; private EC2AutoScalingStrategy strategy; + private WorkerSetupManager workerSetupManager; @Before public void setUp() throws Exception @@ -60,6 +65,7 @@ public class EC2AutoScalingStrategyTest runInstancesResult = EasyMock.createMock(RunInstancesResult.class); describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class); reservation = EasyMock.createMock(Reservation.class); + workerSetupManager = EasyMock.createMock(WorkerSetupManager.class); instance = new Instance() .withInstanceId(INSTANCE_ID) @@ -69,44 +75,16 @@ public class EC2AutoScalingStrategyTest strategy = new EC2AutoScalingStrategy( new DefaultObjectMapper(), - amazonEC2Client, new EC2AutoScalingStrategyConfig() - { - @Override - public String getAmiId() - { - return AMI_ID; - } - - @Override - public String getWorkerPort() - { - return "8080"; - } - - @Override - public String getInstanceType() - { - return "t1.micro"; - } - - @Override - public int getMinNumInstancesToProvision() - { - return 1; - } - - @Override - public int getMaxNumInstancesToProvision() - { - return 1; - } - - @Override - public String getUserDataFile() - { - return ""; - } - } + amazonEC2Client, + new EC2AutoScalingStrategyConfig() + { + @Override + public String getWorkerPort() + { + return "8080"; + } + }, + workerSetupManager ); } @@ -117,11 +95,22 @@ public class EC2AutoScalingStrategyTest EasyMock.verify(runInstancesResult); EasyMock.verify(describeInstancesResult); EasyMock.verify(reservation); + EasyMock.verify(workerSetupManager); } @Test public void testScale() { + EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn( + new WorkerSetupData( + "0", + 0, + new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1), + new GalaxyUserData("env", "ver", "type") + ) + ); + EasyMock.replay(workerSetupManager); + EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn( runInstancesResult );