From af9ea08041b58895a66b8a7dc3d76b54ee92828d Mon Sep 17 00:00:00 2001 From: nishant Date: Tue, 26 May 2015 21:58:38 +0530 Subject: [PATCH] fix race described in 1360 review comments review comments review comments no need to remove fix test review comments --- .../indexing/overlord/RemoteTaskRunner.java | 173 ++++++++++----- .../ResourceManagementScheduler.java | 6 +- .../ResourceManagementStrategy.java | 5 +- .../SimpleResourceManagementStrategy.java | 49 ++--- .../overlord/RemoteTaskRunnerTest.java | 83 +++++++- .../SimpleResourceManagementStrategyTest.java | 198 +++++++++++------- 6 files changed, 340 insertions(+), 174 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 08a71d2808c..b8f259cb26a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -22,6 +22,7 @@ import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -70,7 +71,9 @@ import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -124,6 +127,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor(); + // Workers that have been marked as lazy. these workers are not running any tasks and can be terminated safely by the scaling policy. + private final ConcurrentMap lazyWorkers = new ConcurrentHashMap<>(); + + private final Object statusLock = new Object(); private volatile boolean started = false; @@ -545,7 +552,16 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer config, ImmutableMap.copyOf( Maps.transformEntries( - zkWorkers, + Maps.filterEntries( + zkWorkers, new Predicate>() + { + @Override + public boolean apply(Map.Entry input) + { + return !lazyWorkers.containsKey(input.getKey()); + } + } + ), new Maps.EntryTransformer() { @Override @@ -562,8 +578,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer ); if (immutableZkWorker.isPresent()) { final ZkWorker zkWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost()); - announceTask(task, zkWorker, taskRunnerWorkItem); - return true; + return announceTask(task, zkWorker, taskRunnerWorkItem); } else { log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); return false; @@ -577,58 +592,63 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer * * @param theZkWorker The worker the task is assigned to * @param taskRunnerWorkItem The task to be assigned + * + * @return boolean indicating whether the task was successfully assigned or not */ - private void announceTask( + private boolean announceTask( final Task task, final ZkWorker theZkWorker, final RemoteTaskRunnerWorkItem taskRunnerWorkItem ) throws Exception { Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id"); - final Worker theWorker = theZkWorker.getWorker(); - - log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId()); - - byte[] rawBytes = jsonMapper.writeValueAsBytes(task); - if (rawBytes.length > config.getMaxZnodeBytes()) { - throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes()); - } - - String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), theWorker.getHost(), task.getId()); - - if (cf.checkExists().forPath(taskPath) == null) { - cf.create() - .withMode(CreateMode.EPHEMERAL) - .forPath( - taskPath, rawBytes - ); - } - - RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId()); - if (workItem == null) { - log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!") - .addData("taskId", task.getId()) - .emit(); - return; - } - - RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theWorker); - runningTasks.put(task.getId(), newWorkItem); - log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost()); - - // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running - // on a worker - this avoids overflowing a worker with tasks - Stopwatch timeoutStopwatch = Stopwatch.createUnstarted(); - timeoutStopwatch.start(); + final String worker = theZkWorker.getWorker().getHost(); synchronized (statusLock) { - while (!isWorkerRunningTask(theWorker, task.getId())) { + if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) { + // the worker might got killed or has been marked as lazy. + log.info("Not assigning task to already removed worker[%s]", worker); + return false; + } + log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId()); + + byte[] rawBytes = jsonMapper.writeValueAsBytes(task); + if (rawBytes.length > config.getMaxZnodeBytes()) { + throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes()); + } + + String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()); + + if (cf.checkExists().forPath(taskPath) == null) { + cf.create() + .withMode(CreateMode.EPHEMERAL) + .forPath( + taskPath, rawBytes + ); + } + + RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId()); + if (workItem == null) { + log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!") + .addData("taskId", task.getId()) + .emit(); + return false; + } + + RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker()); + runningTasks.put(task.getId(), newWorkItem); + log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost()); + + // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running + // on a worker - this avoids overflowing a worker with tasks + Stopwatch timeoutStopwatch = Stopwatch.createStarted(); + while (!isWorkerRunningTask(theZkWorker.getWorker(), task.getId())) { final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis(); statusLock.wait(waitMs); long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS); if (elapsed >= waitMs) { log.error( "Something went wrong! [%s] never ran task [%s]! Timeout: (%s >= %s)!", - theWorker.getHost(), + worker, task.getId(), elapsed, config.getTaskAssignmentTimeout() @@ -637,6 +657,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer break; } } + return true; } } @@ -798,22 +819,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer final ZkWorker zkWorker = zkWorkers.get(worker.getHost()); if (zkWorker != null) { try { - List tasksToFail = Lists.newArrayList( - cf.getChildren().forPath(JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost())) - ); - log.info("[%s]: Found %d tasks assigned", worker.getHost(), tasksToFail.size()); - - for (Map.Entry entry : runningTasks.entrySet()) { - if (entry.getValue() == null) { - log.error("Huh? null work item for [%s]", entry.getKey()); - } else if (entry.getValue().getWorker() == null) { - log.error("Huh? no worker for [%s]", entry.getKey()); - } else if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) { - log.info("[%s]: Found [%s] running", worker.getHost(), entry.getKey()); - tasksToFail.add(entry.getKey()); - } - } + List tasksToFail = getAssignedTasks(worker); for (String assignedTask : tasksToFail) { RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask); if (taskRunnerWorkItem != null) { @@ -842,6 +849,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer zkWorkers.remove(worker.getHost()); } } + lazyWorkers.remove(worker.getHost()); } private void taskComplete( @@ -872,4 +880,57 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer // Notify interested parties taskRunnerWorkItem.setResult(taskStatus); } + + public List markWokersLazy(Predicate isLazyWorker, int maxWorkers) + { + // status lock is used to prevent any tasks being assigned to the worker while we mark it lazy + synchronized (statusLock) { + Iterator iterator = zkWorkers.keySet().iterator(); + while (iterator.hasNext()) { + String worker = iterator.next(); + ZkWorker zkWorker = zkWorkers.get(worker); + try { + if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker)) { + log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost()); + lazyWorkers.put(worker, zkWorker); + if (lazyWorkers.size() == maxWorkers) { + // only mark excess workers as lazy and allow their cleanup + break; + } + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + return ImmutableList.copyOf(lazyWorkers.values()); + } + } + + private List getAssignedTasks(Worker worker) throws Exception + { + List assignedTasks = Lists.newArrayList( + cf.getChildren().forPath(JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost())) + ); + + for (Map.Entry entry : runningTasks.entrySet()) { + if (entry.getValue() == null) { + log.error( + "Huh? null work item for [%s]", entry.getKey() + ); + } else if (entry.getValue().getWorker() == null) { + log.error("Huh? no worker for [%s]", entry.getKey()); + } else if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) { + log.info("[%s]: Found [%s] running", worker.getHost(), entry.getKey()); + assignedTasks.add(entry.getKey()); + } + } + log.info("[%s]: Found %d tasks assigned", worker.getHost(), assignedTasks.size()); + return assignedTasks; + } + + public List getLazyWorkers() + { + return ImmutableList.copyOf(lazyWorkers.values()); + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementScheduler.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementScheduler.java index 3c101761248..15a75f5c988 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementScheduler.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementScheduler.java @@ -32,8 +32,6 @@ import java.util.concurrent.ScheduledExecutorService; /** * The ResourceManagementScheduler schedules a check for when worker nodes should potentially be created or destroyed. - * It uses a {@link TaskRunner} to return all pending tasks in the system and the status of the worker nodes in - * the system. * The ResourceManagementScheduler does not contain the logic to decide whether provision or termination should actually * occur. That decision is made in the {@link ResourceManagementStrategy}. */ @@ -80,7 +78,7 @@ public class ResourceManagementScheduler @Override public void run() { - resourceManagementStrategy.doProvision(taskRunner.getPendingTasks(), taskRunner.getWorkers()); + resourceManagementStrategy.doProvision(taskRunner); } } ); @@ -99,7 +97,7 @@ public class ResourceManagementScheduler @Override public void run() { - resourceManagementStrategy.doTerminate(taskRunner.getPendingTasks(), taskRunner.getWorkers()); + resourceManagementStrategy.doTerminate(taskRunner); } } ); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementStrategy.java index ff126b70b86..7a7e23d047c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementStrategy.java @@ -17,6 +17,7 @@ package io.druid.indexing.overlord.autoscaling; +import io.druid.indexing.overlord.RemoteTaskRunner; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.ZkWorker; @@ -28,9 +29,9 @@ import java.util.Collection; */ public interface ResourceManagementStrategy { - public boolean doProvision(Collection runningTasks, Collection zkWorkers); + public boolean doProvision(RemoteTaskRunner runner); - public boolean doTerminate(Collection runningTasks, Collection zkWorkers); + public boolean doTerminate(RemoteTaskRunner runner); public ScalingStats getStats(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java index faa57cdce2c..cc2cb953f6e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java @@ -22,13 +22,13 @@ import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.Collections2; -import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.emitter.EmittingLogger; +import io.druid.indexing.overlord.RemoteTaskRunner; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.ZkWorker; @@ -70,8 +70,10 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } @Override - public boolean doProvision(Collection pendingTasks, Collection zkWorkers) + public boolean doProvision(RemoteTaskRunner runner) { + Collection pendingTasks = runner.getPendingTasks(); + Collection zkWorkers = runner.getWorkers(); synchronized (lock) { boolean didProvision = false; final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); @@ -137,8 +139,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } @Override - public boolean doTerminate(Collection pendingTasks, Collection zkWorkers) + public boolean doTerminate(RemoteTaskRunner runner) { + Collection pendingTasks = runner.getPendingTasks(); synchronized (lock) { final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); if (workerConfig == null) { @@ -151,7 +154,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat workerConfig.getAutoScaler().ipToIdLookup( Lists.newArrayList( Iterables.transform( - zkWorkers, + runner.getLazyWorkers(), new Function() { @Override @@ -174,28 +177,26 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat currentlyTerminating.clear(); currentlyTerminating.addAll(stillExisting); - updateTargetWorkerCount(workerConfig, pendingTasks, zkWorkers); + Collection workers = runner.getWorkers(); + updateTargetWorkerCount(workerConfig, pendingTasks, workers); - final Predicate isLazyWorker = createLazyWorkerPredicate(config); if (currentlyTerminating.isEmpty()) { - final int excessWorkers = (zkWorkers.size() + currentlyProvisioning.size()) - targetWorkerCount; - if (excessWorkers > 0) { - final List laziestWorkerIps = - FluentIterable.from(zkWorkers) - .filter(isLazyWorker) - .limit(excessWorkers) - .transform( - new Function() - { - @Override - public String apply(ZkWorker zkWorker) - { - return zkWorker.getWorker().getIp(); - } - } - ) - .toList(); + final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount; + if (excessWorkers > 0) { + final Predicate isLazyWorker = createLazyWorkerPredicate(config); + final List laziestWorkerIps = + Lists.transform( + runner.markWokersLazy(isLazyWorker, excessWorkers), + new Function() + { + @Override + public String apply(ZkWorker zkWorker) + { + return zkWorker.getWorker().getIp(); + } + } + ); if (laziestWorkerIps.isEmpty()) { log.info("Wanted to terminate %,d workers, but couldn't find any lazy ones!", excessWorkers); } else { @@ -253,7 +254,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat { final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis() >= config.getWorkerIdleTimeout().toStandardDuration().getMillis(); - return worker.getRunningTasks().isEmpty() && (itHasBeenAWhile || !isValidWorker.apply(worker)); + return itHasBeenAWhile || !isValidWorker.apply(worker); } }; } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 19314a2192c..55050ad11fa 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -20,6 +20,7 @@ package io.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Joiner; +import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -55,6 +56,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Collection; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -396,14 +398,16 @@ public class RemoteTaskRunnerTest remoteTaskRunner = new RemoteTaskRunner( jsonMapper, config, - new IndexerZkConfig(new ZkPathsConfig() - { - @Override - public String getBase() - { - return basePath; - } - },null,null,null,null,null), + new IndexerZkConfig( + new ZkPathsConfig() + { + @Override + public String getBase() + { + return basePath; + } + }, null, null, null, null, null + ), cf, new SimplePathChildrenCacheFactory.Builder().build(), null, @@ -492,4 +496,67 @@ public class RemoteTaskRunnerTest TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.success(task.getId())); cf.setData().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement)); } + + @Test + public void testFindLazyWorkerTaskRunning() throws Exception + { + doSetup(); + remoteTaskRunner.start(); + remoteTaskRunner.run(task); + Assert.assertTrue(taskAnnounced(task.getId())); + mockWorkerRunningTask(task); + Collection lazyworkers = remoteTaskRunner.markWokersLazy( + new Predicate() + { + @Override + public boolean apply(ZkWorker input) + { + return true; + } + }, 1 + ); + Assert.assertTrue(lazyworkers.isEmpty()); + Assert.assertTrue(remoteTaskRunner.getLazyWorkers().isEmpty()); + Assert.assertEquals(1, remoteTaskRunner.getWorkers().size()); + } + + @Test + public void testFindLazyWorkerForWorkerJustAssignedTask() throws Exception + { + doSetup(); + remoteTaskRunner.run(task); + Assert.assertTrue(taskAnnounced(task.getId())); + Collection lazyworkers = remoteTaskRunner.markWokersLazy( + new Predicate() + { + @Override + public boolean apply(ZkWorker input) + { + return true; + } + }, 1 + ); + Assert.assertTrue(lazyworkers.isEmpty()); + Assert.assertTrue(remoteTaskRunner.getLazyWorkers().isEmpty()); + Assert.assertEquals(1, remoteTaskRunner.getWorkers().size()); + } + + @Test + public void testFindLazyWorkerNotRunningAnyTask() throws Exception + { + doSetup(); + Collection lazyworkers = remoteTaskRunner.markWokersLazy( + new Predicate() + { + @Override + public boolean apply(ZkWorker input) + { + return true; + } + }, 1 + ); + Assert.assertEquals(1, lazyworkers.size()); + Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size()); + } + } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java index fc1fe34a8ca..f9a5fb96648 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java @@ -17,6 +17,7 @@ package io.druid.indexing.overlord.autoscaling; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -28,6 +29,7 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.RemoteTaskRunner; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.ZkWorker; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; @@ -46,6 +48,7 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -117,16 +120,21 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.provision()).andReturn( new AutoScalingData(Lists.newArrayList("aNode")) ); - EasyMock.replay(autoScaler); - - boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList( + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTasks()).andReturn( + Arrays.asList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) - ), + ) + ); + EasyMock.expect(runner.getWorkers()).andReturn( Arrays.asList( new TestZkWorker(testTask) ) ); + EasyMock.replay(runner); + EasyMock.replay(autoScaler); + + boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); Assert.assertTrue(provisionedSomething); Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); @@ -135,6 +143,7 @@ public class SimpleResourceManagementStrategyTest ); EasyMock.verify(autoScaler); + EasyMock.verify(runner); } @Test @@ -147,16 +156,21 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.provision()).andReturn( new AutoScalingData(Lists.newArrayList("fake")) ); - EasyMock.replay(autoScaler); - - boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList( + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTasks()).andReturn( + Arrays.asList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) - ), + ) + ).times(2); + EasyMock.expect(runner.getWorkers()).andReturn( Arrays.asList( new TestZkWorker(testTask) ) - ); + ).times(2); + EasyMock.replay(runner); + EasyMock.replay(autoScaler); + + boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); Assert.assertTrue(provisionedSomething); Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); @@ -165,14 +179,7 @@ public class SimpleResourceManagementStrategyTest simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION ); - provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) - ), - Arrays.asList( - new TestZkWorker(testTask) - ) - ); + provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); Assert.assertFalse(provisionedSomething); Assert.assertTrue( @@ -184,6 +191,7 @@ public class SimpleResourceManagementStrategyTest ); EasyMock.verify(autoScaler); + EasyMock.verify(runner); } @Test @@ -205,15 +213,20 @@ public class SimpleResourceManagementStrategyTest new AutoScalingData(Lists.newArrayList("fake")) ); EasyMock.replay(autoScaler); - - boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList( + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTasks()).andReturn( + Arrays.asList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) - ), + ) + ).times(2); + EasyMock.expect(runner.getWorkers()).andReturn( Arrays.asList( new TestZkWorker(testTask) ) - ); + ).times(2); + EasyMock.replay(runner); + + boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); Assert.assertTrue(provisionedSomething); Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); @@ -224,14 +237,7 @@ public class SimpleResourceManagementStrategyTest Thread.sleep(2000); - provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) - ), - Arrays.asList( - new TestZkWorker(testTask) - ) - ); + provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); Assert.assertFalse(provisionedSomething); Assert.assertTrue( @@ -244,6 +250,7 @@ public class SimpleResourceManagementStrategyTest EasyMock.verify(autoScaler); EasyMock.verify(emitter); + EasyMock.verify(runner); } @Test @@ -257,15 +264,26 @@ public class SimpleResourceManagementStrategyTest new AutoScalingData(Lists.newArrayList()) ); EasyMock.replay(autoScaler); - - boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList( + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTasks()).andReturn( + Arrays.asList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) - ), + ) + ).times(2); + EasyMock.expect(runner.getWorkers()).andReturn( Arrays.asList( - new TestZkWorker(null) + new TestZkWorker(testTask) + ) + ).times(2); + EasyMock.expect(runner.markWokersLazy((Predicate) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( + Arrays.asList( + new TestZkWorker(testTask) ) ); + EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); + EasyMock.replay(runner); + + boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner); Assert.assertTrue(terminatedSomething); Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); @@ -288,14 +306,26 @@ public class SimpleResourceManagementStrategyTest ); EasyMock.replay(autoScaler); - boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList( + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTasks()).andReturn( + Arrays.asList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) - ), + ) + ).times(2); + EasyMock.expect(runner.getWorkers()).andReturn( Arrays.asList( - new TestZkWorker(null) + new TestZkWorker(testTask) + ) + ).times(2); + EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()).times(2); + EasyMock.expect(runner.markWokersLazy((Predicate) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( + Arrays.asList( + new TestZkWorker(testTask) ) ); + EasyMock.replay(runner); + + boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner); Assert.assertTrue(terminatedSomething); Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); @@ -303,14 +333,7 @@ public class SimpleResourceManagementStrategyTest simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE ); - terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) - ), - Arrays.asList( - new TestZkWorker(null) - ) - ); + terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner); Assert.assertFalse(terminatedSomething); Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); @@ -319,6 +342,7 @@ public class SimpleResourceManagementStrategyTest ); EasyMock.verify(autoScaler); + EasyMock.verify(runner); } @Test @@ -331,15 +355,25 @@ public class SimpleResourceManagementStrategyTest .andReturn(Lists.newArrayList("ip")); EasyMock.replay(autoScaler); - boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList( + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTasks()).andReturn( + Arrays.asList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) - ), + ) + ).times(2); + EasyMock.expect(runner.getWorkers()).andReturn( Arrays.asList( new TestZkWorker(NoopTask.create()), new TestZkWorker(NoopTask.create()) ) + ).times(2); + EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); + EasyMock.expect(runner.markWokersLazy((Predicate) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( + Collections.emptyList() ); + EasyMock.replay(runner); + + boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner); Assert.assertFalse(terminatedSomething); EasyMock.verify(autoScaler); @@ -351,18 +385,11 @@ public class SimpleResourceManagementStrategyTest .andReturn(Lists.newArrayList("ip")); EasyMock.replay(autoScaler); - boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) - ), - Arrays.asList( - new TestZkWorker(NoopTask.create()), - new TestZkWorker(NoopTask.create()) - ) - ); + boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); Assert.assertFalse(provisionedSomething); EasyMock.verify(autoScaler); + EasyMock.verify(runner); } @Test @@ -375,11 +402,23 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList("ip")); EasyMock.replay(autoScaler); - boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList(), + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTasks()).andReturn( + Arrays.asList() + ).times(3); + EasyMock.expect(runner.getWorkers()).andReturn( Arrays.asList( new TestZkWorker(NoopTask.create(), "h1", "i1", "0") ) + ).times(3); + EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); + EasyMock.expect(runner.markWokersLazy((Predicate) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( + Collections.emptyList() + ); + EasyMock.replay(runner); + + boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( + runner ); Assert.assertFalse(terminatedSomething); EasyMock.verify(autoScaler); @@ -392,10 +431,7 @@ public class SimpleResourceManagementStrategyTest .andReturn(Lists.newArrayList("ip")); EasyMock.replay(autoScaler); boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList(), - Arrays.asList( - new TestZkWorker(NoopTask.create()) - ) + runner ); Assert.assertFalse(provisionedSomething); EasyMock.verify(autoScaler); @@ -415,13 +451,11 @@ public class SimpleResourceManagementStrategyTest ); EasyMock.replay(autoScaler); provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList(), - Arrays.asList( - new TestZkWorker(NoopTask.create(), "h1", "i1", "0") - ) + runner ); Assert.assertTrue(provisionedSomething); EasyMock.verify(autoScaler); + EasyMock.verify(runner); } @Test @@ -430,28 +464,32 @@ public class SimpleResourceManagementStrategyTest workerConfig.set(null); EasyMock.replay(autoScaler); - boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList( + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTasks()).andReturn( + Arrays.asList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) - ), + ) + ).times(2); + EasyMock.expect(runner.getWorkers()).andReturn( Arrays.asList( new TestZkWorker(null) ) + ).times(1); + EasyMock.replay(runner); + + boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( + runner ); boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) - ), - Arrays.asList( - new TestZkWorker(null) - ) + runner ); Assert.assertFalse(terminatedSomething); Assert.assertFalse(provisionedSomething); EasyMock.verify(autoScaler); + EasyMock.verify(runner); } private static class TestZkWorker extends ZkWorker