Fix Overlord leader election when task lock re-acquisition fails (#13172)

Overlord leader election can sometimes fail due to task lock re-acquisition issues.
This commit solves the issue by failing such tasks and clearing all their locks.
This commit is contained in:
AmatyaAvadhanula 2022-10-17 15:23:16 +05:30 committed by GitHub
parent 3bbb76f17b
commit b88e1c21ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 225 additions and 43 deletions

View File

@ -108,17 +108,19 @@ public class TaskLockbox
/** /**
* Wipe out our current in-memory state and resync it from our bundled {@link TaskStorage}. * Wipe out our current in-memory state and resync it from our bundled {@link TaskStorage}.
*
* @return SyncResult which needs to be processed by the caller
*/ */
public void syncFromStorage() public TaskLockboxSyncResult syncFromStorage()
{ {
giant.lock(); giant.lock();
try { try {
// Load stuff from taskStorage first. If this fails, we don't want to lose all our locks. // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks.
final Set<String> storedActiveTasks = new HashSet<>(); final Set<Task> storedActiveTasks = new HashSet<>();
final List<Pair<Task, TaskLock>> storedLocks = new ArrayList<>(); final List<Pair<Task, TaskLock>> storedLocks = new ArrayList<>();
for (final Task task : taskStorage.getActiveTasks()) { for (final Task task : taskStorage.getActiveTasks()) {
storedActiveTasks.add(task.getId()); storedActiveTasks.add(task);
for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) { for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) {
storedLocks.add(Pair.of(task, taskLock)); storedLocks.add(Pair.of(task, taskLock));
} }
@ -138,7 +140,12 @@ public class TaskLockbox
}; };
running.clear(); running.clear();
activeTasks.clear(); activeTasks.clear();
activeTasks.addAll(storedActiveTasks); activeTasks.addAll(storedActiveTasks.stream()
.map(Task::getId)
.collect(Collectors.toSet())
);
// Set of task groups in which at least one task failed to re-acquire a lock
final Set<String> failedToReacquireLockTaskGroups = new HashSet<>();
// Bookkeeping for a log message at the end // Bookkeeping for a log message at the end
int taskLockCount = 0; int taskLockCount = 0;
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) { for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
@ -183,20 +190,39 @@ public class TaskLockbox
); );
} }
} else { } else {
throw new ISE( failedToReacquireLockTaskGroups.add(task.getGroupId());
"Could not reacquire lock on interval[%s] version[%s] for task: %s", log.error(
"Could not reacquire lock on interval[%s] version[%s] for task: %s from group %s.",
savedTaskLockWithPriority.getInterval(), savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(), savedTaskLockWithPriority.getVersion(),
task.getId() task.getId(),
task.getGroupId()
); );
continue;
} }
} }
Set<Task> tasksToFail = new HashSet<>();
for (Task task : storedActiveTasks) {
if (failedToReacquireLockTaskGroups.contains(task.getGroupId())) {
tasksToFail.add(task);
activeTasks.remove(task.getId());
}
}
log.info( log.info(
"Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).", "Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).",
taskLockCount, taskLockCount,
activeTasks.size(), activeTasks.size(),
storedLocks.size() - taskLockCount storedLocks.size() - taskLockCount
); );
if (!failedToReacquireLockTaskGroups.isEmpty()) {
log.warn("Marking all tasks from task groups[%s] to be failed "
+ "as they failed to reacquire at least one lock.", failedToReacquireLockTaskGroups);
}
return new TaskLockboxSyncResult(tasksToFail);
} }
finally { finally {
giant.unlock(); giant.unlock();
@ -207,7 +233,8 @@ public class TaskLockbox
* This method is called only in {@link #syncFromStorage()} and verifies the given task and the taskLock have the same * This method is called only in {@link #syncFromStorage()} and verifies the given task and the taskLock have the same
* groupId, dataSource, and priority. * groupId, dataSource, and priority.
*/ */
private TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) @VisibleForTesting
protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock)
{ {
giant.lock(); giant.lock();

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import org.apache.druid.indexing.common.task.Task;
import java.util.Set;
/**
* Result of TaskLockbox#syncFromStorage()
* Contains tasks which need to be forcefully failed to let the overlord become the leader
*/
class TaskLockboxSyncResult
{
private final Set<Task> tasksToFail;
TaskLockboxSyncResult(Set<Task> tasksToFail)
{
this.tasksToFail = tasksToFail;
}
/**
* Return set of tasks which need to be forcefully failed due to lock re-acquisition failure
*/
Set<Task> getTasksToFail()
{
return tasksToFail;
}
}

View File

@ -113,7 +113,6 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
log.info("By the power of Grayskull, I have the power!"); log.info("By the power of Grayskull, I have the power!");
try { try {
taskLockbox.syncFromStorage();
taskRunner = runnerFactory.build(); taskRunner = runnerFactory.build();
taskQueue = new TaskQueue( taskQueue = new TaskQueue(
taskLockConfig, taskLockConfig,

View File

@ -34,6 +34,7 @@ import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters; import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
@ -173,6 +174,13 @@ public class TaskQueue
Preconditions.checkState(!active, "queue must be stopped"); Preconditions.checkState(!active, "queue must be stopped");
active = true; active = true;
syncFromStorage(); syncFromStorage();
// Mark these tasks as failed as they could not reacuire the lock
// Clean up needs to happen after tasks have been synced from storage
Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
for (Task task : tasksToFail) {
shutdown(task.getId(),
"Shutting down forcefully as task failed to reacquire lock while becoming leader");
}
managerExec.submit( managerExec.submit(
new Runnable() new Runnable()
{ {
@ -228,6 +236,13 @@ public class TaskQueue
} }
); );
requestManagement(); requestManagement();
// Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
// This is called after requesting management as locks need to be cleared after notifyStatus is processed
for (Task task : tasksToFail) {
for (TaskLock lock : taskStorage.getLocks(task.getId())) {
taskStorage.removeLock(task.getId(), lock);
}
}
} }
finally { finally {
giant.unlock(); giant.unlock();

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.LockGranularity;
@ -1258,6 +1259,47 @@ public class TaskLockboxTest
); );
} }
@Test
public void testFailedToReacquireTaskLock() throws Exception
{
// Tasks to be failed have a group id with the substring "FailingLockAcquisition"
// Please refer to NullLockPosseTaskLockbox
final Task taskWithFailingLockAcquisition0 = NoopTask.withGroupId("FailingLockAcquisition");
final Task taskWithFailingLockAcquisition1 = NoopTask.withGroupId("FailingLockAcquisition");
final Task taskWithSuccessfulLockAcquisition = NoopTask.create();
taskStorage.insert(taskWithFailingLockAcquisition0, TaskStatus.running(taskWithFailingLockAcquisition0.getId()));
taskStorage.insert(taskWithFailingLockAcquisition1, TaskStatus.running(taskWithFailingLockAcquisition1.getId()));
taskStorage.insert(taskWithSuccessfulLockAcquisition, TaskStatus.running(taskWithSuccessfulLockAcquisition.getId()));
TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, metadataStorageCoordinator);
testLockbox.add(taskWithFailingLockAcquisition0);
testLockbox.add(taskWithFailingLockAcquisition1);
testLockbox.add(taskWithSuccessfulLockAcquisition);
testLockbox.tryLock(taskWithFailingLockAcquisition0,
new TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
taskWithFailingLockAcquisition0,
Intervals.of("2017-07-01/2017-08-01"),
null
)
);
testLockbox.tryLock(taskWithSuccessfulLockAcquisition,
new TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
taskWithSuccessfulLockAcquisition,
Intervals.of("2017-07-01/2017-08-01"),
null
)
);
Assert.assertEquals(3, taskStorage.getActiveTasks().size());
// The tasks must be marked for failure
TaskLockboxSyncResult result = testLockbox.syncFromStorage();
Assert.assertEquals(ImmutableSet.of(taskWithFailingLockAcquisition0, taskWithFailingLockAcquisition1),
result.getTasksToFail());
}
private Set<TaskLock> getAllLocks(List<Task> tasks) private Set<TaskLock> getAllLocks(List<Task> tasks)
{ {
return tasks.stream() return tasks.stream()
@ -1383,4 +1425,25 @@ public class TaskLockboxTest
return TaskStatus.failure("how?", "Dummy task status err msg"); return TaskStatus.failure("how?", "Dummy task status err msg");
} }
} }
/**
* Extends TaskLockbox to return a null TaskLockPosse when the task's group name contains "FailingLockAcquisition".
*/
private static class NullLockPosseTaskLockbox extends TaskLockbox
{
public NullLockPosseTaskLockbox(
TaskStorage taskStorage,
IndexerMetadataStorageCoordinator metadataStorageCoordinator
)
{
super(taskStorage, metadataStorageCoordinator);
}
@Override
protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock)
{
return task.getGroupId()
.contains("FailingLockAcquisition") ? null : super.verifyAndCreateOrFindLockPosse(task, taskLock);
}
}
} }

View File

@ -38,12 +38,15 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunner;
@ -59,6 +62,8 @@ import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager; import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
@ -82,6 +87,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -103,11 +109,15 @@ public class OverlordTest
private CountDownLatch announcementLatch; private CountDownLatch announcementLatch;
private DruidNode druidNode; private DruidNode druidNode;
private OverlordResource overlordResource; private OverlordResource overlordResource;
private CountDownLatch[] taskCompletionCountDownLatches; private Map<String, CountDownLatch> taskCompletionCountDownLatches;
private CountDownLatch[] runTaskCountDownLatches; private Map<String, CountDownLatch> runTaskCountDownLatches;
private HttpServletRequest req; private HttpServletRequest req;
private SupervisorManager supervisorManager; private SupervisorManager supervisorManager;
// Bad task's id must be lexicographically greater than the good task's
private final String goodTaskId = "aaa";
private final String badTaskId = "zzz";
private void setupServerAndCurator() throws Exception private void setupServerAndCurator() throws Exception
{ {
server = new TestingServer(); server = new TestingServer();
@ -140,38 +150,52 @@ public class OverlordTest
req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes(); EasyMock.expectLastCall().anyTimes();
supervisorManager = EasyMock.createMock(SupervisorManager.class); supervisorManager = EasyMock.createMock(SupervisorManager.class);
taskLockbox = EasyMock.createStrictMock(TaskLockbox.class);
taskLockbox.syncFromStorage();
EasyMock.expectLastCall().atLeastOnce();
taskLockbox.add(EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
taskLockbox.remove(EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
// for second Noop Task directly added to deep storage.
taskLockbox.add(EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
taskLockbox.remove(EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
taskActionClientFactory = EasyMock.createStrictMock(TaskActionClientFactory.class); taskActionClientFactory = EasyMock.createStrictMock(TaskActionClientFactory.class);
EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())) EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject()))
.andReturn(null).anyTimes(); .andReturn(null).anyTimes();
EasyMock.replay(taskLockbox, taskActionClientFactory, req); EasyMock.replay(taskActionClientFactory, req);
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
runTaskCountDownLatches = new CountDownLatch[2];
runTaskCountDownLatches[0] = new CountDownLatch(1); IndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
runTaskCountDownLatches[1] = new CountDownLatch(1);
taskCompletionCountDownLatches = new CountDownLatch[2]; taskLockbox = new TaskLockbox(taskStorage, mdc);
taskCompletionCountDownLatches[0] = new CountDownLatch(1);
taskCompletionCountDownLatches[1] = new CountDownLatch(1); runTaskCountDownLatches = new HashMap<>();
runTaskCountDownLatches.put("0", new CountDownLatch(1));
runTaskCountDownLatches.put("1", new CountDownLatch(1));
taskCompletionCountDownLatches = new HashMap<>();
taskCompletionCountDownLatches.put("0", new CountDownLatch(1));
taskCompletionCountDownLatches.put("1", new CountDownLatch(1));
announcementLatch = new CountDownLatch(1); announcementLatch = new CountDownLatch(1);
setupServerAndCurator(); setupServerAndCurator();
curator.start(); curator.start();
curator.blockUntilConnected(); curator.blockUntilConnected();
druidNode = new DruidNode("hey", "what", false, 1234, null, true, false); druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
ServiceEmitter serviceEmitter = new NoopServiceEmitter(); ServiceEmitter serviceEmitter = new NoopServiceEmitter();
// Add two tasks with conflicting locks
// The bad task (The one with a lexicographically larger name) must be failed
Task badTask = new NoopTask(badTaskId, badTaskId, "datasource", 10_000, 0, null, null, null);
TaskLock badLock = new TimeChunkLock(null, badTaskId, "datasource", Intervals.ETERNITY, "version1", 50);
Task goodTask = new NoopTask(goodTaskId, goodTaskId, "datasource", 0, 0, null, null, null);
TaskLock goodLock = new TimeChunkLock(null, goodTaskId, "datasource", Intervals.ETERNITY, "version0", 50);
taskStorage.insert(goodTask, TaskStatus.running(goodTaskId));
taskStorage.insert(badTask, TaskStatus.running(badTaskId));
taskStorage.addLock(badTaskId, badLock);
taskStorage.addLock(goodTaskId, goodLock);
runTaskCountDownLatches.put(badTaskId, new CountDownLatch(1));
runTaskCountDownLatches.put(goodTaskId, new CountDownLatch(1));
taskCompletionCountDownLatches.put(badTaskId, new CountDownLatch(1));
taskCompletionCountDownLatches.put(goodTaskId, new CountDownLatch(1));
TaskRunnerFactory taskRunnerFactory = (TaskRunnerFactory<MockTaskRunner>) () ->
new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches);
taskRunnerFactory.build().run(badTask);
taskRunnerFactory.build().run(goodTask);
taskMaster = new TaskMaster( taskMaster = new TaskMaster(
new TaskLockConfig(), new TaskLockConfig(),
new TaskQueueConfig(null, new Period(1), null, new Period(10)), new TaskQueueConfig(null, new Period(1), null, new Period(10)),
@ -180,8 +204,7 @@ public class OverlordTest
taskStorage, taskStorage,
taskActionClientFactory, taskActionClientFactory,
druidNode, druidNode,
(TaskRunnerFactory<MockTaskRunner>) () -> taskRunnerFactory,
new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches),
new LatchableServiceAnnouncer(announcementLatch, null), new LatchableServiceAnnouncer(announcementLatch, null),
new CoordinatorOverlordServiceConfig(null, null), new CoordinatorOverlordServiceConfig(null, null),
serviceEmitter, serviceEmitter,
@ -222,6 +245,13 @@ public class OverlordTest
Response response = overlordResource.getLeader(); Response response = overlordResource.getLeader();
Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity()); Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity());
// BadTask must fail due to null task lock
waitForTaskStatus(badTaskId, TaskState.FAILED);
// GoodTask must successfully run
taskCompletionCountDownLatches.get(goodTaskId).countDown();
waitForTaskStatus(goodTaskId, TaskState.SUCCESS);
final String taskId_0 = "0"; final String taskId_0 = "0";
NoopTask task_0 = NoopTask.create(taskId_0, 0); NoopTask task_0 = NoopTask.create(taskId_0, 0);
response = overlordResource.taskPost(task_0, req); response = overlordResource.taskPost(task_0, req);
@ -249,7 +279,7 @@ public class OverlordTest
); );
// Simulate completion of task_0 // Simulate completion of task_0
taskCompletionCountDownLatches[Integer.parseInt(taskId_0)].countDown(); taskCompletionCountDownLatches.get(taskId_0).countDown();
// Wait for taskQueue to handle success status of task_0 // Wait for taskQueue to handle success status of task_0
waitForTaskStatus(taskId_0, TaskState.SUCCESS); waitForTaskStatus(taskId_0, TaskState.SUCCESS);
@ -259,7 +289,7 @@ public class OverlordTest
NoopTask task_1 = NoopTask.create(taskId_1, 0); NoopTask task_1 = NoopTask.create(taskId_1, 0);
taskStorage.insert(task_1, TaskStatus.running(taskId_1)); taskStorage.insert(task_1, TaskStatus.running(taskId_1));
// Wait for task runner to run task_1 // Wait for task runner to run task_1
runTaskCountDownLatches[Integer.parseInt(taskId_1)].await(); runTaskCountDownLatches.get(taskId_1).await();
response = overlordResource.getRunningTasks(null, req); response = overlordResource.getRunningTasks(null, req);
// 1 task that was manually inserted should be in running state // 1 task that was manually inserted should be in running state
@ -270,19 +300,20 @@ public class OverlordTest
Assert.assertEquals(TASK_LOCATION, taskResponseObject.getLocation()); Assert.assertEquals(TASK_LOCATION, taskResponseObject.getLocation());
// Simulate completion of task_1 // Simulate completion of task_1
taskCompletionCountDownLatches[Integer.parseInt(taskId_1)].countDown(); taskCompletionCountDownLatches.get(taskId_1).countDown();
// Wait for taskQueue to handle success status of task_1 // Wait for taskQueue to handle success status of task_1
waitForTaskStatus(taskId_1, TaskState.SUCCESS); waitForTaskStatus(taskId_1, TaskState.SUCCESS);
// should return number of tasks which are not in running state // should return number of tasks which are not in running state
response = overlordResource.getCompleteTasks(null, req); response = overlordResource.getCompleteTasks(null, req);
Assert.assertEquals(2, (((List) response.getEntity()).size())); Assert.assertEquals(4, (((List) response.getEntity()).size()));
response = overlordResource.getCompleteTasks(1, req); response = overlordResource.getCompleteTasks(1, req);
Assert.assertEquals(1, (((List) response.getEntity()).size())); Assert.assertEquals(1, (((List) response.getEntity()).size()));
taskMaster.stop(); taskMaster.stop();
Assert.assertFalse(taskMaster.isLeader()); Assert.assertFalse(taskMaster.isLeader());
EasyMock.verify(taskLockbox, taskActionClientFactory); EasyMock.verify(taskActionClientFactory);
} }
/* Wait until the task with given taskId has the given Task Status /* Wait until the task with given taskId has the given Task Status
@ -308,12 +339,12 @@ public class OverlordTest
public static class MockTaskRunner implements TaskRunner public static class MockTaskRunner implements TaskRunner
{ {
private CountDownLatch[] completionLatches; private Map<String, CountDownLatch> completionLatches;
private CountDownLatch[] runLatches; private Map<String, CountDownLatch> runLatches;
private ConcurrentHashMap<String, TaskRunnerWorkItem> taskRunnerWorkItems; private ConcurrentHashMap<String, TaskRunnerWorkItem> taskRunnerWorkItems;
private List<String> runningTasks; private List<String> runningTasks;
public MockTaskRunner(CountDownLatch[] runLatches, CountDownLatch[] completionLatches) public MockTaskRunner(Map<String, CountDownLatch> runLatches, Map<String, CountDownLatch> completionLatches)
{ {
this.runLatches = runLatches; this.runLatches = runLatches;
this.completionLatches = completionLatches; this.completionLatches = completionLatches;
@ -367,11 +398,11 @@ public class OverlordTest
// this is equivalent of getting process holder to run task in ForkingTaskRunner // this is equivalent of getting process holder to run task in ForkingTaskRunner
runningTasks.add(taskId); runningTasks.add(taskId);
if (runLatches != null) { if (runLatches != null) {
runLatches[Integer.parseInt(taskId)].countDown(); runLatches.get(taskId).countDown();
} }
// Wait for completion count down // Wait for completion count down
if (completionLatches != null) { if (completionLatches != null) {
completionLatches[Integer.parseInt(taskId)].await(); completionLatches.get(taskId).await();
} }
taskRunnerWorkItems.remove(taskId); taskRunnerWorkItems.remove(taskId);
runningTasks.remove(taskId); runningTasks.remove(taskId);
@ -407,6 +438,7 @@ public class OverlordTest
@Override @Override
public void shutdown(String taskid, String reason) public void shutdown(String taskid, String reason)
{ {
runningTasks.remove(taskid);
} }
@Override @Override