Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
Eric Tschetter 2013-01-23 18:37:03 -06:00
commit 9b6244ec15
20 changed files with 798 additions and 137 deletions

View File

@ -59,6 +59,18 @@ public class DbConnector
); );
} }
public static void createConfigTable(final DBI dbi, final String configTableName)
{
createTable(
dbi,
configTableName,
String.format(
"CREATE table %s (name VARCHAR(255) NOT NULL, payload LONGTEXT NOT NULL, PRIMARY KEY(name))",
configTableName
)
);
}
public static void createTable( public static void createTable(
final DBI dbi, final DBI dbi,
final String tableName, final String tableName,

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.common.index;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -138,7 +139,13 @@ public class YeOldePlumberSchool implements PlumberSchool
IndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload); IndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
} }
final DataSegment segmentToUpload = theSink.getSegment().withVersion(version); // Map merged segment so we can extract dimensions
final QueryableIndex mappedSegment = IndexIO.loadIndex(fileToUpload);
final DataSegment segmentToUpload = theSink.getSegment()
.withDimensions(ImmutableList.copyOf(mappedSegment.getAvailableDimensions()))
.withVersion(version);
segmentPusher.push(fileToUpload, segmentToUpload); segmentPusher.push(fileToUpload, segmentToUpload);
log.info( log.info(

View File

@ -39,6 +39,7 @@ import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.framework.CuratorFramework;
@ -52,7 +53,7 @@ import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.Period; import org.joda.time.Period;
import javax.annotation.Nullable; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -88,6 +89,7 @@ public class RemoteTaskRunner implements TaskRunner
private final ScheduledExecutorService scheduledExec; private final ScheduledExecutorService scheduledExec;
private final RetryPolicyFactory retryPolicyFactory; private final RetryPolicyFactory retryPolicyFactory;
private final ScalingStrategy strategy; private final ScalingStrategy strategy;
private final WorkerSetupManager workerSetupManager;
// all workers that exist in ZK // all workers that exist in ZK
private final Map<String, WorkerWrapper> zkWorkers = new ConcurrentHashMap<String, WorkerWrapper>(); private final Map<String, WorkerWrapper> zkWorkers = new ConcurrentHashMap<String, WorkerWrapper>();
@ -109,7 +111,8 @@ public class RemoteTaskRunner implements TaskRunner
PathChildrenCache workerPathCache, PathChildrenCache workerPathCache,
ScheduledExecutorService scheduledExec, ScheduledExecutorService scheduledExec,
RetryPolicyFactory retryPolicyFactory, RetryPolicyFactory retryPolicyFactory,
ScalingStrategy strategy ScalingStrategy strategy,
WorkerSetupManager workerSetupManager
) )
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
@ -119,6 +122,7 @@ public class RemoteTaskRunner implements TaskRunner
this.scheduledExec = scheduledExec; this.scheduledExec = scheduledExec;
this.retryPolicyFactory = retryPolicyFactory; this.retryPolicyFactory = retryPolicyFactory;
this.strategy = strategy; this.strategy = strategy;
this.workerSetupManager = workerSetupManager;
} }
@LifecycleStart @LifecycleStart
@ -144,7 +148,7 @@ public class RemoteTaskRunner implements TaskRunner
Worker.class Worker.class
); );
log.info("Worker[%s] removed!", worker.getHost()); log.info("Worker[%s] removed!", worker.getHost());
removeWorker(worker.getHost()); removeWorker(worker);
} }
} }
} }
@ -169,7 +173,7 @@ public class RemoteTaskRunner implements TaskRunner
public void run() public void run()
{ {
if (currentlyTerminating.isEmpty()) { if (currentlyTerminating.isEmpty()) {
if (zkWorkers.size() <= config.getMinNumWorkers()) { if (zkWorkers.size() <= workerSetupManager.getWorkerSetupData().getMinNumWorkers()) {
return; return;
} }
@ -180,7 +184,7 @@ public class RemoteTaskRunner implements TaskRunner
new Predicate<WorkerWrapper>() new Predicate<WorkerWrapper>()
{ {
@Override @Override
public boolean apply(@Nullable WorkerWrapper input) public boolean apply(WorkerWrapper input)
{ {
return input.getRunningTasks().isEmpty() return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis() && System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
@ -196,9 +200,9 @@ public class RemoteTaskRunner implements TaskRunner
new Function<WorkerWrapper, String>() new Function<WorkerWrapper, String>()
{ {
@Override @Override
public String apply(@Nullable WorkerWrapper input) public String apply(WorkerWrapper input)
{ {
return input.getWorker().getHost(); return input.getWorker().getIp();
} }
} }
) )
@ -218,7 +222,7 @@ public class RemoteTaskRunner implements TaskRunner
} }
log.info( log.info(
"[%s] still terminating. Wait for all nodes to terminate before trying again.", "%s still terminating. Wait for all nodes to terminate before trying again.",
currentlyTerminating currentlyTerminating
); );
} }
@ -368,7 +372,7 @@ public class RemoteTaskRunner implements TaskRunner
private void addWorker(final Worker worker) private void addWorker(final Worker worker)
{ {
try { try {
currentlyProvisioning.remove(worker.getHost()); currentlyProvisioning.removeAll(strategy.ipLookup(Arrays.<String>asList(worker.getIp())));
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost()); final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true); final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
@ -388,8 +392,7 @@ public class RemoteTaskRunner implements TaskRunner
synchronized (statusLock) { synchronized (statusLock) {
try { try {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) ||
event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
{
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskStatus taskStatus; final TaskStatus taskStatus;
@ -399,7 +402,7 @@ public class RemoteTaskRunner implements TaskRunner
event.getData().getData(), TaskStatus.class event.getData().getData(), TaskStatus.class
); );
if(!taskStatus.getId().equals(taskId)) { if (!taskStatus.getId().equals(taskId)) {
// Sanity check // Sanity check
throw new ISE( throw new ISE(
"Worker[%s] status id does not match payload id: %s != %s", "Worker[%s] status id does not match payload id: %s != %s",
@ -408,7 +411,8 @@ public class RemoteTaskRunner implements TaskRunner
taskStatus.getId() taskStatus.getId()
); );
} }
} catch (Exception e) { }
catch (Exception e) {
log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId); log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId);
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId)); retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
throw Throwables.propagate(e); throw Throwables.propagate(e);
@ -446,7 +450,8 @@ public class RemoteTaskRunner implements TaskRunner
} }
} }
} }
} catch(Exception e) { }
catch (Exception e) {
log.makeAlert(e, "Failed to handle new worker status") log.makeAlert(e, "Failed to handle new worker status")
.addData("worker", worker.getHost()) .addData("worker", worker.getHost())
.addData("znode", event.getData().getPath()) .addData("znode", event.getData().getPath())
@ -478,22 +483,22 @@ public class RemoteTaskRunner implements TaskRunner
* When a ephemeral worker node disappears from ZK, we have to make sure there are no tasks still assigned * When a ephemeral worker node disappears from ZK, we have to make sure there are no tasks still assigned
* to the worker. If tasks remain, they are retried. * to the worker. If tasks remain, they are retried.
* *
* @param workerId - id of the removed worker * @param worker - the removed worker
*/ */
private void removeWorker(final String workerId) private void removeWorker(final Worker worker)
{ {
currentlyTerminating.remove(workerId); currentlyTerminating.remove(worker.getHost());
WorkerWrapper workerWrapper = zkWorkers.get(workerId); WorkerWrapper workerWrapper = zkWorkers.get(worker.getHost());
if (workerWrapper != null) { if (workerWrapper != null) {
try { try {
Set<String> tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks()); Set<String> tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks());
tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), workerId))); tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), worker.getHost())));
for (String taskId : tasksToRetry) { for (String taskId : tasksToRetry) {
TaskWrapper taskWrapper = tasks.get(taskId); TaskWrapper taskWrapper = tasks.get(taskId);
if (taskWrapper != null) { if (taskWrapper != null) {
retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId)); retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
} }
} }
@ -503,7 +508,7 @@ public class RemoteTaskRunner implements TaskRunner
log.error(e, "Failed to cleanly remove worker[%s]"); log.error(e, "Failed to cleanly remove worker[%s]");
} }
} }
zkWorkers.remove(workerId); zkWorkers.remove(worker.getHost());
} }
private WorkerWrapper findWorkerForTask() private WorkerWrapper findWorkerForTask()
@ -526,7 +531,9 @@ public class RemoteTaskRunner implements TaskRunner
public boolean apply(WorkerWrapper input) public boolean apply(WorkerWrapper input)
{ {
return (!input.isAtCapacity() && return (!input.isAtCapacity() &&
input.getWorker().getVersion().compareTo(config.getMinWorkerVersion()) >= 0); input.getWorker()
.getVersion()
.compareTo(workerSetupManager.getWorkerSetupData().getMinVersion()) >= 0);
} }
} }
) )
@ -551,7 +558,7 @@ public class RemoteTaskRunner implements TaskRunner
} }
log.info( log.info(
"[%s] still provisioning. Wait for all provisioned nodes to complete before requesting new worker.", "%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
currentlyProvisioning currentlyProvisioning
); );
} }

View File

@ -26,24 +26,7 @@ import org.skife.config.Default;
*/ */
public abstract class EC2AutoScalingStrategyConfig public abstract class EC2AutoScalingStrategyConfig
{ {
@Config("druid.indexer.amiId")
public abstract String getAmiId();
@Config("druid.indexer.worker.port") @Config("druid.indexer.worker.port")
@Default("8080") @Default("8080")
public abstract String getWorkerPort(); public abstract String getWorkerPort();
@Config("druid.indexer.instanceType")
public abstract String getInstanceType();
@Config("druid.indexer.minNumInstancesToProvision")
@Default("1")
public abstract int getMinNumInstancesToProvision();
@Config("druid.indexer.maxNumInstancesToProvision")
@Default("1")
public abstract int getMaxNumInstancesToProvision();
@Config("druid.indexer.userDataFile")
public abstract String getUserDataFile();
} }

View File

@ -37,15 +37,8 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
@Default("2012-01-01T00:55:00.000Z") @Default("2012-01-01T00:55:00.000Z")
public abstract DateTime getTerminateResourcesOriginDateTime(); public abstract DateTime getTerminateResourcesOriginDateTime();
@Config("druid.indexer.minWorkerVersion")
public abstract String getMinWorkerVersion();
@Config("druid.indexer.minNumWorkers")
@Default("1")
public abstract int getMinNumWorkers();
@Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion") @Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion")
@Default("1") @Default("10000")
public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion(); public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion();
@Config("druid.indexer.maxScalingDuration") @Config("druid.indexer.maxScalingDuration")

View File

@ -0,0 +1,39 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.config;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class WorkerSetupManagerConfig
{
@Config("druid.indexer.configTable")
public abstract String getConfigTable();
@Config("druid.indexer.workerSetupConfigName")
public abstract String getWorkerSetupConfigName();
@Config("druid.indexer.poll.duration")
@Default("PT1M")
public abstract Duration getPollDuration();
}

View File

@ -47,6 +47,9 @@ import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
@ -66,12 +69,11 @@ import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig; import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.loading.S3SegmentPusher; import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters; import com.metamx.emitter.core.Emitters;
@ -98,6 +100,7 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.FilterHolder; import org.mortbay.jetty.servlet.FilterHolder;
import org.mortbay.jetty.servlet.ServletHolder; import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory; import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import java.net.URL; import java.net.URL;
import java.util.Arrays; import java.util.Arrays;
@ -133,6 +136,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
private CuratorFramework curatorFramework = null; private CuratorFramework curatorFramework = null;
private ScheduledExecutorFactory scheduledExecutorFactory = null; private ScheduledExecutorFactory scheduledExecutorFactory = null;
private IndexerZkConfig indexerZkConfig; private IndexerZkConfig indexerZkConfig;
private WorkerSetupManager workerSetupManager = null;
private TaskRunnerFactory taskRunnerFactory = null; private TaskRunnerFactory taskRunnerFactory = null;
private TaskMaster taskMaster = null; private TaskMaster taskMaster = null;
private Server server = null; private Server server = null;
@ -160,14 +164,16 @@ public class IndexerCoordinatorNode extends RegisteringNode
return this; return this;
} }
public void setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator) public IndexerCoordinatorNode setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator)
{ {
this.mergerDBCoordinator = mergerDBCoordinator; this.mergerDBCoordinator = mergerDBCoordinator;
return this;
} }
public void setTaskQueue(TaskQueue taskQueue) public IndexerCoordinatorNode setTaskQueue(TaskQueue taskQueue)
{ {
this.taskQueue = taskQueue; this.taskQueue = taskQueue;
return this;
} }
public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator) public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator)
@ -182,9 +188,16 @@ public class IndexerCoordinatorNode extends RegisteringNode
return this; return this;
} }
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) public IndexerCoordinatorNode setWorkerSetupManager(WorkerSetupManager workerSetupManager)
{
this.workerSetupManager = workerSetupManager;
return this;
}
public IndexerCoordinatorNode setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)
{ {
this.taskRunnerFactory = taskRunnerFactory; this.taskRunnerFactory = taskRunnerFactory;
return this;
} }
public void init() throws Exception public void init() throws Exception
@ -202,6 +215,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
initializeJacksonSubtypes(); initializeJacksonSubtypes();
initializeCurator(); initializeCurator();
initializeIndexerZkConfig(); initializeIndexerZkConfig();
initializeWorkerSetupManager();
initializeTaskRunnerFactory(); initializeTaskRunnerFactory();
initializeTaskMaster(); initializeTaskMaster();
initializeServer(); initializeServer();
@ -220,7 +234,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
jsonMapper, jsonMapper,
config, config,
emitter, emitter,
taskQueue taskQueue,
workerSetupManager
) )
); );
@ -447,6 +462,27 @@ public class IndexerCoordinatorNode extends RegisteringNode
} }
} }
public void initializeWorkerSetupManager()
{
if (workerSetupManager == null) {
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
final DBI dbi = new DbConnector(dbConnectorConfig).getDBI();
final WorkerSetupManagerConfig workerSetupManagerConfig = configFactory.build(WorkerSetupManagerConfig.class);
DbConnector.createConfigTable(dbi, workerSetupManagerConfig.getConfigTable());
workerSetupManager = new WorkerSetupManager(
dbi, Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("WorkerSetupManagerExec--%d")
.build()
), jsonMapper, workerSetupManagerConfig
);
}
lifecycle.addManagedInstance(workerSetupManager);
}
public void initializeTaskRunnerFactory() public void initializeTaskRunnerFactory()
{ {
if (taskRunnerFactory == null) { if (taskRunnerFactory == null) {
@ -476,7 +512,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
PropUtils.getProperty(props, "com.metamx.aws.secretKey") PropUtils.getProperty(props, "com.metamx.aws.secretKey")
) )
), ),
configFactory.build(EC2AutoScalingStrategyConfig.class) configFactory.build(EC2AutoScalingStrategyConfig.class),
workerSetupManager
); );
} else if (config.getStrategyImpl().equalsIgnoreCase("noop")) { } else if (config.getStrategyImpl().equalsIgnoreCase("noop")) {
strategy = new NoopScalingStrategy(); strategy = new NoopScalingStrategy();
@ -491,7 +528,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true), new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true),
retryScheduledExec, retryScheduledExec,
new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)), new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)),
strategy strategy,
workerSetupManager
); );
} }
}; };

View File

@ -28,6 +28,8 @@ import com.metamx.druid.merger.common.task.MergeTask;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
@ -48,18 +50,21 @@ public class IndexerCoordinatorResource
private final IndexerCoordinatorConfig config; private final IndexerCoordinatorConfig config;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final TaskQueue tasks; private final TaskQueue tasks;
private final WorkerSetupManager workerSetupManager;
@Inject @Inject
public IndexerCoordinatorResource( public IndexerCoordinatorResource(
IndexerCoordinatorConfig config, IndexerCoordinatorConfig config,
ServiceEmitter emitter, ServiceEmitter emitter,
TaskQueue tasks TaskQueue tasks,
WorkerSetupManager workerSetupManager
) throws Exception ) throws Exception
{ {
this.config = config; this.config = config;
this.emitter = emitter; this.emitter = emitter;
this.tasks = tasks; this.tasks = tasks;
this.workerSetupManager = workerSetupManager;
} }
@POST @POST
@ -115,4 +120,25 @@ public class IndexerCoordinatorResource
{ {
return Response.ok(ImmutableMap.of("task", taskid)).build(); return Response.ok(ImmutableMap.of("task", taskid)).build();
} }
@GET
@Path("/worker/setup")
@Produces("application/json")
public Response getWorkerSetupData()
{
return Response.ok(workerSetupManager.getWorkerSetupData()).build();
}
@POST
@Path("/worker/setup")
@Consumes("application/json")
public Response setWorkerSetupData(
final WorkerSetupData workerSetupData
)
{
if (!workerSetupManager.setWorkerSetupData(workerSetupData)) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
return Response.ok().build();
}
} }

View File

@ -22,6 +22,7 @@ package com.metamx.druid.merger.coordinator.http;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.sun.jersey.guice.JerseyServletModule; import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
@ -38,18 +39,21 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
private final IndexerCoordinatorConfig indexerCoordinatorConfig; private final IndexerCoordinatorConfig indexerCoordinatorConfig;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final TaskQueue tasks; private final TaskQueue tasks;
private final WorkerSetupManager workerSetupManager;
public IndexerCoordinatorServletModule( public IndexerCoordinatorServletModule(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
IndexerCoordinatorConfig indexerCoordinatorConfig, IndexerCoordinatorConfig indexerCoordinatorConfig,
ServiceEmitter emitter, ServiceEmitter emitter,
TaskQueue tasks TaskQueue tasks,
WorkerSetupManager workerSetupManager
) )
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.indexerCoordinatorConfig = indexerCoordinatorConfig; this.indexerCoordinatorConfig = indexerCoordinatorConfig;
this.emitter = emitter; this.emitter = emitter;
this.tasks = tasks; this.tasks = tasks;
this.workerSetupManager = workerSetupManager;
} }
@Override @Override
@ -60,6 +64,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig); bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig);
bind(ServiceEmitter.class).toInstance(emitter); bind(ServiceEmitter.class).toInstance(emitter);
bind(TaskQueue.class).toInstance(tasks); bind(TaskQueue.class).toInstance(tasks);
bind(WorkerSetupManager.class).toInstance(workerSetupManager);
serve("/*").with(GuiceContainer.class); serve("/*").with(GuiceContainer.class);
} }

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling; package com.metamx.druid.merger.coordinator.scaling;
import java.util.List; import java.util.List;

View File

@ -24,7 +24,6 @@ import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
import com.amazonaws.services.ec2.model.DescribeInstancesResult; import com.amazonaws.services.ec2.model.DescribeInstancesResult;
import com.amazonaws.services.ec2.model.Filter; import com.amazonaws.services.ec2.model.Filter;
import com.amazonaws.services.ec2.model.Instance; import com.amazonaws.services.ec2.model.Instance;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.ec2.model.Reservation; import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.RunInstancesResult;
@ -32,11 +31,14 @@ import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.setup.EC2NodeData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.apache.commons.codec.binary.Base64;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.File;
import java.util.List; import java.util.List;
/** /**
@ -48,31 +50,45 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final AmazonEC2Client amazonEC2Client; private final AmazonEC2Client amazonEC2Client;
private final EC2AutoScalingStrategyConfig config; private final EC2AutoScalingStrategyConfig config;
private final WorkerSetupManager workerSetupManager;
public EC2AutoScalingStrategy( public EC2AutoScalingStrategy(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
AmazonEC2Client amazonEC2Client, AmazonEC2Client amazonEC2Client,
EC2AutoScalingStrategyConfig config EC2AutoScalingStrategyConfig config,
WorkerSetupManager workerSetupManager
) )
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.amazonEC2Client = amazonEC2Client; this.amazonEC2Client = amazonEC2Client;
this.config = config; this.config = config;
this.workerSetupManager = workerSetupManager;
} }
@Override @Override
public AutoScalingData<Instance> provision() public AutoScalingData<Instance> provision()
{ {
try { try {
WorkerSetupData setupData = workerSetupManager.getWorkerSetupData();
EC2NodeData workerConfig = setupData.getNodeData();
log.info("Creating new instance(s)..."); log.info("Creating new instance(s)...");
RunInstancesResult result = amazonEC2Client.runInstances( RunInstancesResult result = amazonEC2Client.runInstances(
new RunInstancesRequest( new RunInstancesRequest(
config.getAmiId(), workerConfig.getAmiId(),
config.getMinNumInstancesToProvision(), workerConfig.getMinInstances(),
config.getMaxNumInstancesToProvision() workerConfig.getMaxInstances()
) )
.withInstanceType(InstanceType.fromValue(config.getInstanceType())) .withInstanceType(workerConfig.getInstanceType())
.withUserData(jsonMapper.writeValueAsString(new File(config.getUserDataFile()))) .withSecurityGroupIds(workerConfig.getSecurityGroupIds())
.withKeyName(workerConfig.getKeyName())
.withUserData(
Base64.encodeBase64String(
jsonMapper.writeValueAsBytes(
setupData.getUserData()
)
)
)
); );
List<String> instanceIds = Lists.transform( List<String> instanceIds = Lists.transform(
@ -80,7 +96,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
new Function<Instance, String>() new Function<Instance, String>()
{ {
@Override @Override
public String apply(@Nullable Instance input) public String apply(Instance input)
{ {
return input.getInstanceId(); return input.getInstanceId();
} }
@ -95,9 +111,9 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
new Function<Instance, String>() new Function<Instance, String>()
{ {
@Override @Override
public String apply(@Nullable Instance input) public String apply(Instance input)
{ {
return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort()); return input.getInstanceId();
} }
} }
), ),
@ -112,12 +128,12 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
} }
@Override @Override
public AutoScalingData<Instance> terminate(List<String> nodeIds) public AutoScalingData<Instance> terminate(List<String> ids)
{ {
DescribeInstancesResult result = amazonEC2Client.describeInstances( DescribeInstancesResult result = amazonEC2Client.describeInstances(
new DescribeInstancesRequest() new DescribeInstancesRequest()
.withFilters( .withFilters(
new Filter("private-ip-address", nodeIds) new Filter("private-ip-address", ids)
) )
); );
@ -135,7 +151,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
new Function<Instance, String>() new Function<Instance, String>()
{ {
@Override @Override
public String apply(@Nullable Instance input) public String apply(Instance input)
{ {
return input.getInstanceId(); return input.getInstanceId();
} }
@ -146,13 +162,13 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
return new AutoScalingData<Instance>( return new AutoScalingData<Instance>(
Lists.transform( Lists.transform(
instances, ids,
new Function<Instance, String>() new Function<String, String>()
{ {
@Override @Override
public String apply(@Nullable Instance input) public String apply(@Nullable String input)
{ {
return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort()); return String.format("%s:%s", input, config.getWorkerPort());
} }
} }
), ),
@ -165,4 +181,36 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
return null; return null;
} }
@Override
public List<String> ipLookup(List<String> ips)
{
DescribeInstancesResult result = amazonEC2Client.describeInstances(
new DescribeInstancesRequest()
.withFilters(
new Filter("private-ip-address", ips)
)
);
List<Instance> instances = Lists.newArrayList();
for (Reservation reservation : result.getReservations()) {
instances.addAll(reservation.getInstances());
}
List<String> retVal = Lists.transform(
instances,
new Function<Instance, String>()
{
@Override
public String apply(Instance input)
{
return input.getInstanceId();
}
}
);
log.info("Performing lookup: %s --> %s", ips, retVal);
return retVal;
}
} }

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling; package com.metamx.druid.merger.coordinator.scaling;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
@ -24,4 +43,11 @@ public class NoopScalingStrategy implements ScalingStrategy<String>
log.info("If I were a real strategy I'd terminate %s now", nodeIds); log.info("If I were a real strategy I'd terminate %s now", nodeIds);
return null; return null;
} }
@Override
public List<String> ipLookup(List<String> ips)
{
log.info("I'm not a real strategy so I'm returning what I got %s", ips);
return ips;
}
} }

View File

@ -27,5 +27,12 @@ public interface ScalingStrategy<T>
{ {
public AutoScalingData<T> provision(); public AutoScalingData<T> provision();
public AutoScalingData<T> terminate(List<String> nodeIds); public AutoScalingData<T> terminate(List<String> ids);
/**
* Provides a lookup of ip addresses to node ids
* @param ips
* @return
*/
public List<String> ipLookup(List<String> ips);
} }

View File

@ -0,0 +1,91 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.setup;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.List;
/**
*/
public class EC2NodeData
{
private final String amiId;
private final String instanceType;
private final int minInstances;
private final int maxInstances;
private final List<String> securityGroupIds;
private final String keyName;
@JsonCreator
public EC2NodeData(
@JsonProperty("amiId") String amiId,
@JsonProperty("instanceType") String instanceType,
@JsonProperty("minInstances") int minInstances,
@JsonProperty("maxInstances") int maxInstances,
@JsonProperty("securityGroupIds") List<String> securityGroupIds,
@JsonProperty("keyName") String keyName
)
{
this.amiId = amiId;
this.instanceType = instanceType;
this.minInstances = minInstances;
this.maxInstances = maxInstances;
this.securityGroupIds = securityGroupIds;
this.keyName = keyName;
}
@JsonProperty
public String getAmiId()
{
return amiId;
}
@JsonProperty
public String getInstanceType()
{
return instanceType;
}
@JsonProperty
public int getMinInstances()
{
return minInstances;
}
@JsonProperty
public int getMaxInstances()
{
return maxInstances;
}
@JsonProperty
public List<String> getSecurityGroupIds()
{
return securityGroupIds;
}
@JsonProperty
public String getKeyName()
{
return keyName;
}
}

View File

@ -0,0 +1,62 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.setup;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
/**
*/
public class GalaxyUserData
{
public final String env;
public final String version;
public final String type;
@JsonCreator
public GalaxyUserData(
@JsonProperty("env") String env,
@JsonProperty("version") String version,
@JsonProperty("type") String type
)
{
this.env = env;
this.version = version;
this.type = type;
}
@JsonProperty
public String getEnv()
{
return env;
}
@JsonProperty
public String getVersion()
{
return version;
}
@JsonProperty
public String getType()
{
return type;
}
}

View File

@ -0,0 +1,73 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.setup;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.List;
/**
*/
public class WorkerSetupData
{
private final String minVersion;
private final int minNumWorkers;
private final EC2NodeData nodeData;
private final GalaxyUserData userData;
@JsonCreator
public WorkerSetupData(
@JsonProperty("minVersion") String minVersion,
@JsonProperty("minNumWorkers") int minNumWorkers,
@JsonProperty("nodeData") EC2NodeData nodeData,
@JsonProperty("userData") GalaxyUserData userData
)
{
this.minVersion = minVersion;
this.minNumWorkers = minNumWorkers;
this.nodeData = nodeData;
this.userData = userData;
}
@JsonProperty
public String getMinVersion()
{
return minVersion;
}
@JsonProperty
public int getMinNumWorkers()
{
return minNumWorkers;
}
@JsonProperty
public EC2NodeData getNodeData()
{
return nodeData;
}
@JsonProperty
public GalaxyUserData getUserData()
{
return userData;
}
}

View File

@ -0,0 +1,226 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.setup;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
import org.apache.commons.collections.MapUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.Duration;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class WorkerSetupManager
{
private static final Logger log = new Logger(WorkerSetupManager.class);
private final DBI dbi;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService exec;
private final WorkerSetupManagerConfig config;
private final Object lock = new Object();
private volatile AtomicReference<WorkerSetupData> workerSetupData = new AtomicReference<WorkerSetupData>(null);
private volatile boolean started = false;
public WorkerSetupManager(
DBI dbi,
ScheduledExecutorService exec,
ObjectMapper jsonMapper,
WorkerSetupManagerConfig config
)
{
this.dbi = dbi;
this.exec = exec;
this.jsonMapper = jsonMapper;
this.config = config;
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(0),
config.getPollDuration(),
new Runnable()
{
@Override
public void run()
{
poll();
}
}
);
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
started = false;
}
}
public void poll()
{
try {
List<WorkerSetupData> setupDataList = dbi.withHandle(
new HandleCallback<List<WorkerSetupData>>()
{
@Override
public List<WorkerSetupData> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE name = :name",
config.getConfigTable()
)
)
.bind("name", config.getWorkerSetupConfigName())
.fold(
Lists.<WorkerSetupData>newArrayList(),
new Folder3<ArrayList<WorkerSetupData>, Map<String, Object>>()
{
@Override
public ArrayList<WorkerSetupData> fold(
ArrayList<WorkerSetupData> workerNodeConfigurations,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
// stringObjectMap lowercases and jackson may fail serde
workerNodeConfigurations.add(
jsonMapper.readValue(
MapUtils.getString(stringObjectMap, "payload"),
WorkerSetupData.class
)
);
return workerNodeConfigurations;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
);
if (setupDataList.isEmpty()) {
throw new ISE("WTF?! No configuration found for worker nodes!");
} else if (setupDataList.size() != 1) {
throw new ISE("WTF?! Found more than one configuration for worker nodes");
}
workerSetupData.set(setupDataList.get(0));
}
catch (Exception e) {
log.error(e, "Exception while polling for worker setup data!");
}
}
@SuppressWarnings("unchecked")
public WorkerSetupData getWorkerSetupData()
{
synchronized (lock) {
if (!started) {
throw new ISE("Must start WorkerSetupManager first!");
}
return workerSetupData.get();
}
}
public boolean setWorkerSetupData(final WorkerSetupData value)
{
synchronized (lock) {
try {
if (!started) {
throw new ISE("Must start WorkerSetupManager first!");
}
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
config.getConfigTable()
)
)
.bind("name", config.getWorkerSetupConfigName())
.bind("payload", jsonMapper.writeValueAsString(value))
.execute();
return null;
}
}
);
workerSetupData.set(value);
}
catch (Exception e) {
log.error(e, "Exception updating worker config");
return false;
}
}
return true;
}
}

View File

@ -17,6 +17,8 @@ import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.merger.worker.TaskMonitor; import com.metamx.druid.merger.worker.TaskMonitor;
import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.Worker;
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator; import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
@ -62,6 +64,7 @@ public class RemoteTaskRunnerTest
private PathChildrenCache pathChildrenCache; private PathChildrenCache pathChildrenCache;
private RemoteTaskRunner remoteTaskRunner; private RemoteTaskRunner remoteTaskRunner;
private TaskMonitor taskMonitor; private TaskMonitor taskMonitor;
private WorkerSetupManager workerSetupManager;
private ScheduledExecutorService scheduledExec; private ScheduledExecutorService scheduledExec;
@ -69,7 +72,6 @@ public class RemoteTaskRunnerTest
private Worker worker1; private Worker worker1;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
@ -141,9 +143,10 @@ public class RemoteTaskRunnerTest
{ {
remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null); remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null);
try { try {
remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null); remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null);
fail("ISE expected"); fail("ISE expected");
} catch (ISE expected) { }
catch (ISE expected) {
} }
} }
@ -333,6 +336,17 @@ public class RemoteTaskRunnerTest
private void makeRemoteTaskRunner() throws Exception private void makeRemoteTaskRunner() throws Exception
{ {
scheduledExec = EasyMock.createMock(ScheduledExecutorService.class); scheduledExec = EasyMock.createMock(ScheduledExecutorService.class);
workerSetupManager = EasyMock.createMock(WorkerSetupManager.class);
EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn(
new WorkerSetupData(
"0",
0,
null,
null
)
);
EasyMock.replay(workerSetupManager);
remoteTaskRunner = new RemoteTaskRunner( remoteTaskRunner = new RemoteTaskRunner(
jsonMapper, jsonMapper,
@ -341,7 +355,8 @@ public class RemoteTaskRunnerTest
pathChildrenCache, pathChildrenCache,
scheduledExec, scheduledExec,
new RetryPolicyFactory(new TestRetryPolicyConfig()), new RetryPolicyFactory(new TestRetryPolicyConfig()),
new TestScalingStrategy() new TestScalingStrategy(),
workerSetupManager
); );
// Create a single worker and wait for things for be ready // Create a single worker and wait for things for be ready
@ -389,6 +404,12 @@ public class RemoteTaskRunnerTest
{ {
return null; return null;
} }
@Override
public List<String> ipLookup(List<String> ips)
{
return ips;
}
} }
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
@ -405,18 +426,6 @@ public class RemoteTaskRunnerTest
return null; return null;
} }
@Override
public String getMinWorkerVersion()
{
return "0";
}
@Override
public int getMinNumWorkers()
{
return 0;
}
@Override @Override
public int getMaxWorkerIdleTimeMillisBeforeDeletion() public int getMaxWorkerIdleTimeMillisBeforeDeletion()
{ {

View File

@ -27,8 +27,13 @@ import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.collect.Lists;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.setup.EC2NodeData;
import com.metamx.druid.merger.coordinator.setup.GalaxyUserData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -52,6 +57,7 @@ public class EC2AutoScalingStrategyTest
private Reservation reservation; private Reservation reservation;
private Instance instance; private Instance instance;
private EC2AutoScalingStrategy strategy; private EC2AutoScalingStrategy strategy;
private WorkerSetupManager workerSetupManager;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
@ -60,6 +66,7 @@ public class EC2AutoScalingStrategyTest
runInstancesResult = EasyMock.createMock(RunInstancesResult.class); runInstancesResult = EasyMock.createMock(RunInstancesResult.class);
describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class); describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class);
reservation = EasyMock.createMock(Reservation.class); reservation = EasyMock.createMock(Reservation.class);
workerSetupManager = EasyMock.createMock(WorkerSetupManager.class);
instance = new Instance() instance = new Instance()
.withInstanceId(INSTANCE_ID) .withInstanceId(INSTANCE_ID)
@ -69,44 +76,16 @@ public class EC2AutoScalingStrategyTest
strategy = new EC2AutoScalingStrategy( strategy = new EC2AutoScalingStrategy(
new DefaultObjectMapper(), new DefaultObjectMapper(),
amazonEC2Client, new EC2AutoScalingStrategyConfig() amazonEC2Client,
{ new EC2AutoScalingStrategyConfig()
@Override {
public String getAmiId() @Override
{ public String getWorkerPort()
return AMI_ID; {
} return "8080";
}
@Override },
public String getWorkerPort() workerSetupManager
{
return "8080";
}
@Override
public String getInstanceType()
{
return "t1.micro";
}
@Override
public int getMinNumInstancesToProvision()
{
return 1;
}
@Override
public int getMaxNumInstancesToProvision()
{
return 1;
}
@Override
public String getUserDataFile()
{
return "";
}
}
); );
} }
@ -117,11 +96,22 @@ public class EC2AutoScalingStrategyTest
EasyMock.verify(runInstancesResult); EasyMock.verify(runInstancesResult);
EasyMock.verify(describeInstancesResult); EasyMock.verify(describeInstancesResult);
EasyMock.verify(reservation); EasyMock.verify(reservation);
EasyMock.verify(workerSetupManager);
} }
@Test @Test
public void testScale() public void testScale()
{ {
EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn(
new WorkerSetupData(
"0",
0,
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo"),
new GalaxyUserData("env", "version", "type")
)
);
EasyMock.replay(workerSetupManager);
EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn( EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn(
runInstancesResult runInstancesResult
); );
@ -144,9 +134,9 @@ public class EC2AutoScalingStrategyTest
Assert.assertEquals(created.getNodeIds().size(), 1); Assert.assertEquals(created.getNodeIds().size(), 1);
Assert.assertEquals(created.getNodes().size(), 1); Assert.assertEquals(created.getNodes().size(), 1);
Assert.assertEquals(String.format("%s:8080", IP), created.getNodeIds().get(0)); Assert.assertEquals("theInstance", created.getNodeIds().get(0));
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyHost")); AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP"));
Assert.assertEquals(deleted.getNodeIds().size(), 1); Assert.assertEquals(deleted.getNodeIds().size(), 1);
Assert.assertEquals(deleted.getNodes().size(), 1); Assert.assertEquals(deleted.getNodes().size(), 1);

View File

@ -84,7 +84,7 @@
<dependency> <dependency>
<groupId>commons-codec</groupId> <groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId> <artifactId>commons-codec</artifactId>
<version>1.3</version> <version>1.7</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-httpclient</groupId> <groupId>commons-httpclient</groupId>