Persistent Tasks: require correct allocation id for status updates (elastic/x-pack-elasticsearch#923)

In order to prevent tasks state updates by stale executors, this commit adds a check for correct allocation id during status update operation.

Original commit: elastic/x-pack-elasticsearch@b94eb0e863
This commit is contained in:
Igor Motov 2017-04-01 18:17:07 -04:00 committed by GitHub
parent b5a285fd83
commit 4115336f5a
11 changed files with 64 additions and 47 deletions

View File

@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicReference;
*/ */
public class AllocatedPersistentTask extends CancellableTask { public class AllocatedPersistentTask extends CancellableTask {
private long persistentTaskId; private long persistentTaskId;
private long allocationId;
private final AtomicReference<State> state; private final AtomicReference<State> state;
@Nullable @Nullable
@ -57,16 +58,17 @@ public class AllocatedPersistentTask extends CancellableTask {
* This doesn't affect the status of this allocated task. * This doesn't affect the status of this allocated task.
*/ */
public void updatePersistentStatus(Task.Status status, PersistentTasksService.PersistentTaskOperationListener listener) { public void updatePersistentStatus(Task.Status status, PersistentTasksService.PersistentTaskOperationListener listener) {
persistentTasksService.updateStatus(persistentTaskId, status, listener); persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener);
} }
public long getPersistentTaskId() { public long getPersistentTaskId() {
return persistentTaskId; return persistentTaskId;
} }
void init(PersistentTasksService persistentTasksService, long persistentTaskId) { void init(PersistentTasksService persistentTasksService, long persistentTaskId, long allocationId) {
this.persistentTasksService = persistentTasksService; this.persistentTasksService = persistentTasksService;
this.persistentTaskId = persistentTaskId; this.persistentTaskId = persistentTaskId;
this.allocationId = allocationId;
} }
public Exception getFailure() { public Exception getFailure() {

View File

@ -12,6 +12,8 @@ import org.elasticsearch.transport.TransportResponse.Empty;
/** /**
* This component is responsible for execution of persistent tasks. * This component is responsible for execution of persistent tasks.
*
* It abstracts away the execution of tasks and greatly simplifies testing of PersistentTasksNodeService
*/ */
public class NodePersistentTasksExecutor { public class NodePersistentTasksExecutor {
private final ThreadPool threadPool; private final ThreadPool threadPool;

View File

@ -178,18 +178,24 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* Update task status * Update task status
* *
* @param id the id of a persistent task * @param id the id of a persistent task
* @param allocationId the expected allocation id of the persistent task
* @param status new status * @param status new status
* @param listener the listener that will be called when task is removed * @param listener the listener that will be called when task is removed
*/ */
public void updatePersistentTaskStatus(long id, Task.Status status, ActionListener<Empty> listener) { public void updatePersistentTaskStatus(long id, long allocationId, Task.Status status, ActionListener<Empty> listener) {
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) { if (tasksInProgress.hasTask(id, allocationId)) {
return update(currentState, tasksInProgress.updateTaskStatus(id, status)); return update(currentState, tasksInProgress.updateTaskStatus(id, status));
} else { } else {
throw new ResourceNotFoundException("the task with id {} doesn't exist", id); if (tasksInProgress.hasTask(id)) {
logger.warn("trying to update status on task {} with unexpected allocation id {}", id, allocationId);
} else {
logger.warn("trying to update status on non-existing task {}", id);
}
throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", id, allocationId);
} }
} }

View File

@ -621,6 +621,17 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return tasks.containsKey(taskId); return tasks.containsKey(taskId);
} }
/**
* Checks if the task is currently present in the list and has the right allocation id
*/
public boolean hasTask(long taskId, long allocationId) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
return taskInProgress.getAllocationId() == allocationId;
}
return false;
}
/** /**
* Returns the id of the last added task * Returns the id of the last added task
*/ */

View File

@ -87,27 +87,6 @@ public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequ
} }
/**
* Updates the persistent task status in the cluster state.
* <p>
* The status can be used to store the current progress of the task or provide an insight for the
* task allocator about the state of the currently running tasks.
*/
protected void updatePersistentTaskStatus(AllocatedPersistentTask task, Task.Status status, ActionListener<Empty> listener) {
persistentTasksService.updateStatus(task.getPersistentTaskId(), status,
new PersistentTaskOperationListener() {
@Override
public void onResponse(long taskId) {
listener.onResponse(Empty.INSTANCE);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
/** /**
* This operation will be executed on the executor node. * This operation will be executed on the executor node.
* <p> * <p>

View File

@ -118,7 +118,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
taskInProgress.getRequest()); taskInProgress.getRequest());
boolean processed = false; boolean processed = false;
try { try {
task.init(persistentTasksService, taskInProgress.getId()); task.init(persistentTasksService, taskInProgress.getId(), taskInProgress.getAllocationId());
PersistentTaskListener listener = new PersistentTaskListener(task); PersistentTaskListener listener = new PersistentTaskListener(task);
try { try {
runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), task); runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), task);

View File

@ -87,10 +87,14 @@ public class PersistentTasksService extends AbstractComponent {
} }
/** /**
* Updates status of the persistent task * Updates status of the persistent task.
*
* Persistent task implementers shouldn't call this method directly and use
* {@link AllocatedPersistentTask#updatePersistentStatus} instead
*/ */
public void updateStatus(long taskId, Task.Status status, PersistentTaskOperationListener listener) { void updateStatus(long taskId, long allocationId, Task.Status status, PersistentTaskOperationListener listener) {
UpdatePersistentTaskStatusAction.Request updateStatusRequest = new UpdatePersistentTaskStatusAction.Request(taskId, status); UpdatePersistentTaskStatusAction.Request updateStatusRequest =
new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status);
try { try {
client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, ActionListener.wrap( client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, ActionListener.wrap(
o -> listener.onResponse(taskId), listener::onFailure)); o -> listener.onResponse(taskId), listener::onFailure));

View File

@ -56,14 +56,17 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
private long taskId; private long taskId;
private long allocationId;
private Task.Status status; private Task.Status status;
public Request() { public Request() {
} }
public Request(long taskId, Task.Status status) { public Request(long taskId, long allocationId, Task.Status status) {
this.taskId = taskId; this.taskId = taskId;
this.allocationId = allocationId;
this.status = status; this.status = status;
} }
@ -71,6 +74,10 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
this.taskId = taskId; this.taskId = taskId;
} }
public void setAllocationId(long allocationId) {
this.allocationId = allocationId;
}
public void setStatus(Task.Status status) { public void setStatus(Task.Status status) {
this.status = status; this.status = status;
} }
@ -79,6 +86,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
taskId = in.readLong(); taskId = in.readLong();
allocationId = in.readLong();
status = in.readOptionalNamedWriteable(Task.Status.class); status = in.readOptionalNamedWriteable(Task.Status.class);
} }
@ -86,6 +94,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeLong(taskId); out.writeLong(taskId);
out.writeLong(allocationId);
out.writeOptionalNamedWriteable(status); out.writeOptionalNamedWriteable(status);
} }
@ -99,13 +108,13 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o; Request request = (Request) o;
return taskId == request.taskId && return taskId == request.taskId && allocationId == request.allocationId &&
Objects.equals(status, request.status); Objects.equals(status, request.status);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(taskId, status); return Objects.hash(taskId, allocationId, status);
} }
} }
@ -194,7 +203,8 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
@Override @Override
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) { protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
persistentTasksClusterService.updatePersistentTaskStatus(request.taskId, request.status, new ActionListener<Empty>() { persistentTasksClusterService.updatePersistentTaskStatus(request.taskId, request.allocationId, request.status,
new ActionListener<Empty>() {
@Override @Override
public void onResponse(Empty empty) { public void onResponse(Empty empty) {
listener.onResponse(new Response(true)); listener.onResponse(new Response(true));

View File

@ -5,14 +5,16 @@
*/ */
package org.elasticsearch.xpack.persistent; package org.elasticsearch.xpack.persistent;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.Status;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder;
@ -23,6 +25,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@ -49,17 +52,11 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertNoRunningTasks(); assertNoRunningTasks();
} }
public static class PersistentTaskOperationFuture extends BaseFuture<Long> implements WaitForPersistentTaskStatusListener { public static class PersistentTaskOperationFuture extends PlainActionFuture<Long> implements WaitForPersistentTaskStatusListener {
@Override @Override
public void onResponse(long taskId) { public void onResponse(long taskId) {
set(taskId); set(taskId);
} }
@Override
public void onFailure(Exception e) {
setException(e);
}
} }
public void testPersistentActionFailure() throws Exception { public void testPersistentActionFailure() throws Exception {
@ -186,7 +183,12 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
persistentTasksService.waitForPersistentTaskStatus(taskId, persistentTasksService.waitForPersistentTaskStatus(taskId,
task -> false, TimeValue.timeValueMillis(10), future1); task -> false, TimeValue.timeValueMillis(10), future1);
expectThrows(Exception.class, future1::get); assertThrows(future1, IllegalStateException.class, "timed out after 10ms");
PersistentTaskOperationFuture failedUpdateFuture = new PersistentTaskOperationFuture();
persistentTasksService.updateStatus(taskId, -1, new Status("should fail"), failedUpdateFuture);
assertThrows(failedUpdateFuture, ResourceNotFoundException.class, "the task with id " + taskId +
" and allocation id -1 doesn't exist");
// Wait for the task to disappear // Wait for the task to disappear
PersistentTaskOperationFuture future2 = new PersistentTaskOperationFuture(); PersistentTaskOperationFuture future2 = new PersistentTaskOperationFuture();

View File

@ -49,6 +49,7 @@ import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -351,9 +352,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
Status status = new Status("phase " + phase.incrementAndGet()); Status status = new Status("phase " + phase.incrementAndGet());
logger.info("updating the task status to {}", status); logger.info("updating the task status to {}", status);
updatePersistentTaskStatus(task, status, new ActionListener<Empty>() { task.updatePersistentStatus(status, new PersistentTaskOperationListener() {
@Override @Override
public void onResponse(Empty empty) { public void onResponse(long taskId) {
logger.info("updating was successful"); logger.info("updating was successful");
latch.countDown(); latch.countDown();
} }

View File

@ -17,7 +17,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase
@Override @Override
protected Request createTestInstance() { protected Request createTestInstance() {
return new Request(randomLong(), new Status(randomAsciiOfLength(10))); return new Request(randomLong(), randomLong(), new Status(randomAsciiOfLength(10)));
} }
@Override @Override