From b88e1c21eaec38d7c2d61ddbe59051a746ab78a9 Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Mon, 17 Oct 2022 15:23:16 +0530 Subject: [PATCH] Fix Overlord leader election when task lock re-acquisition fails (#13172) Overlord leader election can sometimes fail due to task lock re-acquisition issues. This commit solves the issue by failing such tasks and clearing all their locks. --- .../druid/indexing/overlord/TaskLockbox.java | 43 ++++++-- .../overlord/TaskLockboxSyncResult.java | 46 ++++++++ .../druid/indexing/overlord/TaskMaster.java | 1 - .../druid/indexing/overlord/TaskQueue.java | 15 +++ .../indexing/overlord/TaskLockboxTest.java | 63 +++++++++++ .../indexing/overlord/http/OverlordTest.java | 100 ++++++++++++------ 6 files changed, 225 insertions(+), 43 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index a53af645913..41caf0620fb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -108,17 +108,19 @@ public class TaskLockbox /** * Wipe out our current in-memory state and resync it from our bundled {@link TaskStorage}. + * + * @return SyncResult which needs to be processed by the caller */ - public void syncFromStorage() + public TaskLockboxSyncResult syncFromStorage() { giant.lock(); try { // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks. - final Set storedActiveTasks = new HashSet<>(); + final Set storedActiveTasks = new HashSet<>(); final List> storedLocks = new ArrayList<>(); for (final Task task : taskStorage.getActiveTasks()) { - storedActiveTasks.add(task.getId()); + storedActiveTasks.add(task); for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) { storedLocks.add(Pair.of(task, taskLock)); } @@ -138,7 +140,12 @@ public class TaskLockbox }; running.clear(); activeTasks.clear(); - activeTasks.addAll(storedActiveTasks); + activeTasks.addAll(storedActiveTasks.stream() + .map(Task::getId) + .collect(Collectors.toSet()) + ); + // Set of task groups in which at least one task failed to re-acquire a lock + final Set failedToReacquireLockTaskGroups = new HashSet<>(); // Bookkeeping for a log message at the end int taskLockCount = 0; for (final Pair taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) { @@ -183,20 +190,39 @@ public class TaskLockbox ); } } else { - throw new ISE( - "Could not reacquire lock on interval[%s] version[%s] for task: %s", + failedToReacquireLockTaskGroups.add(task.getGroupId()); + log.error( + "Could not reacquire lock on interval[%s] version[%s] for task: %s from group %s.", savedTaskLockWithPriority.getInterval(), savedTaskLockWithPriority.getVersion(), - task.getId() + task.getId(), + task.getGroupId() ); + continue; } } + + Set tasksToFail = new HashSet<>(); + for (Task task : storedActiveTasks) { + if (failedToReacquireLockTaskGroups.contains(task.getGroupId())) { + tasksToFail.add(task); + activeTasks.remove(task.getId()); + } + } + log.info( "Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).", taskLockCount, activeTasks.size(), storedLocks.size() - taskLockCount ); + + if (!failedToReacquireLockTaskGroups.isEmpty()) { + log.warn("Marking all tasks from task groups[%s] to be failed " + + "as they failed to reacquire at least one lock.", failedToReacquireLockTaskGroups); + } + + return new TaskLockboxSyncResult(tasksToFail); } finally { giant.unlock(); @@ -207,7 +233,8 @@ public class TaskLockbox * This method is called only in {@link #syncFromStorage()} and verifies the given task and the taskLock have the same * groupId, dataSource, and priority. */ - private TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) + @VisibleForTesting + protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) { giant.lock(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java new file mode 100644 index 00000000000..b7273b6bdef --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import org.apache.druid.indexing.common.task.Task; + +import java.util.Set; + +/** + * Result of TaskLockbox#syncFromStorage() + * Contains tasks which need to be forcefully failed to let the overlord become the leader + */ +class TaskLockboxSyncResult +{ + private final Set tasksToFail; + + TaskLockboxSyncResult(Set tasksToFail) + { + this.tasksToFail = tasksToFail; + } + + /** + * Return set of tasks which need to be forcefully failed due to lock re-acquisition failure + */ + Set getTasksToFail() + { + return tasksToFail; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index b52a3c5c03e..7b9101cf1f2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -113,7 +113,6 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro log.info("By the power of Grayskull, I have the power!"); try { - taskLockbox.syncFromStorage(); taskRunner = runnerFactory.build(); taskQueue = new TaskQueue( taskLockConfig, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 07f136c7237..c508876f0ce 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -34,6 +34,7 @@ import org.apache.druid.common.utils.IdUtils; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.Counters; +import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; @@ -173,6 +174,13 @@ public class TaskQueue Preconditions.checkState(!active, "queue must be stopped"); active = true; syncFromStorage(); + // Mark these tasks as failed as they could not reacuire the lock + // Clean up needs to happen after tasks have been synced from storage + Set tasksToFail = taskLockbox.syncFromStorage().getTasksToFail(); + for (Task task : tasksToFail) { + shutdown(task.getId(), + "Shutting down forcefully as task failed to reacquire lock while becoming leader"); + } managerExec.submit( new Runnable() { @@ -228,6 +236,13 @@ public class TaskQueue } ); requestManagement(); + // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired) + // This is called after requesting management as locks need to be cleared after notifyStatus is processed + for (Task task : tasksToFail) { + for (TaskLock lock : taskStorage.getLocks(task.getId())) { + taskStorage.removeLock(task.getId(), lock); + } + } } finally { giant.unlock(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 1da1ae18f84..834bdf9429d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.LockGranularity; @@ -1258,6 +1259,47 @@ public class TaskLockboxTest ); } + @Test + public void testFailedToReacquireTaskLock() throws Exception + { + // Tasks to be failed have a group id with the substring "FailingLockAcquisition" + // Please refer to NullLockPosseTaskLockbox + final Task taskWithFailingLockAcquisition0 = NoopTask.withGroupId("FailingLockAcquisition"); + final Task taskWithFailingLockAcquisition1 = NoopTask.withGroupId("FailingLockAcquisition"); + final Task taskWithSuccessfulLockAcquisition = NoopTask.create(); + taskStorage.insert(taskWithFailingLockAcquisition0, TaskStatus.running(taskWithFailingLockAcquisition0.getId())); + taskStorage.insert(taskWithFailingLockAcquisition1, TaskStatus.running(taskWithFailingLockAcquisition1.getId())); + taskStorage.insert(taskWithSuccessfulLockAcquisition, TaskStatus.running(taskWithSuccessfulLockAcquisition.getId())); + + TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, metadataStorageCoordinator); + testLockbox.add(taskWithFailingLockAcquisition0); + testLockbox.add(taskWithFailingLockAcquisition1); + testLockbox.add(taskWithSuccessfulLockAcquisition); + + testLockbox.tryLock(taskWithFailingLockAcquisition0, + new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, + taskWithFailingLockAcquisition0, + Intervals.of("2017-07-01/2017-08-01"), + null + ) + ); + + testLockbox.tryLock(taskWithSuccessfulLockAcquisition, + new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, + taskWithSuccessfulLockAcquisition, + Intervals.of("2017-07-01/2017-08-01"), + null + ) + ); + + Assert.assertEquals(3, taskStorage.getActiveTasks().size()); + + // The tasks must be marked for failure + TaskLockboxSyncResult result = testLockbox.syncFromStorage(); + Assert.assertEquals(ImmutableSet.of(taskWithFailingLockAcquisition0, taskWithFailingLockAcquisition1), + result.getTasksToFail()); + } + private Set getAllLocks(List tasks) { return tasks.stream() @@ -1383,4 +1425,25 @@ public class TaskLockboxTest return TaskStatus.failure("how?", "Dummy task status err msg"); } } + + /** + * Extends TaskLockbox to return a null TaskLockPosse when the task's group name contains "FailingLockAcquisition". + */ + private static class NullLockPosseTaskLockbox extends TaskLockbox + { + public NullLockPosseTaskLockbox( + TaskStorage taskStorage, + IndexerMetadataStorageCoordinator metadataStorageCoordinator + ) + { + super(taskStorage, metadataStorageCoordinator); + } + + @Override + protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) + { + return task.getGroupId() + .contains("FailingLockAcquisition") ? null : super.verifyAndCreateOrFindLockPosse(task, taskLock); + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index d0652d3aa60..54aacc5dfe0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -38,12 +38,15 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskRunner; @@ -59,6 +62,8 @@ import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; +import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -82,6 +87,7 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -103,11 +109,15 @@ public class OverlordTest private CountDownLatch announcementLatch; private DruidNode druidNode; private OverlordResource overlordResource; - private CountDownLatch[] taskCompletionCountDownLatches; - private CountDownLatch[] runTaskCountDownLatches; + private Map taskCompletionCountDownLatches; + private Map runTaskCountDownLatches; private HttpServletRequest req; private SupervisorManager supervisorManager; + // Bad task's id must be lexicographically greater than the good task's + private final String goodTaskId = "aaa"; + private final String badTaskId = "zzz"; + private void setupServerAndCurator() throws Exception { server = new TestingServer(); @@ -140,38 +150,52 @@ public class OverlordTest req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); EasyMock.expectLastCall().anyTimes(); supervisorManager = EasyMock.createMock(SupervisorManager.class); - taskLockbox = EasyMock.createStrictMock(TaskLockbox.class); - taskLockbox.syncFromStorage(); - EasyMock.expectLastCall().atLeastOnce(); - taskLockbox.add(EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - taskLockbox.remove(EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - - // for second Noop Task directly added to deep storage. - taskLockbox.add(EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - taskLockbox.remove(EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); taskActionClientFactory = EasyMock.createStrictMock(TaskActionClientFactory.class); EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())) .andReturn(null).anyTimes(); - EasyMock.replay(taskLockbox, taskActionClientFactory, req); + EasyMock.replay(taskActionClientFactory, req); taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); - runTaskCountDownLatches = new CountDownLatch[2]; - runTaskCountDownLatches[0] = new CountDownLatch(1); - runTaskCountDownLatches[1] = new CountDownLatch(1); - taskCompletionCountDownLatches = new CountDownLatch[2]; - taskCompletionCountDownLatches[0] = new CountDownLatch(1); - taskCompletionCountDownLatches[1] = new CountDownLatch(1); + + IndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + + taskLockbox = new TaskLockbox(taskStorage, mdc); + + runTaskCountDownLatches = new HashMap<>(); + runTaskCountDownLatches.put("0", new CountDownLatch(1)); + runTaskCountDownLatches.put("1", new CountDownLatch(1)); + taskCompletionCountDownLatches = new HashMap<>(); + taskCompletionCountDownLatches.put("0", new CountDownLatch(1)); + taskCompletionCountDownLatches.put("1", new CountDownLatch(1)); announcementLatch = new CountDownLatch(1); setupServerAndCurator(); curator.start(); curator.blockUntilConnected(); druidNode = new DruidNode("hey", "what", false, 1234, null, true, false); ServiceEmitter serviceEmitter = new NoopServiceEmitter(); + + // Add two tasks with conflicting locks + // The bad task (The one with a lexicographically larger name) must be failed + Task badTask = new NoopTask(badTaskId, badTaskId, "datasource", 10_000, 0, null, null, null); + TaskLock badLock = new TimeChunkLock(null, badTaskId, "datasource", Intervals.ETERNITY, "version1", 50); + Task goodTask = new NoopTask(goodTaskId, goodTaskId, "datasource", 0, 0, null, null, null); + TaskLock goodLock = new TimeChunkLock(null, goodTaskId, "datasource", Intervals.ETERNITY, "version0", 50); + taskStorage.insert(goodTask, TaskStatus.running(goodTaskId)); + taskStorage.insert(badTask, TaskStatus.running(badTaskId)); + taskStorage.addLock(badTaskId, badLock); + taskStorage.addLock(goodTaskId, goodLock); + runTaskCountDownLatches.put(badTaskId, new CountDownLatch(1)); + runTaskCountDownLatches.put(goodTaskId, new CountDownLatch(1)); + taskCompletionCountDownLatches.put(badTaskId, new CountDownLatch(1)); + taskCompletionCountDownLatches.put(goodTaskId, new CountDownLatch(1)); + + TaskRunnerFactory taskRunnerFactory = (TaskRunnerFactory) () -> + new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches); + + taskRunnerFactory.build().run(badTask); + taskRunnerFactory.build().run(goodTask); + taskMaster = new TaskMaster( new TaskLockConfig(), new TaskQueueConfig(null, new Period(1), null, new Period(10)), @@ -180,8 +204,7 @@ public class OverlordTest taskStorage, taskActionClientFactory, druidNode, - (TaskRunnerFactory) () -> - new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches), + taskRunnerFactory, new LatchableServiceAnnouncer(announcementLatch, null), new CoordinatorOverlordServiceConfig(null, null), serviceEmitter, @@ -222,6 +245,13 @@ public class OverlordTest Response response = overlordResource.getLeader(); Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity()); + // BadTask must fail due to null task lock + waitForTaskStatus(badTaskId, TaskState.FAILED); + + // GoodTask must successfully run + taskCompletionCountDownLatches.get(goodTaskId).countDown(); + waitForTaskStatus(goodTaskId, TaskState.SUCCESS); + final String taskId_0 = "0"; NoopTask task_0 = NoopTask.create(taskId_0, 0); response = overlordResource.taskPost(task_0, req); @@ -249,7 +279,7 @@ public class OverlordTest ); // Simulate completion of task_0 - taskCompletionCountDownLatches[Integer.parseInt(taskId_0)].countDown(); + taskCompletionCountDownLatches.get(taskId_0).countDown(); // Wait for taskQueue to handle success status of task_0 waitForTaskStatus(taskId_0, TaskState.SUCCESS); @@ -259,7 +289,7 @@ public class OverlordTest NoopTask task_1 = NoopTask.create(taskId_1, 0); taskStorage.insert(task_1, TaskStatus.running(taskId_1)); // Wait for task runner to run task_1 - runTaskCountDownLatches[Integer.parseInt(taskId_1)].await(); + runTaskCountDownLatches.get(taskId_1).await(); response = overlordResource.getRunningTasks(null, req); // 1 task that was manually inserted should be in running state @@ -270,19 +300,20 @@ public class OverlordTest Assert.assertEquals(TASK_LOCATION, taskResponseObject.getLocation()); // Simulate completion of task_1 - taskCompletionCountDownLatches[Integer.parseInt(taskId_1)].countDown(); + taskCompletionCountDownLatches.get(taskId_1).countDown(); // Wait for taskQueue to handle success status of task_1 waitForTaskStatus(taskId_1, TaskState.SUCCESS); // should return number of tasks which are not in running state response = overlordResource.getCompleteTasks(null, req); - Assert.assertEquals(2, (((List) response.getEntity()).size())); + Assert.assertEquals(4, (((List) response.getEntity()).size())); response = overlordResource.getCompleteTasks(1, req); Assert.assertEquals(1, (((List) response.getEntity()).size())); + taskMaster.stop(); Assert.assertFalse(taskMaster.isLeader()); - EasyMock.verify(taskLockbox, taskActionClientFactory); + EasyMock.verify(taskActionClientFactory); } /* Wait until the task with given taskId has the given Task Status @@ -308,12 +339,12 @@ public class OverlordTest public static class MockTaskRunner implements TaskRunner { - private CountDownLatch[] completionLatches; - private CountDownLatch[] runLatches; + private Map completionLatches; + private Map runLatches; private ConcurrentHashMap taskRunnerWorkItems; private List runningTasks; - public MockTaskRunner(CountDownLatch[] runLatches, CountDownLatch[] completionLatches) + public MockTaskRunner(Map runLatches, Map completionLatches) { this.runLatches = runLatches; this.completionLatches = completionLatches; @@ -367,11 +398,11 @@ public class OverlordTest // this is equivalent of getting process holder to run task in ForkingTaskRunner runningTasks.add(taskId); if (runLatches != null) { - runLatches[Integer.parseInt(taskId)].countDown(); + runLatches.get(taskId).countDown(); } // Wait for completion count down if (completionLatches != null) { - completionLatches[Integer.parseInt(taskId)].await(); + completionLatches.get(taskId).await(); } taskRunnerWorkItems.remove(taskId); runningTasks.remove(taskId); @@ -407,6 +438,7 @@ public class OverlordTest @Override public void shutdown(String taskid, String reason) { + runningTasks.remove(taskid); } @Override