diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index 99712df22a5..73013ce6aa2 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -59,6 +59,18 @@ public class DbConnector ); } + public static void createConfigTable(final DBI dbi, final String configTableName) + { + createTable( + dbi, + configTableName, + String.format( + "CREATE table %s (name VARCHAR(255) NOT NULL, payload LONGTEXT NOT NULL, PRIMARY KEY(name))", + configTableName + ) + ); + } + public static void createTable( final DBI dbi, final String tableName, diff --git a/merger/src/main/java/com/metamx/druid/merger/common/index/YeOldePlumberSchool.java b/merger/src/main/java/com/metamx/druid/merger/common/index/YeOldePlumberSchool.java index 345af0207ba..c5a2bc11826 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/index/YeOldePlumberSchool.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/index/YeOldePlumberSchool.java @@ -21,6 +21,7 @@ package com.metamx.druid.merger.common.index; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -138,7 +139,13 @@ public class YeOldePlumberSchool implements PlumberSchool IndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload); } - final DataSegment segmentToUpload = theSink.getSegment().withVersion(version); + // Map merged segment so we can extract dimensions + final QueryableIndex mappedSegment = IndexIO.loadIndex(fileToUpload); + + final DataSegment segmentToUpload = theSink.getSegment() + .withDimensions(ImmutableList.copyOf(mappedSegment.getAvailableDimensions())) + .withVersion(version); + segmentPusher.push(fileToUpload, segmentToUpload); log.info( diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 2a235b88d86..5537a6b6420 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -39,6 +39,7 @@ import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.worker.Worker; import com.metamx.emitter.EmittingLogger; import com.netflix.curator.framework.CuratorFramework; @@ -52,7 +53,7 @@ import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Period; -import javax.annotation.Nullable; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -88,6 +89,7 @@ public class RemoteTaskRunner implements TaskRunner private final ScheduledExecutorService scheduledExec; private final RetryPolicyFactory retryPolicyFactory; private final ScalingStrategy strategy; + private final WorkerSetupManager workerSetupManager; // all workers that exist in ZK private final Map zkWorkers = new ConcurrentHashMap(); @@ -109,7 +111,8 @@ public class RemoteTaskRunner implements TaskRunner PathChildrenCache workerPathCache, ScheduledExecutorService scheduledExec, RetryPolicyFactory retryPolicyFactory, - ScalingStrategy strategy + ScalingStrategy strategy, + WorkerSetupManager workerSetupManager ) { this.jsonMapper = jsonMapper; @@ -119,6 +122,7 @@ public class RemoteTaskRunner implements TaskRunner this.scheduledExec = scheduledExec; this.retryPolicyFactory = retryPolicyFactory; this.strategy = strategy; + this.workerSetupManager = workerSetupManager; } @LifecycleStart @@ -144,7 +148,7 @@ public class RemoteTaskRunner implements TaskRunner Worker.class ); log.info("Worker[%s] removed!", worker.getHost()); - removeWorker(worker.getHost()); + removeWorker(worker); } } } @@ -169,7 +173,7 @@ public class RemoteTaskRunner implements TaskRunner public void run() { if (currentlyTerminating.isEmpty()) { - if (zkWorkers.size() <= config.getMinNumWorkers()) { + if (zkWorkers.size() <= workerSetupManager.getWorkerSetupData().getMinNumWorkers()) { return; } @@ -180,7 +184,7 @@ public class RemoteTaskRunner implements TaskRunner new Predicate() { @Override - public boolean apply(@Nullable WorkerWrapper input) + public boolean apply(WorkerWrapper input) { return input.getRunningTasks().isEmpty() && System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis() @@ -196,9 +200,9 @@ public class RemoteTaskRunner implements TaskRunner new Function() { @Override - public String apply(@Nullable WorkerWrapper input) + public String apply(WorkerWrapper input) { - return input.getWorker().getHost(); + return input.getWorker().getIp(); } } ) @@ -218,7 +222,7 @@ public class RemoteTaskRunner implements TaskRunner } log.info( - "[%s] still terminating. Wait for all nodes to terminate before trying again.", + "%s still terminating. Wait for all nodes to terminate before trying again.", currentlyTerminating ); } @@ -368,7 +372,7 @@ public class RemoteTaskRunner implements TaskRunner private void addWorker(final Worker worker) { try { - currentlyProvisioning.remove(worker.getHost()); + currentlyProvisioning.removeAll(strategy.ipLookup(Arrays.asList(worker.getIp()))); final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost()); final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true); @@ -388,8 +392,7 @@ public class RemoteTaskRunner implements TaskRunner synchronized (statusLock) { try { if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || - event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) - { + event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); final TaskStatus taskStatus; @@ -399,7 +402,7 @@ public class RemoteTaskRunner implements TaskRunner event.getData().getData(), TaskStatus.class ); - if(!taskStatus.getId().equals(taskId)) { + if (!taskStatus.getId().equals(taskId)) { // Sanity check throw new ISE( "Worker[%s] status id does not match payload id: %s != %s", @@ -408,7 +411,8 @@ public class RemoteTaskRunner implements TaskRunner taskStatus.getId() ); } - } catch (Exception e) { + } + catch (Exception e) { log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId); retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId)); throw Throwables.propagate(e); @@ -446,7 +450,8 @@ public class RemoteTaskRunner implements TaskRunner } } } - } catch(Exception e) { + } + catch (Exception e) { log.makeAlert(e, "Failed to handle new worker status") .addData("worker", worker.getHost()) .addData("znode", event.getData().getPath()) @@ -478,22 +483,22 @@ public class RemoteTaskRunner implements TaskRunner * When a ephemeral worker node disappears from ZK, we have to make sure there are no tasks still assigned * to the worker. If tasks remain, they are retried. * - * @param workerId - id of the removed worker + * @param worker - the removed worker */ - private void removeWorker(final String workerId) + private void removeWorker(final Worker worker) { - currentlyTerminating.remove(workerId); + currentlyTerminating.remove(worker.getHost()); - WorkerWrapper workerWrapper = zkWorkers.get(workerId); + WorkerWrapper workerWrapper = zkWorkers.get(worker.getHost()); if (workerWrapper != null) { try { Set tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks()); - tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), workerId))); + tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), worker.getHost()))); for (String taskId : tasksToRetry) { TaskWrapper taskWrapper = tasks.get(taskId); if (taskWrapper != null) { - retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId)); + retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId)); } } @@ -503,7 +508,7 @@ public class RemoteTaskRunner implements TaskRunner log.error(e, "Failed to cleanly remove worker[%s]"); } } - zkWorkers.remove(workerId); + zkWorkers.remove(worker.getHost()); } private WorkerWrapper findWorkerForTask() @@ -526,7 +531,9 @@ public class RemoteTaskRunner implements TaskRunner public boolean apply(WorkerWrapper input) { return (!input.isAtCapacity() && - input.getWorker().getVersion().compareTo(config.getMinWorkerVersion()) >= 0); + input.getWorker() + .getVersion() + .compareTo(workerSetupManager.getWorkerSetupData().getMinVersion()) >= 0); } } ) @@ -551,7 +558,7 @@ public class RemoteTaskRunner implements TaskRunner } log.info( - "[%s] still provisioning. Wait for all provisioned nodes to complete before requesting new worker.", + "%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.", currentlyProvisioning ); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java index c364070e313..a8cfcf8df22 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java @@ -26,24 +26,7 @@ import org.skife.config.Default; */ public abstract class EC2AutoScalingStrategyConfig { - @Config("druid.indexer.amiId") - public abstract String getAmiId(); - @Config("druid.indexer.worker.port") @Default("8080") public abstract String getWorkerPort(); - - @Config("druid.indexer.instanceType") - public abstract String getInstanceType(); - - @Config("druid.indexer.minNumInstancesToProvision") - @Default("1") - public abstract int getMinNumInstancesToProvision(); - - @Config("druid.indexer.maxNumInstancesToProvision") - @Default("1") - public abstract int getMaxNumInstancesToProvision(); - - @Config("druid.indexer.userDataFile") - public abstract String getUserDataFile(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java index 00b869ea6da..c9badf7ef88 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java @@ -37,15 +37,8 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig @Default("2012-01-01T00:55:00.000Z") public abstract DateTime getTerminateResourcesOriginDateTime(); - @Config("druid.indexer.minWorkerVersion") - public abstract String getMinWorkerVersion(); - - @Config("druid.indexer.minNumWorkers") - @Default("1") - public abstract int getMinNumWorkers(); - @Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion") - @Default("1") + @Default("10000") public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion(); @Config("druid.indexer.maxScalingDuration") diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java new file mode 100644 index 00000000000..16eeb1c3439 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java @@ -0,0 +1,39 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.coordinator.config; + +import org.joda.time.Duration; +import org.skife.config.Config; +import org.skife.config.Default; + +/** + */ +public abstract class WorkerSetupManagerConfig +{ + @Config("druid.indexer.configTable") + public abstract String getConfigTable(); + + @Config("druid.indexer.workerSetupConfigName") + public abstract String getWorkerSetupConfigName(); + + @Config("druid.indexer.poll.duration") + @Default("PT1M") + public abstract Duration getPollDuration(); +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index bc48c7c71de..3dae4046764 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -47,6 +47,9 @@ import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.loading.S3SegmentPusher; +import com.metamx.druid.loading.S3SegmentPusherConfig; +import com.metamx.druid.loading.SegmentPusher; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; @@ -66,12 +69,11 @@ import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; +import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig; import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; -import com.metamx.druid.loading.S3SegmentPusher; -import com.metamx.druid.loading.S3SegmentPusherConfig; -import com.metamx.druid.loading.SegmentPusher; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Emitters; @@ -98,6 +100,7 @@ import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.FilterHolder; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; +import org.skife.jdbi.v2.DBI; import java.net.URL; import java.util.Arrays; @@ -133,6 +136,7 @@ public class IndexerCoordinatorNode extends RegisteringNode private CuratorFramework curatorFramework = null; private ScheduledExecutorFactory scheduledExecutorFactory = null; private IndexerZkConfig indexerZkConfig; + private WorkerSetupManager workerSetupManager = null; private TaskRunnerFactory taskRunnerFactory = null; private TaskMaster taskMaster = null; private Server server = null; @@ -160,14 +164,16 @@ public class IndexerCoordinatorNode extends RegisteringNode return this; } - public void setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator) + public IndexerCoordinatorNode setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator) { this.mergerDBCoordinator = mergerDBCoordinator; + return this; } - public void setTaskQueue(TaskQueue taskQueue) + public IndexerCoordinatorNode setTaskQueue(TaskQueue taskQueue) { this.taskQueue = taskQueue; + return this; } public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator) @@ -182,9 +188,16 @@ public class IndexerCoordinatorNode extends RegisteringNode return this; } - public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) + public IndexerCoordinatorNode setWorkerSetupManager(WorkerSetupManager workerSetupManager) + { + this.workerSetupManager = workerSetupManager; + return this; + } + + public IndexerCoordinatorNode setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { this.taskRunnerFactory = taskRunnerFactory; + return this; } public void init() throws Exception @@ -202,6 +215,7 @@ public class IndexerCoordinatorNode extends RegisteringNode initializeJacksonSubtypes(); initializeCurator(); initializeIndexerZkConfig(); + initializeWorkerSetupManager(); initializeTaskRunnerFactory(); initializeTaskMaster(); initializeServer(); @@ -220,7 +234,8 @@ public class IndexerCoordinatorNode extends RegisteringNode jsonMapper, config, emitter, - taskQueue + taskQueue, + workerSetupManager ) ); @@ -447,6 +462,27 @@ public class IndexerCoordinatorNode extends RegisteringNode } } + public void initializeWorkerSetupManager() + { + if (workerSetupManager == null) { + final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); + final DBI dbi = new DbConnector(dbConnectorConfig).getDBI(); + final WorkerSetupManagerConfig workerSetupManagerConfig = configFactory.build(WorkerSetupManagerConfig.class); + + DbConnector.createConfigTable(dbi, workerSetupManagerConfig.getConfigTable()); + workerSetupManager = new WorkerSetupManager( + dbi, Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("WorkerSetupManagerExec--%d") + .build() + ), jsonMapper, workerSetupManagerConfig + ); + } + lifecycle.addManagedInstance(workerSetupManager); + } + public void initializeTaskRunnerFactory() { if (taskRunnerFactory == null) { @@ -476,7 +512,8 @@ public class IndexerCoordinatorNode extends RegisteringNode PropUtils.getProperty(props, "com.metamx.aws.secretKey") ) ), - configFactory.build(EC2AutoScalingStrategyConfig.class) + configFactory.build(EC2AutoScalingStrategyConfig.class), + workerSetupManager ); } else if (config.getStrategyImpl().equalsIgnoreCase("noop")) { strategy = new NoopScalingStrategy(); @@ -491,7 +528,8 @@ public class IndexerCoordinatorNode extends RegisteringNode new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true), retryScheduledExec, new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)), - strategy + strategy, + workerSetupManager ); } }; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java index 6cf9b0a7c16..e4acd93514f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java @@ -28,6 +28,8 @@ import com.metamx.druid.merger.common.task.MergeTask; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.service.ServiceEmitter; import javax.ws.rs.Consumes; @@ -48,18 +50,21 @@ public class IndexerCoordinatorResource private final IndexerCoordinatorConfig config; private final ServiceEmitter emitter; private final TaskQueue tasks; + private final WorkerSetupManager workerSetupManager; @Inject public IndexerCoordinatorResource( IndexerCoordinatorConfig config, ServiceEmitter emitter, - TaskQueue tasks + TaskQueue tasks, + WorkerSetupManager workerSetupManager ) throws Exception { this.config = config; this.emitter = emitter; this.tasks = tasks; + this.workerSetupManager = workerSetupManager; } @POST @@ -115,4 +120,25 @@ public class IndexerCoordinatorResource { return Response.ok(ImmutableMap.of("task", taskid)).build(); } + + @GET + @Path("/worker/setup") + @Produces("application/json") + public Response getWorkerSetupData() + { + return Response.ok(workerSetupManager.getWorkerSetupData()).build(); + } + + @POST + @Path("/worker/setup") + @Consumes("application/json") + public Response setWorkerSetupData( + final WorkerSetupData workerSetupData + ) + { + if (!workerSetupManager.setWorkerSetupData(workerSetupData)) { + return Response.status(Response.Status.BAD_REQUEST).build(); + } + return Response.ok().build(); + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java index 9c657bdc292..4cc1df9fa6f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java @@ -22,6 +22,7 @@ package com.metamx.druid.merger.coordinator.http; import com.google.inject.Provides; import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.service.ServiceEmitter; import com.sun.jersey.guice.JerseyServletModule; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; @@ -38,18 +39,21 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule private final IndexerCoordinatorConfig indexerCoordinatorConfig; private final ServiceEmitter emitter; private final TaskQueue tasks; + private final WorkerSetupManager workerSetupManager; public IndexerCoordinatorServletModule( ObjectMapper jsonMapper, IndexerCoordinatorConfig indexerCoordinatorConfig, ServiceEmitter emitter, - TaskQueue tasks + TaskQueue tasks, + WorkerSetupManager workerSetupManager ) { this.jsonMapper = jsonMapper; this.indexerCoordinatorConfig = indexerCoordinatorConfig; this.emitter = emitter; this.tasks = tasks; + this.workerSetupManager = workerSetupManager; } @Override @@ -60,6 +64,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig); bind(ServiceEmitter.class).toInstance(emitter); bind(TaskQueue.class).toInstance(tasks); + bind(WorkerSetupManager.class).toInstance(workerSetupManager); serve("/*").with(GuiceContainer.class); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java index 6cce08f8731..5a1bb4980e5 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator.scaling; import java.util.List; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java index 265fe62287c..8d51da61afd 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java @@ -24,7 +24,6 @@ import com.amazonaws.services.ec2.model.DescribeInstancesRequest; import com.amazonaws.services.ec2.model.DescribeInstancesResult; import com.amazonaws.services.ec2.model.Filter; import com.amazonaws.services.ec2.model.Instance; -import com.amazonaws.services.ec2.model.InstanceType; import com.amazonaws.services.ec2.model.Reservation; import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesResult; @@ -32,11 +31,14 @@ import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.google.common.base.Function; import com.google.common.collect.Lists; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; +import com.metamx.druid.merger.coordinator.setup.EC2NodeData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.EmittingLogger; +import org.apache.commons.codec.binary.Base64; import org.codehaus.jackson.map.ObjectMapper; import javax.annotation.Nullable; -import java.io.File; import java.util.List; /** @@ -48,31 +50,45 @@ public class EC2AutoScalingStrategy implements ScalingStrategy private final ObjectMapper jsonMapper; private final AmazonEC2Client amazonEC2Client; private final EC2AutoScalingStrategyConfig config; + private final WorkerSetupManager workerSetupManager; public EC2AutoScalingStrategy( ObjectMapper jsonMapper, AmazonEC2Client amazonEC2Client, - EC2AutoScalingStrategyConfig config + EC2AutoScalingStrategyConfig config, + WorkerSetupManager workerSetupManager ) { this.jsonMapper = jsonMapper; this.amazonEC2Client = amazonEC2Client; this.config = config; + this.workerSetupManager = workerSetupManager; } @Override public AutoScalingData provision() { try { + WorkerSetupData setupData = workerSetupManager.getWorkerSetupData(); + EC2NodeData workerConfig = setupData.getNodeData(); + log.info("Creating new instance(s)..."); RunInstancesResult result = amazonEC2Client.runInstances( new RunInstancesRequest( - config.getAmiId(), - config.getMinNumInstancesToProvision(), - config.getMaxNumInstancesToProvision() + workerConfig.getAmiId(), + workerConfig.getMinInstances(), + workerConfig.getMaxInstances() ) - .withInstanceType(InstanceType.fromValue(config.getInstanceType())) - .withUserData(jsonMapper.writeValueAsString(new File(config.getUserDataFile()))) + .withInstanceType(workerConfig.getInstanceType()) + .withSecurityGroupIds(workerConfig.getSecurityGroupIds()) + .withKeyName(workerConfig.getKeyName()) + .withUserData( + Base64.encodeBase64String( + jsonMapper.writeValueAsBytes( + setupData.getUserData() + ) + ) + ) ); List instanceIds = Lists.transform( @@ -80,7 +96,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy new Function() { @Override - public String apply(@Nullable Instance input) + public String apply(Instance input) { return input.getInstanceId(); } @@ -95,9 +111,9 @@ public class EC2AutoScalingStrategy implements ScalingStrategy new Function() { @Override - public String apply(@Nullable Instance input) + public String apply(Instance input) { - return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort()); + return input.getInstanceId(); } } ), @@ -112,12 +128,12 @@ public class EC2AutoScalingStrategy implements ScalingStrategy } @Override - public AutoScalingData terminate(List nodeIds) + public AutoScalingData terminate(List ids) { DescribeInstancesResult result = amazonEC2Client.describeInstances( new DescribeInstancesRequest() .withFilters( - new Filter("private-ip-address", nodeIds) + new Filter("private-ip-address", ids) ) ); @@ -135,7 +151,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy new Function() { @Override - public String apply(@Nullable Instance input) + public String apply(Instance input) { return input.getInstanceId(); } @@ -146,13 +162,13 @@ public class EC2AutoScalingStrategy implements ScalingStrategy return new AutoScalingData( Lists.transform( - instances, - new Function() + ids, + new Function() { @Override - public String apply(@Nullable Instance input) + public String apply(@Nullable String input) { - return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort()); + return String.format("%s:%s", input, config.getWorkerPort()); } } ), @@ -165,4 +181,36 @@ public class EC2AutoScalingStrategy implements ScalingStrategy return null; } + + @Override + public List ipLookup(List ips) + { + DescribeInstancesResult result = amazonEC2Client.describeInstances( + new DescribeInstancesRequest() + .withFilters( + new Filter("private-ip-address", ips) + ) + ); + + List instances = Lists.newArrayList(); + for (Reservation reservation : result.getReservations()) { + instances.addAll(reservation.getInstances()); + } + + List retVal = Lists.transform( + instances, + new Function() + { + @Override + public String apply(Instance input) + { + return input.getInstanceId(); + } + } + ); + + log.info("Performing lookup: %s --> %s", ips, retVal); + + return retVal; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java index 67eb99293e4..2b412ca6202 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator.scaling; import com.metamx.emitter.EmittingLogger; @@ -24,4 +43,11 @@ public class NoopScalingStrategy implements ScalingStrategy log.info("If I were a real strategy I'd terminate %s now", nodeIds); return null; } + + @Override + public List ipLookup(List ips) + { + log.info("I'm not a real strategy so I'm returning what I got %s", ips); + return ips; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java index 9b7da8fb3a4..150de1357e0 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java @@ -27,5 +27,12 @@ public interface ScalingStrategy { public AutoScalingData provision(); - public AutoScalingData terminate(List nodeIds); + public AutoScalingData terminate(List ids); + + /** + * Provides a lookup of ip addresses to node ids + * @param ips + * @return + */ + public List ipLookup(List ips); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java new file mode 100644 index 00000000000..8d302df25f6 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java @@ -0,0 +1,91 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.List; + +/** + */ +public class EC2NodeData +{ + private final String amiId; + private final String instanceType; + private final int minInstances; + private final int maxInstances; + private final List securityGroupIds; + private final String keyName; + + @JsonCreator + public EC2NodeData( + @JsonProperty("amiId") String amiId, + @JsonProperty("instanceType") String instanceType, + @JsonProperty("minInstances") int minInstances, + @JsonProperty("maxInstances") int maxInstances, + @JsonProperty("securityGroupIds") List securityGroupIds, + @JsonProperty("keyName") String keyName + ) + { + this.amiId = amiId; + this.instanceType = instanceType; + this.minInstances = minInstances; + this.maxInstances = maxInstances; + this.securityGroupIds = securityGroupIds; + this.keyName = keyName; + } + + @JsonProperty + public String getAmiId() + { + return amiId; + } + + @JsonProperty + public String getInstanceType() + { + return instanceType; + } + + @JsonProperty + public int getMinInstances() + { + return minInstances; + } + + @JsonProperty + public int getMaxInstances() + { + return maxInstances; + } + + @JsonProperty + public List getSecurityGroupIds() + { + return securityGroupIds; + } + + @JsonProperty + public String getKeyName() + { + return keyName; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java new file mode 100644 index 00000000000..876a2635273 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java @@ -0,0 +1,62 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + */ +public class GalaxyUserData +{ + public final String env; + public final String version; + public final String type; + + @JsonCreator + public GalaxyUserData( + @JsonProperty("env") String env, + @JsonProperty("version") String version, + @JsonProperty("type") String type + ) + { + this.env = env; + this.version = version; + this.type = type; + } + + @JsonProperty + public String getEnv() + { + return env; + } + + @JsonProperty + public String getVersion() + { + return version; + } + + @JsonProperty + public String getType() + { + return type; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java new file mode 100644 index 00000000000..8395fa2d6c8 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java @@ -0,0 +1,73 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.coordinator.setup; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.List; + +/** + */ +public class WorkerSetupData +{ + private final String minVersion; + private final int minNumWorkers; + private final EC2NodeData nodeData; + private final GalaxyUserData userData; + + @JsonCreator + public WorkerSetupData( + @JsonProperty("minVersion") String minVersion, + @JsonProperty("minNumWorkers") int minNumWorkers, + @JsonProperty("nodeData") EC2NodeData nodeData, + @JsonProperty("userData") GalaxyUserData userData + ) + { + this.minVersion = minVersion; + this.minNumWorkers = minNumWorkers; + this.nodeData = nodeData; + this.userData = userData; + } + + @JsonProperty + public String getMinVersion() + { + return minVersion; + } + + @JsonProperty + public int getMinNumWorkers() + { + return minNumWorkers; + } + + @JsonProperty + public EC2NodeData getNodeData() + { + return nodeData; + } + + @JsonProperty + public GalaxyUserData getUserData() + { + return userData; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java new file mode 100644 index 00000000000..5e43e68ae66 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java @@ -0,0 +1,226 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.coordinator.setup; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig; +import org.apache.commons.collections.MapUtils; +import org.codehaus.jackson.map.ObjectMapper; +import org.joda.time.Duration; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.FoldController; +import org.skife.jdbi.v2.Folder3; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class WorkerSetupManager +{ + private static final Logger log = new Logger(WorkerSetupManager.class); + + private final DBI dbi; + private final ObjectMapper jsonMapper; + private final ScheduledExecutorService exec; + private final WorkerSetupManagerConfig config; + + private final Object lock = new Object(); + + private volatile AtomicReference workerSetupData = new AtomicReference(null); + private volatile boolean started = false; + + public WorkerSetupManager( + DBI dbi, + ScheduledExecutorService exec, + ObjectMapper jsonMapper, + WorkerSetupManagerConfig config + ) + { + this.dbi = dbi; + this.exec = exec; + this.jsonMapper = jsonMapper; + this.config = config; + } + + @LifecycleStart + public void start() + { + synchronized (lock) { + if (started) { + return; + } + + ScheduledExecutors.scheduleWithFixedDelay( + exec, + new Duration(0), + config.getPollDuration(), + new Runnable() + { + @Override + public void run() + { + poll(); + } + } + ); + + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + + started = false; + } + } + + public void poll() + { + try { + List setupDataList = dbi.withHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + return handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE name = :name", + config.getConfigTable() + ) + ) + .bind("name", config.getWorkerSetupConfigName()) + .fold( + Lists.newArrayList(), + new Folder3, Map>() + { + @Override + public ArrayList fold( + ArrayList workerNodeConfigurations, + Map stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + // stringObjectMap lowercases and jackson may fail serde + workerNodeConfigurations.add( + jsonMapper.readValue( + MapUtils.getString(stringObjectMap, "payload"), + WorkerSetupData.class + ) + ); + return workerNodeConfigurations; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + } + } + ); + + if (setupDataList.isEmpty()) { + throw new ISE("WTF?! No configuration found for worker nodes!"); + } else if (setupDataList.size() != 1) { + throw new ISE("WTF?! Found more than one configuration for worker nodes"); + } + + workerSetupData.set(setupDataList.get(0)); + } + catch (Exception e) { + log.error(e, "Exception while polling for worker setup data!"); + } + } + + @SuppressWarnings("unchecked") + public WorkerSetupData getWorkerSetupData() + { + synchronized (lock) { + if (!started) { + throw new ISE("Must start WorkerSetupManager first!"); + } + + return workerSetupData.get(); + } + } + + public boolean setWorkerSetupData(final WorkerSetupData value) + { + synchronized (lock) { + try { + if (!started) { + throw new ISE("Must start WorkerSetupManager first!"); + } + + dbi.withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + handle.createStatement( + String.format( + "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload", + config.getConfigTable() + ) + ) + .bind("name", config.getWorkerSetupConfigName()) + .bind("payload", jsonMapper.writeValueAsString(value)) + .execute(); + + return null; + } + } + ); + + workerSetupData.set(value); + } + catch (Exception e) { + log.error(e, "Exception updating worker config"); + return false; + } + } + + return true; + } +} diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index eb10731abd9..1e31efa121c 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -17,6 +17,8 @@ import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.worker.TaskMonitor; import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.WorkerCuratorCoordinator; @@ -62,6 +64,7 @@ public class RemoteTaskRunnerTest private PathChildrenCache pathChildrenCache; private RemoteTaskRunner remoteTaskRunner; private TaskMonitor taskMonitor; + private WorkerSetupManager workerSetupManager; private ScheduledExecutorService scheduledExec; @@ -69,7 +72,6 @@ public class RemoteTaskRunnerTest private Worker worker1; - @Before public void setUp() throws Exception { @@ -141,9 +143,10 @@ public class RemoteTaskRunnerTest { remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); try { - remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); - fail("ISE expected"); - } catch (ISE expected) { + remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); + fail("ISE expected"); + } + catch (ISE expected) { } } @@ -333,6 +336,17 @@ public class RemoteTaskRunnerTest private void makeRemoteTaskRunner() throws Exception { scheduledExec = EasyMock.createMock(ScheduledExecutorService.class); + workerSetupManager = EasyMock.createMock(WorkerSetupManager.class); + + EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn( + new WorkerSetupData( + "0", + 0, + null, + null + ) + ); + EasyMock.replay(workerSetupManager); remoteTaskRunner = new RemoteTaskRunner( jsonMapper, @@ -341,7 +355,8 @@ public class RemoteTaskRunnerTest pathChildrenCache, scheduledExec, new RetryPolicyFactory(new TestRetryPolicyConfig()), - new TestScalingStrategy() + new TestScalingStrategy(), + workerSetupManager ); // Create a single worker and wait for things for be ready @@ -389,6 +404,12 @@ public class RemoteTaskRunnerTest { return null; } + + @Override + public List ipLookup(List ips) + { + return ips; + } } private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig @@ -405,18 +426,6 @@ public class RemoteTaskRunnerTest return null; } - @Override - public String getMinWorkerVersion() - { - return "0"; - } - - @Override - public int getMinNumWorkers() - { - return 0; - } - @Override public int getMaxWorkerIdleTimeMillisBeforeDeletion() { diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java index 958a2c1d836..c3aa8378b07 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java @@ -27,8 +27,13 @@ import com.amazonaws.services.ec2.model.Reservation; import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; +import com.google.common.collect.Lists; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; +import com.metamx.druid.merger.coordinator.setup.EC2NodeData; +import com.metamx.druid.merger.coordinator.setup.GalaxyUserData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; @@ -52,6 +57,7 @@ public class EC2AutoScalingStrategyTest private Reservation reservation; private Instance instance; private EC2AutoScalingStrategy strategy; + private WorkerSetupManager workerSetupManager; @Before public void setUp() throws Exception @@ -60,6 +66,7 @@ public class EC2AutoScalingStrategyTest runInstancesResult = EasyMock.createMock(RunInstancesResult.class); describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class); reservation = EasyMock.createMock(Reservation.class); + workerSetupManager = EasyMock.createMock(WorkerSetupManager.class); instance = new Instance() .withInstanceId(INSTANCE_ID) @@ -69,44 +76,16 @@ public class EC2AutoScalingStrategyTest strategy = new EC2AutoScalingStrategy( new DefaultObjectMapper(), - amazonEC2Client, new EC2AutoScalingStrategyConfig() - { - @Override - public String getAmiId() - { - return AMI_ID; - } - - @Override - public String getWorkerPort() - { - return "8080"; - } - - @Override - public String getInstanceType() - { - return "t1.micro"; - } - - @Override - public int getMinNumInstancesToProvision() - { - return 1; - } - - @Override - public int getMaxNumInstancesToProvision() - { - return 1; - } - - @Override - public String getUserDataFile() - { - return ""; - } - } + amazonEC2Client, + new EC2AutoScalingStrategyConfig() + { + @Override + public String getWorkerPort() + { + return "8080"; + } + }, + workerSetupManager ); } @@ -117,11 +96,22 @@ public class EC2AutoScalingStrategyTest EasyMock.verify(runInstancesResult); EasyMock.verify(describeInstancesResult); EasyMock.verify(reservation); + EasyMock.verify(workerSetupManager); } @Test public void testScale() { + EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn( + new WorkerSetupData( + "0", + 0, + new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.newArrayList(), "foo"), + new GalaxyUserData("env", "version", "type") + ) + ); + EasyMock.replay(workerSetupManager); + EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn( runInstancesResult ); @@ -144,9 +134,9 @@ public class EC2AutoScalingStrategyTest Assert.assertEquals(created.getNodeIds().size(), 1); Assert.assertEquals(created.getNodes().size(), 1); - Assert.assertEquals(String.format("%s:8080", IP), created.getNodeIds().get(0)); + Assert.assertEquals("theInstance", created.getNodeIds().get(0)); - AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyHost")); + AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP")); Assert.assertEquals(deleted.getNodeIds().size(), 1); Assert.assertEquals(deleted.getNodes().size(), 1); diff --git a/pom.xml b/pom.xml index c580894b379..4310add56c6 100644 --- a/pom.xml +++ b/pom.xml @@ -84,7 +84,7 @@ commons-codec commons-codec - 1.3 + 1.7 commons-httpclient