Add ESAbortPolicy to cached pools

All ES ThreadPools / Executors should use the ESAbortPolicy or at least
one that throws the ESRejectedExecutionException.
This commit is contained in:
Simon Willnauer 2013-08-23 10:02:13 +02:00
parent 7c76819040
commit 71ebb14b58
4 changed files with 36 additions and 31 deletions

View File

@ -54,7 +54,7 @@ public class EsExecutors {
}
public static EsThreadPoolExecutor newCached(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
return new EsThreadPoolExecutor(0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory);
return new EsThreadPoolExecutor(0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new EsAbortPolicy());
}
public static EsThreadPoolExecutor newFixed(int size, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {

View File

@ -186,15 +186,11 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
transportService.disconnectFromNode(node);
}
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
} catch (RejectedExecutionException ex) {
logger.debug("Ping execution rejected", ex);
} catch (EsRejectedExecutionException ex) {
logger.debug("Ping execution rejected", ex);
}
}
});
} catch (RejectedExecutionException ex) {
logger.debug("Ping execution rejected", ex);
} catch (EsRejectedExecutionException ex) {
logger.debug("Ping execution rejected", ex);
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.settings.NodeSettingsService;
@ -103,7 +104,6 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
size, new ByteSizeValue(sizeInBytes), expire, cleanInterval);
nodeSettingsService.addListener(new ApplySettings());
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, new ReaderCleaner());
}
@ -169,34 +169,46 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
return;
}
if (readersKeysToClean.isEmpty()) {
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, this);
schedule();
return;
}
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
Recycler.V<THashSet<Object>> keys = cacheRecycler.hashSet(-1);
try {
for (Iterator<Object> it = readersKeysToClean.iterator(); it.hasNext(); ) {
keys.v().add(it.next());
it.remove();
}
cache.cleanUp();
if (!keys.v().isEmpty()) {
for (Iterator<WeightedFilterCache.FilterCacheKey> it = cache.asMap().keySet().iterator(); it.hasNext(); ) {
WeightedFilterCache.FilterCacheKey filterCacheKey = it.next();
if (keys.v().contains(filterCacheKey.readerKey())) {
// same as invalidate
it.remove();
try {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
Recycler.V<THashSet<Object>> keys = cacheRecycler.hashSet(-1);
try {
for (Iterator<Object> it = readersKeysToClean.iterator(); it.hasNext(); ) {
keys.v().add(it.next());
it.remove();
}
cache.cleanUp();
if (!keys.v().isEmpty()) {
for (Iterator<WeightedFilterCache.FilterCacheKey> it = cache.asMap().keySet().iterator(); it.hasNext(); ) {
WeightedFilterCache.FilterCacheKey filterCacheKey = it.next();
if (keys.v().contains(filterCacheKey.readerKey())) {
// same as invalidate
it.remove();
}
}
}
schedule();
} finally {
keys.release();
}
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, ReaderCleaner.this);
} finally {
keys.release();
}
}
});
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Can not run ReaderCleaner - execution rejected", ex);
}
}
private void schedule() {
try {
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, this);
} catch (EsRejectedExecutionException ex) {
logger.debug("Can not schedule ReaderCleaner - execution rejected", ex);
}
}
}
}

View File

@ -39,7 +39,6 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
@ -315,8 +314,6 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
}
}
} catch (RejectedExecutionException ex) {
logger.debug("Rejected execution on NodeDisconnected", ex);
} catch (EsRejectedExecutionException ex) {
logger.debug("Rejected execution on NodeDisconnected", ex);
}