SOLR-7381: Improve logging by adding node name in MDC in SolrCloud mode and adding MDC to all thread pools

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1673116 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2015-04-13 06:09:53 +00:00
parent e186b220d3
commit 2c9ae41b78
38 changed files with 281 additions and 132 deletions

View File

@ -0,0 +1,25 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@defaultMessage Spawns threads without MDC logging context; use Solr's ExecutorUtil methods to create thread pools
java.util.concurrent.Executors#newFixedThreadPool(int,java.util.concurrent.ThreadFactory)
java.util.concurrent.Executors#newSingleThreadExecutor(java.util.concurrent.ThreadFactory)
java.util.concurrent.Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)
### TODO - Suppress these for all classes inside Solr except for ExecutorUtil.MDCAwareThreadPoolExecutor
#java.util.concurrent.ThreadPoolExecutor#<init>(int,int,long,java.util.concurrent.TimeUnit,java.util.concurrent.BlockingQueue,java.util.concurrent.ThreadFactory,java.util.concurrent.RejectedExecutionHandler)
#java.util.concurrent.ThreadPoolExecutor#<init>(int,int,long,java.util.concurrent.TimeUnit,java.util.concurrent.BlockingQueue)
#java.util.concurrent.ThreadPoolExecutor#<init>(int,int,long,java.util.concurrent.TimeUnit,java.util.concurrent.BlockingQueue,java.util.concurrent.ThreadFactory)
#java.util.concurrent.ThreadPoolExecutor#<init>(int,int,long,java.util.concurrent.TimeUnit,java.util.concurrent.BlockingQueue,java.util.concurrent.RejectedExecutionHandler)

View File

@ -123,6 +123,9 @@ Other Changes
* SOLR-7371: Make DocSet implement Accountable to estimate memory usage. (yonik, shalin)
* SOLR-7381: Improve logging by adding node name in MDC in SolrCloud mode and adding MDC to
all thread pools. (shalin)
================== 5.1.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release

View File

@ -498,6 +498,7 @@
<signaturesFileSet dir="${common.dir}/tools/forbiddenApis">
<include name="base.txt" />
<include name="servlet-api.txt" />
<include name="solr.txt" />
</signaturesFileSet>
<fileset dir="${build.dir}/classes/java" excludes="${forbidden-base-excludes}"/>
<fileset dir="${build.dir}/classes/test" excludes="${forbidden-tests-excludes}" erroronmissingdir="false"/>

View File

@ -36,6 +36,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import javax.servlet.DispatcherType;
import javax.servlet.Filter;
@ -52,6 +53,7 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.SortedMap;
@ -367,27 +369,37 @@ public class JettySolrRunner {
* @throws Exception if an error occurs on startup
*/
public void start() throws Exception {
// if started before, make a new server
if (startedBefore) {
waitOnSolr = false;
init(lastPort);
} else {
startedBefore = true;
}
// Do not let Jetty/Solr pollute the MDC for this thread
Map<String, String> prevContext = MDC.getCopyOfContextMap();
MDC.clear();
try {
// if started before, make a new server
if (startedBefore) {
waitOnSolr = false;
init(lastPort);
} else {
startedBefore = true;
}
if (!server.isRunning()) {
server.start();
}
synchronized (JettySolrRunner.this) {
int cnt = 0;
while (!waitOnSolr) {
this.wait(100);
if (cnt++ == 5) {
throw new RuntimeException("Jetty/Solr unresponsive");
if (!server.isRunning()) {
server.start();
}
synchronized (JettySolrRunner.this) {
int cnt = 0;
while (!waitOnSolr) {
this.wait(100);
if (cnt++ == 5) {
throw new RuntimeException("Jetty/Solr unresponsive");
}
}
}
} finally {
if (prevContext != null) {
MDC.setContextMap(prevContext);
} else {
MDC.clear();
}
}
}
/**
@ -396,21 +408,31 @@ public class JettySolrRunner {
* @throws Exception if an error occurs on shutdown
*/
public void stop() throws Exception {
// Do not let Jetty/Solr pollute the MDC for this thread
Map<String, String> prevContext = MDC.getCopyOfContextMap();
MDC.clear();
try {
Filter filter = dispatchFilter.getFilter();
Filter filter = dispatchFilter.getFilter();
server.stop();
server.stop();
if (server.getState().equals(Server.FAILED)) {
filter.destroy();
if (extraFilters != null) {
for (FilterHolder f : extraFilters) {
f.getFilter().destroy();
if (server.getState().equals(Server.FAILED)) {
filter.destroy();
if (extraFilters != null) {
for (FilterHolder f : extraFilters) {
f.getFilter().destroy();
}
}
}
server.join();
} finally {
if (prevContext != null) {
MDC.setContextMap(prevContext);
} else {
MDC.clear();
}
}
server.join();
}
/**

View File

@ -78,6 +78,7 @@ import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
@ -258,7 +259,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
// TODO: Make maxThreads configurable.
this.tpe = new ThreadPoolExecutor(5, 100, 0L, TimeUnit.MILLISECONDS,
this.tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 100, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("OverseerThreadFactory"));
try {

View File

@ -37,6 +37,7 @@ import org.apache.solr.util.FileUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.io.File;
import java.util.ArrayList;
@ -48,9 +49,9 @@ import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
/**
@ -245,6 +246,9 @@ public class CoreContainer {
log.info("Node Name: " + hostName);
zkSys.initZooKeeper(this, solrHome, cfg.getCloudConfig());
if (isZooKeeperAware()) {
MDC.put(NODE_NAME_PROP, getZkController().getNodeName());
}
collectionsHandler = createHandler(cfg.getCollectionsHandlerClass(), CollectionsHandler.class);
containerHandlers.put(COLLECTIONS_HANDLER_PATH, collectionsHandler);
@ -259,7 +263,7 @@ public class CoreContainer {
// setup executor to load cores in parallel
// do not limit the size of the executor in zk mode since cores may try and wait for each other.
ExecutorService coreLoadExecutor = Executors.newFixedThreadPool(
ExecutorService coreLoadExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(
( zkSys.getZkController() == null ? cfg.getCoreLoadThreadCount() : Integer.MAX_VALUE ),
new DefaultSolrThreadFactory("coreLoadExecutor") );

View File

@ -1357,7 +1357,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
private final LinkedList<RefCounted<SolrIndexSearcher>> _searchers = new LinkedList<>();
private final LinkedList<RefCounted<SolrIndexSearcher>> _realtimeSearchers = new LinkedList<>();
final ExecutorService searcherExecutor = Executors.newSingleThreadExecutor(
final ExecutorService searcherExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(
new DefaultSolrThreadFactory("searcherExecutor"));
private int onDeckSearchers; // number of searchers preparing
// Lock ordering: one can acquire the openSearcherLock and then the searcherLock, but not vice-versa.

View File

@ -47,7 +47,7 @@ public class ZkContainer {
protected ZkController zkController;
private SolrZkServer zkServer;
private ExecutorService coreZkRegister = Executors.newFixedThreadPool(Integer.MAX_VALUE,
private ExecutorService coreZkRegister = ExecutorUtil.newMDCAwareCachedThreadPool(
new DefaultSolrThreadFactory("coreZkRegister") );
// see ZkController.zkRunOnly

View File

@ -71,6 +71,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.DirectoryFactory;
@ -346,7 +347,7 @@ public class IndexFetcher {
LOG.info("Number of files in latest index in master: " + filesToDownload.size());
// Create the sync service
fsyncService = Executors.newSingleThreadExecutor(new DefaultSolrThreadFactory("fsyncService"));
fsyncService = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("fsyncService"));
// use a synchronized list because the list is read by other threads (to show details)
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generation of master is older than that of the slave , it means they are not compatible to be copied

View File

@ -148,7 +148,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private ReentrantLock indexFetchLock = new ReentrantLock();
private ExecutorService restoreExecutor = Executors.newSingleThreadExecutor(
private ExecutorService restoreExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(
new DefaultSolrThreadFactory("restoreExecutor"));
private volatile Future<Boolean> restoreFuture;

View File

@ -57,6 +57,7 @@ import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.ConfigOverlay;
@ -661,7 +662,7 @@ public class SolrConfigHandler extends RequestHandlerBase {
// use an executor service to invoke schema zk version requests in parallel with a max wait time
int poolSize = Math.min(concurrentTasks.size(), 10);
ExecutorService parallelExecutor =
Executors.newFixedThreadPool(poolSize, new DefaultSolrThreadFactory("solrHandlerExecutor"));
ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new DefaultSolrThreadFactory("solrHandlerExecutor"));
try {
List<Future<Boolean>> results =
parallelExecutor.invokeAll(concurrentTasks, maxWaitSecs, TimeUnit.SECONDS);
@ -700,8 +701,7 @@ public class SolrConfigHandler extends RequestHandlerBase {
prop, expectedVersion, concurrentTasks.size(), collection));
Thread.currentThread().interrupt();
} finally {
if (!parallelExecutor.isShutdown())
parallelExecutor.shutdownNow();
ExecutorUtil.shutdownNowAndAwaitTermination(parallelExecutor);
}
long diffMs = (System.currentTimeMillis() - startMs);

View File

@ -98,7 +98,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
protected final CoreContainer coreContainer;
protected final Map<String, Map<String, TaskObject>> requestStatusMap;
protected final ExecutorService parallelExecutor = Executors.newFixedThreadPool(50,
protected final ExecutorService parallelExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(50,
new DefaultSolrThreadFactory("parallelCoreAdminExecutor"));
protected static int MAX_TRACKED_REQUESTS = 100;

View File

@ -58,7 +58,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
//
// Consider CallerRuns policy and a lower max threads to throttle
// requests at some point (or should we simply return failure?)
private ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(
private ThreadPoolExecutor commExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(
0,
Integer.MAX_VALUE,
5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
@ -149,7 +149,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
new SynchronousQueue<Runnable>(this.accessPolicy) :
new ArrayBlockingQueue<Runnable>(this.queueSize, this.accessPolicy);
this.commExecutor = new ThreadPoolExecutor(
this.commExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(
this.corePoolSize,
this.maximumPoolSize,
this.keepAliveTime, TimeUnit.SECONDS,

View File

@ -50,6 +50,7 @@ import org.apache.solr.common.params.FacetParams.FacetRangeOther;
import org.apache.solr.common.params.GroupParams;
import org.apache.solr.common.params.RequiredSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
@ -567,7 +568,7 @@ public class SimpleFacets {
}
};
static final Executor facetExecutor = new ThreadPoolExecutor(
static final Executor facetExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(
0,
Integer.MAX_VALUE,
10, TimeUnit.SECONDS, // terminate idle threads after 10 sec

View File

@ -40,6 +40,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrResourceLoader;
@ -227,7 +228,7 @@ public final class ManagedIndexSchema extends IndexSchema {
// use an executor service to invoke schema zk version requests in parallel with a max wait time
int poolSize = Math.min(concurrentTasks.size(), 10);
ExecutorService parallelExecutor =
Executors.newFixedThreadPool(poolSize, new DefaultSolrThreadFactory("managedSchemaExecutor"));
ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new DefaultSolrThreadFactory("managedSchemaExecutor"));
try {
List<Future<Integer>> results =
parallelExecutor.invokeAll(concurrentTasks, maxWaitSecs, TimeUnit.SECONDS);

View File

@ -100,6 +100,9 @@ import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
import org.apache.solr.util.RTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
/**
* This filter looks at the incoming URL maps them to handlers defined in solrconfig.xml
@ -223,6 +226,10 @@ public class SolrDispatchFilter extends BaseSolrFilter {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain, boolean retry) throws IOException, ServletException {
MDCUtils.clearMDC();
if (this.cores.isZooKeeperAware()) {
MDC.put(NODE_NAME_PROP, this.cores.getZkController().getNodeName());
}
if (abortErrorMessage != null) {
sendError((HttpServletResponse) response, 500, abortErrorMessage);
return;

View File

@ -1457,7 +1457,7 @@ public class UpdateLog implements PluginInfoInitialized {
this.cancelApplyBufferUpdate = true;
}
ThreadPoolExecutor recoveryExecutor = new ThreadPoolExecutor(0,
ThreadPoolExecutor recoveryExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0,
Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("recoveryExecutor"));

View File

@ -39,7 +39,7 @@ public class UpdateShardHandler {
private static Logger log = LoggerFactory.getLogger(UpdateShardHandler.class);
private ExecutorService updateExecutor = Executors.newCachedThreadPool(
private ExecutorService updateExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
new SolrjNamedThreadFactory("updateExecutor"));
private PoolingClientConnectionManager clientConnectionManager;

View File

@ -20,6 +20,7 @@ import org.slf4j.MDC;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
@ -157,11 +158,8 @@ public class SolrLogLayout extends Layout {
sb.append(" name=" + core.getName());
sb.append(" " + core);
}
if (zkController == null) {
zkController = core.getCoreDescriptor().getCoreContainer()
.getZkController();
}
zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
if (zkController != null) {
if (info.url == null) {
info.url = zkController.getBaseUrl() + "/" + core.getName();
@ -182,7 +180,7 @@ public class SolrLogLayout extends Layout {
if (sb.length() > 0) sb.append('\n');
sb.append(timeFromStart);
// sb.append("\nL").append(record.getSequenceNumber()); // log number is
// useful for sequencing when looking at multiple parts of a log file, but
// ms since start should be fine.
@ -190,20 +188,16 @@ public class SolrLogLayout extends Layout {
appendMDC(sb);
// todo: should be able to get port from core container for non zk tests
if (info != null) {
sb.append(' ').append(info.shortId); // core
}
if (zkController != null) {
sb.append(" P").append(zkController.getHostPort()); // todo: should be
// able to get this
// from core container
// for non zk tests
}
if (shortClassName.length() > 0) {
sb.append(' ').append(shortClassName);
}
if (event.getLevel() != Level.INFO) {
sb.append(' ').append(event.getLevel());
}
@ -370,6 +364,9 @@ public class SolrLogLayout extends Layout {
private void appendMDC(StringBuilder sb) {
if (!StringUtils.isEmpty(MDC.get(NODE_NAME_PROP))) {
sb.append(" N:").append(MDC.get(NODE_NAME_PROP));
}
if (!StringUtils.isEmpty(MDC.get(COLLECTION_PROP))) {
sb.append(" C:").append(MDC.get(COLLECTION_PROP));
}

View File

@ -48,6 +48,7 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.util.DefaultSolrThreadFactory;
@ -96,7 +97,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
private AtomicInteger nodeCounter = new AtomicInteger();
ThreadPoolExecutor executor = new ThreadPoolExecutor(0,
ThreadPoolExecutor executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0,
Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
@ -497,7 +498,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
client.query(query).getResults().getNumFound());
}
assertTrue("total numDocs <= 0, WTF? Test is useless",
0 < totalShardNumDocs);
0 < totalShardNumDocs);
}
@ -507,16 +508,20 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
try (final HttpSolrClient httpSolrClient = new HttpSolrClient(url3)) {
httpSolrClient.setConnectionTimeout(15000);
httpSolrClient.setSoTimeout(60000);
ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
int cnt = 3;
ThreadPoolExecutor executor = null;
try {
executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE,
5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
int cnt = 3;
// create the cores
createCores(httpSolrClient, executor, "multiunload2", 1, cnt);
executor.shutdown();
executor.awaitTermination(120, TimeUnit.SECONDS);
// create the cores
createCores(httpSolrClient, executor, "multiunload2", 1, cnt);
} finally {
if (executor != null) {
ExecutorUtil.shutdownAndAwaitTermination(executor, 120, TimeUnit.SECONDS);
}
}
}
ChaosMonkey.stop(cloudJettys.get(0).jetty);

View File

@ -336,8 +336,6 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
cusc.deleteById(delete);
} catch (Exception e) {
changeUrlOnError(e);
//System.err.println("REQUEST FAILED:");
//e.printStackTrace();
fails.incrementAndGet();
}
}
@ -356,8 +354,6 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
cusc.add(doc);
} catch (Exception e) {
changeUrlOnError(e);
//System.err.println("REQUEST FAILED:");
//e.printStackTrace();
fails.incrementAndGet();
}

View File

@ -49,6 +49,7 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
@ -104,13 +105,6 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
private static final String DEFAULT_COLLECTION = "collection1";
private static final boolean DEBUG = false;
ThreadPoolExecutor executor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
CompletionService<Object> completionService;
Set<Future<Object>> pending;
// we randomly use a second config set rather than just one
private boolean secondConfigSet = random().nextBoolean();
@ -164,8 +158,6 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
public CollectionsAPIDistributedZkTest() {
sliceCount = 2;
completionService = new ExecutorCompletionService<>(executor);
pending = new HashSet<>();
checkCreatedVsState = false;
}

View File

@ -37,6 +37,7 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.BeforeClass;
@ -74,13 +75,6 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
private static final String DEFAULT_COLLECTION = "collection1";
private static final boolean DEBUG = false;
ThreadPoolExecutor executor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
CompletionService<Object> completionService;
Set<Future<Object>> pending;
@BeforeClass
public static void beforeThisClass2() throws Exception {
}
@ -99,8 +93,6 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
public CustomCollectionTest() {
sliceCount = 2;
completionService = new ExecutorCompletionService<>(executor);
pending = new HashSet<>();
checkCreatedVsState = false;
}

View File

@ -45,6 +45,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.update.UpdateShardHandler;
@ -384,7 +385,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
controllers[i] = new MockZKController(server.getZkAddress(), "node" + i);
}
for (int i = 0; i < nodeCount; i++) {
nodeExecutors[i] = Executors.newFixedThreadPool(1, new DefaultSolrThreadFactory("testShardAssignment"));
nodeExecutors[i] = ExecutorUtil.newMDCAwareFixedThreadPool(1, new DefaultSolrThreadFactory("testShardAssignment"));
}
final String[] ids = new String[coreCount];

View File

@ -44,6 +44,7 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.BadHdfsThreadsFilter;
import org.junit.AfterClass;
@ -64,7 +65,7 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
private static final boolean DEBUG = true;
private static MiniDFSCluster dfsCluster;
ThreadPoolExecutor executor = new ThreadPoolExecutor(0,
ThreadPoolExecutor executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0,
Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));

View File

@ -30,6 +30,7 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Test;
@ -366,37 +367,40 @@ public class UnloadDistributedZkTest extends BasicDistributedZkTest {
try (final HttpSolrClient adminClient = new HttpSolrClient(url3)) {
adminClient.setConnectionTimeout(15000);
adminClient.setSoTimeout(60000);
ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
int cnt = atLeast(3);
ThreadPoolExecutor executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE,
5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
int cnt = atLeast(3);
try {
// create the cores
createCores(adminClient, executor, "multiunload", 2, cnt);
} finally {
ExecutorUtil.shutdownAndAwaitTermination(executor, 120, TimeUnit.SECONDS);
}
// create the cores
createCores(adminClient, executor, "multiunload", 2, cnt);
executor.shutdown();
executor.awaitTermination(120, TimeUnit.SECONDS);
executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
for (int j = 0; j < cnt; j++) {
final int freezeJ = j;
executor.execute(new Runnable() {
@Override
public void run() {
Unload unloadCmd = new Unload(true);
unloadCmd.setCoreName("multiunload" + freezeJ);
try {
adminClient.request(unloadCmd);
} catch (SolrServerException | IOException e) {
throw new RuntimeException(e);
try {
for (int j = 0; j < cnt; j++) {
final int freezeJ = j;
executor.execute(new Runnable() {
@Override
public void run() {
Unload unloadCmd = new Unload(true);
unloadCmd.setCoreName("multiunload" + freezeJ);
try {
adminClient.request(unloadCmd);
} catch (SolrServerException | IOException e) {
throw new RuntimeException(e);
}
}
}
});
Thread.sleep(random().nextInt(50));
});
Thread.sleep(random().nextInt(50));
}
} finally {
ExecutorUtil.shutdownAndAwaitTermination(executor, 120, TimeUnit.SECONDS);
}
executor.shutdown();
executor.awaitTermination(120, TimeUnit.SECONDS);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.solr.core;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.component.QueryComponent;
import org.apache.solr.handler.component.SpellCheckComponent;
@ -136,7 +137,7 @@ public class SolrCoreTest extends SolrTestCaseJ4 {
final int LOOP = 100;
final int MT = 16;
ExecutorService service = Executors.newFixedThreadPool(MT, new DefaultSolrThreadFactory("refCountMT"));
ExecutorService service = ExecutorUtil.newMDCAwareFixedThreadPool(MT, new DefaultSolrThreadFactory("refCountMT"));
List<Callable<Integer>> callees = new ArrayList<>(MT);
final CoreContainer cores = h.getCoreContainer();
for (int i = 0; i < MT; ++i) {

View File

@ -33,6 +33,7 @@ import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.handler.loader.XMLLoader;
import org.apache.solr.search.SolrIndexSearcher;
@ -92,8 +93,8 @@ public class AddBlockUpdateTest extends SolrTestCaseJ4 {
inputFactory = XMLInputFactory.newInstance();
exe = // Executors.newSingleThreadExecutor();
rarely() ? Executors.newFixedThreadPool(atLeast(2), new DefaultSolrThreadFactory("AddBlockUpdateTest")) : Executors
.newCachedThreadPool(new DefaultSolrThreadFactory("AddBlockUpdateTest"));
rarely() ? ExecutorUtil.newMDCAwareFixedThreadPool(atLeast(2), new DefaultSolrThreadFactory("AddBlockUpdateTest")) : ExecutorUtil
.newMDCAwareCachedThreadPool(new DefaultSolrThreadFactory("AddBlockUpdateTest"));
initCore("solrconfig.xml", "schema15.xml");

View File

@ -20,6 +20,7 @@ package org.apache.solr.update;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Before;
import org.junit.BeforeClass;
@ -359,7 +360,7 @@ public class TestDocBasedVersionConstraints extends SolrTestCaseJ4 {
public void testConcurrentAdds() throws Exception {
final int NUM_DOCS = atLeast(50);
final int MAX_CONCURENT = atLeast(10);
ExecutorService runner = Executors.newFixedThreadPool(MAX_CONCURENT, new DefaultSolrThreadFactory("TestDocBasedVersionConstraints"));
ExecutorService runner = ExecutorUtil.newMDCAwareFixedThreadPool(MAX_CONCURENT, new DefaultSolrThreadFactory("TestDocBasedVersionConstraints"));
// runner = Executors.newFixedThreadPool(1); // to test single threaded
try {
for (int id = 0; id < NUM_DOCS; id++) {
@ -393,7 +394,7 @@ public class TestDocBasedVersionConstraints extends SolrTestCaseJ4 {
,"/response/docs==["+expectedDoc+"]");
}
} finally {
runner.shutdownNow();
ExecutorUtil.shutdownAndAwaitTermination(runner);
}
}

View File

@ -46,6 +46,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
@ -107,8 +108,8 @@ public class CloudSolrClient extends SolrClient {
private final boolean updatesToLeaders;
private boolean parallelUpdates = true;
private ExecutorService threadPool = Executors
.newCachedThreadPool(new SolrjNamedThreadFactory(
private ExecutorService threadPool = ExecutorUtil
.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
"CloudSolrServer ThreadPool"));
private String idField = "id";
public static final String STATE_VERSION = "_stateVer_";

View File

@ -36,6 +36,7 @@ import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
@ -103,7 +104,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
public ConcurrentUpdateSolrClient(String solrServerUrl,
HttpClient client, int queueSize, int threadCount) {
this(solrServerUrl, client, queueSize, threadCount, Executors.newCachedThreadPool(
this(solrServerUrl, client, queueSize, threadCount, ExecutorUtil.newMDCAwareCachedThreadPool(
new SolrjNamedThreadFactory("concurrentUpdateScheduler")));
shutdownExecutor = true;
}

View File

@ -52,6 +52,7 @@ import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.slf4j.Logger;
@ -259,7 +260,7 @@ public class HttpSolrClient extends SolrClient {
public HttpUriRequestResponse httpUriRequest(final SolrRequest request, final ResponseParser processor) throws SolrServerException, IOException {
HttpUriRequestResponse mrr = new HttpUriRequestResponse();
final HttpRequestBase method = createMethod(request, null);
ExecutorService pool = Executors.newFixedThreadPool(1, new SolrjNamedThreadFactory("httpUriRequest"));
ExecutorService pool = ExecutorUtil.newMDCAwareFixedThreadPool(1, new SolrjNamedThreadFactory("httpUriRequest"));
try {
mrr.future = pool.submit(new Callable<NamedList<Object>>(){

View File

@ -39,6 +39,7 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
/**
@ -197,7 +198,7 @@ public class CloudSolrStream extends TupleStream {
}
private void openStreams() throws IOException {
ExecutorService service = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("CloudSolrStream"));
ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("CloudSolrStream"));
try {
List<Future<TupleWrapper>> futures = new ArrayList();
for (TupleStream solrStream : solrStreams) {

View File

@ -76,7 +76,7 @@ public class SolrZkClient implements Closeable {
private ZkCmdExecutor zkCmdExecutor;
private final ExecutorService zkCallbackExecutor = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("zkCallback"));
private final ExecutorService zkCallbackExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("zkCallback"));
private volatile boolean isClosed = false;
private ZkClientConnectionStrategy zkClientConnectionStrategy;

View File

@ -17,11 +17,19 @@ package org.apache.solr.common.util;
* limitations under the License.
*/
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
public class ExecutorUtil {
@ -46,12 +54,16 @@ public class ExecutorUtil {
}
public static void shutdownAndAwaitTermination(ExecutorService pool) {
shutdownAndAwaitTermination(pool, 60, TimeUnit.SECONDS);
}
public static void shutdownAndAwaitTermination(ExecutorService pool, long timeout, TimeUnit timeUnit) {
pool.shutdown(); // Disable new tasks from being submitted
boolean shutdown = false;
while (!shutdown) {
try {
// Wait a while for existing tasks to terminate
shutdown = pool.awaitTermination(60, TimeUnit.SECONDS);
shutdown = pool.awaitTermination(timeout, timeUnit);
} catch (InterruptedException ie) {
// Preserve interrupt status
Thread.currentThread().interrupt();
@ -61,4 +73,78 @@ public class ExecutorUtil {
}
}
}
/**
* See {@link java.util.concurrent.Executors#newFixedThreadPool(int, ThreadFactory)}
*/
public static ExecutorService newMDCAwareFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new MDCAwareThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
/**
* See {@link java.util.concurrent.Executors#newSingleThreadExecutor(ThreadFactory)}
*/
public static ExecutorService newMDCAwareSingleThreadExecutor(ThreadFactory threadFactory) {
return new MDCAwareThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
/**
* See {@link java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)}
*/
public static ExecutorService newMDCAwareCachedThreadPool(ThreadFactory threadFactory) {
return new MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static class MDCAwareThreadPoolExecutor extends ThreadPoolExecutor {
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
@Override
public void execute(final Runnable command) {
final Map<String, String> submitterContext = MDC.getCopyOfContextMap();
super.execute(new Runnable() {
@Override
public void run() {
Map<String, String> threadContext = MDC.getCopyOfContextMap();
if (submitterContext != null) {
MDC.setContextMap(submitterContext);
} else {
MDC.clear();
}
try {
command.run();
} finally {
if (threadContext != null) {
MDC.setContextMap(threadContext);
} else {
MDC.clear();
}
}
}
});
}
}
}

View File

@ -23,6 +23,7 @@ import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.BeforeClass;
@ -167,7 +168,7 @@ public class ConcurrentUpdateSolrClientTest extends SolrJettyTestBase {
concurrentClient.blockUntilFinished();
int poolSize = 5;
ExecutorService threadPool = Executors.newFixedThreadPool(poolSize, new SolrjNamedThreadFactory("testCUSS"));
ExecutorService threadPool = ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new SolrjNamedThreadFactory("testCUSS"));
int numDocs = 100;
int numRunnables = 5;

View File

@ -30,6 +30,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.zookeeper.KeeperException;
@ -67,7 +68,7 @@ public class MiniSolrCloudCluster {
private final CloudSolrClient solrClient;
private final JettyConfig jettyConfig;
private final ExecutorService executor = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("jetty-launcher"));
private final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("jetty-launcher"));
/**
* Create a MiniSolrCloudCluster

View File

@ -473,7 +473,7 @@ public class ZkTestServer {
} else {
this.clientPortAddress = new InetSocketAddress(clientPort);
}
System.out.println("client port:" + this.clientPortAddress);
log.info("client port:" + this.clientPortAddress);
}
};