SOLR-3658: Adding thousands of docs with one UpdateProcessorChain instance can briefly create spikes of threads in the thousands.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1364355 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-07-22 16:54:26 +00:00
parent 4ecb3975c3
commit a798414614
4 changed files with 71 additions and 16 deletions

View File

@ -117,6 +117,9 @@ Bug Fixes
* SOLR-3660: Velocity: Link to admin page broken (janhoy)
* SOLR-3658: Adding thousands of docs with one UpdateProcessorChain instance can briefly create
spikes of threads in the thousands. (yonik, Mark Miller)
Other Changes
----------------------

View File

@ -24,12 +24,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -49,17 +53,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SolrCmdDistributor {
private static final int MAX_RETRIES_ON_FORWARD = 6;
public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
// TODO: shut this thing down
// TODO: this cannot be per instance...
static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("cmdDistribExecutor"));
static BoundedExecutor commExecutor;
static final HttpClient client;
@ -91,8 +91,22 @@ public class SolrCmdDistributor {
ModifiableSolrParams params;
}
public SolrCmdDistributor() {
public SolrCmdDistributor(int numHosts) {
BoundedExecutor executor = null;
synchronized (SolrCmdDistributor.class) {
if (commExecutor == null || commExecutor.getMaximumPoolSize() != numHosts) {
// we don't shutdown the previous because all it's threads will die
int maxPoolSize = Math.max(8, (numHosts-1) * 8);
commExecutor = new BoundedExecutor(0, maxPoolSize, 5,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(maxPoolSize * 2),
new DefaultSolrThreadFactory("cmdDistribExecutor"));
}
executor = commExecutor;
}
completionService = new ExecutorCompletionService<Request>(executor);
pending = new HashSet<Future<Request>>();
}
public void finish() {
@ -297,10 +311,7 @@ public class SolrCmdDistributor {
}
public void submit(final Request sreq) {
if (completionService == null) {
completionService = new ExecutorCompletionService<Request>(commExecutor);
pending = new HashSet<Future<Request>>();
}
final String url = sreq.node.getUrl();
Callable<Request> task = new Callable<Request>() {
@ -502,4 +513,40 @@ public class SolrCmdDistributor {
return nodeProps;
}
}
public class BoundedExecutor extends ThreadPoolExecutor {
private final Semaphore semaphore;
public BoundedExecutor(int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.semaphore = new Semaphore(maximumPoolSize);
}
@Override
public void execute(final Runnable command) {
try {
semaphore.acquire();
} catch (InterruptedException e1) {
throw new RuntimeException();
}
try {
super.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
}
}

View File

@ -130,6 +130,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private boolean forwardToLeader = false;
private List<Node> nodes;
private int numNodes;
public DistributedUpdateProcessor(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) {
@ -164,7 +166,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
collection = cloudDesc.getCollectionName();
}
cmdDistrib = new SolrCmdDistributor();
cmdDistrib = new SolrCmdDistributor(numNodes);
}
private List<Node> setupRequest(int hash) {
@ -172,6 +174,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// if we are in zk mode...
if (zkEnabled) {
// set num nodes
numNodes = zkController.getCloudState().getLiveNodes().size();
// the leader is...
// TODO: if there is no leader, wait and look again
// TODO: we are reading the leader from zk every time - we should cache

View File

@ -82,7 +82,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
public void doTest() throws Exception {
//del("*:*");
SolrCmdDistributor cmdDistrib = new SolrCmdDistributor();
SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(8);
ModifiableSolrParams params = new ModifiableSolrParams();
List<Node> nodes = new ArrayList<Node>();
@ -116,7 +116,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
// add another 2 docs to control and 3 to client
cmdDistrib = new SolrCmdDistributor();
cmdDistrib = new SolrCmdDistributor(8);
cmd.solrDoc = sdoc("id", 2);
cmdDistrib.distribAdd(cmd, nodes, params);
@ -149,7 +149,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
dcmd.id = "2";
cmdDistrib = new SolrCmdDistributor();
cmdDistrib = new SolrCmdDistributor(8);
cmdDistrib.distribDelete(dcmd, nodes, params);
cmdDistrib.distribCommit(ccmd, nodes, params);