diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java index 7f0d39da328..cb3fe39a1c3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java @@ -178,7 +178,7 @@ public class CompletionPersistentTaskAction extends Action listener) { - persistentTasksClusterService.completeOrRestartPersistentTask(request.taskId, request.exception, new ActionListener() { + persistentTasksClusterService.completePersistentTask(request.taskId, request.exception, new ActionListener() { @Override public void onResponse(Empty empty) { listener.onResponse(newResponse()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java index 7a63bae2b1b..68091359b70 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java @@ -85,25 +85,20 @@ public class PersistentTasksClusterService extends AbstractComponent implements * @param failure the reason for restarting the task or null if the task completed successfully * @param listener the listener that will be called when task is removed */ - public void completeOrRestartPersistentTask(long id, Exception failure, ActionListener listener) { + public void completePersistentTask(long id, Exception failure, ActionListener listener) { final String source; if (failure != null) { - logger.warn("persistent task " + id + " failed, restarting", failure); - source = "restart persistent task"; + logger.warn("persistent task " + id + " failed", failure); + source = "finish persistent task (failed)"; } else { - source = "finish persistent task"; + source = "finish persistent task (success)"; } clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id)) { - if (failure != null) { - // If the task failed - we need to restart it on another node, otherwise we just remove it - tasksInProgress.reassignTask(id, (action, request) -> getAssignement(action, currentState, request)); - } else { - tasksInProgress.finishTask(id); - } + tasksInProgress.finishTask(id); return update(currentState, tasksInProgress); } else { // we don't send the error message back to the caller becase that would cause an infinite loop of notifications diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java index f8decae706f..0b7727fca01 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java @@ -159,7 +159,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable - * The operation is only performed if the task is not currently assigned to any nodes. To force assignment use - * {@link #reassignTask(long, BiFunction)} instead + * The operation is only performed if the task is not currently assigned to any nodes. */ @SuppressWarnings("unchecked") public Builder assignTask(long taskId, @@ -605,20 +604,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable Builder reassignTask(long taskId, - BiFunction executorNodeFunc) { - PersistentTask taskInProgress = (PersistentTask) tasks.get(taskId); - if (taskInProgress != null) { - changed = true; - Assignment assignment = executorNodeFunc.apply(taskInProgress.taskName, taskInProgress.request); - tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment)); - } - return this; - } /** * Updates the task status if the task exist diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java index 4a89dd4d246..9408fca0e75 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java @@ -212,11 +212,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ if (builder.hasTask(lastKnownTask)) { changed = true; } - if (randomBoolean()) { - builder.reassignTask(lastKnownTask, randomAssignment()); - } else { - builder.reassignTask(lastKnownTask, (s, request) -> randomAssignment()); - } + builder.reassignTask(lastKnownTask, randomAssignment()); break; case 2: if (builder.hasTask(lastKnownTask)) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java index e25d31d9a18..b0bbf2c85c3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java @@ -63,7 +63,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { } } - public void testPersistentActionRestart() throws Exception { + public void testPersistentActionFailure() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); @@ -85,22 +85,6 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { assertThat(new TestTasksRequestBuilder(client()).setOperation("fail").setTaskId(firstRunningTask.getTaskId()) .get().getTasks().size(), equalTo(1)); - assertBusy(() -> { - // Wait for the task to restart - List tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() - .getTasks(); - logger.info("Found {} tasks", tasks.size()); - assertThat(tasks.size(), equalTo(1)); - // Make sure that restarted task is different - assertThat(tasks.get(0).getTaskId(), not(equalTo(firstRunningTask.getTaskId()))); - }); - - logger.info("Removing persistent task with id {}", firstRunningTask.getId()); - // Remove the persistent task - PersistentTaskOperationFuture removeFuture = new PersistentTaskOperationFuture(); - persistentTasksService.removeTask(taskId, removeFuture); - assertEquals(removeFuture.get(), (Long) taskId); - logger.info("Waiting for persistent task with id {} to disappear", firstRunningTask.getId()); assertBusy(() -> { // Wait for the task to disappear completely