clean timeout listener task when removing the listener

This commit is contained in:
kimchy 2010-07-20 18:32:49 +03:00
parent 45e54c1705
commit 6915b39dc3
1 changed files with 11 additions and 5 deletions

View File

@ -63,7 +63,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>(); private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
private final Queue<Tuple<Timeout, TimeoutClusterStateListener>> onGoingTimeouts = new LinkedTransferQueue<Tuple<Timeout, TimeoutClusterStateListener>>(); private final Queue<Tuple<Timeout, NotifyTimeout>> onGoingTimeouts = new LinkedTransferQueue<Tuple<Timeout, NotifyTimeout>>();
private volatile ClusterState clusterState = newClusterStateBuilder().build(); private volatile ClusterState clusterState = newClusterStateBuilder().build();
@ -82,9 +82,9 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
for (Tuple<Timeout, TimeoutClusterStateListener> onGoingTimeout : onGoingTimeouts) { for (Tuple<Timeout, NotifyTimeout> onGoingTimeout : onGoingTimeouts) {
onGoingTimeout.v1().cancel(); onGoingTimeout.v1().cancel();
onGoingTimeout.v2().onClose(); onGoingTimeout.v2().listener.onClose();
} }
updateTasksExecutor.shutdown(); updateTasksExecutor.shutdown();
try { try {
@ -107,11 +107,17 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
public void remove(ClusterStateListener listener) { public void remove(ClusterStateListener listener) {
clusterStateListeners.remove(listener); clusterStateListeners.remove(listener);
for (Tuple<Timeout, NotifyTimeout> onGoingTimeout : onGoingTimeouts) {
if (onGoingTimeout.v2().listener == listener) {
onGoingTimeout.v1().cancel();
}
}
} }
public void add(TimeValue timeout, final TimeoutClusterStateListener listener) { public void add(TimeValue timeout, final TimeoutClusterStateListener listener) {
Timeout timerTimeout = timerService.newTimeout(new NotifyTimeout(listener, timeout), timeout, TimerService.ExecutionType.THREADED); NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
onGoingTimeouts.add(new Tuple<Timeout, TimeoutClusterStateListener>(timerTimeout, listener)); Timeout timerTimeout = timerService.newTimeout(notifyTimeout, timeout, TimerService.ExecutionType.THREADED);
onGoingTimeouts.add(new Tuple<Timeout, NotifyTimeout>(timerTimeout, notifyTimeout));
clusterStateListeners.add(listener); clusterStateListeners.add(listener);
// call the post added notification on the same event thread // call the post added notification on the same event thread
updateTasksExecutor.execute(new Runnable() { updateTasksExecutor.execute(new Runnable() {