diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index ce5c0f3e258..6d69d57ad1f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -70,6 +70,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -114,7 +115,7 @@ public class ClusterService extends AbstractLifecycleComponent { private final Collection priorityClusterStateListeners = new CopyOnWriteArrayList<>(); private final Collection clusterStateListeners = new CopyOnWriteArrayList<>(); private final Collection lastClusterStateListeners = new CopyOnWriteArrayList<>(); - private final Map> updateTasksPerExecutor = new HashMap<>(); + final Map> updateTasksPerExecutor = new HashMap<>(); // TODO this is rather frequently changing I guess a Synced Set would be better here and a dedicated remove API private final Collection postAppliedListeners = new CopyOnWriteArrayList<>(); private final Iterable preAppliedListeners = Iterables.concat(priorityClusterStateListeners, @@ -454,7 +455,8 @@ public class ClusterService extends AbstractLifecycleComponent { ).collect(Collectors.toList()); synchronized (updateTasksPerExecutor) { - List existingTasks = updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>()); + LinkedHashSet existingTasks = updateTasksPerExecutor.computeIfAbsent(executor, + k -> new LinkedHashSet<>(updateTasks.size())); for (@SuppressWarnings("unchecked") UpdateTask existing : existingTasks) { if (tasksIdentity.containsKey(existing.task)) { throw new IllegalStateException("task [" + executor.describeTasks(Collections.singletonList(existing.task)) + @@ -466,12 +468,29 @@ public class ClusterService extends AbstractLifecycleComponent { final UpdateTask firstTask = updateTasks.get(0); - if (config.timeout() != null) { - updateTasksExecutor.execute(firstTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> { + final TimeValue timeout = config.timeout(); + if (timeout != null) { + updateTasksExecutor.execute(firstTask, threadPool.scheduler(), timeout, () -> threadPool.generic().execute(() -> { + final ArrayList> toRemove = new ArrayList<>(); for (UpdateTask task : updateTasks) { if (task.processed.getAndSet(true) == false) { - logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout()); - task.listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source)); + logger.debug("cluster state update task [{}] timed out after [{}]", source, timeout); + toRemove.add(task); + } + } + if (toRemove.isEmpty() == false) { + ClusterStateTaskExecutor clusterStateTaskExecutor = toRemove.get(0).executor; + synchronized (updateTasksPerExecutor) { + LinkedHashSet existingTasks = updateTasksPerExecutor.get(clusterStateTaskExecutor); + if (existingTasks != null) { + existingTasks.removeAll(toRemove); + if (existingTasks.isEmpty()) { + updateTasksPerExecutor.remove(clusterStateTaskExecutor); + } + } + } + for (UpdateTask task : toRemove) { + task.listener.onFailure(source, new ProcessClusterEventTimeoutException(timeout, source)); } } })); @@ -567,7 +586,7 @@ public class ClusterService extends AbstractLifecycleComponent { final ArrayList> toExecute = new ArrayList<>(); final Map> processTasksBySource = new HashMap<>(); synchronized (updateTasksPerExecutor) { - List pending = updateTasksPerExecutor.remove(executor); + LinkedHashSet pending = updateTasksPerExecutor.remove(executor); if (pending != null) { for (UpdateTask task : pending) { if (task.processed.getAndSet(true) == false) { diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java index bede01ed21b..026d3a16185 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java @@ -148,6 +148,51 @@ public class ClusterServiceTests extends ESTestCase { return timedClusterService; } + public void testTimedOutUpdateTaskCleanedUp() throws Exception { + final CountDownLatch block = new CountDownLatch(1); + clusterService.submitStateUpdateTask("block-task", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + try { + block.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + throw new RuntimeException(e); + } + }); + + final CountDownLatch block2 = new CountDownLatch(1); + clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + block2.countDown(); + return currentState; + } + + @Override + public TimeValue timeout() { + return TimeValue.ZERO; + } + + @Override + public void onFailure(String source, Exception e) { + block2.countDown(); + } + }); + block.countDown(); + block2.await(); + synchronized (clusterService.updateTasksPerExecutor) { + assertTrue("expected empty map but was " + clusterService.updateTasksPerExecutor, + clusterService.updateTasksPerExecutor.isEmpty()); + } + } + public void testTimeoutUpdateTask() throws Exception { final CountDownLatch block = new CountDownLatch(1); clusterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {