diff --git a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java index dee95ac9814..131d7727ce0 100644 --- a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java +++ b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java @@ -132,7 +132,7 @@ public class AllocatedPersistentTask extends CancellableTask { private void completeAndNotifyIfNeeded(@Nullable Exception failure) { State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED); if (prevState == State.COMPLETED) { - logger.warn("attempt to complete task {} in the {} state", getPersistentTaskId(), prevState); + logger.warn("attempt to complete task [{}] with id [{}] in the [{}] state", getAction(), getPersistentTaskId(), prevState); } else { if (failure != null) { logger.warn((Supplier) () -> new ParameterizedMessage( @@ -141,18 +141,20 @@ public class AllocatedPersistentTask extends CancellableTask { try { this.failure = failure; if (prevState == State.STARTED) { - logger.trace("sending notification for completed task {}", getPersistentTaskId()); - persistentTasksService.sendCompletionNotification(getPersistentTaskId(), failure, new + logger.trace("sending notification for completed task [{}] with id [{}]", getAction(), getPersistentTaskId()); + persistentTasksService.sendCompletionNotification(getPersistentTaskId(), getAllocationId(), failure, new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - logger.trace("notification for task {} was successful", getId()); + logger.trace("notification for task [{}] with id [{}] was successful", getAction(), + getPersistentTaskId()); } @Override public void onFailure(Exception e) { logger.warn((Supplier) () -> - new ParameterizedMessage("notification for task {} failed", getPersistentTaskId()), e); + new ParameterizedMessage("notification for task [{}] with id [{}] failed", + getAction(), getPersistentTaskId()), e); } }); } diff --git a/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java index f8042b97d0b..c4bffeeb44d 100644 --- a/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java @@ -75,19 +75,23 @@ public class CompletionPersistentTaskAction extends Action listener) { - persistentTasksClusterService.completePersistentTask(request.taskId, request.exception, + persistentTasksClusterService.completePersistentTask(request.taskId, request.allocationId, request.exception, new ActionListener>() { @Override public void onResponse(PersistentTask task) { diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 57b370398f1..9bed14d3274 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -59,7 +59,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements * Creates a new persistent task on master node * * @param action the action name - * @param params params + * @param params params * @param listener the listener that will be called when task is started */ public void createPersistentTask(String taskId, String action, @Nullable Params params, @@ -99,11 +99,12 @@ public class PersistentTasksClusterService extends AbstractComponent implements /** * Restarts a record about a running persistent task from cluster state * - * @param id the id of a persistent task - * @param failure the reason for restarting the task or null if the task completed successfully - * @param listener the listener that will be called when task is removed + * @param id the id of the persistent task + * @param allocationId the allocation id of the persistent task + * @param failure the reason for restarting the task or null if the task completed successfully + * @param listener the listener that will be called when task is removed */ - public void completePersistentTask(String id, Exception failure, ActionListener> listener) { + public void completePersistentTask(String id, long allocationId, Exception failure, ActionListener> listener) { final String source; if (failure != null) { logger.warn("persistent task " + id + " failed", failure); @@ -115,13 +116,17 @@ public class PersistentTasksClusterService extends AbstractComponent implements @Override public ClusterState execute(ClusterState currentState) throws Exception { PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); - if (tasksInProgress.hasTask(id)) { + if (tasksInProgress.hasTask(id, allocationId)) { tasksInProgress.finishTask(id); return update(currentState, tasksInProgress); } else { - // we don't send the error message back to the caller becase that would cause an infinite loop of notifications - logger.warn("The task {} wasn't found, status is not updated", id); - return currentState; + if (tasksInProgress.hasTask(id)) { + logger.warn("The task [{}] with id [{}] was found but it has a different allocation id [{}], status is not updated", + PersistentTasksCustomMetaData.getTaskWithId(currentState, id).getTaskName(), id, allocationId); + } else { + logger.warn("The task [{}] wasn't found, status is not updated", id); + } + throw new ResourceNotFoundException("the task with id [" + id + "] and allocation id [" + allocationId + "] not found"); } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index 4222c49107f..111041fcd8b 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskId; @@ -70,6 +71,11 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu @Override public void clusterChanged(ClusterChangedEvent event) { + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // wait until the gateway has recovered from disk, otherwise if the only master restarts + // we start cancelling all local tasks before cluster has a chance to recover. + return; + } PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData previousTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); @@ -120,11 +126,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu AllocatedPersistentTask task = runningTasks.get(id); if (task.getState() == AllocatedPersistentTask.State.COMPLETED) { // Result was sent to the caller and the caller acknowledged acceptance of the result + logger.trace("Found completed persistent task [{}] with id [{}] and allocation id [{}] - removing", + task.getAction(), task.getPersistentTaskId(), task.getAllocationId()); runningTasks.remove(id); } else { // task is running locally, but master doesn't know about it - that means that the persistent task was removed // cancel the task without notifying master - logger.trace("Found unregistered persistent task with id {} - cancelling ", id); + logger.trace("Found unregistered persistent task [{}] with id [{}] and allocation id [{}] - cancelling", + task.getAction(), task.getPersistentTaskId(), task.getAllocationId()); cancelTask(id); } } @@ -160,6 +169,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu boolean processed = false; try { task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId()); + logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(), + task.getPersistentTaskId(), task.getAllocationId()); try { runningTasks.put(taskInProgress.getAllocationId(), task); nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), task, executor); @@ -171,6 +182,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu } finally { if (processed == false) { // something went wrong - unregistering task + logger.warn("Persistent task [{}] with id [{}] and allocation id [{}] failed to create", task.getAction(), + task.getPersistentTaskId(), task.getAllocationId()); taskManager.unregister(task); } } @@ -187,14 +200,16 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener() { @Override public void onResponse(CancelTasksResponse cancelTasksResponse) { - logger.trace("Persistent task with id {} was cancelled", task.getId()); - + logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was cancelled", task.getAction(), + task.getPersistentTaskId(), task.getAllocationId()); } @Override public void onFailure(Exception e) { // There is really nothing we can do in case of failure here - logger.warn((Supplier) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e); + logger.warn((Supplier) () -> + new ParameterizedMessage("failed to cancel task [{}] with id [{}] and allocation id [{}]", task.getAction(), + task.getPersistentTaskId(), task.getAllocationId()), e); } }); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index d7788ae5af9..cf416623bcb 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -73,8 +73,9 @@ public class PersistentTasksService extends AbstractComponent { /** * Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure */ - public void sendCompletionNotification(String taskId, Exception failure, ActionListener> listener) { - CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure); + public void sendCompletionNotification(String taskId, long allocationId, Exception failure, + ActionListener> listener) { + CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, allocationId, failure); try { client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest, ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure)); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java index 7790de710bf..b5dd910fa42 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java @@ -122,6 +122,18 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(allocationId)); assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster")); assertThat(firstRunningTask.getDescription(), equalTo("id=" + taskId)); + + if (randomBoolean()) { + logger.info("Simulating errant completion notification"); + //try sending completion request with incorrect allocation id + PlainActionFuture> failedCompletionNotificationFuture = new PlainActionFuture<>(); + persistentTasksService.sendCompletionNotification(taskId, Long.MAX_VALUE, null, failedCompletionNotificationFuture); + assertThrows(failedCompletionNotificationFuture, ResourceNotFoundException.class); + // Make sure that the task is still running + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") + .setDetailed(true).get().getTasks().size(), equalTo(1)); + } + stopOrCancelTask(firstRunningTask.getTaskId()); } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java index 95b0ec87669..ea9e8f69dcf 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java @@ -184,7 +184,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { } @Override - public void sendCompletionNotification(String taskId, Exception failure, ActionListener> listener) { + public void sendCompletionNotification(String taskId, long allocationId, Exception failure, + ActionListener> listener) { fail("Shouldn't be called during Cluster State cancellation"); } }; diff --git a/server/src/test/java/org/elasticsearch/persistent/RestartPersistentTaskRequestTests.java b/server/src/test/java/org/elasticsearch/persistent/RestartPersistentTaskRequestTests.java index 3fc8f804903..3ce29d543d4 100644 --- a/server/src/test/java/org/elasticsearch/persistent/RestartPersistentTaskRequestTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/RestartPersistentTaskRequestTests.java @@ -25,7 +25,7 @@ public class RestartPersistentTaskRequestTests extends AbstractStreamableTestCas @Override protected Request createTestInstance() { - return new Request(randomAlphaOfLength(10), null); + return new Request(randomAlphaOfLength(10), randomLong(), null); } @Override