mirror of https://github.com/apache/lucene.git
SOLR-9290: TCP-connections in CLOSE_WAIT spike during heavy indexing and do not decrease
This commit is contained in:
parent
83bbd32f4d
commit
bb7742ebc7
|
@ -113,6 +113,9 @@ Bug Fixes
|
|||
* SOLR-9287: Including 'score' in the 'fl' param when doing an RTG no longer causes an NPE
|
||||
(hossman, Ishan Chattopadhyaya)
|
||||
|
||||
* SOLR-9290: TCP-connections in CLOSE_WAIT spike during heavy indexing and do not decrease.
|
||||
(Mads Tomasgård Bjørgan, Anshum Gupta, Shai Erera, hossman, Mark Miller, yonik, shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -281,6 +281,8 @@ public class SolrXmlConfig {
|
|||
int maxUpdateConnectionsPerHost = UpdateShardHandlerConfig.DEFAULT_MAXUPDATECONNECTIONSPERHOST;
|
||||
int distributedSocketTimeout = UpdateShardHandlerConfig.DEFAULT_DISTRIBUPDATESOTIMEOUT;
|
||||
int distributedConnectionTimeout = UpdateShardHandlerConfig.DEFAULT_DISTRIBUPDATECONNTIMEOUT;
|
||||
int updateConnectionsEvictorSleepDelay = UpdateShardHandlerConfig.DEFAULT_UPDATECONNECTIONSEVICTORSLEEPDELAY;
|
||||
int maxUpdateConnectionIdleTime = UpdateShardHandlerConfig.DEFAULT_MAXUPDATECONNECTIONIDLETIME;
|
||||
|
||||
Object muc = nl.remove("maxUpdateConnections");
|
||||
if (muc != null) {
|
||||
|
@ -306,10 +308,23 @@ public class SolrXmlConfig {
|
|||
defined = true;
|
||||
}
|
||||
|
||||
Object ucesd = nl.remove("updateConnectionsEvictorSleepDelay");
|
||||
if (ucesd != null) {
|
||||
updateConnectionsEvictorSleepDelay = parseInt("updateConnectionsEvictorSleepDelay", ucesd.toString());
|
||||
defined = true;
|
||||
}
|
||||
|
||||
Object mucit = nl.remove("maxUpdateConnectionIdleTime");
|
||||
if (mucit != null) {
|
||||
maxUpdateConnectionIdleTime = parseInt("maxUpdateConnectionIdleTime", mucit.toString());
|
||||
defined = true;
|
||||
}
|
||||
|
||||
if (!defined && !alwaysDefine)
|
||||
return null;
|
||||
|
||||
return new UpdateShardHandlerConfig(maxUpdateConnections, maxUpdateConnectionsPerHost, distributedSocketTimeout, distributedConnectionTimeout);
|
||||
return new UpdateShardHandlerConfig(maxUpdateConnections, maxUpdateConnectionsPerHost, distributedSocketTimeout,
|
||||
distributedConnectionTimeout, updateConnectionsEvictorSleepDelay, maxUpdateConnectionIdleTime);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -17,8 +17,11 @@
|
|||
package org.apache.solr.handler.component;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.DefaultHttpClient;
|
||||
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
|
||||
import org.apache.http.impl.conn.PoolingClientConnectionManager;
|
||||
import org.apache.http.impl.conn.SchemeRegistryFactory;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.HttpClientConfigurer;
|
||||
import org.apache.solr.client.solrj.impl.HttpClientUtil;
|
||||
|
@ -27,10 +30,12 @@ import org.apache.solr.client.solrj.impl.LBHttpSolrClient.Builder;
|
|||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.IOUtils;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.common.util.URLUtil;
|
||||
import org.apache.solr.core.PluginInfo;
|
||||
import org.apache.solr.update.UpdateShardHandler;
|
||||
import org.apache.solr.update.UpdateShardHandlerConfig;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -68,7 +73,8 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
|||
new DefaultSolrThreadFactory("httpShardExecutor")
|
||||
);
|
||||
|
||||
protected HttpClient defaultClient;
|
||||
protected PoolingClientConnectionManager clientConnectionManager;
|
||||
protected CloseableHttpClient defaultClient;
|
||||
private LBHttpSolrClient loadbalancer;
|
||||
//default values:
|
||||
int soTimeout = UpdateShardHandlerConfig.DEFAULT_DISTRIBUPDATESOTIMEOUT;
|
||||
|
@ -81,6 +87,10 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
|||
int queueSize = -1;
|
||||
boolean accessPolicy = false;
|
||||
boolean useRetries = false;
|
||||
int maxConnectionIdleTime = UpdateShardHandlerConfig.DEFAULT_MAXUPDATECONNECTIONIDLETIME;
|
||||
int connectionsEvictorSleepDelay = UpdateShardHandlerConfig.DEFAULT_UPDATECONNECTIONSEVICTORSLEEPDELAY;
|
||||
|
||||
protected UpdateShardHandler.IdleConnectionsEvictor idleConnectionsEvictor;
|
||||
|
||||
private String scheme = null;
|
||||
|
||||
|
@ -108,6 +118,10 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
|||
// due to connection pooling limitations / races
|
||||
static final String USE_RETRIES = "useRetries";
|
||||
|
||||
static final String CONNECTIONS_EVICTOR_SLEEP_DELAY = "connectionsEvictorSleepDelay";
|
||||
|
||||
static final String MAX_CONNECTION_IDLE_TIME = "maxConnectionIdleTime";
|
||||
|
||||
/**
|
||||
* Get {@link ShardHandler} that uses the default http client.
|
||||
*/
|
||||
|
@ -142,6 +156,9 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
|||
this.queueSize = getParameter(args, INIT_SIZE_OF_QUEUE, queueSize,sb);
|
||||
this.accessPolicy = getParameter(args, INIT_FAIRNESS_POLICY, accessPolicy,sb);
|
||||
this.useRetries = getParameter(args, USE_RETRIES, useRetries,sb);
|
||||
this.connectionsEvictorSleepDelay = getParameter(args, CONNECTIONS_EVICTOR_SLEEP_DELAY, connectionsEvictorSleepDelay, sb);
|
||||
this.maxConnectionIdleTime = getParameter(args, MAX_CONNECTION_IDLE_TIME, maxConnectionIdleTime, sb);
|
||||
|
||||
log.info("created with {}",sb);
|
||||
|
||||
// magic sysprop to make tests reproducible: set by SolrTestCaseJ4.
|
||||
|
@ -164,7 +181,13 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
|||
|
||||
ModifiableSolrParams clientParams = getClientParams();
|
||||
|
||||
this.defaultClient = HttpClientUtil.createClient(clientParams);
|
||||
this.clientConnectionManager = new PoolingClientConnectionManager(SchemeRegistryFactory.createSystemDefault());
|
||||
clientConnectionManager.setDefaultMaxPerRoute(maxConnectionsPerHost);
|
||||
clientConnectionManager.setMaxTotal(maxConnections);
|
||||
this.defaultClient = HttpClientUtil.createClient(clientParams, clientConnectionManager);
|
||||
this.idleConnectionsEvictor = new UpdateShardHandler.IdleConnectionsEvictor(clientConnectionManager,
|
||||
connectionsEvictorSleepDelay, TimeUnit.MILLISECONDS, maxConnectionIdleTime, TimeUnit.MILLISECONDS);
|
||||
idleConnectionsEvictor.start();
|
||||
|
||||
// must come after createClient
|
||||
if (useRetries) {
|
||||
|
@ -178,8 +201,6 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
|||
|
||||
protected ModifiableSolrParams getClientParams() {
|
||||
ModifiableSolrParams clientParams = new ModifiableSolrParams();
|
||||
clientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, maxConnectionsPerHost);
|
||||
clientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS, maxConnections);
|
||||
clientParams.set(HttpClientUtil.PROP_SO_TIMEOUT, soTimeout);
|
||||
clientParams.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connectionTimeout);
|
||||
if (!useRetries) {
|
||||
|
@ -226,8 +247,12 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
|||
ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
|
||||
} finally {
|
||||
try {
|
||||
if (defaultClient != null) {
|
||||
defaultClient.getConnectionManager().shutdown();
|
||||
if (idleConnectionsEvictor != null) {
|
||||
idleConnectionsEvictor.shutdown();
|
||||
}
|
||||
IOUtils.closeQuietly(defaultClient);
|
||||
if (clientConnectionManager != null) {
|
||||
clientConnectionManager.shutdown();
|
||||
}
|
||||
} finally {
|
||||
|
||||
|
|
|
@ -16,12 +16,18 @@
|
|||
*/
|
||||
package org.apache.solr.update;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.conn.ClientConnectionManager;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.DefaultHttpClient;
|
||||
import org.apache.http.impl.conn.PoolingClientConnectionManager;
|
||||
import org.apache.http.impl.conn.SchemeRegistryFactory;
|
||||
import org.apache.http.util.Args;
|
||||
import org.apache.solr.client.solrj.impl.HttpClientConfigurer;
|
||||
import org.apache.solr.client.solrj.impl.HttpClientUtil;
|
||||
import org.apache.solr.cloud.RecoveryStrategy;
|
||||
|
@ -33,9 +39,6 @@ import org.apache.solr.common.util.SolrjNamedThreadFactory;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
public class UpdateShardHandler {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
@ -59,6 +62,8 @@ public class UpdateShardHandler {
|
|||
|
||||
private final UpdateShardHandlerConfig cfg;
|
||||
|
||||
private IdleConnectionsEvictor idleConnectionsEvictor;
|
||||
|
||||
public UpdateShardHandler(UpdateShardHandlerConfig cfg) {
|
||||
this.cfg = cfg;
|
||||
clientConnectionManager = new PoolingClientConnectionManager(SchemeRegistryFactory.createSystemDefault());
|
||||
|
@ -70,6 +75,13 @@ public class UpdateShardHandler {
|
|||
ModifiableSolrParams clientParams = getClientParams();
|
||||
log.info("Creating UpdateShardHandler HTTP client with params: {}", clientParams);
|
||||
client = HttpClientUtil.createClient(clientParams, clientConnectionManager);
|
||||
|
||||
if (cfg != null) {
|
||||
idleConnectionsEvictor = new IdleConnectionsEvictor(clientConnectionManager,
|
||||
cfg.getUpdateConnectionsEvictorSleepDelay(), TimeUnit.MILLISECONDS,
|
||||
cfg.getMaxUpdateConnectionIdleTime(), TimeUnit.MILLISECONDS);
|
||||
idleConnectionsEvictor.start();
|
||||
}
|
||||
}
|
||||
|
||||
protected ModifiableSolrParams getClientParams() {
|
||||
|
@ -127,6 +139,9 @@ public class UpdateShardHandler {
|
|||
// we interrupt on purpose here, but this executor should not run threads that do disk IO!
|
||||
ExecutorUtil.shutdownWithInterruptAndAwaitTermination(updateExecutor);
|
||||
ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor);
|
||||
if (idleConnectionsEvictor != null) {
|
||||
idleConnectionsEvictor.shutdown();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, e);
|
||||
} finally {
|
||||
|
@ -135,4 +150,85 @@ public class UpdateShardHandler {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is adapted from org.apache.http.impl.client.IdleConnectionEvictor and changed to use
|
||||
* the deprecated ClientConnectionManager instead of the new HttpClientConnectionManager.
|
||||
* <p>
|
||||
* This class maintains a background thread to enforce an eviction policy for expired / idle
|
||||
* persistent connections kept alive in the connection pool.
|
||||
* <p>
|
||||
* See SOLR-9290 for related discussion.
|
||||
*/
|
||||
public static final class IdleConnectionsEvictor {
|
||||
|
||||
private final ClientConnectionManager connectionManager;
|
||||
private final ThreadFactory threadFactory;
|
||||
private final Thread thread;
|
||||
private final long sleepTimeMs;
|
||||
private final long maxIdleTimeMs;
|
||||
|
||||
private volatile Exception exception;
|
||||
|
||||
public IdleConnectionsEvictor(
|
||||
final ClientConnectionManager connectionManager,
|
||||
final ThreadFactory threadFactory,
|
||||
final long sleepTime, final TimeUnit sleepTimeUnit,
|
||||
final long maxIdleTime, final TimeUnit maxIdleTimeUnit) {
|
||||
this.connectionManager = Args.notNull(connectionManager, "Connection manager");
|
||||
this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory();
|
||||
this.sleepTimeMs = sleepTimeUnit != null ? sleepTimeUnit.toMillis(sleepTime) : sleepTime;
|
||||
this.maxIdleTimeMs = maxIdleTimeUnit != null ? maxIdleTimeUnit.toMillis(maxIdleTime) : maxIdleTime;
|
||||
this.thread = this.threadFactory.newThread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
Thread.sleep(sleepTimeMs);
|
||||
connectionManager.closeExpiredConnections();
|
||||
if (maxIdleTimeMs > 0) {
|
||||
connectionManager.closeIdleConnections(maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
exception = ex;
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public IdleConnectionsEvictor(ClientConnectionManager connectionManager,
|
||||
long sleepTime, TimeUnit sleepTimeUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
|
||||
this(connectionManager, null, sleepTime, sleepTimeUnit, maxIdleTime, maxIdleTimeUnit);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
thread.start();
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
thread.interrupt();
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
return thread.isAlive();
|
||||
}
|
||||
|
||||
public void awaitTermination(final long time, final TimeUnit tunit) throws InterruptedException {
|
||||
thread.join((tunit != null ? tunit : TimeUnit.MILLISECONDS).toMillis(time));
|
||||
}
|
||||
|
||||
static class DefaultThreadFactory implements ThreadFactory {
|
||||
|
||||
@Override
|
||||
public Thread newThread(final Runnable r) {
|
||||
final Thread t = new Thread(r, "solr-idle-connections-evictor");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,10 +22,13 @@ public class UpdateShardHandlerConfig {
|
|||
public static final int DEFAULT_DISTRIBUPDATESOTIMEOUT = 600000;
|
||||
public static final int DEFAULT_MAXUPDATECONNECTIONS = 100000;
|
||||
public static final int DEFAULT_MAXUPDATECONNECTIONSPERHOST = 100000;
|
||||
public static final int DEFAULT_UPDATECONNECTIONSEVICTORSLEEPDELAY = 5000;
|
||||
public static final int DEFAULT_MAXUPDATECONNECTIONIDLETIME = 40000;
|
||||
|
||||
public static final UpdateShardHandlerConfig DEFAULT
|
||||
= new UpdateShardHandlerConfig(DEFAULT_MAXUPDATECONNECTIONS, DEFAULT_MAXUPDATECONNECTIONSPERHOST,
|
||||
DEFAULT_DISTRIBUPDATESOTIMEOUT, DEFAULT_DISTRIBUPDATECONNTIMEOUT);
|
||||
DEFAULT_DISTRIBUPDATESOTIMEOUT, DEFAULT_DISTRIBUPDATECONNTIMEOUT,
|
||||
DEFAULT_UPDATECONNECTIONSEVICTORSLEEPDELAY, DEFAULT_MAXUPDATECONNECTIONIDLETIME);
|
||||
|
||||
private final int maxUpdateConnections;
|
||||
|
||||
|
@ -35,11 +38,17 @@ public class UpdateShardHandlerConfig {
|
|||
|
||||
private final int distributedConnectionTimeout;
|
||||
|
||||
public UpdateShardHandlerConfig(int maxUpdateConnections, int maxUpdateConnectionsPerHost, int distributedSocketTimeout, int distributedConnectionTimeout) {
|
||||
private final int updateConnectionsEvictorSleepDelay;
|
||||
|
||||
private final int maxUpdateConnectionIdleTime;
|
||||
|
||||
public UpdateShardHandlerConfig(int maxUpdateConnections, int maxUpdateConnectionsPerHost, int distributedSocketTimeout, int distributedConnectionTimeout, int updateConnectionsEvictorSleepDelay, int maxUpdateConnectionIdleTime) {
|
||||
this.maxUpdateConnections = maxUpdateConnections;
|
||||
this.maxUpdateConnectionsPerHost = maxUpdateConnectionsPerHost;
|
||||
this.distributedSocketTimeout = distributedSocketTimeout;
|
||||
this.distributedConnectionTimeout = distributedConnectionTimeout;
|
||||
this.updateConnectionsEvictorSleepDelay = updateConnectionsEvictorSleepDelay;
|
||||
this.maxUpdateConnectionIdleTime = maxUpdateConnectionIdleTime;
|
||||
}
|
||||
|
||||
public int getMaxUpdateConnectionsPerHost() {
|
||||
|
@ -57,4 +66,12 @@ public class UpdateShardHandlerConfig {
|
|||
public int getDistributedConnectionTimeout() {
|
||||
return distributedConnectionTimeout;
|
||||
}
|
||||
|
||||
public int getUpdateConnectionsEvictorSleepDelay() {
|
||||
return updateConnectionsEvictorSleepDelay;
|
||||
}
|
||||
|
||||
public int getMaxUpdateConnectionIdleTime() {
|
||||
return maxUpdateConnectionIdleTime;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,16 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.cloud.*;
|
||||
import org.apache.solr.common.cloud.ClusterProperties;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkConfigManager;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CloudConfig;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
|
@ -331,6 +340,8 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
|
|||
|
||||
private static class MockCoreContainer extends CoreContainer {
|
||||
|
||||
UpdateShardHandler updateShardHandler;
|
||||
|
||||
public MockCoreContainer() {
|
||||
super((Object)null);
|
||||
this.shardHandlerFactory = new HttpShardHandlerFactory();
|
||||
|
@ -342,8 +353,13 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
|
|||
|
||||
@Override
|
||||
public UpdateShardHandler getUpdateShardHandler() {
|
||||
return new UpdateShardHandler(UpdateShardHandlerConfig.DEFAULT);
|
||||
return this.updateShardHandler = new UpdateShardHandler(UpdateShardHandlerConfig.DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
super.shutdown();
|
||||
updateShardHandler.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -336,7 +336,6 @@ public class TestCoreContainer extends SolrTestCaseJ4 {
|
|||
|
||||
CoreContainer cc = init(CUSTOM_HANDLERS_SOLR_XML);
|
||||
try {
|
||||
cc.load();
|
||||
assertThat(cc.getCollectionsHandler(), is(instanceOf(CustomCollectionsHandler.class)));
|
||||
assertThat(cc.getInfoHandler(), is(instanceOf(CustomInfoHandler.class)));
|
||||
assertThat(cc.getMultiCoreHandler(), is(instanceOf(CustomCoreAdminHandler.class)));
|
||||
|
|
|
@ -177,7 +177,7 @@ public class TestHarness extends BaseTestHarness {
|
|||
UpdateShardHandlerConfig updateShardHandlerConfig
|
||||
= new UpdateShardHandlerConfig(UpdateShardHandlerConfig.DEFAULT_MAXUPDATECONNECTIONS,
|
||||
UpdateShardHandlerConfig.DEFAULT_MAXUPDATECONNECTIONSPERHOST,
|
||||
30000, 30000);
|
||||
30000, 30000, 5000, 50000);
|
||||
return new NodeConfig.NodeConfigBuilder("testNode", loader)
|
||||
.setUseSchemaCache(Boolean.getBoolean("shareSchema"))
|
||||
.setCloudConfig(cloudConfig)
|
||||
|
|
Loading…
Reference in New Issue