Persistent Tasks: remove task restart on failure (#815)

If a persistent task throws an exception, the persistent tasks framework will no longer try to restart the task. This is a temporary measure to prevent threshing the cluster with endless restart attempt. We will revisit this in the future version to make the restart process more robust. Please note, however, that if node executing the task goes down, the task will still be restarted on another node.
This commit is contained in:
Igor Motov 2017-03-23 12:56:48 -04:00 committed by Martijn van Groningen
parent 9bd24418d5
commit 19f39fd392
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
8 changed files with 23 additions and 51 deletions

View File

@ -191,7 +191,7 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
@Override @Override
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) { protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
persistentTasksClusterService.completeOrRestartPersistentTask(request.taskId, request.exception, new ActionListener<Empty>() { persistentTasksClusterService.completePersistentTask(request.taskId, request.exception, new ActionListener<Empty>() {
@Override @Override
public void onResponse(Empty empty) { public void onResponse(Empty empty) {
listener.onResponse(newResponse()); listener.onResponse(newResponse());

View File

@ -99,25 +99,20 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param failure the reason for restarting the task or null if the task completed successfully * @param failure the reason for restarting the task or null if the task completed successfully
* @param listener the listener that will be called when task is removed * @param listener the listener that will be called when task is removed
*/ */
public void completeOrRestartPersistentTask(long id, Exception failure, ActionListener<Empty> listener) { public void completePersistentTask(long id, Exception failure, ActionListener<Empty> listener) {
final String source; final String source;
if (failure != null) { if (failure != null) {
logger.warn("persistent task " + id + " failed, restarting", failure); logger.warn("persistent task " + id + " failed", failure);
source = "restart persistent task"; source = "finish persistent task (failed)";
} else { } else {
source = "finish persistent task"; source = "finish persistent task (success)";
} }
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) { if (tasksInProgress.hasTask(id)) {
if (failure != null) {
// If the task failed - we need to restart it on another node, otherwise we just remove it
tasksInProgress.reassignTask(id, (action, request) -> getAssignement(action, currentState, request));
} else {
tasksInProgress.finishTask(id); tasksInProgress.finishTask(id);
}
return update(currentState, tasksInProgress); return update(currentState, tasksInProgress);
} else { } else {
// we don't send the error message back to the caller becase that would cause an infinite loop of notifications // we don't send the error message back to the caller becase that would cause an infinite loop of notifications

View File

@ -172,7 +172,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Override @Override
public Version getMinimalSupportedVersion() { public Version getMinimalSupportedVersion() {
return Version.V_5_3_0_UNRELEASED; return Version.V_5_4_0_UNRELEASED;
} }
@Override @Override
@ -601,8 +601,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
/** /**
* Assigns the task to another node if the task exist and not currently assigned * Assigns the task to another node if the task exist and not currently assigned
* <p> * <p>
* The operation is only performed if the task is not currently assigned to any nodes. To force assignment use * The operation is only performed if the task is not currently assigned to any nodes.
* {@link #reassignTask(long, BiFunction)} instead
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <Request extends PersistentTaskRequest> Builder assignTask(long taskId, public <Request extends PersistentTaskRequest> Builder assignTask(long taskId,
@ -618,20 +617,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return this; return this;
} }
/**
* Reassigns the task to another node if the task exist
*/
@SuppressWarnings("unchecked")
public <Request extends PersistentTaskRequest> Builder reassignTask(long taskId,
BiFunction<String, Request, Assignment> executorNodeFunc) {
PersistentTask<Request> taskInProgress = (PersistentTask<Request>) tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
Assignment assignment = executorNodeFunc.apply(taskInProgress.taskName, taskInProgress.request);
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
}
return this;
}
/** /**
* Updates the task status if the task exist * Updates the task status if the task exist

View File

@ -11,6 +11,10 @@
* *
* Unless required by applicable law or agreed to in writing, * Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an * software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/ */
package org.elasticsearch.persistent; package org.elasticsearch.persistent;

View File

@ -225,11 +225,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
if (builder.hasTask(lastKnownTask)) { if (builder.hasTask(lastKnownTask)) {
changed = true; changed = true;
} }
if (randomBoolean()) {
builder.reassignTask(lastKnownTask, randomAssignment()); builder.reassignTask(lastKnownTask, randomAssignment());
} else {
builder.reassignTask(lastKnownTask, (s, request) -> randomAssignment());
}
break; break;
case 2: case 2:
if (builder.hasTask(lastKnownTask)) { if (builder.hasTask(lastKnownTask)) {

View File

@ -11,6 +11,10 @@
* *
* Unless required by applicable law or agreed to in writing, * Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an * software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/ */
package org.elasticsearch.persistent; package org.elasticsearch.persistent;

View File

@ -77,7 +77,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
} }
} }
public void testPersistentActionRestart() throws Exception { public void testPersistentActionFailure() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
@ -99,22 +99,6 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertThat(new TestTasksRequestBuilder(client()).setOperation("fail").setTaskId(firstRunningTask.getTaskId()) assertThat(new TestTasksRequestBuilder(client()).setOperation("fail").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1)); .get().getTasks().size(), equalTo(1));
assertBusy(() -> {
// Wait for the task to restart
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks();
logger.info("Found {} tasks", tasks.size());
assertThat(tasks.size(), equalTo(1));
// Make sure that restarted task is different
assertThat(tasks.get(0).getTaskId(), not(equalTo(firstRunningTask.getTaskId())));
});
logger.info("Removing persistent task with id {}", firstRunningTask.getId());
// Remove the persistent task
PersistentTaskOperationFuture removeFuture = new PersistentTaskOperationFuture();
persistentTasksService.removeTask(taskId, removeFuture);
assertEquals(removeFuture.get(), (Long) taskId);
logger.info("Waiting for persistent task with id {} to disappear", firstRunningTask.getId()); logger.info("Waiting for persistent task with id {} to disappear", firstRunningTask.getId());
assertBusy(() -> { assertBusy(() -> {
// Wait for the task to disappear completely // Wait for the task to disappear completely

View File

@ -11,6 +11,10 @@
* *
* Unless required by applicable law or agreed to in writing, * Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an * software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/ */
package org.elasticsearch.persistent; package org.elasticsearch.persistent;