SOLR-5496: We should share an http connection manager across non search HttpClients and ensure all http connection managers get shutdown.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1544899 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-11-24 01:59:56 +00:00
parent aa253e949f
commit 739dae21da
21 changed files with 171 additions and 153 deletions

View File

@ -147,6 +147,10 @@ Bug Fixes
a delta-import is run first before any full-imports.
(Sebastien Lorber, Arcadius Ahouansou via shalin)
* SOLR-5496: We should share an http connection manager across non search
HttpClients and ensure all http connection managers get shutdown.
(Mark Miller)
Optimizations
----------------------

View File

@ -118,9 +118,9 @@ class ShardLeaderElectionContextBase extends ElectionContext {
final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
private static Logger log = LoggerFactory.getLogger(ShardLeaderElectionContext.class);
private ZkController zkController;
private CoreContainer cc;
private SyncStrategy syncStrategy = new SyncStrategy();
private final ZkController zkController;
private final CoreContainer cc;
private final SyncStrategy syncStrategy;
private volatile boolean isClosed = false;
@ -131,6 +131,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
zkController.getZkStateReader());
this.zkController = zkController;
this.cc = cc;
syncStrategy = new SyncStrategy(cc.getUpdateShardHandler());
}
@Override

View File

@ -20,13 +20,10 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.common.SolrException;
@ -35,7 +32,6 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
@ -48,7 +44,7 @@ import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.PeerSync;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.update.UpdateShardHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,26 +54,18 @@ public class SyncStrategy {
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
private final ShardHandler shardHandler;
private ThreadPoolExecutor recoveryCmdExecutor = new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new DefaultSolrThreadFactory(
"recoveryCmdExecutor"));
private volatile boolean isClosed;
private final HttpClient client;
{
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
params.set(HttpClientUtil.PROP_USE_RETRY, false);
client = HttpClientUtil.createClient(params);
}
private final ExecutorService updateExecutor;
public SyncStrategy() {
public SyncStrategy(UpdateShardHandler updateShardHandler) {
client = updateShardHandler.getHttpClient();
shardHandler = new HttpShardHandlerFactory().getShardHandler(client);
updateExecutor = updateShardHandler.getUpdateExecutor();
}
private static class ShardCoreRequest extends ShardRequest {
@ -268,18 +256,6 @@ public class SyncStrategy {
public void close() {
this.isClosed = true;
try {
ExecutorUtil.shutdownAndAwaitTermination(recoveryCmdExecutor);
} catch (Throwable e) {
SolrException.log(log, e);
}
// we must close connection manager *after* shutting down executor
try {
client.getConnectionManager().shutdown();
} catch (Throwable e) {
SolrException.log(log, e);
}
}
private void requestRecovery(final ZkNodeProps leaderProps, final String baseUrl, final String coreName) throws SolrServerException, IOException {
@ -305,7 +281,7 @@ public class SyncStrategy {
}
}
};
recoveryCmdExecutor.execute(thread);
updateExecutor.execute(thread);
}
public static ModifiableSolrParams params(String... params) {

View File

@ -172,11 +172,9 @@ public final class ZkController {
private int clientTimeout;
private volatile boolean isClosed;
private UpdateShardHandler updateShardHandler;
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
String localHostContext, int leaderVoteWait, boolean genericCoreNodeNames, int distribUpdateConnTimeout, int distribUpdateSoTimeout, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
String localHostContext, int leaderVoteWait, boolean genericCoreNodeNames, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
TimeoutException, IOException {
if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
this.cc = cc;
@ -187,8 +185,6 @@ public final class ZkController {
// which means the default of "solr"
localHostContext = trimLeadingAndTrailingSlashes(localHostContext);
updateShardHandler = new UpdateShardHandler(distribUpdateConnTimeout, distribUpdateSoTimeout);
this.zkServerAddress = zkServerAddress;
this.localHostPort = locaHostPort;
this.localHostContext = localHostContext;
@ -411,13 +407,6 @@ public final class ZkController {
log.error("Error closing zkClient", t);
}
if (updateShardHandler != null) {
try {
updateShardHandler.close();
} catch(Throwable t) {
log.error("Error closing updateShardHandler", t);
}
}
}
/**
@ -1548,11 +1537,6 @@ public final class ZkController {
return clientTimeout;
}
// may return null if not in zk mode
public UpdateShardHandler getUpdateShardHandler() {
return updateShardHandler;
}
public Overseer getOverseer() {
return overseer;
}

View File

@ -156,6 +156,14 @@ public abstract class ConfigSolr {
public int getDistributedSocketTimeout() {
return getInt(CfgProp.SOLR_DISTRIBUPDATESOTIMEOUT, 0);
}
public int getMaxUpdateConnections() {
return getInt(CfgProp.SOLR_MAXUPDATECONNECTIONS, 10000);
}
public int getMaxUpdateConnectionsPerHost() {
return getInt(CfgProp.SOLR_MAXUPDATECONNECTIONSPERHOST, 100);
}
public int getCoreLoadThreadCount() {
return getInt(ConfigSolr.CfgProp.SOLR_CORELOADTHREADS, DEFAULT_CORE_LOAD_THREADS);
@ -207,6 +215,8 @@ public abstract class ConfigSolr {
SOLR_COREROOTDIRECTORY,
SOLR_DISTRIBUPDATECONNTIMEOUT,
SOLR_DISTRIBUPDATESOTIMEOUT,
SOLR_MAXUPDATECONNECTIONS,
SOLR_MAXUPDATECONNECTIONSPERHOST,
SOLR_HOST,
SOLR_HOSTCONTEXT,
SOLR_HOSTPORT,

View File

@ -107,6 +107,8 @@ public class ConfigSolrXml extends ConfigSolr {
propMap.put(CfgProp.SOLR_COREROOTDIRECTORY, doSub("solr/str[@name='coreRootDirectory']"));
propMap.put(CfgProp.SOLR_DISTRIBUPDATECONNTIMEOUT, doSub("solr/solrcloud/int[@name='distribUpdateConnTimeout']"));
propMap.put(CfgProp.SOLR_DISTRIBUPDATESOTIMEOUT, doSub("solr/solrcloud/int[@name='distribUpdateSoTimeout']"));
propMap.put(CfgProp.SOLR_MAXUPDATECONNECTIONS, doSub("solr/solrcloud/int[@name='maxUpdateConnections']"));
propMap.put(CfgProp.SOLR_MAXUPDATECONNECTIONSPERHOST, doSub("solr/solrcloud/int[@name='maxUpdateConnectionsPerHost']"));
propMap.put(CfgProp.SOLR_HOST, doSub("solr/solrcloud/str[@name='host']"));
propMap.put(CfgProp.SOLR_HOSTCONTEXT, doSub("solr/solrcloud/str[@name='hostContext']"));
propMap.put(CfgProp.SOLR_HOSTPORT, doSub("solr/solrcloud/int[@name='hostPort']"));

View File

@ -145,6 +145,10 @@ public class ConfigSolrXmlOld extends ConfigSolr {
config.getVal("solr/cores/@distribUpdateConnTimeout", false));
propMap.put(CfgProp.SOLR_DISTRIBUPDATESOTIMEOUT,
config.getVal("solr/cores/@distribUpdateSoTimeout", false));
propMap.put(CfgProp.SOLR_MAXUPDATECONNECTIONS,
config.getVal("solr/cores/@maxUpdateConnections", false));
propMap.put(CfgProp.SOLR_MAXUPDATECONNECTIONSPERHOST,
config.getVal("solr/cores/@maxUpdateConnectionsPerHost", false));
propMap.put(CfgProp.SOLR_HOST, config.getVal("solr/cores/@host", false));
propMap.put(CfgProp.SOLR_HOSTCONTEXT,
config.getVal("solr/cores/@hostContext", false));

View File

@ -24,6 +24,7 @@ import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@ -40,6 +41,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
@ -54,6 +58,7 @@ import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.logging.LogWatcher;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.IndexSchemaFactory;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.zookeeper.KeeperException;
@ -88,8 +93,7 @@ public class CoreContainer {
protected ZkContainer zkSys = new ZkContainer();
private ShardHandlerFactory shardHandlerFactory;
private ExecutorService updateExecutor = Executors.newCachedThreadPool(
new SolrjNamedThreadFactory("updateExecutor"));
private UpdateShardHandler updateShardHandler;
protected LogWatcher logging = null;
@ -102,6 +106,8 @@ public class CoreContainer {
protected final CoresLocator coresLocator;
private String hostName;
// private ClientConnectionManager clientConnectionManager = new PoolingClientConnectionManager();
{
log.info("New CoreContainer " + System.identityHashCode(this));
@ -196,6 +202,8 @@ public class CoreContainer {
}
shardHandlerFactory = ShardHandlerFactory.newInstance(cfg.getShardHandlerFactoryPluginInfo(), loader);
updateShardHandler = new UpdateShardHandler(cfg);
solrCores.allocateLazyCores(cfg.getTransientCacheSize(), loader);
@ -358,7 +366,6 @@ public class CoreContainer {
cancelCoreRecoveries();
}
try {
// First wake up the closer thread, it'll terminate almost immediately since it checks isShutDown.
synchronized (solrCores.getModifyLock()) {
@ -384,16 +391,20 @@ public class CoreContainer {
}
} finally {
if (shardHandlerFactory != null) {
shardHandlerFactory.close();
try {
if (shardHandlerFactory != null) {
shardHandlerFactory.close();
}
} finally {
try {
if (updateShardHandler != null) {
updateShardHandler.close();
}
} finally {
// we want to close zk stuff last
zkSys.close();
}
}
ExecutorUtil.shutdownAndAwaitTermination(updateExecutor);
// we want to close zk stuff last
zkSys.close();
}
org.apache.lucene.util.IOUtils.closeWhileHandlingException(loader); // best effort
}
@ -866,7 +877,7 @@ public class CoreContainer {
public InfoHandler getInfoHandler() {
return infoHandler;
}
/**
* the default core name, or null if there is no default core name
*/
@ -952,8 +963,8 @@ public class CoreContainer {
return shardHandlerFactory;
}
public ExecutorService getUpdateExecutor() {
return updateExecutor;
public UpdateShardHandler getUpdateShardHandler() {
return updateShardHandler;
}
// Just to tidy up the code where it did this in-line.
@ -969,8 +980,6 @@ public class CoreContainer {
String getCoreToOrigName(SolrCore core) {
return solrCores.getCoreToOrigName(core);
}
}
class CloserThread extends Thread {

View File

@ -51,8 +51,6 @@ public class ZkContainer {
private String host;
private int leaderVoteWait;
private Boolean genericCoreNodeNames;
private int distribUpdateConnTimeout;
private int distribUpdateSoTimeout;
public ZkContainer() {
@ -67,13 +65,11 @@ public class ZkContainer {
initZooKeeper(cc, solrHome,
config.getZkHost(), config.getZkClientTimeout(), config.getZkHostPort(), config.getZkHostContext(),
config.getHost(), config.getLeaderVoteWait(), config.getGenericCoreNodeNames(),
config.getDistributedConnectionTimeout(), config.getDistributedSocketTimeout());
config.getHost(), config.getLeaderVoteWait(), config.getGenericCoreNodeNames());
}
public void initZooKeeper(final CoreContainer cc, String solrHome, String zkHost, int zkClientTimeout, String hostPort,
String hostContext, String host, int leaderVoteWait, boolean genericCoreNodeNames,
int distribUpdateConnTimeout, int distribUpdateSoTimeout) {
String hostContext, String host, int leaderVoteWait, boolean genericCoreNodeNames) {
ZkController zkController = null;
// if zkHost sys property is not set, we are not using ZooKeeper
@ -92,8 +88,6 @@ public class ZkContainer {
this.host = host;
this.leaderVoteWait = leaderVoteWait;
this.genericCoreNodeNames = genericCoreNodeNames;
this.distribUpdateConnTimeout = distribUpdateConnTimeout;
this.distribUpdateSoTimeout = distribUpdateSoTimeout;
if (zkRun == null && zookeeperHost == null)
return; // not in zk mode
@ -147,7 +141,7 @@ public class ZkContainer {
}
zkController = new ZkController(cc, zookeeperHost, zkClientTimeout,
zkClientConnectTimeout, host, hostPort, hostContext,
leaderVoteWait, genericCoreNodeNames, distribUpdateConnTimeout, distribUpdateSoTimeout,
leaderVoteWait, genericCoreNodeNames,
new CurrentCoreDescriptorProvider() {
@Override

View File

@ -163,25 +163,18 @@ public class SnapPuller {
*/
private AtomicBoolean pollDisabled = new AtomicBoolean(false);
// HttpClient shared by all cores (used if timeout is not specified for a core)
private static HttpClient client;
// HttpClient for this instance if connectionTimeout or readTimeout has been specified
private final HttpClient myHttpClient;
private static synchronized HttpClient createHttpClient(String connTimeout, String readTimeout, String httpBasicAuthUser, String httpBasicAuthPassword, boolean useCompression) {
if (connTimeout == null && readTimeout == null && client != null) return client;
private static HttpClient createHttpClient(SolrCore core, String connTimeout, String readTimeout, String httpBasicAuthUser, String httpBasicAuthPassword, boolean useCompression) {
final ModifiableSolrParams httpClientParams = new ModifiableSolrParams();
httpClientParams.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connTimeout != null ? connTimeout : "5000");
httpClientParams.set(HttpClientUtil.PROP_SO_TIMEOUT, readTimeout != null ? readTimeout : "20000");
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER, httpBasicAuthUser);
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
// Keeping a very high number so that if you have a large number of cores
// no requests are kept waiting for an idle connection.
httpClientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
httpClientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 10000);
HttpClient httpClient = HttpClientUtil.createClient(httpClientParams);
if (client == null && connTimeout == null && readTimeout == null) client = httpClient;
HttpClient httpClient = HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager());
return httpClient;
}
@ -208,7 +201,7 @@ public class SnapPuller {
String readTimeout = (String) initArgs.get(HttpClientUtil.PROP_SO_TIMEOUT);
String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
myHttpClient = createHttpClient(connTimeout, readTimeout, httpBasicAuthUser, httpBasicAuthPassword, useExternal);
myHttpClient = createHttpClient(solrCore, connTimeout, readTimeout, httpBasicAuthUser, httpBasicAuthPassword, useExternal);
if (pollInterval != null && pollInterval > 0) {
startExecutorService();
} else {

View File

@ -805,7 +805,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
try {
core = coreContainer.getCore(cname);
if (core != null) {
syncStrategy = new SyncStrategy();
syncStrategy = new SyncStrategy(core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler());
Map<String,Object> props = new HashMap<String,Object>();
props.put(ZkStateReader.BASE_URL_PROP, zkController.getBaseUrl());

View File

@ -79,15 +79,7 @@ public class PeerSync {
private long ourHighThreshold; // 80th percentile
private boolean cantReachIsSuccess;
private boolean getNoVersionsIsSuccess;
private static final HttpClient client;
static {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
params.set(HttpClientUtil.PROP_USE_RETRY, false);
client = HttpClientUtil.createClient(params);
}
private final HttpClient client;
// comparator that sorts by absolute value, putting highest first
private static Comparator<Long> absComparator = new Comparator<Long>() {
@ -137,7 +129,7 @@ public class PeerSync {
this.maxUpdates = nUpdates;
this.cantReachIsSuccess = cantReachIsSuccess;
this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
this.client = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
uhandler = core.getUpdateHandler();
ulog = uhandler.getUpdateLog();

View File

@ -20,7 +20,6 @@ package org.apache.solr.update;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
@ -51,8 +50,8 @@ public class SolrCmdDistributor {
public boolean abortCheck();
}
public SolrCmdDistributor(ExecutorService updateExecutor) {
servers = new StreamingSolrServers(updateExecutor);
public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
servers = new StreamingSolrServers(updateShardHandler);
}
public void finish() {

View File

@ -43,24 +43,22 @@ import org.slf4j.LoggerFactory;
public class StreamingSolrServers {
public static Logger log = LoggerFactory.getLogger(StreamingSolrServers.class);
private static HttpClient httpClient;
static {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, false);
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
params.set(HttpClientUtil.PROP_USE_RETRY, false);
httpClient = HttpClientUtil.createClient(params);
}
private HttpClient httpClient;
private Map<String,ConcurrentUpdateSolrServer> solrServers = new HashMap<String,ConcurrentUpdateSolrServer>();
private List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
private ExecutorService updateExecutor;
public StreamingSolrServers(ExecutorService updateExecutor) {
this.updateExecutor = updateExecutor;
public StreamingSolrServers(UpdateShardHandler updateShardHandler) {
this.updateExecutor = updateShardHandler.getUpdateExecutor();
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, false);
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
params.set(HttpClientUtil.PROP_USE_RETRY, false);
httpClient = updateShardHandler.getHttpClient();
}
public List<Error> getErrors() {

View File

@ -17,16 +17,18 @@ package org.apache.solr.update;
* limitations under the License.
*/
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.core.ConfigSolr;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,36 +36,48 @@ public class UpdateShardHandler {
private static Logger log = LoggerFactory.getLogger(UpdateShardHandler.class);
private ThreadPoolExecutor cmdDistribExecutor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("cmdDistribExecutor"));
private ExecutorService updateExecutor = Executors.newCachedThreadPool(
new SolrjNamedThreadFactory("updateExecutor"));
private PoolingClientConnectionManager clientConnectionManager;
private final HttpClient client;
public UpdateShardHandler(int distribUpdateConnTimeout, int distribUpdateSoTimeout) {
public UpdateShardHandler(ConfigSolr cfg) {
clientConnectionManager = new PoolingClientConnectionManager();
clientConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnections());
clientConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost());
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 500);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 16);
params.set(HttpClientUtil.PROP_SO_TIMEOUT, distribUpdateSoTimeout);
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, distribUpdateConnTimeout);
client = HttpClientUtil.createClient(params);
params.set(HttpClientUtil.PROP_SO_TIMEOUT, cfg.getDistributedSocketTimeout());
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, cfg.getDistributedConnectionTimeout());
params.set(HttpClientUtil.PROP_USE_RETRY, false);
client = HttpClientUtil.createClient(params, clientConnectionManager);
}
public HttpClient getHttpClient() {
return client;
}
public ClientConnectionManager getConnectionManager() {
return clientConnectionManager;
}
public ThreadPoolExecutor getCmdDistribExecutor() {
return cmdDistribExecutor;
public ExecutorService getUpdateExecutor() {
return updateExecutor;
}
public void close() {
try {
ExecutorUtil.shutdownNowAndAwaitTermination(cmdDistribExecutor);
ExecutorUtil.shutdownAndAwaitTermination(updateExecutor);
} catch (Throwable e) {
SolrException.log(log, e);
} finally {
clientConnectionManager.shutdown();
}
client.getConnectionManager().shutdown();
}
}

View File

@ -185,7 +185,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
this.zkEnabled = coreDesc.getCoreContainer().isZooKeeperAware();
zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
if (zkEnabled) {
cmdDistrib = new SolrCmdDistributor(coreDesc.getCoreContainer().getUpdateExecutor());
cmdDistrib = new SolrCmdDistributor(coreDesc.getCoreContainer().getUpdateShardHandler());
}
//this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
@ -667,7 +667,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
};
ExecutorService executor = req.getCore().getCoreDescriptor().getCoreContainer().getUpdateExecutor();
ExecutorService executor = req.getCore().getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getUpdateExecutor();
executor.execute(thread);
}

View File

@ -27,6 +27,8 @@
<solrcloud>
<int name="distribUpdateConnTimeout">22</int>
<int name="distribUpdateSoTimeout">33</int>
<int name="maxUpdateConnections">3</int>
<int name="maxUpdateConnectionsPerHost">37</int>
<int name="leaderVoteWait">55</int>
<str name="host">testHost</str>
<str name="hostContext">testHostContext</str>

View File

@ -179,7 +179,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
cc = getCoreContainer();
ZkController zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, 10000,
"127.0.0.1", "8983", "solr", 0, true, 10000, 10000, new CurrentCoreDescriptorProvider() {
"127.0.0.1", "8983", "solr", 0, true, new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {
@ -219,7 +219,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
cc = getCoreContainer();
zkController = new ZkController(cc, server.getZkAddress(),
TIMEOUT, 10000, "127.0.0.1", "8983", "solr", 0, true, 10000, 10000, new CurrentCoreDescriptorProvider() {
TIMEOUT, 10000, "127.0.0.1", "8983", "solr", 0, true, new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {

View File

@ -51,6 +51,8 @@ public class TestSolrXml extends SolrTestCaseJ4 {
assertEquals("Did not find expected value", cfg.get(ConfigSolr.CfgProp.SOLR_COREROOTDIRECTORY, null), "testCoreRootDirectory");
assertEquals("Did not find expected value", cfg.getInt(ConfigSolr.CfgProp.SOLR_DISTRIBUPDATECONNTIMEOUT, 0), 22);
assertEquals("Did not find expected value", cfg.getInt(ConfigSolr.CfgProp.SOLR_DISTRIBUPDATESOTIMEOUT, 0), 33);
assertEquals("Did not find expected value", cfg.getInt(ConfigSolr.CfgProp.SOLR_MAXUPDATECONNECTIONS, 0), 3);
assertEquals("Did not find expected value", cfg.getInt(ConfigSolr.CfgProp.SOLR_MAXUPDATECONNECTIONSPERHOST, 0), 37);
assertEquals("Did not find expected value", cfg.get(ConfigSolr.CfgProp.SOLR_HOST, null), "testHost");
assertEquals("Did not find expected value", cfg.get(ConfigSolr.CfgProp.SOLR_HOSTCONTEXT, null), "testHostContext");
assertEquals("Did not find expected value", cfg.getInt(ConfigSolr.CfgProp.SOLR_HOSTPORT, 0), 44);

View File

@ -18,13 +18,14 @@ package org.apache.solr.update;
*/
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.client.solrj.SolrQuery;
@ -37,10 +38,10 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.core.ConfigSolr;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoresLocator;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrEventListener;
import org.apache.solr.search.SolrIndexSearcher;
@ -51,6 +52,7 @@ import org.apache.solr.update.SolrCmdDistributor.RetryNode;
import org.apache.solr.update.SolrCmdDistributor.StdNode;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.junit.BeforeClass;
import org.xml.sax.SAXException;
public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
@BeforeClass
@ -60,10 +62,26 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
// being able to call optimize to have all deletes expunged.
System.setProperty("solr.tests.mergePolicy", LogDocMergePolicy.class.getName());
}
private ExecutorService updateExecutor = Executors.newCachedThreadPool(
new SolrjNamedThreadFactory("updateExecutor"));
private UpdateShardHandler updateShardHandler;
public SolrCmdDistributorTest() {
public SolrCmdDistributorTest() throws ParserConfigurationException, IOException, SAXException {
updateShardHandler = new UpdateShardHandler(new ConfigSolr() {
@Override
public CoresLocator getCoresLocator() {
return null;
}
@Override
protected String getShardHandlerFactoryConfigPath() {
return null;
}
@Override
public boolean isPersistent() {
return false;
}});
fixShardCount = true;
shardCount = 4;
stress = 0;
@ -107,7 +125,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
public void doTest() throws Exception {
del("*:*");
SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateExecutor);
SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler);
ModifiableSolrParams params = new ModifiableSolrParams();
@ -147,7 +165,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
// add another 2 docs to control and 3 to client
cmdDistrib = new SolrCmdDistributor(updateExecutor);
cmdDistrib = new SolrCmdDistributor(updateShardHandler);
cmd.solrDoc = sdoc("id", 2);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
@ -190,7 +208,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
cmdDistrib = new SolrCmdDistributor(updateExecutor);
cmdDistrib = new SolrCmdDistributor(updateShardHandler);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
@ -223,7 +241,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
int id = 5;
cmdDistrib = new SolrCmdDistributor(updateExecutor);
cmdDistrib = new SolrCmdDistributor(updateShardHandler);
int cnt = atLeast(303);
for (int i = 0; i < cnt; i++) {
@ -295,7 +313,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
}
// Test RetryNode
cmdDistrib = new SolrCmdDistributor(updateExecutor);
cmdDistrib = new SolrCmdDistributor(updateShardHandler);
final HttpSolrServer solrclient = (HttpSolrServer) clients.get(0);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
@ -343,7 +361,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
@Override
public void tearDown() throws Exception {
ExecutorUtil.shutdownNowAndAwaitTermination(updateExecutor);
updateShardHandler.close();
super.tearDown();
}
}

View File

@ -33,12 +33,13 @@ import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.HttpClient;
import org.apache.http.client.params.ClientParamBean;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.entity.HttpEntityWrapper;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.SystemDefaultHttpClient;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; // jdoc
import org.apache.http.impl.client.SystemDefaultHttpClient;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; // jdoc
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.protocol.HttpContext;
import org.apache.solr.common.params.ModifiableSolrParams;
@ -91,6 +92,7 @@ public class HttpClientUtil {
public static void setConfigurer(HttpClientConfigurer newConfigurer) {
configurer = newConfigurer;
}
/**
* Creates new http client by using the provided configuration.
*
@ -107,6 +109,20 @@ public class HttpClientUtil {
configureClient(httpClient, config);
return httpClient;
}
/**
* Creates new http client by using the provided configuration.
*
*/
public static HttpClient createClient(final SolrParams params, ClientConnectionManager cm) {
final ModifiableSolrParams config = new ModifiableSolrParams(params);
if (logger.isDebugEnabled()) {
logger.debug("Creating new http client, config:" + config);
}
final DefaultHttpClient httpClient = new DefaultHttpClient(cm);
configureClient(httpClient, config);
return httpClient;
}
/**
* Configures {@link DefaultHttpClient}, only sets parameters if they are