From fba5c09e3d4c4bcc9f4792d66ad887416bcb1649 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 23 Mar 2017 12:56:48 -0400 Subject: [PATCH] Persistent Tasks: remove task restart on failure (elastic/x-pack-elasticsearch#815) If a persistent task throws an exception, the persistent tasks framework will no longer try to restart the task. This is a temporary measure to prevent threshing the cluster with endless restart attempt. We will revisit this in the future version to make the restart process more robust. Please note, however, that if node executing the task goes down, the task will still be restarted on another node. Original commit: elastic/x-pack-elasticsearch@30712e0fbfe91c08d1f6d73135dd001cdf9cab3f --- .../CompletionPersistentTaskAction.java | 2 +- .../PersistentTasksClusterService.java | 15 +++++---------- .../PersistentTasksCustomMetaData.java | 19 ++----------------- .../PersistentTasksCustomMetaDataTests.java | 6 +----- .../persistent/PersistentTasksExecutorIT.java | 18 +----------------- 5 files changed, 10 insertions(+), 50 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java index 7f0d39da328..cb3fe39a1c3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java @@ -178,7 +178,7 @@ public class CompletionPersistentTaskAction extends Action listener) { - persistentTasksClusterService.completeOrRestartPersistentTask(request.taskId, request.exception, new ActionListener() { + persistentTasksClusterService.completePersistentTask(request.taskId, request.exception, new ActionListener() { @Override public void onResponse(Empty empty) { listener.onResponse(newResponse()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java index 7a63bae2b1b..68091359b70 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java @@ -85,25 +85,20 @@ public class PersistentTasksClusterService extends AbstractComponent implements * @param failure the reason for restarting the task or null if the task completed successfully * @param listener the listener that will be called when task is removed */ - public void completeOrRestartPersistentTask(long id, Exception failure, ActionListener listener) { + public void completePersistentTask(long id, Exception failure, ActionListener listener) { final String source; if (failure != null) { - logger.warn("persistent task " + id + " failed, restarting", failure); - source = "restart persistent task"; + logger.warn("persistent task " + id + " failed", failure); + source = "finish persistent task (failed)"; } else { - source = "finish persistent task"; + source = "finish persistent task (success)"; } clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id)) { - if (failure != null) { - // If the task failed - we need to restart it on another node, otherwise we just remove it - tasksInProgress.reassignTask(id, (action, request) -> getAssignement(action, currentState, request)); - } else { - tasksInProgress.finishTask(id); - } + tasksInProgress.finishTask(id); return update(currentState, tasksInProgress); } else { // we don't send the error message back to the caller becase that would cause an infinite loop of notifications diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java index f8decae706f..0b7727fca01 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java @@ -159,7 +159,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable - * The operation is only performed if the task is not currently assigned to any nodes. To force assignment use - * {@link #reassignTask(long, BiFunction)} instead + * The operation is only performed if the task is not currently assigned to any nodes. */ @SuppressWarnings("unchecked") public Builder assignTask(long taskId, @@ -605,20 +604,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable Builder reassignTask(long taskId, - BiFunction executorNodeFunc) { - PersistentTask taskInProgress = (PersistentTask) tasks.get(taskId); - if (taskInProgress != null) { - changed = true; - Assignment assignment = executorNodeFunc.apply(taskInProgress.taskName, taskInProgress.request); - tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment)); - } - return this; - } /** * Updates the task status if the task exist diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java index 4a89dd4d246..9408fca0e75 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java @@ -212,11 +212,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ if (builder.hasTask(lastKnownTask)) { changed = true; } - if (randomBoolean()) { - builder.reassignTask(lastKnownTask, randomAssignment()); - } else { - builder.reassignTask(lastKnownTask, (s, request) -> randomAssignment()); - } + builder.reassignTask(lastKnownTask, randomAssignment()); break; case 2: if (builder.hasTask(lastKnownTask)) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java index e25d31d9a18..b0bbf2c85c3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java @@ -63,7 +63,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { } } - public void testPersistentActionRestart() throws Exception { + public void testPersistentActionFailure() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); @@ -85,22 +85,6 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { assertThat(new TestTasksRequestBuilder(client()).setOperation("fail").setTaskId(firstRunningTask.getTaskId()) .get().getTasks().size(), equalTo(1)); - assertBusy(() -> { - // Wait for the task to restart - List tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() - .getTasks(); - logger.info("Found {} tasks", tasks.size()); - assertThat(tasks.size(), equalTo(1)); - // Make sure that restarted task is different - assertThat(tasks.get(0).getTaskId(), not(equalTo(firstRunningTask.getTaskId()))); - }); - - logger.info("Removing persistent task with id {}", firstRunningTask.getId()); - // Remove the persistent task - PersistentTaskOperationFuture removeFuture = new PersistentTaskOperationFuture(); - persistentTasksService.removeTask(taskId, removeFuture); - assertEquals(removeFuture.get(), (Long) taskId); - logger.info("Waiting for persistent task with id {} to disappear", firstRunningTask.getId()); assertBusy(() -> { // Wait for the task to disappear completely