Persistent tasks: require allocation id on task completion (#1107)

Persistent tasks should verify that completion notification is done for correct version of the task, otherwise a delayed notification from an old node can accidentally close a newly reassigned task.
This commit is contained in:
Igor Motov 2017-04-19 15:42:55 -04:00 committed by Martijn van Groningen
parent 76cd7b1eb2
commit a08e2d9e5e
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
8 changed files with 70 additions and 25 deletions

View File

@ -132,7 +132,7 @@ public class AllocatedPersistentTask extends CancellableTask {
private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED);
if (prevState == State.COMPLETED) {
logger.warn("attempt to complete task {} in the {} state", getPersistentTaskId(), prevState);
logger.warn("attempt to complete task [{}] with id [{}] in the [{}] state", getAction(), getPersistentTaskId(), prevState);
} else {
if (failure != null) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
@ -141,18 +141,20 @@ public class AllocatedPersistentTask extends CancellableTask {
try {
this.failure = failure;
if (prevState == State.STARTED) {
logger.trace("sending notification for completed task {}", getPersistentTaskId());
persistentTasksService.sendCompletionNotification(getPersistentTaskId(), failure, new
logger.trace("sending notification for completed task [{}] with id [{}]", getAction(), getPersistentTaskId());
persistentTasksService.sendCompletionNotification(getPersistentTaskId(), getAllocationId(), failure, new
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
logger.trace("notification for task {} was successful", getId());
logger.trace("notification for task [{}] with id [{}] was successful", getAction(),
getPersistentTaskId());
}
@Override
public void onFailure(Exception e) {
logger.warn((Supplier<?>) () ->
new ParameterizedMessage("notification for task {} failed", getPersistentTaskId()), e);
new ParameterizedMessage("notification for task [{}] with id [{}] failed",
getAction(), getPersistentTaskId()), e);
}
});
}

View File

@ -75,19 +75,23 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
private Exception exception;
private long allocationId = -1;
public Request() {
}
public Request(String taskId, Exception exception) {
public Request(String taskId, long allocationId, Exception exception) {
this.taskId = taskId;
this.exception = exception;
this.allocationId = allocationId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = in.readString();
allocationId = in.readLong();
exception = in.readException();
}
@ -95,6 +99,7 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(taskId);
out.writeLong(allocationId);
out.writeException(exception);
}
@ -104,6 +109,9 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
if (taskId == null) {
validationException = addValidationError("task id is missing", validationException);
}
if (allocationId < 0) {
validationException = addValidationError("allocation id is negative or missing", validationException);
}
return validationException;
}
@ -113,12 +121,13 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(taskId, request.taskId) &&
allocationId == request.allocationId &&
Objects.equals(exception, request.exception);
}
@Override
public int hashCode() {
return Objects.hash(taskId, exception);
return Objects.hash(taskId, allocationId, exception);
}
}
@ -163,7 +172,7 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.completePersistentTask(request.taskId, request.exception,
persistentTasksClusterService.completePersistentTask(request.taskId, request.allocationId, request.exception,
new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> task) {

View File

@ -59,7 +59,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* Creates a new persistent task on master node
*
* @param action the action name
* @param params params
* @param params params
* @param listener the listener that will be called when task is started
*/
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String action, @Nullable Params params,
@ -99,11 +99,12 @@ public class PersistentTasksClusterService extends AbstractComponent implements
/**
* Restarts a record about a running persistent task from cluster state
*
* @param id the id of a persistent task
* @param failure the reason for restarting the task or null if the task completed successfully
* @param listener the listener that will be called when task is removed
* @param id the id of the persistent task
* @param allocationId the allocation id of the persistent task
* @param failure the reason for restarting the task or null if the task completed successfully
* @param listener the listener that will be called when task is removed
*/
public void completePersistentTask(String id, Exception failure, ActionListener<PersistentTask<?>> listener) {
public void completePersistentTask(String id, long allocationId, Exception failure, ActionListener<PersistentTask<?>> listener) {
final String source;
if (failure != null) {
logger.warn("persistent task " + id + " failed", failure);
@ -115,13 +116,17 @@ public class PersistentTasksClusterService extends AbstractComponent implements
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) {
if (tasksInProgress.hasTask(id, allocationId)) {
tasksInProgress.finishTask(id);
return update(currentState, tasksInProgress);
} else {
// we don't send the error message back to the caller becase that would cause an infinite loop of notifications
logger.warn("The task {} wasn't found, status is not updated", id);
return currentState;
if (tasksInProgress.hasTask(id)) {
logger.warn("The task [{}] with id [{}] was found but it has a different allocation id [{}], status is not updated",
PersistentTasksCustomMetaData.getTaskWithId(currentState, id).getTaskName(), id, allocationId);
} else {
logger.warn("The task [{}] wasn't found, status is not updated", id);
}
throw new ResourceNotFoundException("the task with id [" + id + "] and allocation id [" + allocationId + "] not found");
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskId;
@ -70,6 +71,11 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise if the only master restarts
// we start cancelling all local tasks before cluster has a chance to recover.
return;
}
PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData previousTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
@ -120,11 +126,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
AllocatedPersistentTask task = runningTasks.get(id);
if (task.getState() == AllocatedPersistentTask.State.COMPLETED) {
// Result was sent to the caller and the caller acknowledged acceptance of the result
logger.trace("Found completed persistent task [{}] with id [{}] and allocation id [{}] - removing",
task.getAction(), task.getPersistentTaskId(), task.getAllocationId());
runningTasks.remove(id);
} else {
// task is running locally, but master doesn't know about it - that means that the persistent task was removed
// cancel the task without notifying master
logger.trace("Found unregistered persistent task with id {} - cancelling ", id);
logger.trace("Found unregistered persistent task [{}] with id [{}] and allocation id [{}] - cancelling",
task.getAction(), task.getPersistentTaskId(), task.getAllocationId());
cancelTask(id);
}
}
@ -160,6 +169,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
boolean processed = false;
try {
task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(),
task.getPersistentTaskId(), task.getAllocationId());
try {
runningTasks.put(taskInProgress.getAllocationId(), task);
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), task, executor);
@ -171,6 +182,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
} finally {
if (processed == false) {
// something went wrong - unregistering task
logger.warn("Persistent task [{}] with id [{}] and allocation id [{}] failed to create", task.getAction(),
task.getPersistentTaskId(), task.getAllocationId());
taskManager.unregister(task);
}
}
@ -187,14 +200,16 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener<CancelTasksResponse>() {
@Override
public void onResponse(CancelTasksResponse cancelTasksResponse) {
logger.trace("Persistent task with id {} was cancelled", task.getId());
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was cancelled", task.getAction(),
task.getPersistentTaskId(), task.getAllocationId());
}
@Override
public void onFailure(Exception e) {
// There is really nothing we can do in case of failure here
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e);
logger.warn((Supplier<?>) () ->
new ParameterizedMessage("failed to cancel task [{}] with id [{}] and allocation id [{}]", task.getAction(),
task.getPersistentTaskId(), task.getAllocationId()), e);
}
});
}

View File

@ -73,8 +73,9 @@ public class PersistentTasksService extends AbstractComponent {
/**
* Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure
*/
public void sendCompletionNotification(String taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure);
public void sendCompletionNotification(String taskId, long allocationId, Exception failure,
ActionListener<PersistentTask<?>> listener) {
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, allocationId, failure);
try {
client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest,
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));

View File

@ -122,6 +122,18 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(allocationId));
assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster"));
assertThat(firstRunningTask.getDescription(), equalTo("id=" + taskId));
if (randomBoolean()) {
logger.info("Simulating errant completion notification");
//try sending completion request with incorrect allocation id
PlainActionFuture<PersistentTask<?>> failedCompletionNotificationFuture = new PlainActionFuture<>();
persistentTasksService.sendCompletionNotification(taskId, Long.MAX_VALUE, null, failedCompletionNotificationFuture);
assertThrows(failedCompletionNotificationFuture, ResourceNotFoundException.class);
// Make sure that the task is still running
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.setDetailed(true).get().getTasks().size(), equalTo(1));
}
stopOrCancelTask(firstRunningTask.getTaskId());
}

View File

@ -184,7 +184,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
}
@Override
public void sendCompletionNotification(String taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
public void sendCompletionNotification(String taskId, long allocationId, Exception failure,
ActionListener<PersistentTask<?>> listener) {
fail("Shouldn't be called during Cluster State cancellation");
}
};

View File

@ -25,7 +25,7 @@ public class RestartPersistentTaskRequestTests extends AbstractStreamableTestCas
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLength(10), null);
return new Request(randomAlphaOfLength(10), randomLong(), null);
}
@Override