diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java index 25a1bb1aeae..965917fb127 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java @@ -16,54 +16,56 @@ package org.apache.solr.handler.component; * limitations under the License. */ -import java.net.MalformedURLException; -import java.util.Random; -import java.util.concurrent.Executor; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.solr.client.solrj.impl.LBHttpSolrServer; import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.NamedList; import org.apache.solr.core.PluginInfo; -import org.apache.solr.core.SolrCore; import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.plugin.PluginInfoInitialized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.MalformedURLException; +import java.util.Random; +import java.util.concurrent.*; -public class HttpShardHandlerFactory extends ShardHandlerFactory implements PluginInfoInitialized{ +public class HttpShardHandlerFactory extends ShardHandlerFactory implements PluginInfoInitialized { protected static Logger log = LoggerFactory.getLogger(HttpShardHandlerFactory.class); - // We want an executor that doesn't take up any resources if + // We want an executor that doesn't take up any resources if // it's not used, so it could be created statically for // the distributed search component if desired. // // Consider CallerRuns policy and a lower max threads to throttle // requests at some point (or should we simply return failure?) - ThreadPoolExecutor commExecutor = new ThreadPoolExecutor( - 0, - Integer.MAX_VALUE, - 5, TimeUnit.SECONDS, // terminate idle threads after 5 sec - new SynchronousQueue(), // directly hand off tasks - new DefaultSolrThreadFactory("httpShardExecutor") - ); - + ThreadPoolExecutor commExecutor = new ThreadPoolExecutor( + 0, + Integer.MAX_VALUE, + 5, TimeUnit.SECONDS, // terminate idle threads after 5 sec + new SynchronousQueue(), // directly hand off tasks + new DefaultSolrThreadFactory("httpShardExecutor") + ); HttpClient client; Random r = new Random(); LBHttpSolrServer loadbalancer; int soTimeout = 0; //current default values int connectionTimeout = 0; //current default values + int maxConnectionsPerHost = 20; + int corePoolSize = 0; + int maximumPoolSize = 10; + int keepAliveTime = 5; + int queueSize = 1; + boolean accessPolicy = true; + public String scheme = "http://"; //current default values private MultiThreadedHttpConnectionManager mgr; - // socket timeout measured in ms, closes a socket if read + // socket timeout measured in ms, closes a socket if read // takes longer than x ms to complete. throws // java.net.SocketTimeoutException: Read timed out exception static final String INIT_SO_TIMEOUT = "socketTimeout"; @@ -76,42 +78,63 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements Plug // URL scheme to be used in distributed search. static final String INIT_URL_SCHEME = "urlScheme"; + // Maximum connections allowed per host + static final String INIT_MAX_CONNECTION_PER_HOST = "maxConnectionsPerHost"; + // The core size of the threadpool servicing requests + static final String INIT_CORE_POOL_SIZE = "corePoolSize"; - public ShardHandler getShardHandler(){ + // The maximum size of the threadpool servicing requests + static final String INIT_MAX_POOL_SIZE = "maximumPoolSize"; + + // The amount of time idle threads persist for in the queue, before being killed + static final String MAX_THREAD_IDLE_TIME = "maxThreadIdleTime"; + + // If the threadpool uses a backing queue, what is its maximum size (-1) to use direct handoff + static final String INIT_SIZE_OF_QUEUE = "sizeOfQueue"; + + // Configure if the threadpool favours fairness over throughput + static final String INIT_FAIRNESS_POLICY = "fairnessPolicy"; + + public ShardHandler getShardHandler() { return getShardHandler(null); } - - public ShardHandler getShardHandler(HttpClient httpClient){ + public ShardHandler getShardHandler(HttpClient httpClient) { return new HttpShardHandler(this, httpClient); } public void init(PluginInfo info) { + NamedList args = info.initArgs; + this.soTimeout = getParameter(args, INIT_SO_TIMEOUT, 0); - if (info.initArgs != null) { - Object so = info.initArgs.get(INIT_SO_TIMEOUT); - if (so != null) { - soTimeout = (Integer) so; - log.info("Setting socketTimeout to: " + soTimeout); - } + this.scheme = getParameter(args, INIT_URL_SCHEME, "http://"); + this.scheme = (this.scheme.endsWith("://")) ? this.scheme : this.scheme + "://"; + this.connectionTimeout = getParameter(args, INIT_CONNECTION_TIMEOUT, 0); + this.maxConnectionsPerHost = getParameter(args, INIT_MAX_CONNECTION_PER_HOST, 20); + this.corePoolSize = getParameter(args, INIT_CORE_POOL_SIZE, 0); + this.maximumPoolSize = getParameter(args, INIT_MAX_POOL_SIZE, Integer.MAX_VALUE); + this.keepAliveTime = getParameter(args, MAX_THREAD_IDLE_TIME, 5); + this.queueSize = getParameter(args, INIT_SIZE_OF_QUEUE, -1); + this.accessPolicy = getParameter(args, INIT_FAIRNESS_POLICY, false); + + BlockingQueue blockingQueue = (this.queueSize == -1) ? + new SynchronousQueue(this.accessPolicy) : + new ArrayBlockingQueue(this.queueSize, this.accessPolicy); + + this.commExecutor = new ThreadPoolExecutor( + this.corePoolSize, + this.maximumPoolSize, + this.keepAliveTime, TimeUnit.SECONDS, + blockingQueue, + new DefaultSolrThreadFactory("httpShardExecutor") + ); - Object urlScheme = info.initArgs.get(INIT_URL_SCHEME); - if (urlScheme != null) { - scheme = urlScheme + "://"; - log.info("Setting urlScheme to: " + urlScheme); - } - Object co = info.initArgs.get(INIT_CONNECTION_TIMEOUT); - if (co != null) { - connectionTimeout = (Integer) co; - log.info("Setting shard-connection-timeout to: " + connectionTimeout); - } - } mgr = new MultiThreadedHttpConnectionManager(); - mgr.getParams().setDefaultMaxConnectionsPerHost(256); + mgr.getParams().setDefaultMaxConnectionsPerHost(this.maxConnectionsPerHost); mgr.getParams().setMaxTotalConnections(10000); - mgr.getParams().setConnectionTimeout(connectionTimeout); - mgr.getParams().setSoTimeout(soTimeout); + mgr.getParams().setConnectionTimeout(this.connectionTimeout); + mgr.getParams().setSoTimeout(this.soTimeout); // mgr.getParams().setStaleCheckingEnabled(false); client = new HttpClient(mgr); @@ -124,11 +147,22 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements Plug loadbalancer = new LBHttpSolrServer(client); } catch (MalformedURLException e) { // should be impossible since we're not passing any URLs here - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,e); + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); } } + private T getParameter(NamedList initArgs, String configKey, T defaultValue) { + T toReturn = defaultValue; + if (initArgs != null) { + T temp = (T) initArgs.get(configKey); + toReturn = (temp != null) ? temp : defaultValue; + } + log.info("Setting {} to: {}", configKey, soTimeout); + return toReturn; + } + + @Override public void close() { try {