Persistent Tasks: remove task restart on failure (elastic/x-pack-elasticsearch#815)

If a persistent task throws an exception, the persistent tasks framework will no longer try to restart the task. This is a temporary measure to prevent threshing the cluster with endless restart attempt. We will revisit this in the future version to make the restart process more robust. Please note, however, that if node executing the task goes down, the task will still be restarted on another node.

Original commit: elastic/x-pack-elasticsearch@30712e0fbf
This commit is contained in:
Igor Motov 2017-03-23 12:56:48 -04:00 committed by GitHub
parent 6f7f466fa3
commit fba5c09e3d
5 changed files with 10 additions and 50 deletions

View File

@ -178,7 +178,7 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
@Override
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
persistentTasksClusterService.completeOrRestartPersistentTask(request.taskId, request.exception, new ActionListener<Empty>() {
persistentTasksClusterService.completePersistentTask(request.taskId, request.exception, new ActionListener<Empty>() {
@Override
public void onResponse(Empty empty) {
listener.onResponse(newResponse());

View File

@ -85,25 +85,20 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param failure the reason for restarting the task or null if the task completed successfully
* @param listener the listener that will be called when task is removed
*/
public void completeOrRestartPersistentTask(long id, Exception failure, ActionListener<Empty> listener) {
public void completePersistentTask(long id, Exception failure, ActionListener<Empty> listener) {
final String source;
if (failure != null) {
logger.warn("persistent task " + id + " failed, restarting", failure);
source = "restart persistent task";
logger.warn("persistent task " + id + " failed", failure);
source = "finish persistent task (failed)";
} else {
source = "finish persistent task";
source = "finish persistent task (success)";
}
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) {
if (failure != null) {
// If the task failed - we need to restart it on another node, otherwise we just remove it
tasksInProgress.reassignTask(id, (action, request) -> getAssignement(action, currentState, request));
} else {
tasksInProgress.finishTask(id);
}
return update(currentState, tasksInProgress);
} else {
// we don't send the error message back to the caller becase that would cause an infinite loop of notifications

View File

@ -159,7 +159,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Override
public Version getMinimalSupportedVersion() {
return Version.V_5_3_0_UNRELEASED;
return Version.V_5_4_0_UNRELEASED;
}
@Override
@ -588,8 +588,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
/**
* Assigns the task to another node if the task exist and not currently assigned
* <p>
* The operation is only performed if the task is not currently assigned to any nodes. To force assignment use
* {@link #reassignTask(long, BiFunction)} instead
* The operation is only performed if the task is not currently assigned to any nodes.
*/
@SuppressWarnings("unchecked")
public <Request extends PersistentTaskRequest> Builder assignTask(long taskId,
@ -605,20 +604,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return this;
}
/**
* Reassigns the task to another node if the task exist
*/
@SuppressWarnings("unchecked")
public <Request extends PersistentTaskRequest> Builder reassignTask(long taskId,
BiFunction<String, Request, Assignment> executorNodeFunc) {
PersistentTask<Request> taskInProgress = (PersistentTask<Request>) tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
Assignment assignment = executorNodeFunc.apply(taskInProgress.taskName, taskInProgress.request);
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
}
return this;
}
/**
* Updates the task status if the task exist

View File

@ -212,11 +212,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
if (builder.hasTask(lastKnownTask)) {
changed = true;
}
if (randomBoolean()) {
builder.reassignTask(lastKnownTask, randomAssignment());
} else {
builder.reassignTask(lastKnownTask, (s, request) -> randomAssignment());
}
break;
case 2:
if (builder.hasTask(lastKnownTask)) {

View File

@ -63,7 +63,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
}
}
public void testPersistentActionRestart() throws Exception {
public void testPersistentActionFailure() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
@ -85,22 +85,6 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertThat(new TestTasksRequestBuilder(client()).setOperation("fail").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1));
assertBusy(() -> {
// Wait for the task to restart
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks();
logger.info("Found {} tasks", tasks.size());
assertThat(tasks.size(), equalTo(1));
// Make sure that restarted task is different
assertThat(tasks.get(0).getTaskId(), not(equalTo(firstRunningTask.getTaskId())));
});
logger.info("Removing persistent task with id {}", firstRunningTask.getId());
// Remove the persistent task
PersistentTaskOperationFuture removeFuture = new PersistentTaskOperationFuture();
persistentTasksService.removeTask(taskId, removeFuture);
assertEquals(removeFuture.get(), (Long) taskId);
logger.info("Waiting for persistent task with id {} to disappear", firstRunningTask.getId());
assertBusy(() -> {
// Wait for the task to disappear completely