diff --git a/lucene/tools/forbiddenApis/solr.txt b/lucene/tools/forbiddenApis/solr.txt new file mode 100644 index 00000000000..a98d24bb0bd --- /dev/null +++ b/lucene/tools/forbiddenApis/solr.txt @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +@defaultMessage Spawns threads without MDC logging context; use Solr's ExecutorUtil methods to create thread pools +java.util.concurrent.Executors#newFixedThreadPool(int,java.util.concurrent.ThreadFactory) +java.util.concurrent.Executors#newSingleThreadExecutor(java.util.concurrent.ThreadFactory) +java.util.concurrent.Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory) + +### TODO - Suppress these for all classes inside Solr except for ExecutorUtil.MDCAwareThreadPoolExecutor +#java.util.concurrent.ThreadPoolExecutor#(int,int,long,java.util.concurrent.TimeUnit,java.util.concurrent.BlockingQueue,java.util.concurrent.ThreadFactory,java.util.concurrent.RejectedExecutionHandler) +#java.util.concurrent.ThreadPoolExecutor#(int,int,long,java.util.concurrent.TimeUnit,java.util.concurrent.BlockingQueue) +#java.util.concurrent.ThreadPoolExecutor#(int,int,long,java.util.concurrent.TimeUnit,java.util.concurrent.BlockingQueue,java.util.concurrent.ThreadFactory) +#java.util.concurrent.ThreadPoolExecutor#(int,int,long,java.util.concurrent.TimeUnit,java.util.concurrent.BlockingQueue,java.util.concurrent.RejectedExecutionHandler) \ No newline at end of file diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 7ea72018653..be206b2ee76 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -123,6 +123,9 @@ Other Changes * SOLR-7371: Make DocSet implement Accountable to estimate memory usage. (yonik, shalin) +* SOLR-7381: Improve logging by adding node name in MDC in SolrCloud mode and adding MDC to + all thread pools. (shalin) + ================== 5.1.0 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release diff --git a/solr/common-build.xml b/solr/common-build.xml index 66697adbfd3..84b8b755447 100644 --- a/solr/common-build.xml +++ b/solr/common-build.xml @@ -498,6 +498,7 @@ + diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java index e904a037238..05c5bb1fdd7 100644 --- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java +++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java @@ -36,6 +36,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import javax.servlet.DispatcherType; import javax.servlet.Filter; @@ -52,6 +53,7 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.EnumSet; import java.util.LinkedList; +import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.SortedMap; @@ -367,27 +369,37 @@ public class JettySolrRunner { * @throws Exception if an error occurs on startup */ public void start() throws Exception { - // if started before, make a new server - if (startedBefore) { - waitOnSolr = false; - init(lastPort); - } else { - startedBefore = true; - } + // Do not let Jetty/Solr pollute the MDC for this thread + Map prevContext = MDC.getCopyOfContextMap(); + MDC.clear(); + try { + // if started before, make a new server + if (startedBefore) { + waitOnSolr = false; + init(lastPort); + } else { + startedBefore = true; + } - if (!server.isRunning()) { - server.start(); - } - synchronized (JettySolrRunner.this) { - int cnt = 0; - while (!waitOnSolr) { - this.wait(100); - if (cnt++ == 5) { - throw new RuntimeException("Jetty/Solr unresponsive"); + if (!server.isRunning()) { + server.start(); + } + synchronized (JettySolrRunner.this) { + int cnt = 0; + while (!waitOnSolr) { + this.wait(100); + if (cnt++ == 5) { + throw new RuntimeException("Jetty/Solr unresponsive"); + } } } + } finally { + if (prevContext != null) { + MDC.setContextMap(prevContext); + } else { + MDC.clear(); + } } - } /** @@ -396,21 +408,31 @@ public class JettySolrRunner { * @throws Exception if an error occurs on shutdown */ public void stop() throws Exception { + // Do not let Jetty/Solr pollute the MDC for this thread + Map prevContext = MDC.getCopyOfContextMap(); + MDC.clear(); + try { + Filter filter = dispatchFilter.getFilter(); - Filter filter = dispatchFilter.getFilter(); + server.stop(); - server.stop(); - - if (server.getState().equals(Server.FAILED)) { - filter.destroy(); - if (extraFilters != null) { - for (FilterHolder f : extraFilters) { - f.getFilter().destroy(); + if (server.getState().equals(Server.FAILED)) { + filter.destroy(); + if (extraFilters != null) { + for (FilterHolder f : extraFilters) { + f.getFilter().destroy(); + } } } + + server.join(); + } finally { + if (prevContext != null) { + MDC.setContextMap(prevContext); + } else { + MDC.clear(); + } } - - server.join(); } /** diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index 60ddc17beda..8367f37177d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -78,6 +78,7 @@ import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; import org.apache.solr.common.params.MapSolrParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ShardParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.common.util.StrUtils; @@ -258,7 +259,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { // TODO: Make maxThreads configurable. - this.tpe = new ThreadPoolExecutor(5, 100, 0L, TimeUnit.MILLISECONDS, + this.tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 100, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(), new DefaultSolrThreadFactory("OverseerThreadFactory")); try { diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 11e693deb8d..8cc5d7a6dad 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -37,6 +37,7 @@ import org.apache.solr.util.FileUtils; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import java.io.File; import java.util.ArrayList; @@ -48,9 +49,9 @@ import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP; /** @@ -245,6 +246,9 @@ public class CoreContainer { log.info("Node Name: " + hostName); zkSys.initZooKeeper(this, solrHome, cfg.getCloudConfig()); + if (isZooKeeperAware()) { + MDC.put(NODE_NAME_PROP, getZkController().getNodeName()); + } collectionsHandler = createHandler(cfg.getCollectionsHandlerClass(), CollectionsHandler.class); containerHandlers.put(COLLECTIONS_HANDLER_PATH, collectionsHandler); @@ -259,7 +263,7 @@ public class CoreContainer { // setup executor to load cores in parallel // do not limit the size of the executor in zk mode since cores may try and wait for each other. - ExecutorService coreLoadExecutor = Executors.newFixedThreadPool( + ExecutorService coreLoadExecutor = ExecutorUtil.newMDCAwareFixedThreadPool( ( zkSys.getZkController() == null ? cfg.getCoreLoadThreadCount() : Integer.MAX_VALUE ), new DefaultSolrThreadFactory("coreLoadExecutor") ); diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index b82b11dbef8..11b9c74efb8 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -1357,7 +1357,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { private final LinkedList> _searchers = new LinkedList<>(); private final LinkedList> _realtimeSearchers = new LinkedList<>(); - final ExecutorService searcherExecutor = Executors.newSingleThreadExecutor( + final ExecutorService searcherExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor( new DefaultSolrThreadFactory("searcherExecutor")); private int onDeckSearchers; // number of searchers preparing // Lock ordering: one can acquire the openSearcherLock and then the searcherLock, but not vice-versa. diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java index 0c59bf2bac0..5fb6186fe47 100644 --- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java +++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java @@ -47,7 +47,7 @@ public class ZkContainer { protected ZkController zkController; private SolrZkServer zkServer; - private ExecutorService coreZkRegister = Executors.newFixedThreadPool(Integer.MAX_VALUE, + private ExecutorService coreZkRegister = ExecutorUtil.newMDCAwareCachedThreadPool( new DefaultSolrThreadFactory("coreZkRegister") ); // see ZkController.zkRunOnly diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java index 1b7b17981b0..db3f1fba3a8 100644 --- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java +++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java @@ -71,6 +71,7 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.FastInputStream; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.DirectoryFactory; @@ -346,7 +347,7 @@ public class IndexFetcher { LOG.info("Number of files in latest index in master: " + filesToDownload.size()); // Create the sync service - fsyncService = Executors.newSingleThreadExecutor(new DefaultSolrThreadFactory("fsyncService")); + fsyncService = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("fsyncService")); // use a synchronized list because the list is read by other threads (to show details) filesDownloaded = Collections.synchronizedList(new ArrayList>()); // if the generation of master is older than that of the slave , it means they are not compatible to be copied diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java index fdae8f90949..7f4f386bcdd 100644 --- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java @@ -148,7 +148,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw private ReentrantLock indexFetchLock = new ReentrantLock(); - private ExecutorService restoreExecutor = Executors.newSingleThreadExecutor( + private ExecutorService restoreExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor( new DefaultSolrThreadFactory("restoreExecutor")); private volatile Future restoreFuture; diff --git a/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java b/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java index bdae42f9e6d..adebe5af746 100644 --- a/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java @@ -57,6 +57,7 @@ import org.apache.solr.common.params.MapSolrParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.ContentStream; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.StrUtils; import org.apache.solr.core.ConfigOverlay; @@ -661,7 +662,7 @@ public class SolrConfigHandler extends RequestHandlerBase { // use an executor service to invoke schema zk version requests in parallel with a max wait time int poolSize = Math.min(concurrentTasks.size(), 10); ExecutorService parallelExecutor = - Executors.newFixedThreadPool(poolSize, new DefaultSolrThreadFactory("solrHandlerExecutor")); + ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new DefaultSolrThreadFactory("solrHandlerExecutor")); try { List> results = parallelExecutor.invokeAll(concurrentTasks, maxWaitSecs, TimeUnit.SECONDS); @@ -700,8 +701,7 @@ public class SolrConfigHandler extends RequestHandlerBase { prop, expectedVersion, concurrentTasks.size(), collection)); Thread.currentThread().interrupt(); } finally { - if (!parallelExecutor.isShutdown()) - parallelExecutor.shutdownNow(); + ExecutorUtil.shutdownNowAndAwaitTermination(parallelExecutor); } long diffMs = (System.currentTimeMillis() - startMs); diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java index f67b9bf99a0..6902ba177d5 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java @@ -98,7 +98,7 @@ public class CoreAdminHandler extends RequestHandlerBase { protected final CoreContainer coreContainer; protected final Map> requestStatusMap; - protected final ExecutorService parallelExecutor = Executors.newFixedThreadPool(50, + protected final ExecutorService parallelExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(50, new DefaultSolrThreadFactory("parallelCoreAdminExecutor")); protected static int MAX_TRACKED_REQUESTS = 100; diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java index 04232d065e4..b8e165db317 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java @@ -58,7 +58,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org. // // Consider CallerRuns policy and a lower max threads to throttle // requests at some point (or should we simply return failure?) - private ThreadPoolExecutor commExecutor = new ThreadPoolExecutor( + private ThreadPoolExecutor commExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor( 0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, // terminate idle threads after 5 sec @@ -149,7 +149,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org. new SynchronousQueue(this.accessPolicy) : new ArrayBlockingQueue(this.queueSize, this.accessPolicy); - this.commExecutor = new ThreadPoolExecutor( + this.commExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor( this.corePoolSize, this.maximumPoolSize, this.keepAliveTime, TimeUnit.SECONDS, diff --git a/solr/core/src/java/org/apache/solr/request/SimpleFacets.java b/solr/core/src/java/org/apache/solr/request/SimpleFacets.java index 7dbb1f8b2ab..f1d631c13bd 100644 --- a/solr/core/src/java/org/apache/solr/request/SimpleFacets.java +++ b/solr/core/src/java/org/apache/solr/request/SimpleFacets.java @@ -50,6 +50,7 @@ import org.apache.solr.common.params.FacetParams.FacetRangeOther; import org.apache.solr.common.params.GroupParams; import org.apache.solr.common.params.RequiredSolrParams; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.common.util.StrUtils; @@ -567,7 +568,7 @@ public class SimpleFacets { } }; - static final Executor facetExecutor = new ThreadPoolExecutor( + static final Executor facetExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor( 0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, // terminate idle threads after 10 sec diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java index 666d6bc65b8..e2404ae487a 100644 --- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java +++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java @@ -40,6 +40,7 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.ContentStream; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.SolrConfig; import org.apache.solr.core.SolrResourceLoader; @@ -227,7 +228,7 @@ public final class ManagedIndexSchema extends IndexSchema { // use an executor service to invoke schema zk version requests in parallel with a max wait time int poolSize = Math.min(concurrentTasks.size(), 10); ExecutorService parallelExecutor = - Executors.newFixedThreadPool(poolSize, new DefaultSolrThreadFactory("managedSchemaExecutor")); + ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new DefaultSolrThreadFactory("managedSchemaExecutor")); try { List> results = parallelExecutor.invokeAll(concurrentTasks, maxWaitSecs, TimeUnit.SECONDS); diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index afc9c6cbaa3..335908e5f7d 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -100,6 +100,9 @@ import org.apache.solr.update.processor.DistributingUpdateProcessorFactory; import org.apache.solr.util.RTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP; /** * This filter looks at the incoming URL maps them to handlers defined in solrconfig.xml @@ -223,6 +226,10 @@ public class SolrDispatchFilter extends BaseSolrFilter { public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain, boolean retry) throws IOException, ServletException { MDCUtils.clearMDC(); + if (this.cores.isZooKeeperAware()) { + MDC.put(NODE_NAME_PROP, this.cores.getZkController().getNodeName()); + } + if (abortErrorMessage != null) { sendError((HttpServletResponse) response, 500, abortErrorMessage); return; diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index b852eb62a05..3d9545d652a 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -1457,7 +1457,7 @@ public class UpdateLog implements PluginInfoInitialized { this.cancelApplyBufferUpdate = true; } - ThreadPoolExecutor recoveryExecutor = new ThreadPoolExecutor(0, + ThreadPoolExecutor recoveryExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue(), new DefaultSolrThreadFactory("recoveryExecutor")); diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java index a9e1025a828..c5da6fab734 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java @@ -39,7 +39,7 @@ public class UpdateShardHandler { private static Logger log = LoggerFactory.getLogger(UpdateShardHandler.class); - private ExecutorService updateExecutor = Executors.newCachedThreadPool( + private ExecutorService updateExecutor = ExecutorUtil.newMDCAwareCachedThreadPool( new SolrjNamedThreadFactory("updateExecutor")); private PoolingClientConnectionManager clientConnectionManager; diff --git a/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java b/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java index ff8d699acf5..71d4ffc429e 100644 --- a/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java +++ b/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java @@ -20,6 +20,7 @@ import org.slf4j.MDC; import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP; import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; @@ -157,11 +158,8 @@ public class SolrLogLayout extends Layout { sb.append(" name=" + core.getName()); sb.append(" " + core); } - - if (zkController == null) { - zkController = core.getCoreDescriptor().getCoreContainer() - .getZkController(); - } + + zkController = core.getCoreDescriptor().getCoreContainer().getZkController(); if (zkController != null) { if (info.url == null) { info.url = zkController.getBaseUrl() + "/" + core.getName(); @@ -182,7 +180,7 @@ public class SolrLogLayout extends Layout { if (sb.length() > 0) sb.append('\n'); sb.append(timeFromStart); - + // sb.append("\nL").append(record.getSequenceNumber()); // log number is // useful for sequencing when looking at multiple parts of a log file, but // ms since start should be fine. @@ -190,20 +188,16 @@ public class SolrLogLayout extends Layout { appendMDC(sb); + // todo: should be able to get port from core container for non zk tests + if (info != null) { sb.append(' ').append(info.shortId); // core } - if (zkController != null) { - sb.append(" P").append(zkController.getHostPort()); // todo: should be - // able to get this - // from core container - // for non zk tests - } - + if (shortClassName.length() > 0) { sb.append(' ').append(shortClassName); } - + if (event.getLevel() != Level.INFO) { sb.append(' ').append(event.getLevel()); } @@ -370,6 +364,9 @@ public class SolrLogLayout extends Layout { private void appendMDC(StringBuilder sb) { + if (!StringUtils.isEmpty(MDC.get(NODE_NAME_PROP))) { + sb.append(" N:").append(MDC.get(NODE_NAME_PROP)); + } if (!StringUtils.isEmpty(MDC.get(COLLECTION_PROP))) { sb.append(" C:").append(MDC.get(COLLECTION_PROP)); } diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java index efeece452ed..dbd3ae3ee79 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java @@ -48,6 +48,7 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.update.DirectUpdateHandler2; import org.apache.solr.util.DefaultSolrThreadFactory; @@ -96,7 +97,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { private AtomicInteger nodeCounter = new AtomicInteger(); - ThreadPoolExecutor executor = new ThreadPoolExecutor(0, + ThreadPoolExecutor executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue(), new DefaultSolrThreadFactory("testExecutor")); @@ -497,7 +498,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { client.query(query).getResults().getNumFound()); } assertTrue("total numDocs <= 0, WTF? Test is useless", - 0 < totalShardNumDocs); + 0 < totalShardNumDocs); } @@ -507,16 +508,20 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { try (final HttpSolrClient httpSolrClient = new HttpSolrClient(url3)) { httpSolrClient.setConnectionTimeout(15000); httpSolrClient.setSoTimeout(60000); - ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, - 5, TimeUnit.SECONDS, new SynchronousQueue(), - new DefaultSolrThreadFactory("testExecutor")); - int cnt = 3; + ThreadPoolExecutor executor = null; + try { + executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, + 5, TimeUnit.SECONDS, new SynchronousQueue(), + new DefaultSolrThreadFactory("testExecutor")); + int cnt = 3; - // create the cores - createCores(httpSolrClient, executor, "multiunload2", 1, cnt); - - executor.shutdown(); - executor.awaitTermination(120, TimeUnit.SECONDS); + // create the cores + createCores(httpSolrClient, executor, "multiunload2", 1, cnt); + } finally { + if (executor != null) { + ExecutorUtil.shutdownAndAwaitTermination(executor, 120, TimeUnit.SECONDS); + } + } } ChaosMonkey.stop(cloudJettys.get(0).jetty); diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java index 2ce851ea37b..628d92c6021 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java @@ -336,8 +336,6 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase cusc.deleteById(delete); } catch (Exception e) { changeUrlOnError(e); - //System.err.println("REQUEST FAILED:"); - //e.printStackTrace(); fails.incrementAndGet(); } } @@ -356,8 +354,6 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase cusc.add(doc); } catch (Exception e) { changeUrlOnError(e); - //System.err.println("REQUEST FAILED:"); - //e.printStackTrace(); fails.incrementAndGet(); } diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java index f6389b14ef6..5bb11bc17fe 100644 --- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java @@ -49,6 +49,7 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.MapSolrParams; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.common.util.StrUtils; @@ -104,13 +105,6 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa private static final String DEFAULT_COLLECTION = "collection1"; private static final boolean DEBUG = false; - ThreadPoolExecutor executor = new ThreadPoolExecutor(0, - Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue(), - new DefaultSolrThreadFactory("testExecutor")); - - CompletionService completionService; - Set> pending; - // we randomly use a second config set rather than just one private boolean secondConfigSet = random().nextBoolean(); @@ -164,8 +158,6 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa public CollectionsAPIDistributedZkTest() { sliceCount = 2; - completionService = new ExecutorCompletionService<>(executor); - pending = new HashSet<>(); checkCreatedVsState = false; } diff --git a/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java b/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java index 063eca10e5e..312e77c7127 100644 --- a/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java @@ -37,6 +37,7 @@ import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.update.DirectUpdateHandler2; import org.apache.solr.util.DefaultSolrThreadFactory; import org.junit.BeforeClass; @@ -74,13 +75,6 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase { private static final String DEFAULT_COLLECTION = "collection1"; private static final boolean DEBUG = false; - ThreadPoolExecutor executor = new ThreadPoolExecutor(0, - Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue(), - new DefaultSolrThreadFactory("testExecutor")); - - CompletionService completionService; - Set> pending; - @BeforeClass public static void beforeThisClass2() throws Exception { } @@ -99,8 +93,6 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase { public CustomCollectionTest() { sliceCount = 2; - completionService = new ExecutorCompletionService<>(executor); - pending = new HashSet<>(); checkCreatedVsState = false; } diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index f56ed3c1214..793cb406b78 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -45,6 +45,7 @@ import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.core.CloudConfig; import org.apache.solr.handler.component.HttpShardHandlerFactory; import org.apache.solr.update.UpdateShardHandler; @@ -384,7 +385,7 @@ public class OverseerTest extends SolrTestCaseJ4 { controllers[i] = new MockZKController(server.getZkAddress(), "node" + i); } for (int i = 0; i < nodeCount; i++) { - nodeExecutors[i] = Executors.newFixedThreadPool(1, new DefaultSolrThreadFactory("testShardAssignment")); + nodeExecutors[i] = ExecutorUtil.newMDCAwareFixedThreadPool(1, new DefaultSolrThreadFactory("testShardAssignment")); } final String[] ids = new String[coreCount]; diff --git a/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java b/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java index be595468526..c34c76e9f55 100644 --- a/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java @@ -44,6 +44,7 @@ import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.MapSolrParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.BadHdfsThreadsFilter; import org.junit.AfterClass; @@ -64,7 +65,7 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa private static final boolean DEBUG = true; private static MiniDFSCluster dfsCluster; - ThreadPoolExecutor executor = new ThreadPoolExecutor(0, + ThreadPoolExecutor executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue(), new DefaultSolrThreadFactory("testExecutor")); diff --git a/solr/core/src/test/org/apache/solr/cloud/UnloadDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/UnloadDistributedZkTest.java index 3e9097e40e8..138de11edb6 100644 --- a/solr/core/src/test/org/apache/solr/cloud/UnloadDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/UnloadDistributedZkTest.java @@ -30,6 +30,7 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.update.DirectUpdateHandler2; import org.apache.solr.util.DefaultSolrThreadFactory; import org.junit.Test; @@ -366,37 +367,40 @@ public class UnloadDistributedZkTest extends BasicDistributedZkTest { try (final HttpSolrClient adminClient = new HttpSolrClient(url3)) { adminClient.setConnectionTimeout(15000); adminClient.setSoTimeout(60000); - ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, + int cnt = atLeast(3); + ThreadPoolExecutor executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue(), new DefaultSolrThreadFactory("testExecutor")); - int cnt = atLeast(3); + try { + // create the cores + createCores(adminClient, executor, "multiunload", 2, cnt); + } finally { + ExecutorUtil.shutdownAndAwaitTermination(executor, 120, TimeUnit.SECONDS); + } - // create the cores - createCores(adminClient, executor, "multiunload", 2, cnt); - - executor.shutdown(); - executor.awaitTermination(120, TimeUnit.SECONDS); - executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, + executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue(), new DefaultSolrThreadFactory("testExecutor")); - for (int j = 0; j < cnt; j++) { - final int freezeJ = j; - executor.execute(new Runnable() { - @Override - public void run() { - Unload unloadCmd = new Unload(true); - unloadCmd.setCoreName("multiunload" + freezeJ); - try { - adminClient.request(unloadCmd); - } catch (SolrServerException | IOException e) { - throw new RuntimeException(e); + try { + for (int j = 0; j < cnt; j++) { + final int freezeJ = j; + executor.execute(new Runnable() { + @Override + public void run() { + Unload unloadCmd = new Unload(true); + unloadCmd.setCoreName("multiunload" + freezeJ); + try { + adminClient.request(unloadCmd); + } catch (SolrServerException | IOException e) { + throw new RuntimeException(e); + } } - } - }); - Thread.sleep(random().nextInt(50)); + }); + Thread.sleep(random().nextInt(50)); + } + } finally { + ExecutorUtil.shutdownAndAwaitTermination(executor, 120, TimeUnit.SECONDS); } - executor.shutdown(); - executor.awaitTermination(120, TimeUnit.SECONDS); } } diff --git a/solr/core/src/test/org/apache/solr/core/SolrCoreTest.java b/solr/core/src/test/org/apache/solr/core/SolrCoreTest.java index b78b8d71ffd..224bd835ae8 100644 --- a/solr/core/src/test/org/apache/solr/core/SolrCoreTest.java +++ b/solr/core/src/test/org/apache/solr/core/SolrCoreTest.java @@ -18,6 +18,7 @@ package org.apache.solr.core; import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.handler.RequestHandlerBase; import org.apache.solr.handler.component.QueryComponent; import org.apache.solr.handler.component.SpellCheckComponent; @@ -136,7 +137,7 @@ public class SolrCoreTest extends SolrTestCaseJ4 { final int LOOP = 100; final int MT = 16; - ExecutorService service = Executors.newFixedThreadPool(MT, new DefaultSolrThreadFactory("refCountMT")); + ExecutorService service = ExecutorUtil.newMDCAwareFixedThreadPool(MT, new DefaultSolrThreadFactory("refCountMT")); List> callees = new ArrayList<>(MT); final CoreContainer cores = h.getCoreContainer(); for (int i = 0; i < MT; ++i) { diff --git a/solr/core/src/test/org/apache/solr/update/AddBlockUpdateTest.java b/solr/core/src/test/org/apache/solr/update/AddBlockUpdateTest.java index 9da8f4eb252..7200d7d200f 100644 --- a/solr/core/src/test/org/apache/solr/update/AddBlockUpdateTest.java +++ b/solr/core/src/test/org/apache/solr/update/AddBlockUpdateTest.java @@ -33,6 +33,7 @@ import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.solrj.request.RequestWriter; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.JavaBinCodec; import org.apache.solr.handler.loader.XMLLoader; import org.apache.solr.search.SolrIndexSearcher; @@ -92,8 +93,8 @@ public class AddBlockUpdateTest extends SolrTestCaseJ4 { inputFactory = XMLInputFactory.newInstance(); exe = // Executors.newSingleThreadExecutor(); - rarely() ? Executors.newFixedThreadPool(atLeast(2), new DefaultSolrThreadFactory("AddBlockUpdateTest")) : Executors - .newCachedThreadPool(new DefaultSolrThreadFactory("AddBlockUpdateTest")); + rarely() ? ExecutorUtil.newMDCAwareFixedThreadPool(atLeast(2), new DefaultSolrThreadFactory("AddBlockUpdateTest")) : ExecutorUtil + .newMDCAwareCachedThreadPool(new DefaultSolrThreadFactory("AddBlockUpdateTest")); initCore("solrconfig.xml", "schema15.xml"); diff --git a/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java b/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java index 89e898c8adb..779e12cc481 100644 --- a/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java +++ b/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java @@ -20,6 +20,7 @@ package org.apache.solr.update; import org.apache.lucene.util.TestUtil; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.util.DefaultSolrThreadFactory; import org.junit.Before; import org.junit.BeforeClass; @@ -359,7 +360,7 @@ public class TestDocBasedVersionConstraints extends SolrTestCaseJ4 { public void testConcurrentAdds() throws Exception { final int NUM_DOCS = atLeast(50); final int MAX_CONCURENT = atLeast(10); - ExecutorService runner = Executors.newFixedThreadPool(MAX_CONCURENT, new DefaultSolrThreadFactory("TestDocBasedVersionConstraints")); + ExecutorService runner = ExecutorUtil.newMDCAwareFixedThreadPool(MAX_CONCURENT, new DefaultSolrThreadFactory("TestDocBasedVersionConstraints")); // runner = Executors.newFixedThreadPool(1); // to test single threaded try { for (int id = 0; id < NUM_DOCS; id++) { @@ -393,7 +394,7 @@ public class TestDocBasedVersionConstraints extends SolrTestCaseJ4 { ,"/response/docs==["+expectedDoc+"]"); } } finally { - runner.shutdownNow(); + ExecutorUtil.shutdownAndAwaitTermination(runner); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java index eae844382ed..7beb1678222 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -46,6 +46,7 @@ import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.Hash; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SolrjNamedThreadFactory; @@ -107,8 +108,8 @@ public class CloudSolrClient extends SolrClient { private final boolean updatesToLeaders; private boolean parallelUpdates = true; - private ExecutorService threadPool = Executors - .newCachedThreadPool(new SolrjNamedThreadFactory( + private ExecutorService threadPool = ExecutorUtil + .newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory( "CloudSolrServer ThreadPool")); private String idField = "id"; public static final String STATE_VERSION = "_stateVer_"; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java index cc005fed429..65508e26955 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java @@ -36,6 +36,7 @@ import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SolrjNamedThreadFactory; @@ -103,7 +104,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient { public ConcurrentUpdateSolrClient(String solrServerUrl, HttpClient client, int queueSize, int threadCount) { - this(solrServerUrl, client, queueSize, threadCount, Executors.newCachedThreadPool( + this(solrServerUrl, client, queueSize, threadCount, ExecutorUtil.newMDCAwareCachedThreadPool( new SolrjNamedThreadFactory("concurrentUpdateScheduler"))); shutdownExecutor = true; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java index 78e39ca8532..4eef33faa02 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java @@ -52,6 +52,7 @@ import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.ContentStream; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.slf4j.Logger; @@ -259,7 +260,7 @@ public class HttpSolrClient extends SolrClient { public HttpUriRequestResponse httpUriRequest(final SolrRequest request, final ResponseParser processor) throws SolrServerException, IOException { HttpUriRequestResponse mrr = new HttpUriRequestResponse(); final HttpRequestBase method = createMethod(request, null); - ExecutorService pool = Executors.newFixedThreadPool(1, new SolrjNamedThreadFactory("httpUriRequest")); + ExecutorService pool = ExecutorUtil.newMDCAwareFixedThreadPool(1, new SolrjNamedThreadFactory("httpUriRequest")); try { mrr.future = pool.submit(new Callable>(){ diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java index b99dac5402d..12fe66526ac 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java @@ -39,6 +39,7 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.SolrjNamedThreadFactory; /** @@ -197,7 +198,7 @@ public class CloudSolrStream extends TupleStream { } private void openStreams() throws IOException { - ExecutorService service = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("CloudSolrStream")); + ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("CloudSolrStream")); try { List> futures = new ArrayList(); for (TupleStream solrStream : solrStreams) { diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java index 8ef15df6be0..0d7b8952cc7 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java @@ -76,7 +76,7 @@ public class SolrZkClient implements Closeable { private ZkCmdExecutor zkCmdExecutor; - private final ExecutorService zkCallbackExecutor = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("zkCallback")); + private final ExecutorService zkCallbackExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("zkCallback")); private volatile boolean isClosed = false; private ZkClientConnectionStrategy zkClientConnectionStrategy; diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java index 0c058c94af4..0d5e704c647 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java @@ -17,11 +17,19 @@ package org.apache.solr.common.util; * limitations under the License. */ +import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; public class ExecutorUtil { @@ -46,12 +54,16 @@ public class ExecutorUtil { } public static void shutdownAndAwaitTermination(ExecutorService pool) { + shutdownAndAwaitTermination(pool, 60, TimeUnit.SECONDS); + } + + public static void shutdownAndAwaitTermination(ExecutorService pool, long timeout, TimeUnit timeUnit) { pool.shutdown(); // Disable new tasks from being submitted boolean shutdown = false; while (!shutdown) { try { // Wait a while for existing tasks to terminate - shutdown = pool.awaitTermination(60, TimeUnit.SECONDS); + shutdown = pool.awaitTermination(timeout, timeUnit); } catch (InterruptedException ie) { // Preserve interrupt status Thread.currentThread().interrupt(); @@ -61,4 +73,78 @@ public class ExecutorUtil { } } } + + /** + * See {@link java.util.concurrent.Executors#newFixedThreadPool(int, ThreadFactory)} + */ + public static ExecutorService newMDCAwareFixedThreadPool(int nThreads, ThreadFactory threadFactory) { + return new MDCAwareThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory); + } + + /** + * See {@link java.util.concurrent.Executors#newSingleThreadExecutor(ThreadFactory)} + */ + public static ExecutorService newMDCAwareSingleThreadExecutor(ThreadFactory threadFactory) { + return new MDCAwareThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory); + } + + /** + * See {@link java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)} + */ + public static ExecutorService newMDCAwareCachedThreadPool(ThreadFactory threadFactory) { + return new MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue(), + threadFactory); + } + + public static class MDCAwareThreadPoolExecutor extends ThreadPoolExecutor { + + public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public MDCAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + @Override + public void execute(final Runnable command) { + final Map submitterContext = MDC.getCopyOfContextMap(); + super.execute(new Runnable() { + @Override + public void run() { + Map threadContext = MDC.getCopyOfContextMap(); + if (submitterContext != null) { + MDC.setContextMap(submitterContext); + } else { + MDC.clear(); + } + try { + command.run(); + } finally { + if (threadContext != null) { + MDC.setContextMap(threadContext); + } else { + MDC.clear(); + } + } + } + }); + } + } } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java index c8129a7d928..64c1788d836 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java @@ -23,6 +23,7 @@ import org.apache.solr.client.solrj.embedded.JettyConfig; import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.eclipse.jetty.servlet.ServletHolder; import org.junit.BeforeClass; @@ -167,7 +168,7 @@ public class ConcurrentUpdateSolrClientTest extends SolrJettyTestBase { concurrentClient.blockUntilFinished(); int poolSize = 5; - ExecutorService threadPool = Executors.newFixedThreadPool(poolSize, new SolrjNamedThreadFactory("testCUSS")); + ExecutorService threadPool = ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new SolrjNamedThreadFactory("testCUSS")); int numDocs = 100; int numRunnables = 5; diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java index 7075ffa1a70..d6663932025 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java @@ -30,6 +30,7 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.apache.zookeeper.KeeperException; @@ -67,7 +68,7 @@ public class MiniSolrCloudCluster { private final CloudSolrClient solrClient; private final JettyConfig jettyConfig; - private final ExecutorService executor = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("jetty-launcher")); + private final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("jetty-launcher")); /** * Create a MiniSolrCloudCluster diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java index 2551a3d1e0d..cd1a9a209fe 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java @@ -473,7 +473,7 @@ public class ZkTestServer { } else { this.clientPortAddress = new InetSocketAddress(clientPort); } - System.out.println("client port:" + this.clientPortAddress); + log.info("client port:" + this.clientPortAddress); } };