Persistent tasks: require allocation id on task completion (elastic/x-pack-elasticsearch#1107)
Persistent tasks should verify that completion notification is done for correct version of the task, otherwise a delayed notification from an old node can accidentally close a newly reassigned task. Original commit: elastic/x-pack-elasticsearch@478bb6e730
This commit is contained in:
parent
a0099cace6
commit
7656e4a67b
|
@ -119,7 +119,7 @@ public class AllocatedPersistentTask extends CancellableTask {
|
|||
private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
|
||||
State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED);
|
||||
if (prevState == State.COMPLETED) {
|
||||
logger.warn("attempt to complete task {} in the {} state", getPersistentTaskId(), prevState);
|
||||
logger.warn("attempt to complete task [{}] with id [{}] in the [{}] state", getAction(), getPersistentTaskId(), prevState);
|
||||
} else {
|
||||
if (failure != null) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
|
||||
|
@ -128,18 +128,20 @@ public class AllocatedPersistentTask extends CancellableTask {
|
|||
try {
|
||||
this.failure = failure;
|
||||
if (prevState == State.STARTED) {
|
||||
logger.trace("sending notification for completed task {}", getPersistentTaskId());
|
||||
persistentTasksService.sendCompletionNotification(getPersistentTaskId(), failure, new
|
||||
logger.trace("sending notification for completed task [{}] with id [{}]", getAction(), getPersistentTaskId());
|
||||
persistentTasksService.sendCompletionNotification(getPersistentTaskId(), getAllocationId(), failure, new
|
||||
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
|
||||
logger.trace("notification for task {} was successful", getId());
|
||||
logger.trace("notification for task [{}] with id [{}] was successful", getAction(),
|
||||
getPersistentTaskId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn((Supplier<?>) () ->
|
||||
new ParameterizedMessage("notification for task {} failed", getPersistentTaskId()), e);
|
||||
new ParameterizedMessage("notification for task [{}] with id [{}] failed",
|
||||
getAction(), getPersistentTaskId()), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -62,19 +62,23 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
|
|||
|
||||
private Exception exception;
|
||||
|
||||
private long allocationId = -1;
|
||||
|
||||
public Request() {
|
||||
|
||||
}
|
||||
|
||||
public Request(String taskId, Exception exception) {
|
||||
public Request(String taskId, long allocationId, Exception exception) {
|
||||
this.taskId = taskId;
|
||||
this.exception = exception;
|
||||
this.allocationId = allocationId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
taskId = in.readString();
|
||||
allocationId = in.readLong();
|
||||
exception = in.readException();
|
||||
}
|
||||
|
||||
|
@ -82,6 +86,7 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(taskId);
|
||||
out.writeLong(allocationId);
|
||||
out.writeException(exception);
|
||||
}
|
||||
|
||||
|
@ -91,6 +96,9 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
|
|||
if (taskId == null) {
|
||||
validationException = addValidationError("task id is missing", validationException);
|
||||
}
|
||||
if (allocationId < 0) {
|
||||
validationException = addValidationError("allocation id is negative or missing", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
|
@ -100,12 +108,13 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
|
|||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Request request = (Request) o;
|
||||
return Objects.equals(taskId, request.taskId) &&
|
||||
allocationId == request.allocationId &&
|
||||
Objects.equals(exception, request.exception);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(taskId, exception);
|
||||
return Objects.hash(taskId, allocationId, exception);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -149,7 +158,7 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
|
|||
|
||||
@Override
|
||||
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<PersistentTaskResponse> listener) {
|
||||
persistentTasksClusterService.completePersistentTask(request.taskId, request.exception,
|
||||
persistentTasksClusterService.completePersistentTask(request.taskId, request.allocationId, request.exception,
|
||||
new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> task) {
|
||||
|
|
|
@ -85,11 +85,12 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
/**
|
||||
* Restarts a record about a running persistent task from cluster state
|
||||
*
|
||||
* @param id the id of a persistent task
|
||||
* @param id the id of the persistent task
|
||||
* @param allocationId the allocation id of the persistent task
|
||||
* @param failure the reason for restarting the task or null if the task completed successfully
|
||||
* @param listener the listener that will be called when task is removed
|
||||
*/
|
||||
public void completePersistentTask(String id, Exception failure, ActionListener<PersistentTask<?>> listener) {
|
||||
public void completePersistentTask(String id, long allocationId, Exception failure, ActionListener<PersistentTask<?>> listener) {
|
||||
final String source;
|
||||
if (failure != null) {
|
||||
logger.warn("persistent task " + id + " failed", failure);
|
||||
|
@ -101,13 +102,17 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
|
||||
if (tasksInProgress.hasTask(id)) {
|
||||
if (tasksInProgress.hasTask(id, allocationId)) {
|
||||
tasksInProgress.finishTask(id);
|
||||
return update(currentState, tasksInProgress);
|
||||
} else {
|
||||
// we don't send the error message back to the caller becase that would cause an infinite loop of notifications
|
||||
logger.warn("The task {} wasn't found, status is not updated", id);
|
||||
return currentState;
|
||||
if (tasksInProgress.hasTask(id)) {
|
||||
logger.warn("The task [{}] with id [{}] was found but it has a different allocation id [{}], status is not updated",
|
||||
PersistentTasksCustomMetaData.getTaskWithId(currentState, id).getTaskName(), id, allocationId);
|
||||
} else {
|
||||
logger.warn("The task [{}] wasn't found, status is not updated", id);
|
||||
}
|
||||
throw new ResourceNotFoundException("the task with id [" + id + "] and allocation id [" + allocationId + "] not found");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskAwareRequest;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
@ -57,6 +58,11 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||
// wait until the gateway has recovered from disk, otherwise if the only master restarts
|
||||
// we start cancelling all local tasks before cluster has a chance to recover.
|
||||
return;
|
||||
}
|
||||
PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
PersistentTasksCustomMetaData previousTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
|
||||
|
@ -107,11 +113,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
AllocatedPersistentTask task = runningTasks.get(id);
|
||||
if (task.getState() == AllocatedPersistentTask.State.COMPLETED) {
|
||||
// Result was sent to the caller and the caller acknowledged acceptance of the result
|
||||
logger.trace("Found completed persistent task [{}] with id [{}] and allocation id [{}] - removing",
|
||||
task.getAction(), task.getPersistentTaskId(), task.getAllocationId());
|
||||
runningTasks.remove(id);
|
||||
} else {
|
||||
// task is running locally, but master doesn't know about it - that means that the persistent task was removed
|
||||
// cancel the task without notifying master
|
||||
logger.trace("Found unregistered persistent task with id {} - cancelling ", id);
|
||||
logger.trace("Found unregistered persistent task [{}] with id [{}] and allocation id [{}] - cancelling",
|
||||
task.getAction(), task.getPersistentTaskId(), task.getAllocationId());
|
||||
cancelTask(id);
|
||||
}
|
||||
}
|
||||
|
@ -147,6 +156,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
boolean processed = false;
|
||||
try {
|
||||
task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
|
||||
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(),
|
||||
task.getPersistentTaskId(), task.getAllocationId());
|
||||
try {
|
||||
runningTasks.put(taskInProgress.getAllocationId(), task);
|
||||
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), task, executor);
|
||||
|
@ -158,6 +169,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
} finally {
|
||||
if (processed == false) {
|
||||
// something went wrong - unregistering task
|
||||
logger.warn("Persistent task [{}] with id [{}] and allocation id [{}] failed to create", task.getAction(),
|
||||
task.getPersistentTaskId(), task.getAllocationId());
|
||||
taskManager.unregister(task);
|
||||
}
|
||||
}
|
||||
|
@ -174,14 +187,16 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener<CancelTasksResponse>() {
|
||||
@Override
|
||||
public void onResponse(CancelTasksResponse cancelTasksResponse) {
|
||||
logger.trace("Persistent task with id {} was cancelled", task.getId());
|
||||
|
||||
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was cancelled", task.getAction(),
|
||||
task.getPersistentTaskId(), task.getAllocationId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// There is really nothing we can do in case of failure here
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e);
|
||||
logger.warn((Supplier<?>) () ->
|
||||
new ParameterizedMessage("failed to cancel task [{}] with id [{}] and allocation id [{}]", task.getAction(),
|
||||
task.getPersistentTaskId(), task.getAllocationId()), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -60,8 +60,9 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
/**
|
||||
* Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure
|
||||
*/
|
||||
public void sendCompletionNotification(String taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
|
||||
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure);
|
||||
public void sendCompletionNotification(String taskId, long allocationId, Exception failure,
|
||||
ActionListener<PersistentTask<?>> listener) {
|
||||
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, allocationId, failure);
|
||||
try {
|
||||
client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest,
|
||||
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
|
||||
|
|
|
@ -108,6 +108,18 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(allocationId));
|
||||
assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster"));
|
||||
assertThat(firstRunningTask.getDescription(), equalTo("id=" + taskId));
|
||||
|
||||
if (randomBoolean()) {
|
||||
logger.info("Simulating errant completion notification");
|
||||
//try sending completion request with incorrect allocation id
|
||||
PlainActionFuture<PersistentTask<?>> failedCompletionNotificationFuture = new PlainActionFuture<>();
|
||||
persistentTasksService.sendCompletionNotification(taskId, Long.MAX_VALUE, null, failedCompletionNotificationFuture);
|
||||
assertThrows(failedCompletionNotificationFuture, ResourceNotFoundException.class);
|
||||
// Make sure that the task is still running
|
||||
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.setDetailed(true).get().getTasks().size(), equalTo(1));
|
||||
}
|
||||
|
||||
stopOrCancelTask(firstRunningTask.getTaskId());
|
||||
}
|
||||
|
||||
|
|
|
@ -170,7 +170,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendCompletionNotification(String taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
|
||||
public void sendCompletionNotification(String taskId, long allocationId, Exception failure,
|
||||
ActionListener<PersistentTask<?>> listener) {
|
||||
fail("Shouldn't be called during Cluster State cancellation");
|
||||
}
|
||||
};
|
||||
|
|
|
@ -12,7 +12,7 @@ public class RestartPersistentTaskRequestTests extends AbstractStreamableTestCas
|
|||
|
||||
@Override
|
||||
protected Request createTestInstance() {
|
||||
return new Request(randomAlphaOfLength(10), null);
|
||||
return new Request(randomAlphaOfLength(10), randomLong(), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue