ignore rejected exception when shutting down in cluster service

This commit is contained in:
Shay Banon 2013-09-05 16:52:57 +02:00
parent 81d70b1ef8
commit 623e340d4f
1 changed files with 42 additions and 29 deletions

View File

@ -36,10 +36,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.*;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.node.settings.NodeSettingsService;
@ -199,16 +196,24 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return; return;
} }
// call the post added notification on the same event thread // call the post added notification on the same event thread
updateTasksExecutor.execute(new PrioritizedRunnable(Priority.HIGH) { try {
@Override updateTasksExecutor.execute(new PrioritizedRunnable(Priority.HIGH) {
public void run() { @Override
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout); public void run() {
notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout); NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
onGoingTimeouts.add(notifyTimeout); notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout);
clusterStateListeners.add(listener); onGoingTimeouts.add(notifyTimeout);
listener.postAdded(); clusterStateListeners.add(listener);
listener.postAdded();
}
});
} catch (EsRejectedExecutionException e) {
if (lifecycle.stoppedOrClosed()) {
listener.onClose();
} else {
throw e;
} }
}); }
} }
public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) { public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) {
@ -219,22 +224,30 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (!lifecycle.started()) { if (!lifecycle.started()) {
return; return;
} }
final UpdateTask task = new UpdateTask(source, priority, updateTask); try {
if (updateTask instanceof TimeoutClusterStateUpdateTask) { final UpdateTask task = new UpdateTask(source, priority, updateTask);
final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask; if (updateTask instanceof TimeoutClusterStateUpdateTask) {
updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() { final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask;
@Override updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() {
public void run() { @Override
threadPool.generic().execute(new Runnable() { public void run() {
@Override threadPool.generic().execute(new Runnable() {
public void run() { @Override
timeoutUpdateTask.onFailure(task.source, new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source)); public void run() {
} timeoutUpdateTask.onFailure(task.source, new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source));
}); }
} });
}); }
} else { });
updateTasksExecutor.execute(task); } else {
updateTasksExecutor.execute(task);
}
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (!lifecycle.stoppedOrClosed()) {
throw e;
}
} }
} }