fix race described in 1360

review comments

review comments

review comments

no need to remove

fix test

review comments
This commit is contained in:
nishant 2015-05-26 21:58:38 +05:30
parent 6ae4ecc7d4
commit af9ea08041
6 changed files with 340 additions and 174 deletions

View File

@ -22,6 +22,7 @@ import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
@ -70,7 +71,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@ -124,6 +127,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor();
// Workers that have been marked as lazy. these workers are not running any tasks and can be terminated safely by the scaling policy.
private final ConcurrentMap<String, ZkWorker> lazyWorkers = new ConcurrentHashMap<>();
private final Object statusLock = new Object();
private volatile boolean started = false;
@ -545,7 +552,16 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
config,
ImmutableMap.copyOf(
Maps.transformEntries(
zkWorkers,
Maps.filterEntries(
zkWorkers, new Predicate<Map.Entry<String, ZkWorker>>()
{
@Override
public boolean apply(Map.Entry<String, ZkWorker> input)
{
return !lazyWorkers.containsKey(input.getKey());
}
}
),
new Maps.EntryTransformer<String, ZkWorker, ImmutableZkWorker>()
{
@Override
@ -562,8 +578,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
);
if (immutableZkWorker.isPresent()) {
final ZkWorker zkWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost());
announceTask(task, zkWorker, taskRunnerWorkItem);
return true;
return announceTask(task, zkWorker, taskRunnerWorkItem);
} else {
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return false;
@ -577,24 +592,31 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
*
* @param theZkWorker The worker the task is assigned to
* @param taskRunnerWorkItem The task to be assigned
*
* @return boolean indicating whether the task was successfully assigned or not
*/
private void announceTask(
private boolean announceTask(
final Task task,
final ZkWorker theZkWorker,
final RemoteTaskRunnerWorkItem taskRunnerWorkItem
) throws Exception
{
Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");
final Worker theWorker = theZkWorker.getWorker();
log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId());
final String worker = theZkWorker.getWorker().getHost();
synchronized (statusLock) {
if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) {
// the worker might got killed or has been marked as lazy.
log.info("Not assigning task to already removed worker[%s]", worker);
return false;
}
log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId());
byte[] rawBytes = jsonMapper.writeValueAsBytes(task);
if (rawBytes.length > config.getMaxZnodeBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes());
}
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), theWorker.getHost(), task.getId());
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId());
if (cf.checkExists().forPath(taskPath) == null) {
cf.create()
@ -609,26 +631,24 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!")
.addData("taskId", task.getId())
.emit();
return;
return false;
}
RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theWorker);
RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker());
runningTasks.put(task.getId(), newWorkItem);
log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost());
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks
Stopwatch timeoutStopwatch = Stopwatch.createUnstarted();
timeoutStopwatch.start();
synchronized (statusLock) {
while (!isWorkerRunningTask(theWorker, task.getId())) {
Stopwatch timeoutStopwatch = Stopwatch.createStarted();
while (!isWorkerRunningTask(theZkWorker.getWorker(), task.getId())) {
final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
statusLock.wait(waitMs);
long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
if (elapsed >= waitMs) {
log.error(
"Something went wrong! [%s] never ran task [%s]! Timeout: (%s >= %s)!",
theWorker.getHost(),
worker,
task.getId(),
elapsed,
config.getTaskAssignmentTimeout()
@ -637,6 +657,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
break;
}
}
return true;
}
}
@ -798,22 +819,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
try {
List<String> tasksToFail = Lists.newArrayList(
cf.getChildren().forPath(JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost()))
);
log.info("[%s]: Found %d tasks assigned", worker.getHost(), tasksToFail.size());
for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : runningTasks.entrySet()) {
if (entry.getValue() == null) {
log.error("Huh? null work item for [%s]", entry.getKey());
} else if (entry.getValue().getWorker() == null) {
log.error("Huh? no worker for [%s]", entry.getKey());
} else if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
log.info("[%s]: Found [%s] running", worker.getHost(), entry.getKey());
tasksToFail.add(entry.getKey());
}
}
List<String> tasksToFail = getAssignedTasks(worker);
for (String assignedTask : tasksToFail) {
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
@ -842,6 +849,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
zkWorkers.remove(worker.getHost());
}
}
lazyWorkers.remove(worker.getHost());
}
private void taskComplete(
@ -872,4 +880,57 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
// Notify interested parties
taskRunnerWorkItem.setResult(taskStatus);
}
public List<ZkWorker> markWokersLazy(Predicate<ZkWorker> isLazyWorker, int maxWorkers)
{
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) {
Iterator<String> iterator = zkWorkers.keySet().iterator();
while (iterator.hasNext()) {
String worker = iterator.next();
ZkWorker zkWorker = zkWorkers.get(worker);
try {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker)) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker, zkWorker);
if (lazyWorkers.size() == maxWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
}
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
return ImmutableList.copyOf(lazyWorkers.values());
}
}
private List<String> getAssignedTasks(Worker worker) throws Exception
{
List<String> assignedTasks = Lists.newArrayList(
cf.getChildren().forPath(JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost()))
);
for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : runningTasks.entrySet()) {
if (entry.getValue() == null) {
log.error(
"Huh? null work item for [%s]", entry.getKey()
);
} else if (entry.getValue().getWorker() == null) {
log.error("Huh? no worker for [%s]", entry.getKey());
} else if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
log.info("[%s]: Found [%s] running", worker.getHost(), entry.getKey());
assignedTasks.add(entry.getKey());
}
}
log.info("[%s]: Found %d tasks assigned", worker.getHost(), assignedTasks.size());
return assignedTasks;
}
public List<ZkWorker> getLazyWorkers()
{
return ImmutableList.copyOf(lazyWorkers.values());
}
}

View File

@ -32,8 +32,6 @@ import java.util.concurrent.ScheduledExecutorService;
/**
* The ResourceManagementScheduler schedules a check for when worker nodes should potentially be created or destroyed.
* It uses a {@link TaskRunner} to return all pending tasks in the system and the status of the worker nodes in
* the system.
* The ResourceManagementScheduler does not contain the logic to decide whether provision or termination should actually
* occur. That decision is made in the {@link ResourceManagementStrategy}.
*/
@ -80,7 +78,7 @@ public class ResourceManagementScheduler
@Override
public void run()
{
resourceManagementStrategy.doProvision(taskRunner.getPendingTasks(), taskRunner.getWorkers());
resourceManagementStrategy.doProvision(taskRunner);
}
}
);
@ -99,7 +97,7 @@ public class ResourceManagementScheduler
@Override
public void run()
{
resourceManagementStrategy.doTerminate(taskRunner.getPendingTasks(), taskRunner.getWorkers());
resourceManagementStrategy.doTerminate(taskRunner);
}
}
);

View File

@ -17,6 +17,7 @@
package io.druid.indexing.overlord.autoscaling;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
@ -28,9 +29,9 @@ import java.util.Collection;
*/
public interface ResourceManagementStrategy
{
public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> runningTasks, Collection<ZkWorker> zkWorkers);
public boolean doProvision(RemoteTaskRunner runner);
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> runningTasks, Collection<ZkWorker> zkWorkers);
public boolean doTerminate(RemoteTaskRunner runner);
public ScalingStats getStats();
}

View File

@ -22,13 +22,13 @@ import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
@ -70,8 +70,10 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
@Override
public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
public boolean doProvision(RemoteTaskRunner runner)
{
Collection<RemoteTaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
Collection<ZkWorker> zkWorkers = runner.getWorkers();
synchronized (lock) {
boolean didProvision = false;
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
@ -137,8 +139,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
@Override
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
public boolean doTerminate(RemoteTaskRunner runner)
{
Collection<RemoteTaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
synchronized (lock) {
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null) {
@ -151,7 +154,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
workerConfig.getAutoScaler().ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
zkWorkers,
runner.getLazyWorkers(),
new Function<ZkWorker, String>()
{
@Override
@ -174,17 +177,17 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
updateTargetWorkerCount(workerConfig, pendingTasks, zkWorkers);
Collection<ZkWorker> workers = runner.getWorkers();
updateTargetWorkerCount(workerConfig, pendingTasks, workers);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
if (currentlyTerminating.isEmpty()) {
final int excessWorkers = (zkWorkers.size() + currentlyProvisioning.size()) - targetWorkerCount;
final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount;
if (excessWorkers > 0) {
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
final List<String> laziestWorkerIps =
FluentIterable.from(zkWorkers)
.filter(isLazyWorker)
.limit(excessWorkers)
.transform(
Lists.transform(
runner.markWokersLazy(isLazyWorker, excessWorkers),
new Function<ZkWorker, String>()
{
@Override
@ -193,9 +196,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return zkWorker.getWorker().getIp();
}
}
)
.toList();
);
if (laziestWorkerIps.isEmpty()) {
log.info("Wanted to terminate %,d workers, but couldn't find any lazy ones!", excessWorkers);
} else {
@ -253,7 +254,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
{
final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
return worker.getRunningTasks().isEmpty() && (itHasBeenAWhile || !isValidWorker.apply(worker));
return itHasBeenAWhile || !isValidWorker.apply(worker);
}
};
}

View File

@ -20,6 +20,7 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@ -55,6 +56,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -396,14 +398,16 @@ public class RemoteTaskRunnerTest
remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
config,
new IndexerZkConfig(new ZkPathsConfig()
new IndexerZkConfig(
new ZkPathsConfig()
{
@Override
public String getBase()
{
return basePath;
}
},null,null,null,null,null),
}, null, null, null, null, null
),
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
null,
@ -492,4 +496,67 @@ public class RemoteTaskRunnerTest
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.success(task.getId()));
cf.setData().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
}
@Test
public void testFindLazyWorkerTaskRunning() throws Exception
{
doSetup();
remoteTaskRunner.start();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWokersLazy(
new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker input)
{
return true;
}
}, 1
);
Assert.assertTrue(lazyworkers.isEmpty());
Assert.assertTrue(remoteTaskRunner.getLazyWorkers().isEmpty());
Assert.assertEquals(1, remoteTaskRunner.getWorkers().size());
}
@Test
public void testFindLazyWorkerForWorkerJustAssignedTask() throws Exception
{
doSetup();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWokersLazy(
new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker input)
{
return true;
}
}, 1
);
Assert.assertTrue(lazyworkers.isEmpty());
Assert.assertTrue(remoteTaskRunner.getLazyWorkers().isEmpty());
Assert.assertEquals(1, remoteTaskRunner.getWorkers().size());
}
@Test
public void testFindLazyWorkerNotRunningAnyTask() throws Exception
{
doSetup();
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWokersLazy(
new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker input)
{
return true;
}
}, 1
);
Assert.assertEquals(1, lazyworkers.size());
Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size());
}
}

View File

@ -17,6 +17,7 @@
package io.druid.indexing.overlord.autoscaling;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -28,6 +29,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
@ -46,6 +48,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@ -117,16 +120,21 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("aNode"))
);
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
)
);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
);
EasyMock.replay(runner);
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
@ -135,6 +143,7 @@ public class SimpleResourceManagementStrategyTest
);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}
@Test
@ -147,16 +156,21 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"))
);
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
);
).times(2);
EasyMock.replay(runner);
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
@ -165,14 +179,7 @@ public class SimpleResourceManagementStrategyTest
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
);
provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
Assert.assertFalse(provisionedSomething);
Assert.assertTrue(
@ -184,6 +191,7 @@ public class SimpleResourceManagementStrategyTest
);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}
@Test
@ -205,15 +213,20 @@ public class SimpleResourceManagementStrategyTest
new AutoScalingData(Lists.<String>newArrayList("fake"))
);
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
);
).times(2);
EasyMock.replay(runner);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
@ -224,14 +237,7 @@ public class SimpleResourceManagementStrategyTest
Thread.sleep(2000);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
);
provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
Assert.assertFalse(provisionedSomething);
Assert.assertTrue(
@ -244,6 +250,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScaler);
EasyMock.verify(emitter);
EasyMock.verify(runner);
}
@Test
@ -257,15 +264,26 @@ public class SimpleResourceManagementStrategyTest
new AutoScalingData(Lists.<String>newArrayList())
);
EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
new TestZkWorker(testTask)
)
).times(2);
EasyMock.expect(runner.markWokersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner);
Assert.assertTrue(terminatedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
@ -288,14 +306,26 @@ public class SimpleResourceManagementStrategyTest
);
EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
new TestZkWorker(testTask)
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()).times(2);
EasyMock.expect(runner.markWokersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
);
EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner);
Assert.assertTrue(terminatedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
@ -303,14 +333,7 @@ public class SimpleResourceManagementStrategyTest
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
);
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
)
);
terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner);
Assert.assertFalse(terminatedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
@ -319,6 +342,7 @@ public class SimpleResourceManagementStrategyTest
);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}
@Test
@ -331,15 +355,25 @@ public class SimpleResourceManagementStrategyTest
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.markWokersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList()
);
EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScaler);
@ -351,18 +385,11 @@ public class SimpleResourceManagementStrategyTest
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}
@Test
@ -375,11 +402,23 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.<RemoteTaskRunnerWorkItem>asList()
).times(3);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0")
)
).times(3);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.markWokersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList()
);
EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
runner
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScaler);
@ -392,10 +431,7 @@ public class SimpleResourceManagementStrategyTest
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create())
)
runner
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScaler);
@ -415,13 +451,11 @@ public class SimpleResourceManagementStrategyTest
);
EasyMock.replay(autoScaler);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0")
)
runner
);
Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}
@Test
@ -430,28 +464,32 @@ public class SimpleResourceManagementStrategyTest
workerConfig.set(null);
EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
)
).times(1);
EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
runner
);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
)
runner
);
Assert.assertFalse(terminatedSomething);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}
private static class TestZkWorker extends ZkWorker