From cd441f690648b3eb9aeaef2cdbbe4b64f5a647c6 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Thu, 15 Aug 2019 14:58:35 -0400 Subject: [PATCH] Catch AllocatedTask registration failures (#45300) When a persistent task attempts to register an allocated task locally, this creates the Task object and starts tracking it locally. If there is a failure while initializing the task, this is handled by a catch and subsequent error handling (canceling, unregistering, etc). But if the task fails to be created because an exception is thrown in the tasks ctor, this is uncaught and fails the cluster update thread. The ramification is that a persistent task remains in the cluster state, but is unable to create the allocated task, and the exception prevents other tasks "after" the poisoned task from starting too. Because the allocated task is never created, the cancellation tools are not able to remove the persistent task and it is stuck as a zombie in the CS. This commit adds exception handling around the task creation, and attempts to notify the master if there is a failure (so the persistent task can be removed). Even if this notification fails, the exception handling means the rest of the uninitialized tasks can proceed as normal. --- .../PersistentTasksNodeService.java | 43 ++++++++++++-- .../PersistentTasksNodeServiceTests.java | 57 +++++++++++++++++++ 2 files changed, 96 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index 260fabc67cd..9b811a079ef 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -30,11 +30,11 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.HashMap; @@ -112,7 +112,13 @@ public class PersistentTasksNodeService implements ClusterStateListener { AllocatedPersistentTask persistentTask = runningTasks.get(allocationId); if (persistentTask == null) { // New task - let's start it - startTask(taskInProgress); + try { + startTask(taskInProgress); + } catch (Exception e) { + logger.error("Unable to start allocated task [" + taskInProgress.getTaskName() + + "] with id [" + taskInProgress.getId() + + "] and allocation id [" + taskInProgress.getAllocationId() + "]", e); + } } else { // The task is still running notVisitedTasks.remove(allocationId); @@ -163,8 +169,18 @@ public class PersistentTasksNodeService implements ClusterStateListener { return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers); } }; - AllocatedPersistentTask task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", - request); + + AllocatedPersistentTask task; + try { + task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", request); + } catch (Exception e) { + logger.error("Fatal error registering persistent task [" + taskInProgress.getTaskName() + + "] with id [" + taskInProgress.getId() + "] and allocation id [" + taskInProgress.getAllocationId() + + "], removing from persistent tasks", e); + notifyMasterOfFailedTask(taskInProgress, e); + return; + } + boolean processed = false; try { task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId()); @@ -188,6 +204,25 @@ public class PersistentTasksNodeService implements ClusterStateListener { } } + private void notifyMasterOfFailedTask(PersistentTask taskInProgress, + Exception originalException) { + persistentTasksService.sendCompletionRequest(taskInProgress.getId(), taskInProgress.getAllocationId(), originalException, + new ActionListener>() { + @Override + public void onResponse(PersistentTask persistentTask) { + logger.trace("completion notification for failed task [{}] with id [{}] was successful", taskInProgress.getTaskName(), + taskInProgress.getAllocationId()); + } + + @Override + public void onFailure(Exception notificationException) { + notificationException.addSuppressed(originalException); + logger.warn(new ParameterizedMessage("notification for task [{}] with id [{}] failed", + taskInProgress.getTaskName(), taskInProgress.getAllocationId()), notificationException); + } + }); + } + /** * Unregisters and then cancels the locally running task using the task manager. No notification to master will be send upon * cancellation. diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java index 4f91ba9322b..8ac41f49f7e 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java @@ -32,6 +32,8 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; @@ -49,10 +51,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.core.IsEqual.equalTo; @@ -309,6 +314,58 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { assertThat(taskManager.getTasks().values(), empty()); } + public void testRegisterTaskFails() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + final Client mockClient = mock(Client.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(threadPool.generic()).thenReturn(EsExecutors.newDirectExecutorService()); + when(mockClient.threadPool()).thenReturn(threadPool); + when(mockClient.settings()).thenReturn(Settings.EMPTY); + + PersistentTasksService persistentTasksService = new PersistentTasksService(null, null, mockClient) { + @Override + public void sendCompletionRequest(String taskId, long taskAllocationId, Exception taskFailure, + ActionListener> listener) { + assertThat(taskFailure, instanceOf(RuntimeException.class)); + assertThat(taskFailure.getMessage(), equalTo("Something went wrong")); + listener.onResponse(mock(PersistentTask.class)); + latch.countDown(); + } + }; + + @SuppressWarnings("unchecked") PersistentTasksExecutor action = mock(PersistentTasksExecutor.class); + when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME); + when(action.getTaskName()).thenReturn(TestPersistentTasksExecutor.NAME); + when(action.createTask(anyLong(), anyString(), anyString(), any(), any(), any())) + .thenThrow(new RuntimeException("Something went wrong")); + + PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Collections.singletonList(action)); + + MockExecutor executor = new MockExecutor(); + PersistentTasksNodeService coordinator = new PersistentTasksNodeService(persistentTasksService, + registry, new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), executor); + + ClusterState state = createInitialClusterState(0, Settings.EMPTY); + + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); + + tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("this_param"), + new Assignment("this_node", "test assignment on this node")); + + MetaData.Builder metaData = MetaData.builder(state.metaData()); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); + ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build(); + + coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); + + // Failed to start the task, make sure it wasn't invoked further + assertThat(executor.executions.size(), equalTo(0)); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + private ClusterState addTask(ClusterState state, String action, Params params, String node) { PersistentTasksCustomMetaData.Builder builder =