SOLR-12293: Updates need to use their own connection pool to maintain connection reuse and prevent spurious recoveries.

This commit is contained in:
Mark Miller 2018-05-04 20:02:56 -05:00
parent 296201055f
commit 3a2572db79
15 changed files with 73 additions and 54 deletions

View File

@ -209,6 +209,9 @@ Bug Fixes
* SOLR-12290: Do not close any servlet streams and improve our servlet stream closing prevention code for users
and devs. (Mark Miller)
* SOLR-12293: Updates need to use their own connection pool to maintain connection reuse and prevent spurious
recoveries. (Mark Miller)
Optimizations
----------------------

View File

@ -69,7 +69,7 @@ public class SyncStrategy {
public SyncStrategy(CoreContainer cc) {
UpdateShardHandler updateShardHandler = cc.getUpdateShardHandler();
client = updateShardHandler.getHttpClient();
client = updateShardHandler.getDefaultHttpClient();
shardHandler = cc.getShardHandlerFactory().getShardHandler();
updateExecutor = updateShardHandler.getUpdateExecutor();
}

View File

@ -33,6 +33,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
@ -43,6 +44,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.SimplePostTool;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.slf4j.Logger;
@ -155,16 +157,20 @@ public class BlobRepository {
Replica replica = getSystemCollReplica();
String url = replica.getStr(BASE_URL_PROP) + "/" + CollectionAdminParams.SYSTEM_COLL + "/blob/" + key + "?wt=filestream";
HttpClient httpClient = coreContainer.getUpdateShardHandler().getHttpClient();
HttpClient httpClient = coreContainer.getUpdateShardHandler().getDefaultHttpClient();
HttpGet httpGet = new HttpGet(url);
ByteBuffer b;
HttpResponse response = null;
HttpEntity entity = null;
try {
HttpResponse entity = httpClient.execute(httpGet);
int statusCode = entity.getStatusLine().getStatusCode();
response = httpClient.execute(httpGet);
entity = response.getEntity();
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "no such blob or version available: " + key);
}
try (InputStream is = entity.getEntity().getContent()) {
try (InputStream is = entity.getContent()) {
b = SimplePostTool.inputStreamToByteArray(is);
}
} catch (Exception e) {
@ -174,7 +180,7 @@ public class BlobRepository {
throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "could not load : " + key, e);
}
} finally {
httpGet.releaseConnection();
Utils.consumeFully(entity);
}
return b;
}

View File

@ -245,7 +245,7 @@ public class IndexFetcher {
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
return HttpClientUtil.createClient(httpClientParams, core.getCoreContainer().getUpdateShardHandler().getConnectionManager(), true);
return HttpClientUtil.createClient(httpClientParams, core.getCoreContainer().getUpdateShardHandler().getDefaultConnectionManager(), true);
}
public IndexFetcher(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {

View File

@ -126,7 +126,7 @@ public class AutoscalingHistoryHandler extends RequestHandlerBase implements Per
}
}
try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(coreContainer.getZkController().getZkServerAddress()), Optional.empty())
.withHttpClient(coreContainer.getUpdateShardHandler().getHttpClient())
.withHttpClient(coreContainer.getUpdateShardHandler().getDefaultHttpClient())
.build()) {
QueryResponse qr = cloudSolrClient.query(collection, params);
rsp.setAllValues(qr.getResponse());

View File

@ -207,7 +207,7 @@ public class SolrClusterReporter extends SolrCoreContainerReporter {
log.info("Turning off node reporter, period=" + period);
return;
}
HttpClient httpClient = cc.getUpdateShardHandler().getHttpClient();
HttpClient httpClient = cc.getUpdateShardHandler().getDefaultHttpClient();
ZkController zk = cc.getZkController();
String reporterId = zk.getNodeName();
reporter = SolrReporter.Builder.forReports(metricManager, reports)

View File

@ -154,7 +154,7 @@ public class SolrShardReporter extends SolrCoreReporter {
.cloudClient(false) // we want to send reports specifically to a selected leader instance
.skipAggregateValues(true) // we don't want to transport details of aggregates
.skipHistograms(true) // we don't want to transport histograms
.build(core.getCoreContainer().getUpdateShardHandler().getHttpClient(), new LeaderUrlSupplier(core));
.build(core.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient(), new LeaderUrlSupplier(core));
reporter.start(period, TimeUnit.SECONDS);
}

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
@ -195,12 +196,14 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
PublicKey getRemotePublicKey(String nodename) {
if (!cores.getZkController().getZkStateReader().getClusterState().getLiveNodes().contains(nodename)) return null;
String url = cores.getZkController().getZkStateReader().getBaseUrlForNodeName(nodename);
HttpEntity entity = null;
try {
String uri = url + PATH + "?wt=json&omitHeader=true";
log.debug("Fetching fresh public key from : {}",uri);
HttpResponse rsp = cores.getUpdateShardHandler().getHttpClient()
HttpResponse rsp = cores.getUpdateShardHandler().getDefaultHttpClient()
.execute(new HttpGet(uri), HttpClientUtil.createNewHttpClientRequestContext());
byte[] bytes = EntityUtils.toByteArray(rsp.getEntity());
entity = rsp.getEntity();
byte[] bytes = EntityUtils.toByteArray(entity);
Map m = (Map) Utils.fromJSON(bytes);
String key = (String) m.get("key");
if (key == null) {
@ -215,6 +218,8 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
} catch (Exception e) {
log.error("Exception trying to get public key from : " + url, e);
return null;
} finally {
Utils.consumeFully(entity);
}
}

View File

@ -172,7 +172,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
coresInit = createCoreContainer(solrHome == null ? SolrResourceLoader.locateSolrHome() : Paths.get(solrHome),
extraProperties);
this.httpClient = coresInit.getUpdateShardHandler().getHttpClient();
this.httpClient = coresInit.getUpdateShardHandler().getDefaultHttpClient();
setupJvmMetrics(coresInit);
log.debug("user.dir=" + System.getProperty("user.dir"));
}

View File

@ -360,11 +360,8 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
// have to 'wait in line' a bit or bail if a recovery is
// already queued up - the recovery execution itself is run
// in another thread on another 'recovery' executor.
// The update executor is interrupted on shutdown and should
// not do disk IO.
// The recovery executor is not interrupted on shutdown.
//
// avoid deadlock: we can't use the recovery executor here
// avoid deadlock: we can't use the recovery executor here!
cc.getUpdateShardHandler().getUpdateExecutor().submit(recoveryTask);
} catch (RejectedExecutionException e) {
// fine, we are shutting down

View File

@ -156,7 +156,7 @@ public class PeerSync implements SolrMetricProducer {
this.cantReachIsSuccess = cantReachIsSuccess;
this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
this.doFingerprint = doFingerprint && !("true".equals(System.getProperty("solr.disableFingerprint")));
this.client = core.getCoreContainer().getUpdateShardHandler().getHttpClient();
this.client = core.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
this.onlyIfActive = onlyIfActive;
uhandler = core.getUpdateHandler();

View File

@ -49,7 +49,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/**
@ -67,7 +66,6 @@ public class SolrCmdDistributor implements Closeable {
private final List<Error> allErrors = new ArrayList<>();
private final List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
private final ExecutorService updateExecutor;
private final CompletionService<Object> completionService;
private final Set<Future<Object>> pending = new HashSet<>();
@ -78,16 +76,14 @@ public class SolrCmdDistributor implements Closeable {
public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
this.clients = new StreamingSolrClients(updateShardHandler);
this.updateExecutor = updateShardHandler.getUpdateExecutor();
this.completionService = new ExecutorCompletionService<>(updateExecutor);
this.completionService = new ExecutorCompletionService<>(updateShardHandler.getUpdateExecutor());
}
public SolrCmdDistributor(StreamingSolrClients clients, int maxRetriesOnForward, int retryPause) {
this.clients = clients;
this.maxRetriesOnForward = maxRetriesOnForward;
this.retryPause = retryPause;
this.updateExecutor = clients.getUpdateExecutor();
completionService = new ExecutorCompletionService<>(updateExecutor);
completionService = new ExecutorCompletionService<>(clients.getUpdateExecutor());
}
public void finish() {

View File

@ -54,7 +54,7 @@ public class StreamingSolrClients {
public StreamingSolrClients(UpdateShardHandler updateShardHandler) {
this.updateExecutor = updateShardHandler.getUpdateExecutor();
httpClient = updateShardHandler.getHttpClient();
httpClient = updateShardHandler.getUpdateOnlyHttpClient();
}
public List<Error> getErrors() {

View File

@ -27,7 +27,6 @@ import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.cloud.RecoveryStrategy;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
@ -51,8 +50,6 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
/*
* A downside to configuring an upper bound will be big update reorders (when that upper bound is hit)
* and then undetected shard inconsistency as a result.
* This update executor is used for different things too... both update streams (which may be very long lived)
* and control messages (peersync? LIR?) and could lead to starvation if limited.
* Therefore this thread pool is left unbounded. See SOLR-8205
*/
private ExecutorService updateExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
@ -60,20 +57,28 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
private ExecutorService recoveryExecutor;
private final CloseableHttpClient client;
private final CloseableHttpClient updateOnlyClient;
private final CloseableHttpClient defaultClient;
private final InstrumentedPoolingHttpClientConnectionManager clientConnectionManager;
private final InstrumentedPoolingHttpClientConnectionManager updateOnlyConnectionManager;
private final InstrumentedPoolingHttpClientConnectionManager defaultConnectionManager;
private final InstrumentedHttpRequestExecutor httpRequestExecutor;
private final Set<String> metricNames = ConcurrentHashMap.newKeySet();
private MetricRegistry registry;
public UpdateShardHandler(UpdateShardHandlerConfig cfg) {
clientConnectionManager = new InstrumentedPoolingHttpClientConnectionManager(HttpClientUtil.getSchemaRegisteryProvider().getSchemaRegistry());
updateOnlyConnectionManager = new InstrumentedPoolingHttpClientConnectionManager(HttpClientUtil.getSchemaRegisteryProvider().getSchemaRegistry());
defaultConnectionManager = new InstrumentedPoolingHttpClientConnectionManager(HttpClientUtil.getSchemaRegisteryProvider().getSchemaRegistry());
if (cfg != null ) {
clientConnectionManager.setMaxTotal(cfg.getMaxUpdateConnections());
clientConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost());
updateOnlyConnectionManager.setMaxTotal(cfg.getMaxUpdateConnections());
updateOnlyConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost());
defaultConnectionManager.setMaxTotal(cfg.getMaxUpdateConnections());
defaultConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost());
}
ModifiableSolrParams clientParams = new ModifiableSolrParams();
@ -90,8 +95,10 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
}
}
httpRequestExecutor = new InstrumentedHttpRequestExecutor(metricNameStrategy);
client = HttpClientUtil.createClient(clientParams, clientConnectionManager, false, httpRequestExecutor);
updateOnlyClient = HttpClientUtil.createClient(clientParams, updateOnlyConnectionManager, false, httpRequestExecutor);
defaultClient = HttpClientUtil.createClient(clientParams, defaultConnectionManager, false, httpRequestExecutor);
// following is done only for logging complete configuration.
// The maxConnections and maxConnectionsPerHost have already been specified on the connection manager
@ -99,7 +106,8 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
clientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS, cfg.getMaxUpdateConnections());
clientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, cfg.getMaxUpdateConnectionsPerHost());
}
log.debug("Created UpdateShardHandler HTTP client with params: {}", clientParams);
log.debug("Created default UpdateShardHandler HTTP client with params: {}", clientParams);
log.debug("Created update only UpdateShardHandler HTTP client with params: {}", clientParams);
ThreadFactory recoveryThreadFactory = new SolrjNamedThreadFactory("recoveryExecutor");
if (cfg != null && cfg.getMaxRecoveryThreads() > 0) {
@ -120,10 +128,10 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
public void initializeMetrics(SolrMetricManager manager, String registryName, String tag, String scope) {
registry = manager.registry(registryName);
String expandedScope = SolrMetricManager.mkName(scope, getCategory().name());
clientConnectionManager.initializeMetrics(manager, registryName, tag, expandedScope);
httpRequestExecutor.initializeMetrics(manager, registryName, tag, expandedScope);
updateOnlyConnectionManager.initializeMetrics(manager, registryName, tag, expandedScope);
defaultConnectionManager.initializeMetrics(manager, registryName, tag, expandedScope);
updateExecutor = MetricUtils.instrumentedExecutorService(updateExecutor, this, registry,
SolrMetricManager.mkName("updateExecutor", expandedScope, "threadPool"));
SolrMetricManager.mkName("updateOnlyExecutor", expandedScope, "threadPool"));
recoveryExecutor = MetricUtils.instrumentedExecutorService(recoveryExecutor, this, registry,
SolrMetricManager.mkName("recoveryExecutor", expandedScope, "threadPool"));
}
@ -148,31 +156,33 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
return registry;
}
public HttpClient getHttpClient() {
return client;
// if you are looking for a client to use, it's probably this one.
public HttpClient getDefaultHttpClient() {
return defaultClient;
}
/**
* This method returns an executor that is not meant for disk IO and that will
* be interrupted on shutdown.
// don't introduce a bug, this client is for sending updates only!
public HttpClient getUpdateOnlyHttpClient() {
return updateOnlyClient;
}
/**
* This method returns an executor that is meant for non search related tasks.
*
* @return an executor for update related activities that do not do disk IO.
* @return an executor for update side related activities.
*/
public ExecutorService getUpdateExecutor() {
return updateExecutor;
}
public PoolingHttpClientConnectionManager getConnectionManager() {
return clientConnectionManager;
public PoolingHttpClientConnectionManager getDefaultConnectionManager() {
return defaultConnectionManager;
}
/**
* In general, RecoveryStrategy threads do not do disk IO, but they open and close SolrCores
* in async threads, among other things, and can trigger disk IO, so we use this alternate
* executor rather than the 'updateExecutor', which is interrupted on shutdown.
*
* @return executor for {@link RecoveryStrategy} thread which will not be interrupted on close.
* @return executor for recovery operations
*/
public ExecutorService getRecoveryExecutor() {
return recoveryExecutor;
@ -186,8 +196,10 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
} catch (Exception e) {
SolrException.log(log, e);
} finally {
HttpClientUtil.close(client);
clientConnectionManager.close();
HttpClientUtil.close(updateOnlyClient);
HttpClientUtil.close(defaultClient);
updateOnlyConnectionManager.close();
defaultConnectionManager.close();
}
}

View File

@ -1285,7 +1285,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
NamedList<Object> rsp = null;
try (HttpSolrClient hsc = new HttpSolrClient.Builder(leaderUrl).
withHttpClient(updateShardHandler.getHttpClient()).build()) {
withHttpClient(updateShardHandler.getUpdateOnlyHttpClient()).build()) {
rsp = hsc.request(ur);
} catch (SolrServerException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error during fetching [" + id +