Catch AllocatedTask registration failures (#45300)
When a persistent task attempts to register an allocated task locally, this creates the Task object and starts tracking it locally. If there is a failure while initializing the task, this is handled by a catch and subsequent error handling (canceling, unregistering, etc). But if the task fails to be created because an exception is thrown in the tasks ctor, this is uncaught and fails the cluster update thread. The ramification is that a persistent task remains in the cluster state, but is unable to create the allocated task, and the exception prevents other tasks "after" the poisoned task from starting too. Because the allocated task is never created, the cancellation tools are not able to remove the persistent task and it is stuck as a zombie in the CS. This commit adds exception handling around the task creation, and attempts to notify the master if there is a failure (so the persistent task can be removed). Even if this notification fails, the exception handling means the rest of the uninitialized tasks can proceed as normal.
This commit is contained in:
parent
73e266b2fd
commit
cd441f6906
|
@ -30,11 +30,11 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskAwareRequest;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
@ -112,7 +112,13 @@ public class PersistentTasksNodeService implements ClusterStateListener {
|
|||
AllocatedPersistentTask persistentTask = runningTasks.get(allocationId);
|
||||
if (persistentTask == null) {
|
||||
// New task - let's start it
|
||||
try {
|
||||
startTask(taskInProgress);
|
||||
} catch (Exception e) {
|
||||
logger.error("Unable to start allocated task [" + taskInProgress.getTaskName()
|
||||
+ "] with id [" + taskInProgress.getId()
|
||||
+ "] and allocation id [" + taskInProgress.getAllocationId() + "]", e);
|
||||
}
|
||||
} else {
|
||||
// The task is still running
|
||||
notVisitedTasks.remove(allocationId);
|
||||
|
@ -163,8 +169,18 @@ public class PersistentTasksNodeService implements ClusterStateListener {
|
|||
return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers);
|
||||
}
|
||||
};
|
||||
AllocatedPersistentTask task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]",
|
||||
request);
|
||||
|
||||
AllocatedPersistentTask task;
|
||||
try {
|
||||
task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", request);
|
||||
} catch (Exception e) {
|
||||
logger.error("Fatal error registering persistent task [" + taskInProgress.getTaskName()
|
||||
+ "] with id [" + taskInProgress.getId() + "] and allocation id [" + taskInProgress.getAllocationId()
|
||||
+ "], removing from persistent tasks", e);
|
||||
notifyMasterOfFailedTask(taskInProgress, e);
|
||||
return;
|
||||
}
|
||||
|
||||
boolean processed = false;
|
||||
try {
|
||||
task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
|
||||
|
@ -188,6 +204,25 @@ public class PersistentTasksNodeService implements ClusterStateListener {
|
|||
}
|
||||
}
|
||||
|
||||
private <Params extends PersistentTaskParams> void notifyMasterOfFailedTask(PersistentTask<Params> taskInProgress,
|
||||
Exception originalException) {
|
||||
persistentTasksService.sendCompletionRequest(taskInProgress.getId(), taskInProgress.getAllocationId(), originalException,
|
||||
new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
logger.trace("completion notification for failed task [{}] with id [{}] was successful", taskInProgress.getTaskName(),
|
||||
taskInProgress.getAllocationId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception notificationException) {
|
||||
notificationException.addSuppressed(originalException);
|
||||
logger.warn(new ParameterizedMessage("notification for task [{}] with id [{}] failed",
|
||||
taskInProgress.getTaskName(), taskInProgress.getAllocationId()), notificationException);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregisters and then cancels the locally running task using the task manager. No notification to master will be send upon
|
||||
* cancellation.
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
|
||||
|
@ -49,10 +51,13 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.hamcrest.core.IsEqual.equalTo;
|
||||
|
@ -309,6 +314,58 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
assertThat(taskManager.getTasks().values(), empty());
|
||||
}
|
||||
|
||||
public void testRegisterTaskFails() throws InterruptedException {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
final Client mockClient = mock(Client.class);
|
||||
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
||||
when(threadPool.generic()).thenReturn(EsExecutors.newDirectExecutorService());
|
||||
when(mockClient.threadPool()).thenReturn(threadPool);
|
||||
when(mockClient.settings()).thenReturn(Settings.EMPTY);
|
||||
|
||||
PersistentTasksService persistentTasksService = new PersistentTasksService(null, null, mockClient) {
|
||||
@Override
|
||||
public void sendCompletionRequest(String taskId, long taskAllocationId, Exception taskFailure,
|
||||
ActionListener<PersistentTask<?>> listener) {
|
||||
assertThat(taskFailure, instanceOf(RuntimeException.class));
|
||||
assertThat(taskFailure.getMessage(), equalTo("Something went wrong"));
|
||||
listener.onResponse(mock(PersistentTask.class));
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
|
||||
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
|
||||
when(action.getTaskName()).thenReturn(TestPersistentTasksExecutor.NAME);
|
||||
when(action.createTask(anyLong(), anyString(), anyString(), any(), any(), any()))
|
||||
.thenThrow(new RuntimeException("Something went wrong"));
|
||||
|
||||
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Collections.singletonList(action));
|
||||
|
||||
MockExecutor executor = new MockExecutor();
|
||||
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(persistentTasksService,
|
||||
registry, new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), executor);
|
||||
|
||||
ClusterState state = createInitialClusterState(0, Settings.EMPTY);
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
|
||||
|
||||
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("this_param"),
|
||||
new Assignment("this_node", "test assignment on this node"));
|
||||
|
||||
MetaData.Builder metaData = MetaData.builder(state.metaData());
|
||||
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
|
||||
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
|
||||
|
||||
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
|
||||
|
||||
// Failed to start the task, make sure it wasn't invoked further
|
||||
assertThat(executor.executions.size(), equalTo(0));
|
||||
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private <Params extends PersistentTaskParams> ClusterState addTask(ClusterState state, String action, Params params,
|
||||
String node) {
|
||||
PersistentTasksCustomMetaData.Builder builder =
|
||||
|
|
Loading…
Reference in New Issue