From 4e099f3571df9871148aad1bd300f78e53a11eed Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Thu, 2 Aug 2012 21:58:50 +0000 Subject: [PATCH] SOLR-3658: SolrCmdDistributor can briefly create spikes of threads in the thousands - second pass git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1368725 13f79535-47bb-0310-9956-ffa450edef68 --- .../solr/update/SolrCmdDistributor.java | 71 ++++++++++------- .../processor/DistributedUpdateProcessor.java | 9 ++- .../apache/solr/util/AdjustableSemaphore.java | 79 +++++++++++++++++++ 3 files changed, 127 insertions(+), 32 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 77a32e0b298..6e5ee0d64b2 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -24,7 +24,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -33,6 +32,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -48,6 +48,7 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.SolrCore; +import org.apache.solr.util.AdjustableSemaphore; import org.apache.solr.util.DefaultSolrThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,11 +63,12 @@ public class SolrCmdDistributor { static BoundedExecutor commExecutor; static final HttpClient client; + static AdjustableSemaphore semaphore; static { ModifiableSolrParams params = new ModifiableSolrParams(); - params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 200); - params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 8); + params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 500); + params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 16); client = HttpClientUtil.createClient(params); } @@ -92,14 +94,22 @@ public class SolrCmdDistributor { } public SolrCmdDistributor(int numHosts) { - + int maxPoolSize = Math.max(8, (numHosts-1) * 8); BoundedExecutor executor = null; synchronized (SolrCmdDistributor.class) { - if (commExecutor == null || commExecutor.getMaximumPoolSize() != numHosts) { + if (semaphore == null) { + semaphore = new AdjustableSemaphore(maxPoolSize); + } + + if (maxPoolSize != semaphore.getMaxPermits()) { + // raise the permits to match maxPoolSize + semaphore.setMaxPermits(maxPoolSize); + } + + if (commExecutor == null || commExecutor.getMaximumPoolSize() != maxPoolSize) { // we don't shutdown the previous because all it's threads will die - int maxPoolSize = Math.max(8, (numHosts-1) * 8); commExecutor = new BoundedExecutor(0, maxPoolSize, 5, - TimeUnit.SECONDS, new ArrayBlockingQueue(maxPoolSize * 2), + TimeUnit.SECONDS, new SynchronousQueue(), new DefaultSolrThreadFactory("cmdDistribExecutor")); } executor = commExecutor; @@ -343,11 +353,17 @@ public class SolrCmdDistributor { } else { clonedRequest.rspCode = -1; } + } finally { + semaphore.release(); } return clonedRequest; } }; - + try { + semaphore.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(); + } pending.add(completionService.submit(task)); } @@ -517,36 +533,33 @@ public class SolrCmdDistributor { public class BoundedExecutor extends ThreadPoolExecutor { private final Semaphore semaphore; - public BoundedExecutor(int corePoolSize, - int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue workQueue, ThreadFactory threadFactory) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + public BoundedExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, + ThreadFactory threadFactory) { + super(corePoolSize, Integer.MAX_VALUE, keepAliveTime, unit, workQueue, + threadFactory); this.semaphore = new Semaphore(maximumPoolSize); } - + @Override public void execute(final Runnable command) { +// try { +// System.out.println("semaphore aq:" + semaphore.availablePermits()); +// semaphore.acquire(); +// System.out.println("aquired"); +// } catch (InterruptedException e1) { +// throw new RuntimeException(); +// } try { - semaphore.acquire(); - } catch (InterruptedException e1) { - throw new RuntimeException(); - } - try { - super.execute(new Runnable() { - public void run() { - try { - command.run(); - } finally { - semaphore.release(); - } - } - }); + super.execute(command); } catch (RejectedExecutionException e) { - semaphore.release(); throw e; + } finally { +// semaphore.release(); +// System.out.println("semaphore re:" + semaphore.availablePermits()); } } -} + } } diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 1a92c7c1ff3..434a32a0fe8 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -158,17 +158,20 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { CoreDescriptor coreDesc = req.getCore().getCoreDescriptor(); this.zkEnabled = coreDesc.getCoreContainer().isZooKeeperAware(); + zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController(); + if (zkEnabled) { + numNodes = zkController.getZkStateReader().getCloudState().getLiveNodes().size(); + } //this.rsp = reqInfo != null ? reqInfo.getRsp() : null; - - zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController(); + cloudDesc = coreDesc.getCloudDescriptor(); if (cloudDesc != null) { collection = cloudDesc.getCollectionName(); } - + cmdDistrib = new SolrCmdDistributor(numNodes); } diff --git a/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java b/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java new file mode 100644 index 00000000000..97b0495f929 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java @@ -0,0 +1,79 @@ +package org.apache.solr.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.concurrent.Semaphore; + +final public class AdjustableSemaphore { + + private final ResizeableSemaphore semaphore; + + private int maxPermits = 0; + + public AdjustableSemaphore(int size) { + semaphore = new ResizeableSemaphore(size); + } + + public synchronized void setMaxPermits(int newMax) { + if (newMax < 1) { + throw new IllegalArgumentException("Semaphore size must be at least 1," + + " was " + newMax); + } + + int delta = newMax - this.maxPermits; + + if (delta == 0) { + return; + } else if (delta > 0) { + this.semaphore.release(delta); + } else { + delta *= -1; + this.semaphore.reducePermits(delta); + } + + this.maxPermits = newMax; + } + + public void release() { + this.semaphore.release(); + } + + public void release(int numPermits) { + this.semaphore.release(numPermits); + } + + public void acquire() throws InterruptedException { + this.semaphore.acquire(); + } + + public int getMaxPermits() { + return maxPermits; + } + + private static final class ResizeableSemaphore extends Semaphore { + + ResizeableSemaphore(int size) { + super(size); + } + + @Override + protected void reducePermits(int reduction) { + super.reducePermits(reduction); + } + } +} \ No newline at end of file