additional improvements according to code review; unit tests; bug fixes for retry policies

This commit is contained in:
Fangjin Yang 2012-11-07 17:27:23 -08:00
parent 51cd361fbe
commit 52214d7355
16 changed files with 622 additions and 133 deletions

View File

@ -178,6 +178,10 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -88,7 +88,7 @@ public abstract class MergeTask extends AbstractTask
@Override
public boolean apply(@Nullable DataSegment segment)
{
return segment == null || !segment.getDataSource().equals(dataSource);
return segment == null || !segment.getDataSource().equalsIgnoreCase(dataSource);
}
}
)

View File

@ -25,7 +25,9 @@ import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
@ -54,6 +56,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
@ -95,6 +98,8 @@ public class RemoteTaskRunner implements TaskRunner
private final ConcurrentSkipListSet<String> currentlyTerminating = new ConcurrentSkipListSet<String>();
private final Object statusLock = new Object();
private volatile DateTime lastProvisionTime = new DateTime();
private volatile DateTime lastTerminateTime = new DateTime();
private volatile boolean started = false;
public RemoteTaskRunner(
@ -120,27 +125,31 @@ public class RemoteTaskRunner implements TaskRunner
public void start()
{
try {
workerPathCache.start();
workerPathCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
{
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
final Worker worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
log.info("New worker[%s] found!", worker.getHost());
addWorker(worker);
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
final Worker worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
log.info("Worker[%s] removed!", worker.getHost());
removeWorker(worker.getHost());
}
}
}
);
workerPathCache.start();
// Schedule termination of worker nodes periodically
Period period = new Period(config.getTerminateResourcesDuration());
@ -175,7 +184,7 @@ public class RemoteTaskRunner implements TaskRunner
{
return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
> config.getmaxWorkerIdleTimeMillisBeforeDeletion();
> config.getMaxWorkerIdleTimeMillisBeforeDeletion();
}
}
)
@ -195,8 +204,19 @@ public class RemoteTaskRunner implements TaskRunner
)
);
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
}
} else {
Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert(
"It has been %d millis since last scheduled termination but nodes remain",
durSinceLastTerminate.getMillis()
).emit();
}
log.info(
"[%s] still terminating. Wait for all nodes to terminate before trying again.",
currentlyTerminating
@ -233,36 +253,38 @@ public class RemoteTaskRunner implements TaskRunner
return started;
}
public int getNumWorkers()
{
return zkWorkers.size();
}
@Override
public void run(Task task, TaskContext context, TaskCallback callback)
{
assignTask(
new TaskWrapper(
if (tasks.contains(task.getId())) {
throw new ISE("Assigned a task[%s] that already exists, WTF is happening?!", task.getId());
}
TaskWrapper taskWrapper = new TaskWrapper(
task, context, callback, retryPolicyFactory.makeRetryPolicy()
)
);
tasks.put(taskWrapper.getTask().getId(), taskWrapper);
assignTask(taskWrapper);
}
private void assignTask(TaskWrapper taskWrapper)
{
tasks.putIfAbsent(taskWrapper.getTask().getId(), taskWrapper);
WorkerWrapper workerWrapper = findWorkerRunningTask(taskWrapper);
// If the task already exists, we don't need to announce it
if (workerWrapper != null) {
final Worker worker = workerWrapper.getWorker();
try {
log.info("Worker[%s] is already running task[%s].", worker.getHost(), taskWrapper.getTask().getId());
TaskStatus taskStatus = jsonMapper.readValue(
workerWrapper.getStatusCache()
.getCurrentData(
JOINER.join(
config.getStatusPath(),
worker.getHost(),
taskWrapper.getTask().getId()
)
JOINER.join(config.getStatusPath(), worker.getHost(), taskWrapper.getTask().getId())
)
.getData(),
TaskStatus.class
@ -280,8 +302,9 @@ public class RemoteTaskRunner implements TaskRunner
log.error(e, "Task exists, but hit exception!");
retryTask(new CleanupPaths(worker.getHost(), taskWrapper.getTask().getId()), taskWrapper);
}
} else { // Announce the task
workerWrapper = getWorkerForTask();
} else {
// Announce the task or retry if there is not enough capacity
workerWrapper = findWorkerForTask();
if (workerWrapper != null) {
announceTask(workerWrapper.getWorker(), taskWrapper);
} else {
@ -328,7 +351,7 @@ public class RemoteTaskRunner implements TaskRunner
}
}
},
retryPolicy.getAndIncrementRetryDelay(),
retryPolicy.getAndIncrementRetryDelay().getMillis(),
TimeUnit.MILLISECONDS
);
}
@ -347,9 +370,9 @@ public class RemoteTaskRunner implements TaskRunner
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
final ConcurrentSkipListSet<String> runningTasks = new ConcurrentSkipListSet<String>(
Lists.transform(
statusCache.getCurrentData(),
final WorkerWrapper workerWrapper = new WorkerWrapper(
worker,
statusCache,
new Function<ChildData, String>()
{
@Override
@ -363,12 +386,6 @@ public class RemoteTaskRunner implements TaskRunner
}
}
}
)
);
final WorkerWrapper workerWrapper = new WorkerWrapper(
worker,
runningTasks,
statusCache
);
// Add status listener to the watcher for status changes
@ -385,10 +402,7 @@ public class RemoteTaskRunner implements TaskRunner
TaskStatus taskStatus = jsonMapper.readValue(
event.getData().getData(), TaskStatus.class
);
taskId = taskStatus.getId();
log.info("New status[%s] appeared!", taskId);
runningTasks.add(taskId);
log.info("New status[%s] appeared!", taskStatus.getId());
statusLock.notify();
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
String statusPath = event.getData().getPath();
@ -413,13 +427,13 @@ public class RemoteTaskRunner implements TaskRunner
callback.notify(taskStatus);
}
tasks.remove(taskId);
runningTasks.remove(taskId);
cf.delete().guaranteed().forPath(statusPath);
cf.delete().guaranteed().inBackground().forPath(statusPath);
}
}
}
}
catch (Exception e) {
log.error(e, "Exception in status listener");
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
}
}
@ -456,25 +470,27 @@ public class RemoteTaskRunner implements TaskRunner
WorkerWrapper workerWrapper = zkWorkers.get(workerId);
if (workerWrapper != null) {
for (String taskId : workerWrapper.getRunningTasks()) {
try {
Set<String> tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks());
tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), workerId)));
for (String taskId : tasksToRetry) {
TaskWrapper taskWrapper = tasks.get(taskId);
if (taskWrapper != null) {
retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId));
}
workerWrapper.removeTask(taskId);
}
try {
workerWrapper.getStatusCache().close();
}
catch (IOException e) {
log.error("Failed to close watcher associated with worker[%s]", workerWrapper.getWorker().getHost());
catch (Exception e) {
log.error(e, "Failed to cleanly remove worker[%s]");
}
}
zkWorkers.remove(workerId);
}
private WorkerWrapper getWorkerForTask()
private WorkerWrapper findWorkerForTask()
{
try {
final MinMaxPriorityQueue<WorkerWrapper> workerQueue = MinMaxPriorityQueue.<WorkerWrapper>orderedBy(
@ -504,11 +520,20 @@ public class RemoteTaskRunner implements TaskRunner
log.info("Worker nodes do not have capacity to run any more tasks!");
if (currentlyProvisioning.isEmpty()) {
AutoScalingData provisioned = strategy.provision();
AutoScalingData provisioned = strategy.provision(currentlyProvisioning.size());
if (provisioned != null) {
currentlyProvisioning.addAll(provisioned.getNodeIds());
lastProvisionTime = new DateTime();
}
} else {
Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert(
"It has been %d millis since last scheduled provision but nodes remain",
durSinceLastProvision.getMillis()
).emit();
}
log.info(
"[%s] still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
currentlyProvisioning
@ -552,8 +577,9 @@ public class RemoteTaskRunner implements TaskRunner
jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext))
);
// Syncing state with Zookeeper
while (findWorkerRunningTask(taskWrapper) == null) {
statusLock.wait();
statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis());
}
}
catch (Exception e) {

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.Duration;
/**
*/
@ -29,29 +30,25 @@ public class RetryPolicy
private static final EmittingLogger log = new EmittingLogger(RetryPolicy.class);
private final long MAX_NUM_RETRIES;
private final long MAX_RETRY_DELAY_MILLIS;
private final Duration MAX_RETRY_DURATION;
private volatile long currRetryDelay;
private volatile Duration currRetryDelay;
private volatile int retryCount;
public RetryPolicy(RetryPolicyConfig config)
{
this.MAX_NUM_RETRIES = config.getMaxRetryCount();
this.MAX_RETRY_DELAY_MILLIS = config.getRetryMaxMillis();
this.MAX_RETRY_DURATION = config.getRetryMaxDuration();
this.currRetryDelay = config.getRetryMinMillis();
this.currRetryDelay = config.getRetryMinDuration();
this.retryCount = 0;
}
public long getAndIncrementRetryDelay()
public Duration getAndIncrementRetryDelay()
{
long retVal = currRetryDelay;
if (currRetryDelay < MAX_RETRY_DELAY_MILLIS) {
currRetryDelay *= 2;
}
Duration retVal = new Duration(currRetryDelay);
currRetryDelay = new Duration(Math.min(currRetryDelay.getMillis() * 2, MAX_RETRY_DURATION.getMillis()));
retryCount++;
return retVal;
}

View File

@ -19,29 +19,33 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.druid.merger.worker.Worker;
import com.netflix.curator.framework.recipes.cache.ChildData;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import org.joda.time.DateTime;
import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
/**
*/
public class WorkerWrapper
public class WorkerWrapper implements Closeable
{
private final Worker worker;
private final ConcurrentSkipListSet<String> runningTasks;
private final PathChildrenCache statusCache;
private final Function<ChildData, String> cacheConverter;
private volatile DateTime lastCompletedTaskTime = new DateTime();
public WorkerWrapper(Worker worker, ConcurrentSkipListSet<String> runningTasks, PathChildrenCache statusCache)
public WorkerWrapper(Worker worker, PathChildrenCache statusCache, Function<ChildData, String> cacheConverter)
{
this.worker = worker;
this.runningTasks = runningTasks;
this.statusCache = statusCache;
this.cacheConverter = cacheConverter;
}
public Worker getWorker()
@ -51,7 +55,12 @@ public class WorkerWrapper
public Set<String> getRunningTasks()
{
return runningTasks;
return Sets.newHashSet(
Lists.transform(
statusCache.getCurrentData(),
cacheConverter
)
);
}
public PathChildrenCache getStatusCache()
@ -66,7 +75,7 @@ public class WorkerWrapper
public boolean isAtCapacity()
{
return runningTasks.size() >= worker.getCapacity();
return statusCache.getCurrentData().size() >= worker.getCapacity();
}
public void setLastCompletedTaskTime(DateTime completedTaskTime)
@ -74,11 +83,7 @@ public class WorkerWrapper
lastCompletedTaskTime = completedTaskTime;
}
public void removeTask(String taskId)
{
runningTasks.remove(taskId);
}
@Override
public void close() throws IOException
{
statusCache.close();

View File

@ -46,5 +46,13 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
@Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion")
@Default("1")
public abstract int getmaxWorkerIdleTimeMillisBeforeDeletion();
public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion();
@Config("druid.indexer.maxScalingDuration")
@Default("PT1H")
public abstract Duration getMaxScalingDuration();
@Config("druid.indexer.taskAssignmentTimeoutDuration")
@Default("PT5M")
public abstract Duration getTaskAssignmentTimeoutDuration();
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.coordinator.config;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
@ -27,12 +28,12 @@ import org.skife.config.Default;
public abstract class RetryPolicyConfig
{
@Config("druid.indexer.retry.minWaitMillis")
@Default("60000") // 1 minute
public abstract long getRetryMinMillis();
@Default("PT1M") // 1 minute
public abstract Duration getRetryMinDuration();
@Config("druid.indexer.retry.maxWaitMillis")
@Default("600000") // 10 minutes
public abstract long getRetryMaxMillis();
@Default("PT10M") // 10 minutes
public abstract Duration getRetryMaxDuration();
@Config("druid.indexer.retry.maxRetryCount")
@Default("10")

View File

@ -56,7 +56,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
}
@Override
public AutoScalingData<Instance> provision()
public AutoScalingData<Instance> provision(long numUnassignedTasks)
{
try {
log.info("Creating new instance(s)...");

View File

@ -1,15 +1,8 @@
package com.metamx.druid.merger.coordinator.scaling;
import com.amazonaws.services.ec2.model.Instance;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
/**
* This class just logs when scaling should occur.
@ -19,7 +12,7 @@ public class NoopScalingStrategy implements ScalingStrategy<String>
private static final EmittingLogger log = new EmittingLogger(NoopScalingStrategy.class);
@Override
public AutoScalingData<String> provision()
public AutoScalingData<String> provision(long numUnassignedTasks)
{
log.info("If I were a real strategy I'd create something now");
return null;

View File

@ -25,7 +25,7 @@ import java.util.List;
*/
public interface ScalingStrategy<T>
{
public AutoScalingData<T> provision();
public AutoScalingData<T> provision(long numUnassignedTasks);
public AutoScalingData<T> terminate(List<String> nodeIds);
}

View File

@ -47,7 +47,6 @@ public class TaskMonitor
private final PathChildrenCache pathChildrenCache;
private final CuratorFramework cf;
private final ObjectMapper jsonMapper;
private final WorkerCuratorCoordinator workerCuratorCoordinator;
private final TaskToolbox toolbox;
private final ExecutorService exec;
@ -55,7 +54,6 @@ public class TaskMonitor
public TaskMonitor(
PathChildrenCache pathChildrenCache,
CuratorFramework cf,
ObjectMapper jsonMapper,
WorkerCuratorCoordinator workerCuratorCoordinator,
TaskToolbox toolbox,
ExecutorService exec
@ -63,7 +61,6 @@ public class TaskMonitor
{
this.pathChildrenCache = pathChildrenCache;
this.cf = cf;
this.jsonMapper = jsonMapper;
this.workerCuratorCoordinator = workerCuratorCoordinator;
this.toolbox = toolbox;
this.exec = exec;
@ -87,7 +84,7 @@ public class TaskMonitor
throws Exception
{
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
final TaskHolder taskHolder = jsonMapper.readValue(
final TaskHolder taskHolder = toolbox.getObjectMapper().readValue(
cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()),
TaskHolder.class
);

View File

@ -287,8 +287,8 @@ public class WorkerNode
if (taskToolbox == null) {
final RestS3Service s3Client = new RestS3Service(
new AWSCredentials(
props.getProperty("com.metamx.aws.accessKey"),
props.getProperty("com.metamx.aws.secretKey")
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
final SegmentPusher segmentPusher = new S3SegmentPusher(
@ -334,7 +334,6 @@ public class WorkerNode
taskMonitor = new TaskMonitor(
pathChildrenCache,
curatorFramework,
jsonMapper,
workerCuratorCoordinator,
taskToolbox,
workerExec

View File

@ -0,0 +1,433 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.task.DefaultMergeTask;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.worker.TaskMonitor;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.retry.ExponentialBackoffRetry;
import com.netflix.curator.test.TestingCluster;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.annotate.JsonTypeName;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.jsontype.NamedType;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class RemoteTaskRunnerTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final String basePath = "/test/druid/indexer";
private static final String announcementsPath = String.format("%s/announcements", basePath);
private static final String tasksPath = String.format("%s/tasks", basePath);
private static final String statusPath = String.format("%s/status", basePath);
private TestingCluster testingCluster;
private CuratorFramework cf;
private PathChildrenCache pathChildrenCache;
private RemoteTaskRunner remoteTaskRunner;
private TaskMonitor taskMonitor;
private ScheduledExecutorService scheduledExec;
private Task task1;
private Worker worker1;
@Before
public void setUp() throws Exception
{
testingCluster = new TestingCluster(1);
testingCluster.start();
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.build();
cf.start();
cf.create().creatingParentsIfNeeded().forPath(announcementsPath);
cf.create().forPath(tasksPath);
cf.create().forPath(String.format("%s/worker1", tasksPath));
cf.create().forPath(statusPath);
cf.create().forPath(String.format("%s/worker1", statusPath));
pathChildrenCache = new PathChildrenCache(cf, announcementsPath, true);
worker1 = new Worker(
"worker1",
"localhost",
3,
"0"
);
makeRemoteTaskRunner();
makeTaskMonitor();
}
@After
public void tearDown() throws Exception
{
testingCluster.stop();
remoteTaskRunner.stop();
taskMonitor.stop();
}
@Test
public void testRunNoExistingTask() throws Exception
{
remoteTaskRunner.run(
task1,
new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()),
null
);
}
@Test
public void testRunWithExistingCompletedTask() throws Exception
{
cf.create().creatingParentsIfNeeded().forPath(
String.format("%s/worker1/task1", statusPath),
jsonMapper.writeValueAsBytes(
TaskStatus.success(
"task1",
Lists.<DataSegment>newArrayList()
)
)
);
// Really don't like this way of waiting for the task to appear
while (remoteTaskRunner.getNumWorkers() == 0) {
Thread.sleep(500);
}
final MutableBoolean callbackCalled = new MutableBoolean(false);
remoteTaskRunner.run(
task1,
null,
new TaskCallback()
{
@Override
public void notify(TaskStatus status)
{
callbackCalled.setValue(true);
}
}
);
Assert.assertTrue("TaskCallback was not called!", callbackCalled.booleanValue());
}
private void makeTaskMonitor() throws Exception
{
WorkerCuratorCoordinator workerCuratorCoordinator = new WorkerCuratorCoordinator(
jsonMapper,
new IndexerZkConfig()
{
@Override
public String getAnnouncementPath()
{
return announcementsPath;
}
@Override
public String getTaskPath()
{
return tasksPath;
}
@Override
public String getStatusPath()
{
return statusPath;
}
},
cf,
worker1
);
workerCuratorCoordinator.start();
taskMonitor = new TaskMonitor(
new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true),
cf,
workerCuratorCoordinator,
new TaskToolbox(
new IndexerCoordinatorConfig()
{
@Override
public String getServerName()
{
return "worker1";
}
@Override
public String getLeaderLatchPath()
{
return null;
}
@Override
public int getNumLocalThreads()
{
return 1;
}
@Override
public String getRunnerImpl()
{
return null;
}
@Override
public String getStorageImpl()
{
return null;
}
@Override
public File getBaseTaskDir()
{
try {
return File.createTempFile("billy", "yay");
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public boolean isWhitelistEnabled()
{
return false;
}
@Override
public String getWhitelistDatasourcesString()
{
return null;
}
@Override
public long getRowFlushBoundary()
{
return 0;
}
}, null, null, null, jsonMapper
),
Executors.newSingleThreadExecutor()
);
jsonMapper.registerSubtypes(new NamedType(TestTask.class, "test"));
taskMonitor.start();
}
private void makeRemoteTaskRunner() throws Exception
{
scheduledExec = EasyMock.createMock(ScheduledExecutorService.class);
remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
new TestRemoteTaskRunnerConfig(),
cf,
pathChildrenCache,
scheduledExec,
new RetryPolicyFactory(new TestRetryPolicyConfig()),
new TestScalingStrategy()
);
task1 = new TestTask(
"task1",
"dummyDs",
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0
)
), Lists.<AggregatorFactory>newArrayList()
);
// Create a single worker and wait for things for be ready
remoteTaskRunner.start();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
String.format("%s/worker1", announcementsPath),
jsonMapper.writeValueAsBytes(worker1)
);
while (remoteTaskRunner.getNumWorkers() == 0) {
Thread.sleep(500);
}
}
private static class TestRetryPolicyConfig extends RetryPolicyConfig
{
@Override
public Duration getRetryMinDuration()
{
return null;
}
@Override
public Duration getRetryMaxDuration()
{
return null;
}
@Override
public long getMaxRetryCount()
{
return 0;
}
}
private static class TestScalingStrategy<T> implements ScalingStrategy<T>
{
@Override
public AutoScalingData provision(long numUnassignedTasks)
{
return null;
}
@Override
public AutoScalingData terminate(List<String> nodeIds)
{
return null;
}
}
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{
@Override
public Duration getTerminateResourcesDuration()
{
return null;
}
@Override
public DateTime getTerminateResourcesOriginDateTime()
{
return null;
}
@Override
public String getMinWorkerVersion()
{
return "0";
}
@Override
public int getMinNumWorkers()
{
return 0;
}
@Override
public int getMaxWorkerIdleTimeMillisBeforeDeletion()
{
return 0;
}
@Override
public Duration getMaxScalingDuration()
{
return null;
}
@Override
public String getAnnouncementPath()
{
return announcementsPath;
}
@Override
public String getTaskPath()
{
return tasksPath;
}
@Override
public String getStatusPath()
{
return statusPath;
}
@Override
public Duration getTaskAssignmentTimeoutDuration()
{
return new Duration(60000);
}
}
@JsonTypeName("test")
private static class TestTask extends DefaultMergeTask
{
private final String id;
public TestTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
)
{
super(dataSource, segments, aggregators);
this.id = id;
}
@Override
@JsonProperty
public String getId()
{
return id;
}
@Override
public Type getType()
{
return Type.TEST;
}
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception
{
return TaskStatus.success("task1", Lists.<DataSegment>newArrayList());
}
}
}

View File

@ -0,0 +1,45 @@
package com.metamx.druid.merger.coordinator;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import junit.framework.Assert;
import org.joda.time.Duration;
import org.junit.Test;
/**
*/
public class RetryPolicyTest
{
@Test
public void testGetAndIncrementRetryDelay() throws Exception
{
RetryPolicy retryPolicy = new RetryPolicy(
new RetryPolicyConfig()
{
@Override
public Duration getRetryMinDuration()
{
return new Duration("PT1S");
}
@Override
public Duration getRetryMaxDuration()
{
return new Duration("PT10S");
}
@Override
public long getMaxRetryCount()
{
return 10;
}
}
);
Assert.assertEquals(new Duration("PT1S"), retryPolicy.getAndIncrementRetryDelay());
Assert.assertEquals(new Duration("PT2S"), retryPolicy.getAndIncrementRetryDelay());
Assert.assertEquals(new Duration("PT4S"), retryPolicy.getAndIncrementRetryDelay());
Assert.assertEquals(new Duration("PT8S"), retryPolicy.getAndIncrementRetryDelay());
Assert.assertEquals(new Duration("PT10S"), retryPolicy.getAndIncrementRetryDelay());
Assert.assertEquals(new Duration("PT10S"), retryPolicy.getAndIncrementRetryDelay());
}
}

View File

@ -27,12 +27,8 @@ import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.collect.Maps;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.worker.Worker;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -40,8 +36,6 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
/**
*/
@ -56,7 +50,6 @@ public class EC2AutoScalingStrategyTest
private DescribeInstancesResult describeInstancesResult;
private Reservation reservation;
private Instance instance;
private WorkerWrapper worker;
private EC2AutoScalingStrategy strategy;
@Before
@ -73,12 +66,6 @@ public class EC2AutoScalingStrategyTest
.withImageId(AMI_ID)
.withPrivateIpAddress(IP);
worker = new WorkerWrapper(
new Worker("dummyHost", IP, 2, "0"),
new ConcurrentSkipListSet<String>(),
null
);
worker.setLastCompletedTaskTime(new DateTime(0));
strategy = new EC2AutoScalingStrategy(
amazonEC2Client, new EC2AutoScalingStrategyConfig()
{
@ -145,23 +132,12 @@ public class EC2AutoScalingStrategyTest
EasyMock.expect(reservation.getInstances()).andReturn(Arrays.asList(instance)).atLeastOnce();
EasyMock.replay(reservation);
worker.getRunningTasks().add("task1");
Assert.assertFalse(worker.isAtCapacity());
worker.getRunningTasks().add("task2");
Assert.assertTrue(worker.isAtCapacity());
AutoScalingData created = strategy.provision();
AutoScalingData created = strategy.provision(0);
Assert.assertEquals(created.getNodeIds().size(), 1);
Assert.assertEquals(created.getNodes().size(), 1);
Assert.assertEquals(String.format("%s:8080", IP), created.getNodeIds().get(0));
worker.getRunningTasks().remove("task1");
worker.getRunningTasks().remove("task2");
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyHost"));
Assert.assertEquals(deleted.getNodeIds().size(), 1);

View File

@ -131,6 +131,11 @@
<artifactId>curator-x-discovery</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-test</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>