mirror of https://github.com/apache/lucene.git
Fix for SOLR-3221: "Make Shard handler threadpool configurable"
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1300160 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8452214c35
commit
3f82d831db
|
@ -16,54 +16,56 @@ package org.apache.solr.handler.component;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
|
||||
import org.apache.commons.httpclient.HttpClient;
|
||||
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
|
||||
import org.apache.commons.httpclient.params.HttpMethodParams;
|
||||
import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.PluginInfo;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.solr.util.plugin.PluginInfoInitialized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
public class HttpShardHandlerFactory extends ShardHandlerFactory implements PluginInfoInitialized{
|
||||
public class HttpShardHandlerFactory extends ShardHandlerFactory implements PluginInfoInitialized {
|
||||
protected static Logger log = LoggerFactory.getLogger(HttpShardHandlerFactory.class);
|
||||
|
||||
// We want an executor that doesn't take up any resources if
|
||||
// We want an executor that doesn't take up any resources if
|
||||
// it's not used, so it could be created statically for
|
||||
// the distributed search component if desired.
|
||||
//
|
||||
// Consider CallerRuns policy and a lower max threads to throttle
|
||||
// requests at some point (or should we simply return failure?)
|
||||
ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(
|
||||
0,
|
||||
Integer.MAX_VALUE,
|
||||
5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
|
||||
new SynchronousQueue<Runnable>(), // directly hand off tasks
|
||||
new DefaultSolrThreadFactory("httpShardExecutor")
|
||||
);
|
||||
|
||||
ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(
|
||||
0,
|
||||
Integer.MAX_VALUE,
|
||||
5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
|
||||
new SynchronousQueue<Runnable>(), // directly hand off tasks
|
||||
new DefaultSolrThreadFactory("httpShardExecutor")
|
||||
);
|
||||
|
||||
HttpClient client;
|
||||
Random r = new Random();
|
||||
LBHttpSolrServer loadbalancer;
|
||||
int soTimeout = 0; //current default values
|
||||
int connectionTimeout = 0; //current default values
|
||||
int maxConnectionsPerHost = 20;
|
||||
int corePoolSize = 0;
|
||||
int maximumPoolSize = 10;
|
||||
int keepAliveTime = 5;
|
||||
int queueSize = 1;
|
||||
boolean accessPolicy = true;
|
||||
|
||||
public String scheme = "http://"; //current default values
|
||||
|
||||
private MultiThreadedHttpConnectionManager mgr;
|
||||
// socket timeout measured in ms, closes a socket if read
|
||||
// socket timeout measured in ms, closes a socket if read
|
||||
// takes longer than x ms to complete. throws
|
||||
// java.net.SocketTimeoutException: Read timed out exception
|
||||
static final String INIT_SO_TIMEOUT = "socketTimeout";
|
||||
|
@ -76,42 +78,63 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements Plug
|
|||
// URL scheme to be used in distributed search.
|
||||
static final String INIT_URL_SCHEME = "urlScheme";
|
||||
|
||||
// Maximum connections allowed per host
|
||||
static final String INIT_MAX_CONNECTION_PER_HOST = "maxConnectionsPerHost";
|
||||
|
||||
// The core size of the threadpool servicing requests
|
||||
static final String INIT_CORE_POOL_SIZE = "corePoolSize";
|
||||
|
||||
public ShardHandler getShardHandler(){
|
||||
// The maximum size of the threadpool servicing requests
|
||||
static final String INIT_MAX_POOL_SIZE = "maximumPoolSize";
|
||||
|
||||
// The amount of time idle threads persist for in the queue, before being killed
|
||||
static final String MAX_THREAD_IDLE_TIME = "maxThreadIdleTime";
|
||||
|
||||
// If the threadpool uses a backing queue, what is its maximum size (-1) to use direct handoff
|
||||
static final String INIT_SIZE_OF_QUEUE = "sizeOfQueue";
|
||||
|
||||
// Configure if the threadpool favours fairness over throughput
|
||||
static final String INIT_FAIRNESS_POLICY = "fairnessPolicy";
|
||||
|
||||
public ShardHandler getShardHandler() {
|
||||
return getShardHandler(null);
|
||||
}
|
||||
|
||||
|
||||
public ShardHandler getShardHandler(HttpClient httpClient){
|
||||
public ShardHandler getShardHandler(HttpClient httpClient) {
|
||||
return new HttpShardHandler(this, httpClient);
|
||||
}
|
||||
|
||||
public void init(PluginInfo info) {
|
||||
NamedList args = info.initArgs;
|
||||
this.soTimeout = getParameter(args, INIT_SO_TIMEOUT, 0);
|
||||
|
||||
if (info.initArgs != null) {
|
||||
Object so = info.initArgs.get(INIT_SO_TIMEOUT);
|
||||
if (so != null) {
|
||||
soTimeout = (Integer) so;
|
||||
log.info("Setting socketTimeout to: " + soTimeout);
|
||||
}
|
||||
this.scheme = getParameter(args, INIT_URL_SCHEME, "http://");
|
||||
this.scheme = (this.scheme.endsWith("://")) ? this.scheme : this.scheme + "://";
|
||||
this.connectionTimeout = getParameter(args, INIT_CONNECTION_TIMEOUT, 0);
|
||||
this.maxConnectionsPerHost = getParameter(args, INIT_MAX_CONNECTION_PER_HOST, 20);
|
||||
this.corePoolSize = getParameter(args, INIT_CORE_POOL_SIZE, 0);
|
||||
this.maximumPoolSize = getParameter(args, INIT_MAX_POOL_SIZE, Integer.MAX_VALUE);
|
||||
this.keepAliveTime = getParameter(args, MAX_THREAD_IDLE_TIME, 5);
|
||||
this.queueSize = getParameter(args, INIT_SIZE_OF_QUEUE, -1);
|
||||
this.accessPolicy = getParameter(args, INIT_FAIRNESS_POLICY, false);
|
||||
|
||||
BlockingQueue<Runnable> blockingQueue = (this.queueSize == -1) ?
|
||||
new SynchronousQueue<Runnable>(this.accessPolicy) :
|
||||
new ArrayBlockingQueue<Runnable>(this.queueSize, this.accessPolicy);
|
||||
|
||||
this.commExecutor = new ThreadPoolExecutor(
|
||||
this.corePoolSize,
|
||||
this.maximumPoolSize,
|
||||
this.keepAliveTime, TimeUnit.SECONDS,
|
||||
blockingQueue,
|
||||
new DefaultSolrThreadFactory("httpShardExecutor")
|
||||
);
|
||||
|
||||
Object urlScheme = info.initArgs.get(INIT_URL_SCHEME);
|
||||
if (urlScheme != null) {
|
||||
scheme = urlScheme + "://";
|
||||
log.info("Setting urlScheme to: " + urlScheme);
|
||||
}
|
||||
Object co = info.initArgs.get(INIT_CONNECTION_TIMEOUT);
|
||||
if (co != null) {
|
||||
connectionTimeout = (Integer) co;
|
||||
log.info("Setting shard-connection-timeout to: " + connectionTimeout);
|
||||
}
|
||||
}
|
||||
mgr = new MultiThreadedHttpConnectionManager();
|
||||
mgr.getParams().setDefaultMaxConnectionsPerHost(256);
|
||||
mgr.getParams().setDefaultMaxConnectionsPerHost(this.maxConnectionsPerHost);
|
||||
mgr.getParams().setMaxTotalConnections(10000);
|
||||
mgr.getParams().setConnectionTimeout(connectionTimeout);
|
||||
mgr.getParams().setSoTimeout(soTimeout);
|
||||
mgr.getParams().setConnectionTimeout(this.connectionTimeout);
|
||||
mgr.getParams().setSoTimeout(this.soTimeout);
|
||||
// mgr.getParams().setStaleCheckingEnabled(false);
|
||||
|
||||
client = new HttpClient(mgr);
|
||||
|
@ -124,11 +147,22 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements Plug
|
|||
loadbalancer = new LBHttpSolrServer(client);
|
||||
} catch (MalformedURLException e) {
|
||||
// should be impossible since we're not passing any URLs here
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,e);
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private <T> T getParameter(NamedList initArgs, String configKey, T defaultValue) {
|
||||
T toReturn = defaultValue;
|
||||
if (initArgs != null) {
|
||||
T temp = (T) initArgs.get(configKey);
|
||||
toReturn = (temp != null) ? temp : defaultValue;
|
||||
}
|
||||
log.info("Setting {} to: {}", configKey, soTimeout);
|
||||
return toReturn;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue