mirror of https://github.com/apache/druid.git
Merge pull request #53 from metamx/autoscaling
Use a database to store configurations for indexer workers
This commit is contained in:
commit
07131c51ed
|
@ -59,6 +59,18 @@ public class DbConnector
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void createConfigTable(final DBI dbi, final String configTableName)
|
||||||
|
{
|
||||||
|
createTable(
|
||||||
|
dbi,
|
||||||
|
configTableName,
|
||||||
|
String.format(
|
||||||
|
"CREATE table %s (name VARCHAR(255) NOT NULL, payload LONGTEXT NOT NULL, PRIMARY KEY(name))",
|
||||||
|
configTableName
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
public static void createTable(
|
public static void createTable(
|
||||||
final DBI dbi,
|
final DBI dbi,
|
||||||
final String tableName,
|
final String tableName,
|
||||||
|
|
|
@ -39,6 +39,7 @@ import com.metamx.druid.merger.common.task.Task;
|
||||||
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
|
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
|
||||||
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
|
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
|
||||||
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
|
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
|
||||||
import com.metamx.druid.merger.worker.Worker;
|
import com.metamx.druid.merger.worker.Worker;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import com.netflix.curator.framework.CuratorFramework;
|
import com.netflix.curator.framework.CuratorFramework;
|
||||||
|
@ -52,7 +53,7 @@ import org.joda.time.DateTime;
|
||||||
import org.joda.time.Duration;
|
import org.joda.time.Duration;
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import java.util.Arrays;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -88,6 +89,7 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
private final ScheduledExecutorService scheduledExec;
|
private final ScheduledExecutorService scheduledExec;
|
||||||
private final RetryPolicyFactory retryPolicyFactory;
|
private final RetryPolicyFactory retryPolicyFactory;
|
||||||
private final ScalingStrategy strategy;
|
private final ScalingStrategy strategy;
|
||||||
|
private final WorkerSetupManager workerSetupManager;
|
||||||
|
|
||||||
// all workers that exist in ZK
|
// all workers that exist in ZK
|
||||||
private final Map<String, WorkerWrapper> zkWorkers = new ConcurrentHashMap<String, WorkerWrapper>();
|
private final Map<String, WorkerWrapper> zkWorkers = new ConcurrentHashMap<String, WorkerWrapper>();
|
||||||
|
@ -109,7 +111,8 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
PathChildrenCache workerPathCache,
|
PathChildrenCache workerPathCache,
|
||||||
ScheduledExecutorService scheduledExec,
|
ScheduledExecutorService scheduledExec,
|
||||||
RetryPolicyFactory retryPolicyFactory,
|
RetryPolicyFactory retryPolicyFactory,
|
||||||
ScalingStrategy strategy
|
ScalingStrategy strategy,
|
||||||
|
WorkerSetupManager workerSetupManager
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.jsonMapper = jsonMapper;
|
this.jsonMapper = jsonMapper;
|
||||||
|
@ -119,6 +122,7 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
this.scheduledExec = scheduledExec;
|
this.scheduledExec = scheduledExec;
|
||||||
this.retryPolicyFactory = retryPolicyFactory;
|
this.retryPolicyFactory = retryPolicyFactory;
|
||||||
this.strategy = strategy;
|
this.strategy = strategy;
|
||||||
|
this.workerSetupManager = workerSetupManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@LifecycleStart
|
@LifecycleStart
|
||||||
|
@ -144,7 +148,7 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
Worker.class
|
Worker.class
|
||||||
);
|
);
|
||||||
log.info("Worker[%s] removed!", worker.getHost());
|
log.info("Worker[%s] removed!", worker.getHost());
|
||||||
removeWorker(worker.getHost());
|
removeWorker(worker);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -169,7 +173,7 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
public void run()
|
public void run()
|
||||||
{
|
{
|
||||||
if (currentlyTerminating.isEmpty()) {
|
if (currentlyTerminating.isEmpty()) {
|
||||||
if (zkWorkers.size() <= config.getMinNumWorkers()) {
|
if (zkWorkers.size() <= workerSetupManager.getWorkerSetupData().getMinNumWorkers()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -180,7 +184,7 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
new Predicate<WorkerWrapper>()
|
new Predicate<WorkerWrapper>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(@Nullable WorkerWrapper input)
|
public boolean apply(WorkerWrapper input)
|
||||||
{
|
{
|
||||||
return input.getRunningTasks().isEmpty()
|
return input.getRunningTasks().isEmpty()
|
||||||
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
|
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
|
||||||
|
@ -196,9 +200,9 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
new Function<WorkerWrapper, String>()
|
new Function<WorkerWrapper, String>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public String apply(@Nullable WorkerWrapper input)
|
public String apply(WorkerWrapper input)
|
||||||
{
|
{
|
||||||
return input.getWorker().getHost();
|
return input.getWorker().getIp();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -218,7 +222,7 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
"[%s] still terminating. Wait for all nodes to terminate before trying again.",
|
"%s still terminating. Wait for all nodes to terminate before trying again.",
|
||||||
currentlyTerminating
|
currentlyTerminating
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -368,7 +372,7 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
private void addWorker(final Worker worker)
|
private void addWorker(final Worker worker)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
currentlyProvisioning.remove(worker.getHost());
|
currentlyProvisioning.removeAll(strategy.ipLookup(Arrays.<String>asList(worker.getIp())));
|
||||||
|
|
||||||
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
|
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
|
||||||
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
|
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
|
||||||
|
@ -388,8 +392,7 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
synchronized (statusLock) {
|
synchronized (statusLock) {
|
||||||
try {
|
try {
|
||||||
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) ||
|
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) ||
|
||||||
event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED))
|
event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
|
||||||
{
|
|
||||||
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
|
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
|
||||||
final TaskStatus taskStatus;
|
final TaskStatus taskStatus;
|
||||||
|
|
||||||
|
@ -399,7 +402,7 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
event.getData().getData(), TaskStatus.class
|
event.getData().getData(), TaskStatus.class
|
||||||
);
|
);
|
||||||
|
|
||||||
if(!taskStatus.getId().equals(taskId)) {
|
if (!taskStatus.getId().equals(taskId)) {
|
||||||
// Sanity check
|
// Sanity check
|
||||||
throw new ISE(
|
throw new ISE(
|
||||||
"Worker[%s] status id does not match payload id: %s != %s",
|
"Worker[%s] status id does not match payload id: %s != %s",
|
||||||
|
@ -408,7 +411,8 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
taskStatus.getId()
|
taskStatus.getId()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
}
|
||||||
|
catch (Exception e) {
|
||||||
log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId);
|
log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId);
|
||||||
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
|
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
|
@ -446,7 +450,8 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch(Exception e) {
|
}
|
||||||
|
catch (Exception e) {
|
||||||
log.makeAlert(e, "Failed to handle new worker status")
|
log.makeAlert(e, "Failed to handle new worker status")
|
||||||
.addData("worker", worker.getHost())
|
.addData("worker", worker.getHost())
|
||||||
.addData("znode", event.getData().getPath())
|
.addData("znode", event.getData().getPath())
|
||||||
|
@ -478,22 +483,22 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
* When a ephemeral worker node disappears from ZK, we have to make sure there are no tasks still assigned
|
* When a ephemeral worker node disappears from ZK, we have to make sure there are no tasks still assigned
|
||||||
* to the worker. If tasks remain, they are retried.
|
* to the worker. If tasks remain, they are retried.
|
||||||
*
|
*
|
||||||
* @param workerId - id of the removed worker
|
* @param worker - the removed worker
|
||||||
*/
|
*/
|
||||||
private void removeWorker(final String workerId)
|
private void removeWorker(final Worker worker)
|
||||||
{
|
{
|
||||||
currentlyTerminating.remove(workerId);
|
currentlyTerminating.remove(worker.getHost());
|
||||||
|
|
||||||
WorkerWrapper workerWrapper = zkWorkers.get(workerId);
|
WorkerWrapper workerWrapper = zkWorkers.get(worker.getHost());
|
||||||
if (workerWrapper != null) {
|
if (workerWrapper != null) {
|
||||||
try {
|
try {
|
||||||
Set<String> tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks());
|
Set<String> tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks());
|
||||||
tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), workerId)));
|
tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), worker.getHost())));
|
||||||
|
|
||||||
for (String taskId : tasksToRetry) {
|
for (String taskId : tasksToRetry) {
|
||||||
TaskWrapper taskWrapper = tasks.get(taskId);
|
TaskWrapper taskWrapper = tasks.get(taskId);
|
||||||
if (taskWrapper != null) {
|
if (taskWrapper != null) {
|
||||||
retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId));
|
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -503,7 +508,7 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
log.error(e, "Failed to cleanly remove worker[%s]");
|
log.error(e, "Failed to cleanly remove worker[%s]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
zkWorkers.remove(workerId);
|
zkWorkers.remove(worker.getHost());
|
||||||
}
|
}
|
||||||
|
|
||||||
private WorkerWrapper findWorkerForTask()
|
private WorkerWrapper findWorkerForTask()
|
||||||
|
@ -526,7 +531,9 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
public boolean apply(WorkerWrapper input)
|
public boolean apply(WorkerWrapper input)
|
||||||
{
|
{
|
||||||
return (!input.isAtCapacity() &&
|
return (!input.isAtCapacity() &&
|
||||||
input.getWorker().getVersion().compareTo(config.getMinWorkerVersion()) >= 0);
|
input.getWorker()
|
||||||
|
.getVersion()
|
||||||
|
.compareTo(workerSetupManager.getWorkerSetupData().getMinVersion()) >= 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -551,7 +558,7 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
"[%s] still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
|
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
|
||||||
currentlyProvisioning
|
currentlyProvisioning
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,24 +26,7 @@ import org.skife.config.Default;
|
||||||
*/
|
*/
|
||||||
public abstract class EC2AutoScalingStrategyConfig
|
public abstract class EC2AutoScalingStrategyConfig
|
||||||
{
|
{
|
||||||
@Config("druid.indexer.amiId")
|
|
||||||
public abstract String getAmiId();
|
|
||||||
|
|
||||||
@Config("druid.indexer.worker.port")
|
@Config("druid.indexer.worker.port")
|
||||||
@Default("8080")
|
@Default("8080")
|
||||||
public abstract String getWorkerPort();
|
public abstract String getWorkerPort();
|
||||||
|
|
||||||
@Config("druid.indexer.instanceType")
|
|
||||||
public abstract String getInstanceType();
|
|
||||||
|
|
||||||
@Config("druid.indexer.minNumInstancesToProvision")
|
|
||||||
@Default("1")
|
|
||||||
public abstract int getMinNumInstancesToProvision();
|
|
||||||
|
|
||||||
@Config("druid.indexer.maxNumInstancesToProvision")
|
|
||||||
@Default("1")
|
|
||||||
public abstract int getMaxNumInstancesToProvision();
|
|
||||||
|
|
||||||
@Config("druid.indexer.userDataFile")
|
|
||||||
public abstract String getUserDataFile();
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,15 +37,8 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
|
||||||
@Default("2012-01-01T00:55:00.000Z")
|
@Default("2012-01-01T00:55:00.000Z")
|
||||||
public abstract DateTime getTerminateResourcesOriginDateTime();
|
public abstract DateTime getTerminateResourcesOriginDateTime();
|
||||||
|
|
||||||
@Config("druid.indexer.minWorkerVersion")
|
|
||||||
public abstract String getMinWorkerVersion();
|
|
||||||
|
|
||||||
@Config("druid.indexer.minNumWorkers")
|
|
||||||
@Default("1")
|
|
||||||
public abstract int getMinNumWorkers();
|
|
||||||
|
|
||||||
@Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion")
|
@Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion")
|
||||||
@Default("1")
|
@Default("10000")
|
||||||
public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion();
|
public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion();
|
||||||
|
|
||||||
@Config("druid.indexer.maxScalingDuration")
|
@Config("druid.indexer.maxScalingDuration")
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.merger.coordinator.config;
|
||||||
|
|
||||||
|
import org.joda.time.Duration;
|
||||||
|
import org.skife.config.Config;
|
||||||
|
import org.skife.config.Default;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public abstract class WorkerSetupManagerConfig
|
||||||
|
{
|
||||||
|
@Config("druid.indexer.configTable")
|
||||||
|
public abstract String getConfigTable();
|
||||||
|
|
||||||
|
@Config("druid.indexer.workerSetupConfigName")
|
||||||
|
public abstract String getWorkerSetupConfigName();
|
||||||
|
|
||||||
|
@Config("druid.indexer.poll.duration")
|
||||||
|
@Default("PT1M")
|
||||||
|
public abstract Duration getPollDuration();
|
||||||
|
}
|
|
@ -47,6 +47,9 @@ import com.metamx.druid.initialization.Initialization;
|
||||||
import com.metamx.druid.initialization.ServerConfig;
|
import com.metamx.druid.initialization.ServerConfig;
|
||||||
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
||||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||||
|
import com.metamx.druid.loading.S3SegmentPusher;
|
||||||
|
import com.metamx.druid.loading.S3SegmentPusherConfig;
|
||||||
|
import com.metamx.druid.loading.SegmentPusher;
|
||||||
import com.metamx.druid.merger.common.TaskToolbox;
|
import com.metamx.druid.merger.common.TaskToolbox;
|
||||||
import com.metamx.druid.merger.common.config.IndexerZkConfig;
|
import com.metamx.druid.merger.common.config.IndexerZkConfig;
|
||||||
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
|
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
|
||||||
|
@ -66,12 +69,11 @@ import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
|
||||||
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
|
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
|
||||||
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
|
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
|
||||||
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
|
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
|
||||||
|
import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
|
||||||
import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy;
|
import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy;
|
||||||
import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy;
|
import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy;
|
||||||
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
|
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
|
||||||
import com.metamx.druid.loading.S3SegmentPusher;
|
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
|
||||||
import com.metamx.druid.loading.S3SegmentPusherConfig;
|
|
||||||
import com.metamx.druid.loading.SegmentPusher;
|
|
||||||
import com.metamx.druid.utils.PropUtils;
|
import com.metamx.druid.utils.PropUtils;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import com.metamx.emitter.core.Emitters;
|
import com.metamx.emitter.core.Emitters;
|
||||||
|
@ -98,6 +100,7 @@ import org.mortbay.jetty.servlet.DefaultServlet;
|
||||||
import org.mortbay.jetty.servlet.FilterHolder;
|
import org.mortbay.jetty.servlet.FilterHolder;
|
||||||
import org.mortbay.jetty.servlet.ServletHolder;
|
import org.mortbay.jetty.servlet.ServletHolder;
|
||||||
import org.skife.config.ConfigurationObjectFactory;
|
import org.skife.config.ConfigurationObjectFactory;
|
||||||
|
import org.skife.jdbi.v2.DBI;
|
||||||
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -133,6 +136,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
|
||||||
private CuratorFramework curatorFramework = null;
|
private CuratorFramework curatorFramework = null;
|
||||||
private ScheduledExecutorFactory scheduledExecutorFactory = null;
|
private ScheduledExecutorFactory scheduledExecutorFactory = null;
|
||||||
private IndexerZkConfig indexerZkConfig;
|
private IndexerZkConfig indexerZkConfig;
|
||||||
|
private WorkerSetupManager workerSetupManager = null;
|
||||||
private TaskRunnerFactory taskRunnerFactory = null;
|
private TaskRunnerFactory taskRunnerFactory = null;
|
||||||
private TaskMaster taskMaster = null;
|
private TaskMaster taskMaster = null;
|
||||||
private Server server = null;
|
private Server server = null;
|
||||||
|
@ -160,14 +164,16 @@ public class IndexerCoordinatorNode extends RegisteringNode
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator)
|
public IndexerCoordinatorNode setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator)
|
||||||
{
|
{
|
||||||
this.mergerDBCoordinator = mergerDBCoordinator;
|
this.mergerDBCoordinator = mergerDBCoordinator;
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setTaskQueue(TaskQueue taskQueue)
|
public IndexerCoordinatorNode setTaskQueue(TaskQueue taskQueue)
|
||||||
{
|
{
|
||||||
this.taskQueue = taskQueue;
|
this.taskQueue = taskQueue;
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator)
|
public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator)
|
||||||
|
@ -182,9 +188,16 @@ public class IndexerCoordinatorNode extends RegisteringNode
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)
|
public IndexerCoordinatorNode setWorkerSetupManager(WorkerSetupManager workerSetupManager)
|
||||||
|
{
|
||||||
|
this.workerSetupManager = workerSetupManager;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IndexerCoordinatorNode setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)
|
||||||
{
|
{
|
||||||
this.taskRunnerFactory = taskRunnerFactory;
|
this.taskRunnerFactory = taskRunnerFactory;
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void init() throws Exception
|
public void init() throws Exception
|
||||||
|
@ -202,6 +215,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
|
||||||
initializeJacksonSubtypes();
|
initializeJacksonSubtypes();
|
||||||
initializeCurator();
|
initializeCurator();
|
||||||
initializeIndexerZkConfig();
|
initializeIndexerZkConfig();
|
||||||
|
initializeWorkerSetupManager();
|
||||||
initializeTaskRunnerFactory();
|
initializeTaskRunnerFactory();
|
||||||
initializeTaskMaster();
|
initializeTaskMaster();
|
||||||
initializeServer();
|
initializeServer();
|
||||||
|
@ -220,7 +234,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
|
||||||
jsonMapper,
|
jsonMapper,
|
||||||
config,
|
config,
|
||||||
emitter,
|
emitter,
|
||||||
taskQueue
|
taskQueue,
|
||||||
|
workerSetupManager
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -447,6 +462,27 @@ public class IndexerCoordinatorNode extends RegisteringNode
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void initializeWorkerSetupManager()
|
||||||
|
{
|
||||||
|
if (workerSetupManager == null) {
|
||||||
|
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
|
||||||
|
final DBI dbi = new DbConnector(dbConnectorConfig).getDBI();
|
||||||
|
final WorkerSetupManagerConfig workerSetupManagerConfig = configFactory.build(WorkerSetupManagerConfig.class);
|
||||||
|
|
||||||
|
DbConnector.createConfigTable(dbi, workerSetupManagerConfig.getConfigTable());
|
||||||
|
workerSetupManager = new WorkerSetupManager(
|
||||||
|
dbi, Executors.newScheduledThreadPool(
|
||||||
|
1,
|
||||||
|
new ThreadFactoryBuilder()
|
||||||
|
.setDaemon(true)
|
||||||
|
.setNameFormat("WorkerSetupManagerExec--%d")
|
||||||
|
.build()
|
||||||
|
), jsonMapper, workerSetupManagerConfig
|
||||||
|
);
|
||||||
|
}
|
||||||
|
lifecycle.addManagedInstance(workerSetupManager);
|
||||||
|
}
|
||||||
|
|
||||||
public void initializeTaskRunnerFactory()
|
public void initializeTaskRunnerFactory()
|
||||||
{
|
{
|
||||||
if (taskRunnerFactory == null) {
|
if (taskRunnerFactory == null) {
|
||||||
|
@ -476,7 +512,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
|
||||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
configFactory.build(EC2AutoScalingStrategyConfig.class)
|
configFactory.build(EC2AutoScalingStrategyConfig.class),
|
||||||
|
workerSetupManager
|
||||||
);
|
);
|
||||||
} else if (config.getStrategyImpl().equalsIgnoreCase("noop")) {
|
} else if (config.getStrategyImpl().equalsIgnoreCase("noop")) {
|
||||||
strategy = new NoopScalingStrategy();
|
strategy = new NoopScalingStrategy();
|
||||||
|
@ -491,7 +528,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
|
||||||
new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true),
|
new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true),
|
||||||
retryScheduledExec,
|
retryScheduledExec,
|
||||||
new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)),
|
new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)),
|
||||||
strategy
|
strategy,
|
||||||
|
workerSetupManager
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -28,6 +28,8 @@ import com.metamx.druid.merger.common.task.MergeTask;
|
||||||
import com.metamx.druid.merger.common.task.Task;
|
import com.metamx.druid.merger.common.task.Task;
|
||||||
import com.metamx.druid.merger.coordinator.TaskQueue;
|
import com.metamx.druid.merger.coordinator.TaskQueue;
|
||||||
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
|
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
|
||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
|
|
||||||
import javax.ws.rs.Consumes;
|
import javax.ws.rs.Consumes;
|
||||||
|
@ -48,18 +50,21 @@ public class IndexerCoordinatorResource
|
||||||
private final IndexerCoordinatorConfig config;
|
private final IndexerCoordinatorConfig config;
|
||||||
private final ServiceEmitter emitter;
|
private final ServiceEmitter emitter;
|
||||||
private final TaskQueue tasks;
|
private final TaskQueue tasks;
|
||||||
|
private final WorkerSetupManager workerSetupManager;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public IndexerCoordinatorResource(
|
public IndexerCoordinatorResource(
|
||||||
IndexerCoordinatorConfig config,
|
IndexerCoordinatorConfig config,
|
||||||
ServiceEmitter emitter,
|
ServiceEmitter emitter,
|
||||||
TaskQueue tasks
|
TaskQueue tasks,
|
||||||
|
WorkerSetupManager workerSetupManager
|
||||||
|
|
||||||
) throws Exception
|
) throws Exception
|
||||||
{
|
{
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.emitter = emitter;
|
this.emitter = emitter;
|
||||||
this.tasks = tasks;
|
this.tasks = tasks;
|
||||||
|
this.workerSetupManager = workerSetupManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@POST
|
@POST
|
||||||
|
@ -115,4 +120,25 @@ public class IndexerCoordinatorResource
|
||||||
{
|
{
|
||||||
return Response.ok(ImmutableMap.of("task", taskid)).build();
|
return Response.ok(ImmutableMap.of("task", taskid)).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GET
|
||||||
|
@Path("/worker/setup")
|
||||||
|
@Produces("application/json")
|
||||||
|
public Response getWorkerSetupData()
|
||||||
|
{
|
||||||
|
return Response.ok(workerSetupManager.getWorkerSetupData()).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@POST
|
||||||
|
@Path("/worker/setup")
|
||||||
|
@Consumes("application/json")
|
||||||
|
public Response setWorkerSetupData(
|
||||||
|
final WorkerSetupData workerSetupData
|
||||||
|
)
|
||||||
|
{
|
||||||
|
if (!workerSetupManager.setWorkerSetupData(workerSetupData)) {
|
||||||
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
||||||
|
}
|
||||||
|
return Response.ok().build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ package com.metamx.druid.merger.coordinator.http;
|
||||||
import com.google.inject.Provides;
|
import com.google.inject.Provides;
|
||||||
import com.metamx.druid.merger.coordinator.TaskQueue;
|
import com.metamx.druid.merger.coordinator.TaskQueue;
|
||||||
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
|
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
|
||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import com.sun.jersey.guice.JerseyServletModule;
|
import com.sun.jersey.guice.JerseyServletModule;
|
||||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||||
|
@ -38,18 +39,21 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
|
||||||
private final IndexerCoordinatorConfig indexerCoordinatorConfig;
|
private final IndexerCoordinatorConfig indexerCoordinatorConfig;
|
||||||
private final ServiceEmitter emitter;
|
private final ServiceEmitter emitter;
|
||||||
private final TaskQueue tasks;
|
private final TaskQueue tasks;
|
||||||
|
private final WorkerSetupManager workerSetupManager;
|
||||||
|
|
||||||
public IndexerCoordinatorServletModule(
|
public IndexerCoordinatorServletModule(
|
||||||
ObjectMapper jsonMapper,
|
ObjectMapper jsonMapper,
|
||||||
IndexerCoordinatorConfig indexerCoordinatorConfig,
|
IndexerCoordinatorConfig indexerCoordinatorConfig,
|
||||||
ServiceEmitter emitter,
|
ServiceEmitter emitter,
|
||||||
TaskQueue tasks
|
TaskQueue tasks,
|
||||||
|
WorkerSetupManager workerSetupManager
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.jsonMapper = jsonMapper;
|
this.jsonMapper = jsonMapper;
|
||||||
this.indexerCoordinatorConfig = indexerCoordinatorConfig;
|
this.indexerCoordinatorConfig = indexerCoordinatorConfig;
|
||||||
this.emitter = emitter;
|
this.emitter = emitter;
|
||||||
this.tasks = tasks;
|
this.tasks = tasks;
|
||||||
|
this.workerSetupManager = workerSetupManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -60,6 +64,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
|
||||||
bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig);
|
bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig);
|
||||||
bind(ServiceEmitter.class).toInstance(emitter);
|
bind(ServiceEmitter.class).toInstance(emitter);
|
||||||
bind(TaskQueue.class).toInstance(tasks);
|
bind(TaskQueue.class).toInstance(tasks);
|
||||||
|
bind(WorkerSetupManager.class).toInstance(workerSetupManager);
|
||||||
|
|
||||||
serve("/*").with(GuiceContainer.class);
|
serve("/*").with(GuiceContainer.class);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
package com.metamx.druid.merger.coordinator.scaling;
|
package com.metamx.druid.merger.coordinator.scaling;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
|
@ -24,7 +24,6 @@ import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
|
||||||
import com.amazonaws.services.ec2.model.DescribeInstancesResult;
|
import com.amazonaws.services.ec2.model.DescribeInstancesResult;
|
||||||
import com.amazonaws.services.ec2.model.Filter;
|
import com.amazonaws.services.ec2.model.Filter;
|
||||||
import com.amazonaws.services.ec2.model.Instance;
|
import com.amazonaws.services.ec2.model.Instance;
|
||||||
import com.amazonaws.services.ec2.model.InstanceType;
|
|
||||||
import com.amazonaws.services.ec2.model.Reservation;
|
import com.amazonaws.services.ec2.model.Reservation;
|
||||||
import com.amazonaws.services.ec2.model.RunInstancesRequest;
|
import com.amazonaws.services.ec2.model.RunInstancesRequest;
|
||||||
import com.amazonaws.services.ec2.model.RunInstancesResult;
|
import com.amazonaws.services.ec2.model.RunInstancesResult;
|
||||||
|
@ -32,11 +31,14 @@ import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
|
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.EC2NodeData;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.File;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,31 +50,45 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
|
||||||
private final ObjectMapper jsonMapper;
|
private final ObjectMapper jsonMapper;
|
||||||
private final AmazonEC2Client amazonEC2Client;
|
private final AmazonEC2Client amazonEC2Client;
|
||||||
private final EC2AutoScalingStrategyConfig config;
|
private final EC2AutoScalingStrategyConfig config;
|
||||||
|
private final WorkerSetupManager workerSetupManager;
|
||||||
|
|
||||||
public EC2AutoScalingStrategy(
|
public EC2AutoScalingStrategy(
|
||||||
ObjectMapper jsonMapper,
|
ObjectMapper jsonMapper,
|
||||||
AmazonEC2Client amazonEC2Client,
|
AmazonEC2Client amazonEC2Client,
|
||||||
EC2AutoScalingStrategyConfig config
|
EC2AutoScalingStrategyConfig config,
|
||||||
|
WorkerSetupManager workerSetupManager
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.jsonMapper = jsonMapper;
|
this.jsonMapper = jsonMapper;
|
||||||
this.amazonEC2Client = amazonEC2Client;
|
this.amazonEC2Client = amazonEC2Client;
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
this.workerSetupManager = workerSetupManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AutoScalingData<Instance> provision()
|
public AutoScalingData<Instance> provision()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
WorkerSetupData setupData = workerSetupManager.getWorkerSetupData();
|
||||||
|
EC2NodeData workerConfig = setupData.getNodeData();
|
||||||
|
|
||||||
log.info("Creating new instance(s)...");
|
log.info("Creating new instance(s)...");
|
||||||
RunInstancesResult result = amazonEC2Client.runInstances(
|
RunInstancesResult result = amazonEC2Client.runInstances(
|
||||||
new RunInstancesRequest(
|
new RunInstancesRequest(
|
||||||
config.getAmiId(),
|
workerConfig.getAmiId(),
|
||||||
config.getMinNumInstancesToProvision(),
|
workerConfig.getMinInstances(),
|
||||||
config.getMaxNumInstancesToProvision()
|
workerConfig.getMaxInstances()
|
||||||
|
)
|
||||||
|
.withInstanceType(workerConfig.getInstanceType())
|
||||||
|
.withSecurityGroupIds(workerConfig.getSecurityGroupIds())
|
||||||
|
.withKeyName(workerConfig.getKeyName())
|
||||||
|
.withUserData(
|
||||||
|
Base64.encodeBase64String(
|
||||||
|
jsonMapper.writeValueAsBytes(
|
||||||
|
setupData.getUserData()
|
||||||
|
)
|
||||||
|
)
|
||||||
)
|
)
|
||||||
.withInstanceType(InstanceType.fromValue(config.getInstanceType()))
|
|
||||||
.withUserData(jsonMapper.writeValueAsString(new File(config.getUserDataFile())))
|
|
||||||
);
|
);
|
||||||
|
|
||||||
List<String> instanceIds = Lists.transform(
|
List<String> instanceIds = Lists.transform(
|
||||||
|
@ -80,7 +96,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
|
||||||
new Function<Instance, String>()
|
new Function<Instance, String>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public String apply(@Nullable Instance input)
|
public String apply(Instance input)
|
||||||
{
|
{
|
||||||
return input.getInstanceId();
|
return input.getInstanceId();
|
||||||
}
|
}
|
||||||
|
@ -95,9 +111,9 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
|
||||||
new Function<Instance, String>()
|
new Function<Instance, String>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public String apply(@Nullable Instance input)
|
public String apply(Instance input)
|
||||||
{
|
{
|
||||||
return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort());
|
return input.getInstanceId();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
@ -112,12 +128,12 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AutoScalingData<Instance> terminate(List<String> nodeIds)
|
public AutoScalingData<Instance> terminate(List<String> ids)
|
||||||
{
|
{
|
||||||
DescribeInstancesResult result = amazonEC2Client.describeInstances(
|
DescribeInstancesResult result = amazonEC2Client.describeInstances(
|
||||||
new DescribeInstancesRequest()
|
new DescribeInstancesRequest()
|
||||||
.withFilters(
|
.withFilters(
|
||||||
new Filter("private-ip-address", nodeIds)
|
new Filter("private-ip-address", ids)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -135,7 +151,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
|
||||||
new Function<Instance, String>()
|
new Function<Instance, String>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public String apply(@Nullable Instance input)
|
public String apply(Instance input)
|
||||||
{
|
{
|
||||||
return input.getInstanceId();
|
return input.getInstanceId();
|
||||||
}
|
}
|
||||||
|
@ -146,13 +162,13 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
|
||||||
|
|
||||||
return new AutoScalingData<Instance>(
|
return new AutoScalingData<Instance>(
|
||||||
Lists.transform(
|
Lists.transform(
|
||||||
instances,
|
ids,
|
||||||
new Function<Instance, String>()
|
new Function<String, String>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public String apply(@Nullable Instance input)
|
public String apply(@Nullable String input)
|
||||||
{
|
{
|
||||||
return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort());
|
return String.format("%s:%s", input, config.getWorkerPort());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
@ -165,4 +181,36 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> ipLookup(List<String> ips)
|
||||||
|
{
|
||||||
|
DescribeInstancesResult result = amazonEC2Client.describeInstances(
|
||||||
|
new DescribeInstancesRequest()
|
||||||
|
.withFilters(
|
||||||
|
new Filter("private-ip-address", ips)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
List<Instance> instances = Lists.newArrayList();
|
||||||
|
for (Reservation reservation : result.getReservations()) {
|
||||||
|
instances.addAll(reservation.getInstances());
|
||||||
|
}
|
||||||
|
|
||||||
|
List<String> retVal = Lists.transform(
|
||||||
|
instances,
|
||||||
|
new Function<Instance, String>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public String apply(Instance input)
|
||||||
|
{
|
||||||
|
return input.getInstanceId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
log.info("Performing lookup: %s --> %s", ips, retVal);
|
||||||
|
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
package com.metamx.druid.merger.coordinator.scaling;
|
package com.metamx.druid.merger.coordinator.scaling;
|
||||||
|
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
|
@ -24,4 +43,11 @@ public class NoopScalingStrategy implements ScalingStrategy<String>
|
||||||
log.info("If I were a real strategy I'd terminate %s now", nodeIds);
|
log.info("If I were a real strategy I'd terminate %s now", nodeIds);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> ipLookup(List<String> ips)
|
||||||
|
{
|
||||||
|
log.info("I'm not a real strategy so I'm returning what I got %s", ips);
|
||||||
|
return ips;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,5 +27,12 @@ public interface ScalingStrategy<T>
|
||||||
{
|
{
|
||||||
public AutoScalingData<T> provision();
|
public AutoScalingData<T> provision();
|
||||||
|
|
||||||
public AutoScalingData<T> terminate(List<String> nodeIds);
|
public AutoScalingData<T> terminate(List<String> ids);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides a lookup of ip addresses to node ids
|
||||||
|
* @param ips
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public List<String> ipLookup(List<String> ips);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.merger.coordinator.setup;
|
||||||
|
|
||||||
|
import org.codehaus.jackson.annotate.JsonCreator;
|
||||||
|
import org.codehaus.jackson.annotate.JsonProperty;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class EC2NodeData
|
||||||
|
{
|
||||||
|
private final String amiId;
|
||||||
|
private final String instanceType;
|
||||||
|
private final int minInstances;
|
||||||
|
private final int maxInstances;
|
||||||
|
private final List<String> securityGroupIds;
|
||||||
|
private final String keyName;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public EC2NodeData(
|
||||||
|
@JsonProperty("amiId") String amiId,
|
||||||
|
@JsonProperty("instanceType") String instanceType,
|
||||||
|
@JsonProperty("minInstances") int minInstances,
|
||||||
|
@JsonProperty("maxInstances") int maxInstances,
|
||||||
|
@JsonProperty("securityGroupIds") List<String> securityGroupIds,
|
||||||
|
@JsonProperty("keyName") String keyName
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.amiId = amiId;
|
||||||
|
this.instanceType = instanceType;
|
||||||
|
this.minInstances = minInstances;
|
||||||
|
this.maxInstances = maxInstances;
|
||||||
|
this.securityGroupIds = securityGroupIds;
|
||||||
|
this.keyName = keyName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getAmiId()
|
||||||
|
{
|
||||||
|
return amiId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getInstanceType()
|
||||||
|
{
|
||||||
|
return instanceType;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public int getMinInstances()
|
||||||
|
{
|
||||||
|
return minInstances;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public int getMaxInstances()
|
||||||
|
{
|
||||||
|
return maxInstances;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public List<String> getSecurityGroupIds()
|
||||||
|
{
|
||||||
|
return securityGroupIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getKeyName()
|
||||||
|
{
|
||||||
|
return keyName;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,62 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.merger.coordinator.setup;
|
||||||
|
|
||||||
|
import org.codehaus.jackson.annotate.JsonCreator;
|
||||||
|
import org.codehaus.jackson.annotate.JsonProperty;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class GalaxyUserData
|
||||||
|
{
|
||||||
|
public final String env;
|
||||||
|
public final String version;
|
||||||
|
public final String type;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public GalaxyUserData(
|
||||||
|
@JsonProperty("env") String env,
|
||||||
|
@JsonProperty("version") String version,
|
||||||
|
@JsonProperty("type") String type
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.env = env;
|
||||||
|
this.version = version;
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getEnv()
|
||||||
|
{
|
||||||
|
return env;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getVersion()
|
||||||
|
{
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getType()
|
||||||
|
{
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,73 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.merger.coordinator.setup;
|
||||||
|
|
||||||
|
import org.codehaus.jackson.annotate.JsonCreator;
|
||||||
|
import org.codehaus.jackson.annotate.JsonProperty;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class WorkerSetupData
|
||||||
|
{
|
||||||
|
private final String minVersion;
|
||||||
|
private final int minNumWorkers;
|
||||||
|
private final EC2NodeData nodeData;
|
||||||
|
private final GalaxyUserData userData;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public WorkerSetupData(
|
||||||
|
@JsonProperty("minVersion") String minVersion,
|
||||||
|
@JsonProperty("minNumWorkers") int minNumWorkers,
|
||||||
|
@JsonProperty("nodeData") EC2NodeData nodeData,
|
||||||
|
@JsonProperty("userData") GalaxyUserData userData
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.minVersion = minVersion;
|
||||||
|
this.minNumWorkers = minNumWorkers;
|
||||||
|
this.nodeData = nodeData;
|
||||||
|
this.userData = userData;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getMinVersion()
|
||||||
|
{
|
||||||
|
return minVersion;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public int getMinNumWorkers()
|
||||||
|
{
|
||||||
|
return minNumWorkers;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public EC2NodeData getNodeData()
|
||||||
|
{
|
||||||
|
return nodeData;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public GalaxyUserData getUserData()
|
||||||
|
{
|
||||||
|
return userData;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,226 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.merger.coordinator.setup;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.metamx.common.ISE;
|
||||||
|
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||||
|
import com.metamx.common.lifecycle.LifecycleStart;
|
||||||
|
import com.metamx.common.lifecycle.LifecycleStop;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
|
||||||
|
import org.apache.commons.collections.MapUtils;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.joda.time.Duration;
|
||||||
|
import org.skife.jdbi.v2.DBI;
|
||||||
|
import org.skife.jdbi.v2.FoldController;
|
||||||
|
import org.skife.jdbi.v2.Folder3;
|
||||||
|
import org.skife.jdbi.v2.Handle;
|
||||||
|
import org.skife.jdbi.v2.StatementContext;
|
||||||
|
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||||
|
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class WorkerSetupManager
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(WorkerSetupManager.class);
|
||||||
|
|
||||||
|
private final DBI dbi;
|
||||||
|
private final ObjectMapper jsonMapper;
|
||||||
|
private final ScheduledExecutorService exec;
|
||||||
|
private final WorkerSetupManagerConfig config;
|
||||||
|
|
||||||
|
private final Object lock = new Object();
|
||||||
|
|
||||||
|
private volatile AtomicReference<WorkerSetupData> workerSetupData = new AtomicReference<WorkerSetupData>(null);
|
||||||
|
private volatile boolean started = false;
|
||||||
|
|
||||||
|
public WorkerSetupManager(
|
||||||
|
DBI dbi,
|
||||||
|
ScheduledExecutorService exec,
|
||||||
|
ObjectMapper jsonMapper,
|
||||||
|
WorkerSetupManagerConfig config
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.dbi = dbi;
|
||||||
|
this.exec = exec;
|
||||||
|
this.jsonMapper = jsonMapper;
|
||||||
|
this.config = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
@LifecycleStart
|
||||||
|
public void start()
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
if (started) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ScheduledExecutors.scheduleWithFixedDelay(
|
||||||
|
exec,
|
||||||
|
new Duration(0),
|
||||||
|
config.getPollDuration(),
|
||||||
|
new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
poll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
started = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@LifecycleStop
|
||||||
|
public void stop()
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
if (!started) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
started = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void poll()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
List<WorkerSetupData> setupDataList = dbi.withHandle(
|
||||||
|
new HandleCallback<List<WorkerSetupData>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public List<WorkerSetupData> withHandle(Handle handle) throws Exception
|
||||||
|
{
|
||||||
|
return handle.createQuery(
|
||||||
|
String.format(
|
||||||
|
"SELECT payload FROM %s WHERE name = :name",
|
||||||
|
config.getConfigTable()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.bind("name", config.getWorkerSetupConfigName())
|
||||||
|
.fold(
|
||||||
|
Lists.<WorkerSetupData>newArrayList(),
|
||||||
|
new Folder3<ArrayList<WorkerSetupData>, Map<String, Object>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public ArrayList<WorkerSetupData> fold(
|
||||||
|
ArrayList<WorkerSetupData> workerNodeConfigurations,
|
||||||
|
Map<String, Object> stringObjectMap,
|
||||||
|
FoldController foldController,
|
||||||
|
StatementContext statementContext
|
||||||
|
) throws SQLException
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
// stringObjectMap lowercases and jackson may fail serde
|
||||||
|
workerNodeConfigurations.add(
|
||||||
|
jsonMapper.readValue(
|
||||||
|
MapUtils.getString(stringObjectMap, "payload"),
|
||||||
|
WorkerSetupData.class
|
||||||
|
)
|
||||||
|
);
|
||||||
|
return workerNodeConfigurations;
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
if (setupDataList.isEmpty()) {
|
||||||
|
throw new ISE("WTF?! No configuration found for worker nodes!");
|
||||||
|
} else if (setupDataList.size() != 1) {
|
||||||
|
throw new ISE("WTF?! Found more than one configuration for worker nodes");
|
||||||
|
}
|
||||||
|
|
||||||
|
workerSetupData.set(setupDataList.get(0));
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error(e, "Exception while polling for worker setup data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public WorkerSetupData getWorkerSetupData()
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
if (!started) {
|
||||||
|
throw new ISE("Must start WorkerSetupManager first!");
|
||||||
|
}
|
||||||
|
|
||||||
|
return workerSetupData.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean setWorkerSetupData(final WorkerSetupData value)
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
try {
|
||||||
|
if (!started) {
|
||||||
|
throw new ISE("Must start WorkerSetupManager first!");
|
||||||
|
}
|
||||||
|
|
||||||
|
dbi.withHandle(
|
||||||
|
new HandleCallback<Void>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Void withHandle(Handle handle) throws Exception
|
||||||
|
{
|
||||||
|
handle.createStatement(
|
||||||
|
String.format(
|
||||||
|
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
|
||||||
|
config.getConfigTable()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.bind("name", config.getWorkerSetupConfigName())
|
||||||
|
.bind("payload", jsonMapper.writeValueAsString(value))
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
workerSetupData.set(value);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error(e, "Exception updating worker config");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,6 +17,8 @@ import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
|
||||||
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
|
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
|
||||||
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
|
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
|
||||||
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
|
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
|
||||||
import com.metamx.druid.merger.worker.TaskMonitor;
|
import com.metamx.druid.merger.worker.TaskMonitor;
|
||||||
import com.metamx.druid.merger.worker.Worker;
|
import com.metamx.druid.merger.worker.Worker;
|
||||||
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
|
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
|
||||||
|
@ -62,6 +64,7 @@ public class RemoteTaskRunnerTest
|
||||||
private PathChildrenCache pathChildrenCache;
|
private PathChildrenCache pathChildrenCache;
|
||||||
private RemoteTaskRunner remoteTaskRunner;
|
private RemoteTaskRunner remoteTaskRunner;
|
||||||
private TaskMonitor taskMonitor;
|
private TaskMonitor taskMonitor;
|
||||||
|
private WorkerSetupManager workerSetupManager;
|
||||||
|
|
||||||
private ScheduledExecutorService scheduledExec;
|
private ScheduledExecutorService scheduledExec;
|
||||||
|
|
||||||
|
@ -69,7 +72,6 @@ public class RemoteTaskRunnerTest
|
||||||
|
|
||||||
private Worker worker1;
|
private Worker worker1;
|
||||||
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception
|
public void setUp() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -143,7 +145,8 @@ public class RemoteTaskRunnerTest
|
||||||
try {
|
try {
|
||||||
remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null);
|
remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null);
|
||||||
fail("ISE expected");
|
fail("ISE expected");
|
||||||
} catch (ISE expected) {
|
}
|
||||||
|
catch (ISE expected) {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -333,6 +336,17 @@ public class RemoteTaskRunnerTest
|
||||||
private void makeRemoteTaskRunner() throws Exception
|
private void makeRemoteTaskRunner() throws Exception
|
||||||
{
|
{
|
||||||
scheduledExec = EasyMock.createMock(ScheduledExecutorService.class);
|
scheduledExec = EasyMock.createMock(ScheduledExecutorService.class);
|
||||||
|
workerSetupManager = EasyMock.createMock(WorkerSetupManager.class);
|
||||||
|
|
||||||
|
EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn(
|
||||||
|
new WorkerSetupData(
|
||||||
|
"0",
|
||||||
|
0,
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
)
|
||||||
|
);
|
||||||
|
EasyMock.replay(workerSetupManager);
|
||||||
|
|
||||||
remoteTaskRunner = new RemoteTaskRunner(
|
remoteTaskRunner = new RemoteTaskRunner(
|
||||||
jsonMapper,
|
jsonMapper,
|
||||||
|
@ -341,7 +355,8 @@ public class RemoteTaskRunnerTest
|
||||||
pathChildrenCache,
|
pathChildrenCache,
|
||||||
scheduledExec,
|
scheduledExec,
|
||||||
new RetryPolicyFactory(new TestRetryPolicyConfig()),
|
new RetryPolicyFactory(new TestRetryPolicyConfig()),
|
||||||
new TestScalingStrategy()
|
new TestScalingStrategy(),
|
||||||
|
workerSetupManager
|
||||||
);
|
);
|
||||||
|
|
||||||
// Create a single worker and wait for things for be ready
|
// Create a single worker and wait for things for be ready
|
||||||
|
@ -389,6 +404,12 @@ public class RemoteTaskRunnerTest
|
||||||
{
|
{
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> ipLookup(List<String> ips)
|
||||||
|
{
|
||||||
|
return ips;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
|
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
|
||||||
|
@ -405,18 +426,6 @@ public class RemoteTaskRunnerTest
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getMinWorkerVersion()
|
|
||||||
{
|
|
||||||
return "0";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getMinNumWorkers()
|
|
||||||
{
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getMaxWorkerIdleTimeMillisBeforeDeletion()
|
public int getMaxWorkerIdleTimeMillisBeforeDeletion()
|
||||||
{
|
{
|
||||||
|
|
|
@ -27,8 +27,13 @@ import com.amazonaws.services.ec2.model.Reservation;
|
||||||
import com.amazonaws.services.ec2.model.RunInstancesRequest;
|
import com.amazonaws.services.ec2.model.RunInstancesRequest;
|
||||||
import com.amazonaws.services.ec2.model.RunInstancesResult;
|
import com.amazonaws.services.ec2.model.RunInstancesResult;
|
||||||
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
|
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||||
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
|
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.EC2NodeData;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.GalaxyUserData;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
|
||||||
|
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -52,6 +57,7 @@ public class EC2AutoScalingStrategyTest
|
||||||
private Reservation reservation;
|
private Reservation reservation;
|
||||||
private Instance instance;
|
private Instance instance;
|
||||||
private EC2AutoScalingStrategy strategy;
|
private EC2AutoScalingStrategy strategy;
|
||||||
|
private WorkerSetupManager workerSetupManager;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception
|
public void setUp() throws Exception
|
||||||
|
@ -60,6 +66,7 @@ public class EC2AutoScalingStrategyTest
|
||||||
runInstancesResult = EasyMock.createMock(RunInstancesResult.class);
|
runInstancesResult = EasyMock.createMock(RunInstancesResult.class);
|
||||||
describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class);
|
describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class);
|
||||||
reservation = EasyMock.createMock(Reservation.class);
|
reservation = EasyMock.createMock(Reservation.class);
|
||||||
|
workerSetupManager = EasyMock.createMock(WorkerSetupManager.class);
|
||||||
|
|
||||||
instance = new Instance()
|
instance = new Instance()
|
||||||
.withInstanceId(INSTANCE_ID)
|
.withInstanceId(INSTANCE_ID)
|
||||||
|
@ -69,44 +76,16 @@ public class EC2AutoScalingStrategyTest
|
||||||
|
|
||||||
strategy = new EC2AutoScalingStrategy(
|
strategy = new EC2AutoScalingStrategy(
|
||||||
new DefaultObjectMapper(),
|
new DefaultObjectMapper(),
|
||||||
amazonEC2Client, new EC2AutoScalingStrategyConfig()
|
amazonEC2Client,
|
||||||
|
new EC2AutoScalingStrategyConfig()
|
||||||
{
|
{
|
||||||
@Override
|
|
||||||
public String getAmiId()
|
|
||||||
{
|
|
||||||
return AMI_ID;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getWorkerPort()
|
public String getWorkerPort()
|
||||||
{
|
{
|
||||||
return "8080";
|
return "8080";
|
||||||
}
|
}
|
||||||
|
},
|
||||||
@Override
|
workerSetupManager
|
||||||
public String getInstanceType()
|
|
||||||
{
|
|
||||||
return "t1.micro";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getMinNumInstancesToProvision()
|
|
||||||
{
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getMaxNumInstancesToProvision()
|
|
||||||
{
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getUserDataFile()
|
|
||||||
{
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,11 +96,22 @@ public class EC2AutoScalingStrategyTest
|
||||||
EasyMock.verify(runInstancesResult);
|
EasyMock.verify(runInstancesResult);
|
||||||
EasyMock.verify(describeInstancesResult);
|
EasyMock.verify(describeInstancesResult);
|
||||||
EasyMock.verify(reservation);
|
EasyMock.verify(reservation);
|
||||||
|
EasyMock.verify(workerSetupManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testScale()
|
public void testScale()
|
||||||
{
|
{
|
||||||
|
EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn(
|
||||||
|
new WorkerSetupData(
|
||||||
|
"0",
|
||||||
|
0,
|
||||||
|
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo"),
|
||||||
|
new GalaxyUserData("env", "version", "type")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
EasyMock.replay(workerSetupManager);
|
||||||
|
|
||||||
EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn(
|
EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn(
|
||||||
runInstancesResult
|
runInstancesResult
|
||||||
);
|
);
|
||||||
|
@ -144,9 +134,9 @@ public class EC2AutoScalingStrategyTest
|
||||||
|
|
||||||
Assert.assertEquals(created.getNodeIds().size(), 1);
|
Assert.assertEquals(created.getNodeIds().size(), 1);
|
||||||
Assert.assertEquals(created.getNodes().size(), 1);
|
Assert.assertEquals(created.getNodes().size(), 1);
|
||||||
Assert.assertEquals(String.format("%s:8080", IP), created.getNodeIds().get(0));
|
Assert.assertEquals("theInstance", created.getNodeIds().get(0));
|
||||||
|
|
||||||
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyHost"));
|
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP"));
|
||||||
|
|
||||||
Assert.assertEquals(deleted.getNodeIds().size(), 1);
|
Assert.assertEquals(deleted.getNodeIds().size(), 1);
|
||||||
Assert.assertEquals(deleted.getNodes().size(), 1);
|
Assert.assertEquals(deleted.getNodes().size(), 1);
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -84,7 +84,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>commons-codec</groupId>
|
<groupId>commons-codec</groupId>
|
||||||
<artifactId>commons-codec</artifactId>
|
<artifactId>commons-codec</artifactId>
|
||||||
<version>1.3</version>
|
<version>1.7</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>commons-httpclient</groupId>
|
<groupId>commons-httpclient</groupId>
|
||||||
|
|
Loading…
Reference in New Issue