mirror of https://github.com/apache/lucene.git
SOLR-11880: Avoid creating new exceptions for every request made to MDCAwareThreadPoolExecutor by distributed search and update operations
This commit is contained in:
parent
0f14e7fe5c
commit
5a47ed4209
|
@ -248,6 +248,9 @@ Optimizations
|
||||||
|
|
||||||
* SOLR-12333: Removed redundant lines for handling lists in JSON reponse writers. (David Smiley via Mikhail Khludnev)
|
* SOLR-12333: Removed redundant lines for handling lists in JSON reponse writers. (David Smiley via Mikhail Khludnev)
|
||||||
|
|
||||||
|
* SOLR-11880: Avoid creating new exceptions for every request made to MDCAwareThreadPoolExecutor by distributed
|
||||||
|
search and update operations. (Varun Thacker, shalin)
|
||||||
|
|
||||||
Other Changes
|
Other Changes
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -81,8 +81,11 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
||||||
0,
|
0,
|
||||||
Integer.MAX_VALUE,
|
Integer.MAX_VALUE,
|
||||||
5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
|
5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
|
||||||
new SynchronousQueue<Runnable>(), // directly hand off tasks
|
new SynchronousQueue<>(), // directly hand off tasks
|
||||||
new DefaultSolrThreadFactory("httpShardExecutor")
|
new DefaultSolrThreadFactory("httpShardExecutor"),
|
||||||
|
// the Runnable added to this executor handles all exceptions so we disable stack trace collection as an optimization
|
||||||
|
// see SOLR-11880 for more details
|
||||||
|
false
|
||||||
);
|
);
|
||||||
|
|
||||||
protected InstrumentedPoolingHttpClientConnectionManager clientConnectionManager;
|
protected InstrumentedPoolingHttpClientConnectionManager clientConnectionManager;
|
||||||
|
|
|
@ -20,7 +20,9 @@ import java.lang.invoke.MethodHandles;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.codahale.metrics.MetricRegistry;
|
import com.codahale.metrics.MetricRegistry;
|
||||||
import org.apache.http.client.HttpClient;
|
import org.apache.http.client.HttpClient;
|
||||||
|
@ -52,8 +54,13 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
|
||||||
* and then undetected shard inconsistency as a result.
|
* and then undetected shard inconsistency as a result.
|
||||||
* Therefore this thread pool is left unbounded. See SOLR-8205
|
* Therefore this thread pool is left unbounded. See SOLR-8205
|
||||||
*/
|
*/
|
||||||
private ExecutorService updateExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
|
private ExecutorService updateExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE,
|
||||||
new SolrjNamedThreadFactory("updateExecutor"));
|
60L, TimeUnit.SECONDS,
|
||||||
|
new SynchronousQueue<>(),
|
||||||
|
new SolrjNamedThreadFactory("updateExecutor"),
|
||||||
|
// the Runnable added to this executor handles all exceptions so we disable stack trace collection as an optimization
|
||||||
|
// see SOLR-11880 for more details
|
||||||
|
false);
|
||||||
|
|
||||||
private ExecutorService recoveryExecutor;
|
private ExecutorService recoveryExecutor;
|
||||||
|
|
||||||
|
|
|
@ -105,7 +105,7 @@ public class ExecutorUtil {
|
||||||
public static ExecutorService newMDCAwareSingleThreadExecutor(ThreadFactory threadFactory) {
|
public static ExecutorService newMDCAwareSingleThreadExecutor(ThreadFactory threadFactory) {
|
||||||
return new MDCAwareThreadPoolExecutor(1, 1,
|
return new MDCAwareThreadPoolExecutor(1, 1,
|
||||||
0L, TimeUnit.MILLISECONDS,
|
0L, TimeUnit.MILLISECONDS,
|
||||||
new LinkedBlockingQueue<Runnable>(),
|
new LinkedBlockingQueue<>(),
|
||||||
threadFactory);
|
threadFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,7 +122,7 @@ public class ExecutorUtil {
|
||||||
public static ExecutorService newMDCAwareCachedThreadPool(ThreadFactory threadFactory) {
|
public static ExecutorService newMDCAwareCachedThreadPool(ThreadFactory threadFactory) {
|
||||||
return new MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE,
|
return new MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE,
|
||||||
60L, TimeUnit.SECONDS,
|
60L, TimeUnit.SECONDS,
|
||||||
new SynchronousQueue<Runnable>(),
|
new SynchronousQueue<>(),
|
||||||
threadFactory);
|
threadFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,20 +131,30 @@ public class ExecutorUtil {
|
||||||
|
|
||||||
private static final int MAX_THREAD_NAME_LEN = 512;
|
private static final int MAX_THREAD_NAME_LEN = 512;
|
||||||
|
|
||||||
|
private final boolean enableSubmitterStackTrace;
|
||||||
|
|
||||||
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
|
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
|
||||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
|
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
|
||||||
|
this.enableSubmitterStackTrace = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
|
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
|
||||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
|
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
|
||||||
|
this.enableSubmitterStackTrace = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
|
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
|
||||||
|
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, boolean enableSubmitterStackTrace) {
|
||||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
|
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
|
||||||
|
this.enableSubmitterStackTrace = enableSubmitterStackTrace;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
|
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
|
||||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
|
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
|
||||||
|
this.enableSubmitterStackTrace = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -164,7 +174,7 @@ public class ExecutorUtil {
|
||||||
|
|
||||||
String ctxStr = contextString.toString().replace("/", "//");
|
String ctxStr = contextString.toString().replace("/", "//");
|
||||||
final String submitterContextStr = ctxStr.length() <= MAX_THREAD_NAME_LEN ? ctxStr : ctxStr.substring(0, MAX_THREAD_NAME_LEN);
|
final String submitterContextStr = ctxStr.length() <= MAX_THREAD_NAME_LEN ? ctxStr : ctxStr.substring(0, MAX_THREAD_NAME_LEN);
|
||||||
final Exception submitterStackTrace = new Exception("Submitter stack trace");
|
final Exception submitterStackTrace = enableSubmitterStackTrace ? new Exception("Submitter stack trace") : null;
|
||||||
final List<InheritableThreadLocalProvider> providersCopy = providers;
|
final List<InheritableThreadLocalProvider> providersCopy = providers;
|
||||||
final ArrayList<AtomicReference> ctx = providersCopy.isEmpty() ? null : new ArrayList<>(providersCopy.size());
|
final ArrayList<AtomicReference> ctx = providersCopy.isEmpty() ? null : new ArrayList<>(providersCopy.size());
|
||||||
if (ctx != null) {
|
if (ctx != null) {
|
||||||
|
@ -194,7 +204,11 @@ public class ExecutorUtil {
|
||||||
if (t instanceof OutOfMemoryError) {
|
if (t instanceof OutOfMemoryError) {
|
||||||
throw t;
|
throw t;
|
||||||
}
|
}
|
||||||
|
if (enableSubmitterStackTrace) {
|
||||||
log.error("Uncaught exception {} thrown by thread: {}", t, currentThread.getName(), submitterStackTrace);
|
log.error("Uncaught exception {} thrown by thread: {}", t, currentThread.getName(), submitterStackTrace);
|
||||||
|
} else {
|
||||||
|
log.error("Uncaught exception {} thrown by thread: {}", t, currentThread.getName());
|
||||||
|
}
|
||||||
throw t;
|
throw t;
|
||||||
} finally {
|
} finally {
|
||||||
isServerPool.remove();
|
isServerPool.remove();
|
||||||
|
|
Loading…
Reference in New Issue