Remove cluster update task when task times out (#21578)

Fixes an issue where the cluster service does not remove an update task from its internal data structures that are used for batching cluster state updates.

* review comments

* checkstyle
This commit is contained in:
Yannick Welsch 2016-11-15 21:38:58 +01:00 committed by GitHub
parent 17a2fffc9b
commit 40e0162e61
2 changed files with 71 additions and 7 deletions

View File

@ -70,6 +70,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -114,7 +115,7 @@ public class ClusterService extends AbstractLifecycleComponent {
private final Collection<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<>();
private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private final Collection<ClusterStateListener> lastClusterStateListeners = new CopyOnWriteArrayList<>();
private final Map<ClusterStateTaskExecutor, List<UpdateTask>> updateTasksPerExecutor = new HashMap<>();
final Map<ClusterStateTaskExecutor, LinkedHashSet<UpdateTask>> updateTasksPerExecutor = new HashMap<>();
// TODO this is rather frequently changing I guess a Synced Set would be better here and a dedicated remove API
private final Collection<ClusterStateListener> postAppliedListeners = new CopyOnWriteArrayList<>();
private final Iterable<ClusterStateListener> preAppliedListeners = Iterables.concat(priorityClusterStateListeners,
@ -454,7 +455,8 @@ public class ClusterService extends AbstractLifecycleComponent {
).collect(Collectors.toList());
synchronized (updateTasksPerExecutor) {
List<UpdateTask> existingTasks = updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>());
LinkedHashSet<UpdateTask> existingTasks = updateTasksPerExecutor.computeIfAbsent(executor,
k -> new LinkedHashSet<>(updateTasks.size()));
for (@SuppressWarnings("unchecked") UpdateTask<T> existing : existingTasks) {
if (tasksIdentity.containsKey(existing.task)) {
throw new IllegalStateException("task [" + executor.describeTasks(Collections.singletonList(existing.task)) +
@ -466,12 +468,29 @@ public class ClusterService extends AbstractLifecycleComponent {
final UpdateTask<T> firstTask = updateTasks.get(0);
if (config.timeout() != null) {
updateTasksExecutor.execute(firstTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> {
final TimeValue timeout = config.timeout();
if (timeout != null) {
updateTasksExecutor.execute(firstTask, threadPool.scheduler(), timeout, () -> threadPool.generic().execute(() -> {
final ArrayList<UpdateTask<T>> toRemove = new ArrayList<>();
for (UpdateTask<T> task : updateTasks) {
if (task.processed.getAndSet(true) == false) {
logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout());
task.listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source));
logger.debug("cluster state update task [{}] timed out after [{}]", source, timeout);
toRemove.add(task);
}
}
if (toRemove.isEmpty() == false) {
ClusterStateTaskExecutor<T> clusterStateTaskExecutor = toRemove.get(0).executor;
synchronized (updateTasksPerExecutor) {
LinkedHashSet<UpdateTask> existingTasks = updateTasksPerExecutor.get(clusterStateTaskExecutor);
if (existingTasks != null) {
existingTasks.removeAll(toRemove);
if (existingTasks.isEmpty()) {
updateTasksPerExecutor.remove(clusterStateTaskExecutor);
}
}
}
for (UpdateTask<T> task : toRemove) {
task.listener.onFailure(source, new ProcessClusterEventTimeoutException(timeout, source));
}
}
}));
@ -567,7 +586,7 @@ public class ClusterService extends AbstractLifecycleComponent {
final ArrayList<UpdateTask<T>> toExecute = new ArrayList<>();
final Map<String, ArrayList<T>> processTasksBySource = new HashMap<>();
synchronized (updateTasksPerExecutor) {
List<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
LinkedHashSet<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
if (pending != null) {
for (UpdateTask<T> task : pending) {
if (task.processed.getAndSet(true) == false) {

View File

@ -148,6 +148,51 @@ public class ClusterServiceTests extends ESTestCase {
return timedClusterService;
}
public void testTimedOutUpdateTaskCleanedUp() throws Exception {
final CountDownLatch block = new CountDownLatch(1);
clusterService.submitStateUpdateTask("block-task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
block.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return currentState;
}
@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e);
}
});
final CountDownLatch block2 = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
block2.countDown();
return currentState;
}
@Override
public TimeValue timeout() {
return TimeValue.ZERO;
}
@Override
public void onFailure(String source, Exception e) {
block2.countDown();
}
});
block.countDown();
block2.await();
synchronized (clusterService.updateTasksPerExecutor) {
assertTrue("expected empty map but was " + clusterService.updateTasksPerExecutor,
clusterService.updateTasksPerExecutor.isEmpty());
}
}
public void testTimeoutUpdateTask() throws Exception {
final CountDownLatch block = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {