diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index f844ba8e93f..a935190e68d 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -54,7 +54,7 @@ public class EsExecutors { } public static EsThreadPoolExecutor newCached(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { - return new EsThreadPoolExecutor(0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue(), threadFactory); + return new EsThreadPoolExecutor(0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue(), threadFactory, new EsAbortPolicy()); } public static EsThreadPoolExecutor newFixed(int size, BlockingQueue queue, ThreadFactory threadFactory) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 846ff83c2f3..45c2ba68d13 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -186,15 +186,11 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen transportService.disconnectFromNode(node); } listener.onPing(responses.values().toArray(new PingResponse[responses.size()])); - } catch (RejectedExecutionException ex) { - logger.debug("Ping execution rejected", ex); } catch (EsRejectedExecutionException ex) { logger.debug("Ping execution rejected", ex); } } }); - } catch (RejectedExecutionException ex) { - logger.debug("Ping execution rejected", ex); } catch (EsRejectedExecutionException ex) { logger.debug("Ping execution rejected", ex); } diff --git a/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java b/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java index 3ed090c5c17..1de146b3c90 100644 --- a/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java +++ b/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.node.settings.NodeSettingsService; @@ -103,7 +104,6 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList size, new ByteSizeValue(sizeInBytes), expire, cleanInterval); nodeSettingsService.addListener(new ApplySettings()); - threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, new ReaderCleaner()); } @@ -169,34 +169,46 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList return; } if (readersKeysToClean.isEmpty()) { - threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, this); + schedule(); return; } - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - Recycler.V> keys = cacheRecycler.hashSet(-1); - try { - for (Iterator it = readersKeysToClean.iterator(); it.hasNext(); ) { - keys.v().add(it.next()); - it.remove(); - } - cache.cleanUp(); - if (!keys.v().isEmpty()) { - for (Iterator it = cache.asMap().keySet().iterator(); it.hasNext(); ) { - WeightedFilterCache.FilterCacheKey filterCacheKey = it.next(); - if (keys.v().contains(filterCacheKey.readerKey())) { - // same as invalidate - it.remove(); + try { + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + Recycler.V> keys = cacheRecycler.hashSet(-1); + try { + for (Iterator it = readersKeysToClean.iterator(); it.hasNext(); ) { + keys.v().add(it.next()); + it.remove(); + } + cache.cleanUp(); + if (!keys.v().isEmpty()) { + for (Iterator it = cache.asMap().keySet().iterator(); it.hasNext(); ) { + WeightedFilterCache.FilterCacheKey filterCacheKey = it.next(); + if (keys.v().contains(filterCacheKey.readerKey())) { + // same as invalidate + it.remove(); + } } } + schedule(); + } finally { + keys.release(); } - threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, ReaderCleaner.this); - } finally { - keys.release(); } - } - }); + }); + } catch (EsRejectedExecutionException ex) { + logger.debug("Can not run ReaderCleaner - execution rejected", ex); + } + } + + private void schedule() { + try { + threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, this); + } catch (EsRejectedExecutionException ex) { + logger.debug("Can not schedule ReaderCleaner - execution rejected", ex); + } } } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index 57274347a83..5f5397891d2 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -39,7 +39,6 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicLong; @@ -315,8 +314,6 @@ public class TransportService extends AbstractLifecycleComponent