From a798414614c91aca4b8f8808828bdea310d70189 Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Sun, 22 Jul 2012 16:54:26 +0000 Subject: [PATCH] SOLR-3658: Adding thousands of docs with one UpdateProcessorChain instance can briefly create spikes of threads in the thousands. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1364355 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 3 + .../solr/update/SolrCmdDistributor.java | 71 +++++++++++++++---- .../processor/DistributedUpdateProcessor.java | 7 +- .../solr/update/SolrCmdDistributorTest.java | 6 +- 4 files changed, 71 insertions(+), 16 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 8909fe9c292..4e3fdd867ec 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -117,6 +117,9 @@ Bug Fixes * SOLR-3660: Velocity: Link to admin page broken (janhoy) +* SOLR-3658: Adding thousands of docs with one UpdateProcessorChain instance can briefly create + spikes of threads in the thousands. (yonik, Mark Miller) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index a79e663cb30..e941546db02 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -24,12 +24,16 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; -import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -49,17 +53,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - - public class SolrCmdDistributor { private static final int MAX_RETRIES_ON_FORWARD = 6; public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class); // TODO: shut this thing down // TODO: this cannot be per instance... - static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0, - Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue(), - new DefaultSolrThreadFactory("cmdDistribExecutor")); + static BoundedExecutor commExecutor; static final HttpClient client; @@ -91,8 +91,22 @@ public class SolrCmdDistributor { ModifiableSolrParams params; } - public SolrCmdDistributor() { - + public SolrCmdDistributor(int numHosts) { + + BoundedExecutor executor = null; + synchronized (SolrCmdDistributor.class) { + if (commExecutor == null || commExecutor.getMaximumPoolSize() != numHosts) { + // we don't shutdown the previous because all it's threads will die + int maxPoolSize = Math.max(8, (numHosts-1) * 8); + commExecutor = new BoundedExecutor(0, maxPoolSize, 5, + TimeUnit.SECONDS, new ArrayBlockingQueue(maxPoolSize * 2), + new DefaultSolrThreadFactory("cmdDistribExecutor")); + } + executor = commExecutor; + } + + completionService = new ExecutorCompletionService(executor); + pending = new HashSet>(); } public void finish() { @@ -297,10 +311,7 @@ public class SolrCmdDistributor { } public void submit(final Request sreq) { - if (completionService == null) { - completionService = new ExecutorCompletionService(commExecutor); - pending = new HashSet>(); - } + final String url = sreq.node.getUrl(); Callable task = new Callable() { @@ -502,4 +513,40 @@ public class SolrCmdDistributor { return nodeProps; } } + + public class BoundedExecutor extends ThreadPoolExecutor { + private final Semaphore semaphore; + + public BoundedExecutor(int corePoolSize, + int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + this.semaphore = new Semaphore(maximumPoolSize); + } + + @Override + public void execute(final Runnable command) { + try { + semaphore.acquire(); + } catch (InterruptedException e1) { + throw new RuntimeException(); + } + try { + super.execute(new Runnable() { + public void run() { + try { + command.run(); + } finally { + semaphore.release(); + } + } + }); + } catch (RejectedExecutionException e) { + semaphore.release(); + throw e; + } + } } +} + + diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index f7323c9f5ff..3ab86395223 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -130,6 +130,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { private boolean forwardToLeader = false; private List nodes; + private int numNodes; + public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { @@ -164,7 +166,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { collection = cloudDesc.getCollectionName(); } - cmdDistrib = new SolrCmdDistributor(); + cmdDistrib = new SolrCmdDistributor(numNodes); } private List setupRequest(int hash) { @@ -172,6 +174,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // if we are in zk mode... if (zkEnabled) { + // set num nodes + numNodes = zkController.getCloudState().getLiveNodes().size(); + // the leader is... // TODO: if there is no leader, wait and look again // TODO: we are reading the leader from zk every time - we should cache diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java index 730f5c46c0f..16a8077c88b 100644 --- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java +++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java @@ -82,7 +82,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { public void doTest() throws Exception { //del("*:*"); - SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(); + SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(8); ModifiableSolrParams params = new ModifiableSolrParams(); List nodes = new ArrayList(); @@ -116,7 +116,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps))); // add another 2 docs to control and 3 to client - cmdDistrib = new SolrCmdDistributor(); + cmdDistrib = new SolrCmdDistributor(8); cmd.solrDoc = sdoc("id", 2); cmdDistrib.distribAdd(cmd, nodes, params); @@ -149,7 +149,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null); dcmd.id = "2"; - cmdDistrib = new SolrCmdDistributor(); + cmdDistrib = new SolrCmdDistributor(8); cmdDistrib.distribDelete(dcmd, nodes, params); cmdDistrib.distribCommit(ccmd, nodes, params);