From 52214d735578630f4af6efc397d4136a0bf3d5ee Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Wed, 7 Nov 2012 17:27:23 -0800 Subject: [PATCH] additional improvements according to code review; unit tests; bug fixes for retry policies --- merger/pom.xml | 6 +- .../druid/merger/common/task/MergeTask.java | 2 +- .../merger/coordinator/RemoteTaskRunner.java | 146 +++--- .../druid/merger/coordinator/RetryPolicy.java | 19 +- .../merger/coordinator/WorkerWrapper.java | 29 +- .../config/RemoteTaskRunnerConfig.java | 10 +- .../coordinator/config/RetryPolicyConfig.java | 9 +- .../scaling/EC2AutoScalingStrategy.java | 2 +- .../scaling/NoopScalingStrategy.java | 9 +- .../coordinator/scaling/ScalingStrategy.java | 2 +- .../druid/merger/worker/TaskMonitor.java | 5 +- .../druid/merger/worker/http/WorkerNode.java | 5 +- .../coordinator/RemoteTaskRunnerTest.java | 433 ++++++++++++++++++ .../merger/coordinator/RetryPolicyTest.java | 45 ++ .../scaling/EC2AutoScalingStrategyTest.java | 26 +- pom.xml | 7 +- 16 files changed, 622 insertions(+), 133 deletions(-) create mode 100644 merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java create mode 100644 merger/src/test/java/com/metamx/druid/merger/coordinator/RetryPolicyTest.java diff --git a/merger/pom.xml b/merger/pom.xml index e5fc6f99f69..c60bd7e2bd5 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -19,7 +19,7 @@ --> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 com.metamx.druid druid-merger @@ -178,6 +178,10 @@ easymock test + + com.netflix.curator + curator-test + diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java index db5ff0594da..859352c2a7d 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java @@ -88,7 +88,7 @@ public abstract class MergeTask extends AbstractTask @Override public boolean apply(@Nullable DataSegment segment) { - return segment == null || !segment.getDataSource().equals(dataSource); + return segment == null || !segment.getDataSource().equalsIgnoreCase(dataSource); } } ) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 84f71360404..a2bc0c3688e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -25,7 +25,9 @@ import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import com.metamx.common.ISE; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; @@ -54,6 +56,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ScheduledExecutorService; @@ -95,6 +98,8 @@ public class RemoteTaskRunner implements TaskRunner private final ConcurrentSkipListSet currentlyTerminating = new ConcurrentSkipListSet(); private final Object statusLock = new Object(); + private volatile DateTime lastProvisionTime = new DateTime(); + private volatile DateTime lastTerminateTime = new DateTime(); private volatile boolean started = false; public RemoteTaskRunner( @@ -120,27 +125,31 @@ public class RemoteTaskRunner implements TaskRunner public void start() { try { - workerPathCache.start(); workerPathCache.getListenable().addListener( new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception { - final Worker worker = jsonMapper.readValue( - event.getData().getData(), - Worker.class - ); if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { + final Worker worker = jsonMapper.readValue( + event.getData().getData(), + Worker.class + ); log.info("New worker[%s] found!", worker.getHost()); addWorker(worker); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { + final Worker worker = jsonMapper.readValue( + event.getData().getData(), + Worker.class + ); log.info("Worker[%s] removed!", worker.getHost()); removeWorker(worker.getHost()); } } } ); + workerPathCache.start(); // Schedule termination of worker nodes periodically Period period = new Period(config.getTerminateResourcesDuration()); @@ -175,7 +184,7 @@ public class RemoteTaskRunner implements TaskRunner { return input.getRunningTasks().isEmpty() && System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis() - > config.getmaxWorkerIdleTimeMillisBeforeDeletion(); + > config.getMaxWorkerIdleTimeMillisBeforeDeletion(); } } ) @@ -195,8 +204,19 @@ public class RemoteTaskRunner implements TaskRunner ) ); - currentlyTerminating.addAll(terminated.getNodeIds()); + if (terminated != null) { + currentlyTerminating.addAll(terminated.getNodeIds()); + lastTerminateTime = new DateTime(); + } } else { + Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime); + if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) { + log.makeAlert( + "It has been %d millis since last scheduled termination but nodes remain", + durSinceLastTerminate.getMillis() + ).emit(); + } + log.info( "[%s] still terminating. Wait for all nodes to terminate before trying again.", currentlyTerminating @@ -233,36 +253,38 @@ public class RemoteTaskRunner implements TaskRunner return started; } + public int getNumWorkers() + { + return zkWorkers.size(); + } + @Override public void run(Task task, TaskContext context, TaskCallback callback) { - assignTask( - new TaskWrapper( - task, context, callback, retryPolicyFactory.makeRetryPolicy() - ) + if (tasks.contains(task.getId())) { + throw new ISE("Assigned a task[%s] that already exists, WTF is happening?!", task.getId()); + } + TaskWrapper taskWrapper = new TaskWrapper( + task, context, callback, retryPolicyFactory.makeRetryPolicy() ); + tasks.put(taskWrapper.getTask().getId(), taskWrapper); + assignTask(taskWrapper); } private void assignTask(TaskWrapper taskWrapper) { - tasks.putIfAbsent(taskWrapper.getTask().getId(), taskWrapper); WorkerWrapper workerWrapper = findWorkerRunningTask(taskWrapper); // If the task already exists, we don't need to announce it if (workerWrapper != null) { final Worker worker = workerWrapper.getWorker(); try { - log.info("Worker[%s] is already running task[%s].", worker.getHost(), taskWrapper.getTask().getId()); TaskStatus taskStatus = jsonMapper.readValue( workerWrapper.getStatusCache() .getCurrentData( - JOINER.join( - config.getStatusPath(), - worker.getHost(), - taskWrapper.getTask().getId() - ) + JOINER.join(config.getStatusPath(), worker.getHost(), taskWrapper.getTask().getId()) ) .getData(), TaskStatus.class @@ -280,8 +302,9 @@ public class RemoteTaskRunner implements TaskRunner log.error(e, "Task exists, but hit exception!"); retryTask(new CleanupPaths(worker.getHost(), taskWrapper.getTask().getId()), taskWrapper); } - } else { // Announce the task - workerWrapper = getWorkerForTask(); + } else { + // Announce the task or retry if there is not enough capacity + workerWrapper = findWorkerForTask(); if (workerWrapper != null) { announceTask(workerWrapper.getWorker(), taskWrapper); } else { @@ -328,7 +351,7 @@ public class RemoteTaskRunner implements TaskRunner } } }, - retryPolicy.getAndIncrementRetryDelay(), + retryPolicy.getAndIncrementRetryDelay().getMillis(), TimeUnit.MILLISECONDS ); } @@ -347,28 +370,22 @@ public class RemoteTaskRunner implements TaskRunner final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost()); final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true); - final ConcurrentSkipListSet runningTasks = new ConcurrentSkipListSet( - Lists.transform( - statusCache.getCurrentData(), - new Function() - { - @Override - public String apply(@Nullable ChildData input) - { - try { - return jsonMapper.readValue(input.getData(), TaskStatus.class).getId(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - ) - ); final WorkerWrapper workerWrapper = new WorkerWrapper( worker, - runningTasks, - statusCache + statusCache, + new Function() + { + @Override + public String apply(@Nullable ChildData input) + { + try { + return jsonMapper.readValue(input.getData(), TaskStatus.class).getId(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } ); // Add status listener to the watcher for status changes @@ -385,10 +402,7 @@ public class RemoteTaskRunner implements TaskRunner TaskStatus taskStatus = jsonMapper.readValue( event.getData().getData(), TaskStatus.class ); - taskId = taskStatus.getId(); - - log.info("New status[%s] appeared!", taskId); - runningTasks.add(taskId); + log.info("New status[%s] appeared!", taskStatus.getId()); statusLock.notify(); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { String statusPath = event.getData().getPath(); @@ -413,13 +427,13 @@ public class RemoteTaskRunner implements TaskRunner callback.notify(taskStatus); } tasks.remove(taskId); - runningTasks.remove(taskId); - cf.delete().guaranteed().forPath(statusPath); + cf.delete().guaranteed().inBackground().forPath(statusPath); } } } } catch (Exception e) { + log.error(e, "Exception in status listener"); retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId)); } } @@ -456,25 +470,27 @@ public class RemoteTaskRunner implements TaskRunner WorkerWrapper workerWrapper = zkWorkers.get(workerId); if (workerWrapper != null) { - for (String taskId : workerWrapper.getRunningTasks()) { - TaskWrapper taskWrapper = tasks.get(taskId); - if (taskWrapper != null) { - retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId)); - } - workerWrapper.removeTask(taskId); - } - try { + Set tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks()); + tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), workerId))); + + for (String taskId : tasksToRetry) { + TaskWrapper taskWrapper = tasks.get(taskId); + if (taskWrapper != null) { + retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId)); + } + } + workerWrapper.getStatusCache().close(); } - catch (IOException e) { - log.error("Failed to close watcher associated with worker[%s]", workerWrapper.getWorker().getHost()); + catch (Exception e) { + log.error(e, "Failed to cleanly remove worker[%s]"); } } zkWorkers.remove(workerId); } - private WorkerWrapper getWorkerForTask() + private WorkerWrapper findWorkerForTask() { try { final MinMaxPriorityQueue workerQueue = MinMaxPriorityQueue.orderedBy( @@ -504,11 +520,20 @@ public class RemoteTaskRunner implements TaskRunner log.info("Worker nodes do not have capacity to run any more tasks!"); if (currentlyProvisioning.isEmpty()) { - AutoScalingData provisioned = strategy.provision(); + AutoScalingData provisioned = strategy.provision(currentlyProvisioning.size()); if (provisioned != null) { currentlyProvisioning.addAll(provisioned.getNodeIds()); + lastProvisionTime = new DateTime(); } } else { + Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime); + if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) { + log.makeAlert( + "It has been %d millis since last scheduled provision but nodes remain", + durSinceLastProvision.getMillis() + ).emit(); + } + log.info( "[%s] still provisioning. Wait for all provisioned nodes to complete before requesting new worker.", currentlyProvisioning @@ -552,8 +577,9 @@ public class RemoteTaskRunner implements TaskRunner jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext)) ); + // Syncing state with Zookeeper while (findWorkerRunningTask(taskWrapper) == null) { - statusLock.wait(); + statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis()); } } catch (Exception e) { diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RetryPolicy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RetryPolicy.java index 24ee54290bb..fbb27d76c18 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RetryPolicy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RetryPolicy.java @@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator; import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; import com.metamx.emitter.EmittingLogger; +import org.joda.time.Duration; /** */ @@ -29,29 +30,25 @@ public class RetryPolicy private static final EmittingLogger log = new EmittingLogger(RetryPolicy.class); private final long MAX_NUM_RETRIES; - private final long MAX_RETRY_DELAY_MILLIS; + private final Duration MAX_RETRY_DURATION; - private volatile long currRetryDelay; + private volatile Duration currRetryDelay; private volatile int retryCount; public RetryPolicy(RetryPolicyConfig config) { this.MAX_NUM_RETRIES = config.getMaxRetryCount(); - this.MAX_RETRY_DELAY_MILLIS = config.getRetryMaxMillis(); + this.MAX_RETRY_DURATION = config.getRetryMaxDuration(); - this.currRetryDelay = config.getRetryMinMillis(); + this.currRetryDelay = config.getRetryMinDuration(); this.retryCount = 0; } - public long getAndIncrementRetryDelay() + public Duration getAndIncrementRetryDelay() { - long retVal = currRetryDelay; - if (currRetryDelay < MAX_RETRY_DELAY_MILLIS) { - currRetryDelay *= 2; - } - + Duration retVal = new Duration(currRetryDelay); + currRetryDelay = new Duration(Math.min(currRetryDelay.getMillis() * 2, MAX_RETRY_DURATION.getMillis())); retryCount++; - return retVal; } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/WorkerWrapper.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/WorkerWrapper.java index 99e330c0a3a..68d4f0a128c 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/WorkerWrapper.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/WorkerWrapper.java @@ -19,29 +19,33 @@ package com.metamx.druid.merger.coordinator; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.metamx.druid.merger.worker.Worker; +import com.netflix.curator.framework.recipes.cache.ChildData; import com.netflix.curator.framework.recipes.cache.PathChildrenCache; import org.joda.time.DateTime; +import java.io.Closeable; import java.io.IOException; import java.util.Set; -import java.util.concurrent.ConcurrentSkipListSet; /** */ -public class WorkerWrapper +public class WorkerWrapper implements Closeable { private final Worker worker; - private final ConcurrentSkipListSet runningTasks; private final PathChildrenCache statusCache; + private final Function cacheConverter; private volatile DateTime lastCompletedTaskTime = new DateTime(); - public WorkerWrapper(Worker worker, ConcurrentSkipListSet runningTasks, PathChildrenCache statusCache) + public WorkerWrapper(Worker worker, PathChildrenCache statusCache, Function cacheConverter) { this.worker = worker; - this.runningTasks = runningTasks; this.statusCache = statusCache; + this.cacheConverter = cacheConverter; } public Worker getWorker() @@ -51,7 +55,12 @@ public class WorkerWrapper public Set getRunningTasks() { - return runningTasks; + return Sets.newHashSet( + Lists.transform( + statusCache.getCurrentData(), + cacheConverter + ) + ); } public PathChildrenCache getStatusCache() @@ -66,7 +75,7 @@ public class WorkerWrapper public boolean isAtCapacity() { - return runningTasks.size() >= worker.getCapacity(); + return statusCache.getCurrentData().size() >= worker.getCapacity(); } public void setLastCompletedTaskTime(DateTime completedTaskTime) @@ -74,11 +83,7 @@ public class WorkerWrapper lastCompletedTaskTime = completedTaskTime; } - public void removeTask(String taskId) - { - runningTasks.remove(taskId); - } - + @Override public void close() throws IOException { statusCache.close(); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java index 5118e6e7c59..00b869ea6da 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java @@ -46,5 +46,13 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig @Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion") @Default("1") - public abstract int getmaxWorkerIdleTimeMillisBeforeDeletion(); + public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion(); + + @Config("druid.indexer.maxScalingDuration") + @Default("PT1H") + public abstract Duration getMaxScalingDuration(); + + @Config("druid.indexer.taskAssignmentTimeoutDuration") + @Default("PT5M") + public abstract Duration getTaskAssignmentTimeoutDuration(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RetryPolicyConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RetryPolicyConfig.java index 044706b67ed..47c8eaf4d1a 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RetryPolicyConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RetryPolicyConfig.java @@ -19,6 +19,7 @@ package com.metamx.druid.merger.coordinator.config; +import org.joda.time.Duration; import org.skife.config.Config; import org.skife.config.Default; @@ -27,12 +28,12 @@ import org.skife.config.Default; public abstract class RetryPolicyConfig { @Config("druid.indexer.retry.minWaitMillis") - @Default("60000") // 1 minute - public abstract long getRetryMinMillis(); + @Default("PT1M") // 1 minute + public abstract Duration getRetryMinDuration(); @Config("druid.indexer.retry.maxWaitMillis") - @Default("600000") // 10 minutes - public abstract long getRetryMaxMillis(); + @Default("PT10M") // 10 minutes + public abstract Duration getRetryMaxDuration(); @Config("druid.indexer.retry.maxRetryCount") @Default("10") diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java index cd94b70d3ce..a85c3ade8fd 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java @@ -56,7 +56,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy } @Override - public AutoScalingData provision() + public AutoScalingData provision(long numUnassignedTasks) { try { log.info("Creating new instance(s)..."); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java index 0331022082c..923de463870 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java @@ -1,15 +1,8 @@ package com.metamx.druid.merger.coordinator.scaling; -import com.amazonaws.services.ec2.model.Instance; -import com.google.common.collect.MinMaxPriorityQueue; -import com.metamx.druid.merger.coordinator.WorkerWrapper; -import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.emitter.EmittingLogger; -import org.joda.time.DateTime; -import java.util.Comparator; import java.util.List; -import java.util.Map; /** * This class just logs when scaling should occur. @@ -19,7 +12,7 @@ public class NoopScalingStrategy implements ScalingStrategy private static final EmittingLogger log = new EmittingLogger(NoopScalingStrategy.class); @Override - public AutoScalingData provision() + public AutoScalingData provision(long numUnassignedTasks) { log.info("If I were a real strategy I'd create something now"); return null; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java index 9b7da8fb3a4..ec71d856301 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java @@ -25,7 +25,7 @@ import java.util.List; */ public interface ScalingStrategy { - public AutoScalingData provision(); + public AutoScalingData provision(long numUnassignedTasks); public AutoScalingData terminate(List nodeIds); } diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java b/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java index 938bda933fc..5fc49788fcd 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java @@ -47,7 +47,6 @@ public class TaskMonitor private final PathChildrenCache pathChildrenCache; private final CuratorFramework cf; - private final ObjectMapper jsonMapper; private final WorkerCuratorCoordinator workerCuratorCoordinator; private final TaskToolbox toolbox; private final ExecutorService exec; @@ -55,7 +54,6 @@ public class TaskMonitor public TaskMonitor( PathChildrenCache pathChildrenCache, CuratorFramework cf, - ObjectMapper jsonMapper, WorkerCuratorCoordinator workerCuratorCoordinator, TaskToolbox toolbox, ExecutorService exec @@ -63,7 +61,6 @@ public class TaskMonitor { this.pathChildrenCache = pathChildrenCache; this.cf = cf; - this.jsonMapper = jsonMapper; this.workerCuratorCoordinator = workerCuratorCoordinator; this.toolbox = toolbox; this.exec = exec; @@ -87,7 +84,7 @@ public class TaskMonitor throws Exception { if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { - final TaskHolder taskHolder = jsonMapper.readValue( + final TaskHolder taskHolder = toolbox.getObjectMapper().readValue( cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()), TaskHolder.class ); diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index 210a55bcf5f..cc30b914367 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -287,8 +287,8 @@ public class WorkerNode if (taskToolbox == null) { final RestS3Service s3Client = new RestS3Service( new AWSCredentials( - props.getProperty("com.metamx.aws.accessKey"), - props.getProperty("com.metamx.aws.secretKey") + PropUtils.getProperty(props, "com.metamx.aws.accessKey"), + PropUtils.getProperty(props, "com.metamx.aws.secretKey") ) ); final SegmentPusher segmentPusher = new S3SegmentPusher( @@ -334,7 +334,6 @@ public class WorkerNode taskMonitor = new TaskMonitor( pathChildrenCache, curatorFramework, - jsonMapper, workerCuratorCoordinator, taskToolbox, workerExec diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java new file mode 100644 index 00000000000..e219c2984a0 --- /dev/null +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -0,0 +1,433 @@ +package com.metamx.druid.merger.coordinator; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.merger.common.TaskStatus; +import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.config.IndexerZkConfig; +import com.metamx.druid.merger.common.task.DefaultMergeTask; +import com.metamx.druid.merger.common.task.Task; +import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; +import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; +import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; +import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; +import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; +import com.metamx.druid.merger.worker.TaskMonitor; +import com.metamx.druid.merger.worker.Worker; +import com.metamx.druid.merger.worker.WorkerCuratorCoordinator; +import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.framework.CuratorFrameworkFactory; +import com.netflix.curator.framework.recipes.cache.PathChildrenCache; +import com.netflix.curator.retry.ExponentialBackoffRetry; +import com.netflix.curator.test.TestingCluster; +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.zookeeper.CreateMode; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.annotate.JsonTypeName; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.jsontype.NamedType; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +/** + */ +public class RemoteTaskRunnerTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private static final String basePath = "/test/druid/indexer"; + private static final String announcementsPath = String.format("%s/announcements", basePath); + private static final String tasksPath = String.format("%s/tasks", basePath); + private static final String statusPath = String.format("%s/status", basePath); + + private TestingCluster testingCluster; + private CuratorFramework cf; + private PathChildrenCache pathChildrenCache; + private RemoteTaskRunner remoteTaskRunner; + private TaskMonitor taskMonitor; + + private ScheduledExecutorService scheduledExec; + + private Task task1; + + private Worker worker1; + + + @Before + public void setUp() throws Exception + { + testingCluster = new TestingCluster(1); + testingCluster.start(); + + cf = CuratorFrameworkFactory.builder() + .connectString(testingCluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1, 10)) + .build(); + cf.start(); + + cf.create().creatingParentsIfNeeded().forPath(announcementsPath); + cf.create().forPath(tasksPath); + cf.create().forPath(String.format("%s/worker1", tasksPath)); + cf.create().forPath(statusPath); + cf.create().forPath(String.format("%s/worker1", statusPath)); + + pathChildrenCache = new PathChildrenCache(cf, announcementsPath, true); + + worker1 = new Worker( + "worker1", + "localhost", + 3, + "0" + ); + + makeRemoteTaskRunner(); + makeTaskMonitor(); + } + + @After + public void tearDown() throws Exception + { + testingCluster.stop(); + remoteTaskRunner.stop(); + taskMonitor.stop(); + } + + @Test + public void testRunNoExistingTask() throws Exception + { + remoteTaskRunner.run( + task1, + new TaskContext(new DateTime().toString(), Sets.newHashSet()), + null + ); + } + + @Test + public void testRunWithExistingCompletedTask() throws Exception + { + cf.create().creatingParentsIfNeeded().forPath( + String.format("%s/worker1/task1", statusPath), + jsonMapper.writeValueAsBytes( + TaskStatus.success( + "task1", + Lists.newArrayList() + ) + ) + ); + + // Really don't like this way of waiting for the task to appear + while (remoteTaskRunner.getNumWorkers() == 0) { + Thread.sleep(500); + } + + final MutableBoolean callbackCalled = new MutableBoolean(false); + remoteTaskRunner.run( + task1, + null, + new TaskCallback() + { + @Override + public void notify(TaskStatus status) + { + callbackCalled.setValue(true); + } + } + ); + + Assert.assertTrue("TaskCallback was not called!", callbackCalled.booleanValue()); + } + + private void makeTaskMonitor() throws Exception + { + WorkerCuratorCoordinator workerCuratorCoordinator = new WorkerCuratorCoordinator( + jsonMapper, + new IndexerZkConfig() + { + @Override + public String getAnnouncementPath() + { + return announcementsPath; + } + + @Override + public String getTaskPath() + { + return tasksPath; + } + + @Override + public String getStatusPath() + { + return statusPath; + } + }, + cf, + worker1 + ); + workerCuratorCoordinator.start(); + + taskMonitor = new TaskMonitor( + new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true), + cf, + workerCuratorCoordinator, + new TaskToolbox( + new IndexerCoordinatorConfig() + { + @Override + public String getServerName() + { + return "worker1"; + } + + @Override + public String getLeaderLatchPath() + { + return null; + } + + @Override + public int getNumLocalThreads() + { + return 1; + } + + @Override + public String getRunnerImpl() + { + return null; + } + + @Override + public String getStorageImpl() + { + return null; + } + + @Override + public File getBaseTaskDir() + { + try { + return File.createTempFile("billy", "yay"); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public boolean isWhitelistEnabled() + { + return false; + } + + @Override + public String getWhitelistDatasourcesString() + { + return null; + } + + @Override + public long getRowFlushBoundary() + { + return 0; + } + }, null, null, null, jsonMapper + ), + Executors.newSingleThreadExecutor() + ); + jsonMapper.registerSubtypes(new NamedType(TestTask.class, "test")); + taskMonitor.start(); + } + + private void makeRemoteTaskRunner() throws Exception + { + scheduledExec = EasyMock.createMock(ScheduledExecutorService.class); + + remoteTaskRunner = new RemoteTaskRunner( + jsonMapper, + new TestRemoteTaskRunnerConfig(), + cf, + pathChildrenCache, + scheduledExec, + new RetryPolicyFactory(new TestRetryPolicyConfig()), + new TestScalingStrategy() + ); + + task1 = new TestTask( + "task1", + "dummyDs", + Lists.newArrayList( + new DataSegment( + "dummyDs", + new Interval(new DateTime(), new DateTime()), + new DateTime().toString(), + null, + null, + null, + null, + 0 + ) + ), Lists.newArrayList() + ); + + // Create a single worker and wait for things for be ready + remoteTaskRunner.start(); + cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( + String.format("%s/worker1", announcementsPath), + jsonMapper.writeValueAsBytes(worker1) + ); + while (remoteTaskRunner.getNumWorkers() == 0) { + Thread.sleep(500); + } + } + + private static class TestRetryPolicyConfig extends RetryPolicyConfig + { + @Override + public Duration getRetryMinDuration() + { + return null; + } + + @Override + public Duration getRetryMaxDuration() + { + return null; + } + + @Override + public long getMaxRetryCount() + { + return 0; + } + } + + private static class TestScalingStrategy implements ScalingStrategy + { + @Override + public AutoScalingData provision(long numUnassignedTasks) + { + return null; + } + + @Override + public AutoScalingData terminate(List nodeIds) + { + return null; + } + } + + private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig + { + @Override + public Duration getTerminateResourcesDuration() + { + return null; + } + + @Override + public DateTime getTerminateResourcesOriginDateTime() + { + return null; + } + + @Override + public String getMinWorkerVersion() + { + return "0"; + } + + @Override + public int getMinNumWorkers() + { + return 0; + } + + @Override + public int getMaxWorkerIdleTimeMillisBeforeDeletion() + { + return 0; + } + + @Override + public Duration getMaxScalingDuration() + { + return null; + } + + @Override + public String getAnnouncementPath() + { + return announcementsPath; + } + + @Override + public String getTaskPath() + { + return tasksPath; + } + + @Override + public String getStatusPath() + { + return statusPath; + } + + @Override + public Duration getTaskAssignmentTimeoutDuration() + { + return new Duration(60000); + } + } + + @JsonTypeName("test") + private static class TestTask extends DefaultMergeTask + { + private final String id; + + public TestTask( + @JsonProperty("id") String id, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("segments") List segments, + @JsonProperty("aggregations") List aggregators + ) + { + super(dataSource, segments, aggregators); + + this.id = id; + } + + @Override + @JsonProperty + public String getId() + { + return id; + } + + @Override + public Type getType() + { + return Type.TEST; + } + + @Override + public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception + { + return TaskStatus.success("task1", Lists.newArrayList()); + } + } +} diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RetryPolicyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RetryPolicyTest.java new file mode 100644 index 00000000000..5445c05e7dd --- /dev/null +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RetryPolicyTest.java @@ -0,0 +1,45 @@ +package com.metamx.druid.merger.coordinator; + +import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; +import junit.framework.Assert; +import org.joda.time.Duration; +import org.junit.Test; + +/** + */ +public class RetryPolicyTest +{ + @Test + public void testGetAndIncrementRetryDelay() throws Exception + { + RetryPolicy retryPolicy = new RetryPolicy( + new RetryPolicyConfig() + { + @Override + public Duration getRetryMinDuration() + { + return new Duration("PT1S"); + } + + @Override + public Duration getRetryMaxDuration() + { + return new Duration("PT10S"); + } + + @Override + public long getMaxRetryCount() + { + return 10; + } + } + ); + + Assert.assertEquals(new Duration("PT1S"), retryPolicy.getAndIncrementRetryDelay()); + Assert.assertEquals(new Duration("PT2S"), retryPolicy.getAndIncrementRetryDelay()); + Assert.assertEquals(new Duration("PT4S"), retryPolicy.getAndIncrementRetryDelay()); + Assert.assertEquals(new Duration("PT8S"), retryPolicy.getAndIncrementRetryDelay()); + Assert.assertEquals(new Duration("PT10S"), retryPolicy.getAndIncrementRetryDelay()); + Assert.assertEquals(new Duration("PT10S"), retryPolicy.getAndIncrementRetryDelay()); + } +} diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java index f5d682ba781..11a837196cc 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java @@ -27,12 +27,8 @@ import com.amazonaws.services.ec2.model.Reservation; import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; -import com.google.common.collect.Maps; -import com.metamx.druid.merger.coordinator.WorkerWrapper; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; -import com.metamx.druid.merger.worker.Worker; import org.easymock.EasyMock; -import org.joda.time.DateTime; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -40,8 +36,6 @@ import org.junit.Test; import java.util.Arrays; import java.util.Date; -import java.util.Map; -import java.util.concurrent.ConcurrentSkipListSet; /** */ @@ -56,7 +50,6 @@ public class EC2AutoScalingStrategyTest private DescribeInstancesResult describeInstancesResult; private Reservation reservation; private Instance instance; - private WorkerWrapper worker; private EC2AutoScalingStrategy strategy; @Before @@ -73,12 +66,6 @@ public class EC2AutoScalingStrategyTest .withImageId(AMI_ID) .withPrivateIpAddress(IP); - worker = new WorkerWrapper( - new Worker("dummyHost", IP, 2, "0"), - new ConcurrentSkipListSet(), - null - ); - worker.setLastCompletedTaskTime(new DateTime(0)); strategy = new EC2AutoScalingStrategy( amazonEC2Client, new EC2AutoScalingStrategyConfig() { @@ -145,23 +132,12 @@ public class EC2AutoScalingStrategyTest EasyMock.expect(reservation.getInstances()).andReturn(Arrays.asList(instance)).atLeastOnce(); EasyMock.replay(reservation); - worker.getRunningTasks().add("task1"); - - Assert.assertFalse(worker.isAtCapacity()); - - worker.getRunningTasks().add("task2"); - - Assert.assertTrue(worker.isAtCapacity()); - - AutoScalingData created = strategy.provision(); + AutoScalingData created = strategy.provision(0); Assert.assertEquals(created.getNodeIds().size(), 1); Assert.assertEquals(created.getNodes().size(), 1); Assert.assertEquals(String.format("%s:8080", IP), created.getNodeIds().get(0)); - worker.getRunningTasks().remove("task1"); - worker.getRunningTasks().remove("task2"); - AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyHost")); Assert.assertEquals(deleted.getNodeIds().size(), 1); diff --git a/pom.xml b/pom.xml index cbeb399f8ed..8a4b8b3d650 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ --> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 com.metamx druid @@ -131,6 +131,11 @@ curator-x-discovery 1.2.2 + + com.netflix.curator + curator-test + 1.2.2 + it.uniroma3.mat extendedset