From 6915b39dc38a481962094dae31174adcedc9371d Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 20 Jul 2010 18:32:49 +0300 Subject: [PATCH] clean timeout listener task when removing the listener --- .../cluster/service/InternalClusterService.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index b782b9c5c73..e5c6c41fdfa 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -63,7 +63,7 @@ public class InternalClusterService extends AbstractLifecycleComponent clusterStateListeners = new CopyOnWriteArrayList(); - private final Queue> onGoingTimeouts = new LinkedTransferQueue>(); + private final Queue> onGoingTimeouts = new LinkedTransferQueue>(); private volatile ClusterState clusterState = newClusterStateBuilder().build(); @@ -82,9 +82,9 @@ public class InternalClusterService extends AbstractLifecycleComponent onGoingTimeout : onGoingTimeouts) { + for (Tuple onGoingTimeout : onGoingTimeouts) { onGoingTimeout.v1().cancel(); - onGoingTimeout.v2().onClose(); + onGoingTimeout.v2().listener.onClose(); } updateTasksExecutor.shutdown(); try { @@ -107,11 +107,17 @@ public class InternalClusterService extends AbstractLifecycleComponent onGoingTimeout : onGoingTimeouts) { + if (onGoingTimeout.v2().listener == listener) { + onGoingTimeout.v1().cancel(); + } + } } public void add(TimeValue timeout, final TimeoutClusterStateListener listener) { - Timeout timerTimeout = timerService.newTimeout(new NotifyTimeout(listener, timeout), timeout, TimerService.ExecutionType.THREADED); - onGoingTimeouts.add(new Tuple(timerTimeout, listener)); + NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout); + Timeout timerTimeout = timerService.newTimeout(notifyTimeout, timeout, TimerService.ExecutionType.THREADED); + onGoingTimeouts.add(new Tuple(timerTimeout, notifyTimeout)); clusterStateListeners.add(listener); // call the post added notification on the same event thread updateTasksExecutor.execute(new Runnable() {