SOLR-10915: Make builder based SolrClient constructors to be the only valid way to construct client objects and increase the visibility of builder elements to be protected so extending the builder, and the clients is possible.

This commit is contained in:
Anshum Gupta 2017-06-22 15:27:34 -07:00
parent 05433eb709
commit e46d39bd5a
11 changed files with 309 additions and 202 deletions

View File

@ -318,6 +318,10 @@ Other Changes
resources, they are only resolved against Solr's own or "core" class loader resources, they are only resolved against Solr's own or "core" class loader
by default. (Uwe Schindler) by default. (Uwe Schindler)
* SOLR-10915: Make builder based SolrClient constructors to be the only valid way to construct client objects and
increase the visibility of builder elements to be protected so extending the builder, and the clients is possible.
(Jason Gerlowski, Anshum Gupta)
================== 6.7.0 ================== ================== 6.7.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -72,7 +72,13 @@ public class StreamingSolrClients {
// NOTE: increasing to more than 1 threadCount for the client could cause updates to be reordered // NOTE: increasing to more than 1 threadCount for the client could cause updates to be reordered
// on a greater scale since the current behavior is to only increase the number of connections/Runners when // on a greater scale since the current behavior is to only increase the number of connections/Runners when
// the queue is more than half full. // the queue is more than half full.
client = new ErrorReportingConcurrentUpdateSolrClient(url, httpClient, 100, runnerCount, updateExecutor, true, req); client = new ErrorReportingConcurrentUpdateSolrClient.Builder(url, req, errors)
.withHttpClient(httpClient)
.withQueueSize(100)
.withThreadCount(runnerCount)
.withExecutorService(updateExecutor)
.alwaysStreamDeletes()
.build();
client.setPollQueueTime(Integer.MAX_VALUE); // minimize connections created client.setPollQueueTime(Integer.MAX_VALUE); // minimize connections created
client.setParser(new BinaryResponseParser()); client.setParser(new BinaryResponseParser());
client.setRequestWriter(new BinaryRequestWriter()); client.setRequestWriter(new BinaryRequestWriter());
@ -115,14 +121,17 @@ public class StreamingSolrClients {
public ExecutorService getUpdateExecutor() { public ExecutorService getUpdateExecutor() {
return updateExecutor; return updateExecutor;
} }
}
class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient { class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SolrCmdDistributor.Req req; private final SolrCmdDistributor.Req req;
private final List<Error> errors;
public ErrorReportingConcurrentUpdateSolrClient(String solrServerUrl, HttpClient client, int queueSize, public ErrorReportingConcurrentUpdateSolrClient(Builder builder) {
int threadCount, ExecutorService es, boolean streamDeletes, SolrCmdDistributor.Req req) { super(builder);
super(solrServerUrl, client, queueSize, threadCount, es, streamDeletes); this.req = builder.req;
this.req = req; this.errors = builder.errors;
} }
@Override @Override
@ -141,5 +150,19 @@ public class StreamingSolrClients {
public void onSuccess(HttpResponse resp) { public void onSuccess(HttpResponse resp) {
req.trackRequestResult(resp, true); req.trackRequestResult(resp, true);
} }
static class Builder extends ConcurrentUpdateSolrClient.Builder {
protected SolrCmdDistributor.Req req;
protected List<Error> errors;
public Builder(String baseSolrUrl, SolrCmdDistributor.Req req, List<Error> errors) {
super(baseSolrUrl);
this.req = req;
this.errors = errors;
}
public ErrorReportingConcurrentUpdateSolrClient build() {
return new ErrorReportingConcurrentUpdateSolrClient(this);
}
} }
} }

View File

@ -22,7 +22,6 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
@ -55,7 +54,11 @@ class FullThrottleStoppableIndexingThread extends StoppableIndexingThread {
setDaemon(true); setDaemon(true);
this.clients = clients; this.clients = clients;
cusc = new ErrorLoggingConcurrentUpdateSolrClient(((HttpSolrClient) clients.get(0)).getBaseURL(), httpClient, 8, 2); cusc = new ErrorLoggingConcurrentUpdateSolrClient.Builder(((HttpSolrClient) clients.get(0)).getBaseURL())
.withHttpClient(httpClient)
.withQueueSize(8)
.withThreadCount(2)
.build();
cusc.setConnectionTimeout(10000); cusc.setConnectionTimeout(10000);
cusc.setSoTimeout(clientSoTimeout); cusc.setSoTimeout(clientSoTimeout);
} }
@ -114,8 +117,11 @@ class FullThrottleStoppableIndexingThread extends StoppableIndexingThread {
clientIndex = 0; clientIndex = 0;
} }
cusc.shutdownNow(); cusc.shutdownNow();
cusc = new ErrorLoggingConcurrentUpdateSolrClient(((HttpSolrClient) clients.get(clientIndex)).getBaseURL(), cusc = new ErrorLoggingConcurrentUpdateSolrClient.Builder(((HttpSolrClient) clients.get(clientIndex)).getBaseURL())
httpClient, 30, 3); .withHttpClient(httpClient)
.withQueueSize(30)
.withThreadCount(3)
.build();
} }
} }
@ -143,14 +149,25 @@ class FullThrottleStoppableIndexingThread extends StoppableIndexingThread {
} }
static class ErrorLoggingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient { static class ErrorLoggingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient {
@SuppressWarnings("deprecation") public ErrorLoggingConcurrentUpdateSolrClient(Builder builder) {
public ErrorLoggingConcurrentUpdateSolrClient(String serverUrl, HttpClient httpClient, int queueSize, int threadCount) { super(builder);
super(serverUrl, httpClient, queueSize, threadCount, null, false);
} }
@Override @Override
public void handleError(Throwable ex) { public void handleError(Throwable ex) {
log.warn("cusc error", ex); log.warn("cusc error", ex);
} }
static class Builder extends ConcurrentUpdateSolrClient.Builder {
public Builder(String baseSolrUrl) {
super(baseSolrUrl);
}
public ErrorLoggingConcurrentUpdateSolrClient build() {
return new ErrorLoggingConcurrentUpdateSolrClient(this);
}
}
} }
} }

View File

@ -240,70 +240,37 @@ public class CloudSolrClient extends SolrClient {
* SolrCloud has enough replicas for every shard in a collection, there is no * SolrCloud has enough replicas for every shard in a collection, there is no
* single point of failure. Updates will be sent to shard leaders by default. * single point of failure. Updates will be sent to shard leaders by default.
* *
* @param zkHosts * @param builder a {@link CloudSolrClient.Builder} with the options used to create the client.
* A Java Collection (List, Set, etc) of HOST:PORT strings, one for
* each host in the zookeeper ensemble. Note that with certain
* Collection types like HashSet, the order of hosts in the final
* connect string may not be in the same order you added them.
* Provide only one of solrUrls or zkHosts.
* @param chroot
* A chroot value for zookeeper, starting with a forward slash. If no
* chroot is required, use null.
* @param solrUrls
* A list of Solr URLs to configure the underlying {@link HttpClusterStateProvider}, which will
* use of the these URLs to fetch the list of live nodes for this Solr cluster. URL's must point to the
* root Solr path ("/solr"). Provide only one of solrUrls or zkHosts.
* @param httpClient
* the {@link HttpClient} instance to be used for all requests. The provided httpClient should use a
* multi-threaded connection manager. If null, a default HttpClient will be used.
* @param lbSolrClient
* LBHttpSolrClient instance for requests. If null, a default LBHttpSolrClient will be used.
* @param lbHttpSolrClientBuilder
* LBHttpSolrClient builder to construct the LBHttpSolrClient. If null, a default builder will be used.
* @param updatesToLeaders
* If true, sends updates to shard leaders.
* @param directUpdatesToLeadersOnly
* If true, sends direct updates to shard leaders only.
*/ */
private CloudSolrClient(Collection<String> zkHosts, protected CloudSolrClient(Builder builder) {
String chroot, if (builder.stateProvider == null) {
List<String> solrUrls, if (builder.zkHosts != null && builder.solrUrls != null) {
HttpClient httpClient,
LBHttpSolrClient lbSolrClient,
LBHttpSolrClient.Builder lbHttpSolrClientBuilder,
boolean updatesToLeaders,
boolean directUpdatesToLeadersOnly,
ClusterStateProvider stateProvider
) {
if (stateProvider == null) {
if (zkHosts != null && solrUrls != null) {
throw new IllegalArgumentException("Both zkHost(s) & solrUrl(s) have been specified. Only specify one."); throw new IllegalArgumentException("Both zkHost(s) & solrUrl(s) have been specified. Only specify one.");
} }
if (zkHosts != null) { if (builder.zkHosts != null) {
this.stateProvider = new ZkClientClusterStateProvider(zkHosts, chroot); this.stateProvider = new ZkClientClusterStateProvider(builder.zkHosts, builder.zkChroot);
} else if (solrUrls != null && !solrUrls.isEmpty()) { } else if (builder.solrUrls != null && !builder.solrUrls.isEmpty()) {
try { try {
this.stateProvider = new HttpClusterStateProvider(solrUrls, httpClient); this.stateProvider = new HttpClusterStateProvider(builder.solrUrls, builder.httpClient);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the " throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the "
+ "Solr server(s), " + solrUrls + ", down?)", e); + "Solr server(s), " + builder.solrUrls + ", down?)", e);
} }
} else { } else {
throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null."); throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
} }
} else { } else {
this.stateProvider = stateProvider; this.stateProvider = builder.stateProvider;
} }
this.clientIsInternal = httpClient == null; this.clientIsInternal = builder.httpClient == null;
this.shutdownLBHttpSolrServer = lbSolrClient == null; this.shutdownLBHttpSolrServer = builder.loadBalancedSolrClient == null;
if(lbHttpSolrClientBuilder != null) lbSolrClient = lbHttpSolrClientBuilder.build(); if(builder.lbClientBuilder != null) builder.loadBalancedSolrClient = builder.lbClientBuilder.build();
if(lbSolrClient != null) httpClient = lbSolrClient.getHttpClient(); if(builder.loadBalancedSolrClient != null) builder.httpClient = builder.loadBalancedSolrClient.getHttpClient();
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient; this.myClient = (builder.httpClient == null) ? HttpClientUtil.createClient(null) : builder.httpClient;
if (lbSolrClient == null) lbSolrClient = createLBHttpSolrClient(myClient); if (builder.loadBalancedSolrClient == null) builder.loadBalancedSolrClient = createLBHttpSolrClient(myClient);
this.lbClient = lbSolrClient; this.lbClient = builder.loadBalancedSolrClient;
this.updatesToLeaders = updatesToLeaders; this.updatesToLeaders = builder.shardLeadersOnly;
this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly; this.directUpdatesToLeadersOnly = builder.directUpdatesToLeadersOnly;
} }
/**Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json /**Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
@ -1372,15 +1339,15 @@ public class CloudSolrClient extends SolrClient {
* Constructs {@link CloudSolrClient} instances from provided configuration. * Constructs {@link CloudSolrClient} instances from provided configuration.
*/ */
public static class Builder { public static class Builder {
private Collection<String> zkHosts; protected Collection<String> zkHosts;
private List<String> solrUrls; protected List<String> solrUrls;
private HttpClient httpClient; protected HttpClient httpClient;
private String zkChroot; protected String zkChroot;
private LBHttpSolrClient loadBalancedSolrClient; protected LBHttpSolrClient loadBalancedSolrClient;
private LBHttpSolrClient.Builder lbClientBuilder; protected LBHttpSolrClient.Builder lbClientBuilder;
private boolean shardLeadersOnly; protected boolean shardLeadersOnly;
private boolean directUpdatesToLeadersOnly; protected boolean directUpdatesToLeadersOnly;
private ClusterStateProvider stateProvider; protected ClusterStateProvider stateProvider;
public Builder() { public Builder() {
@ -1536,8 +1503,7 @@ public class CloudSolrClient extends SolrClient {
throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null."); throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
} }
} }
return new CloudSolrClient(zkHosts, zkChroot, solrUrls, httpClient, loadBalancedSolrClient, lbClientBuilder, return new CloudSolrClient(this);
shardLeadersOnly, directUpdatesToLeadersOnly, stateProvider);
} }
} }
} }

View File

@ -97,33 +97,52 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
/** /**
* Uses the supplied HttpClient to send documents to the Solr server. * Uses the supplied HttpClient to send documents to the Solr server.
*
* @deprecated use {@link ConcurrentUpdateSolrClient#ConcurrentUpdateSolrClient(Builder)} instead, as it is a more extension/subclassing-friendly alternative
*/ */
@Deprecated
protected ConcurrentUpdateSolrClient(String solrServerUrl, protected ConcurrentUpdateSolrClient(String solrServerUrl,
HttpClient client, int queueSize, int threadCount, HttpClient client, int queueSize, int threadCount,
ExecutorService es, boolean streamDeletes) { ExecutorService es, boolean streamDeletes) {
this.internalHttpClient = (client == null); this((streamDeletes) ?
this.client = new HttpSolrClient.Builder(solrServerUrl) new Builder(solrServerUrl)
.withHttpClient(client) .withHttpClient(client)
.withQueueSize(queueSize)
.withThreadCount(threadCount)
.withExecutorService(es)
.alwaysStreamDeletes() :
new Builder(solrServerUrl)
.withHttpClient(client)
.withQueueSize(queueSize)
.withThreadCount(threadCount)
.withExecutorService(es)
.neverStreamDeletes());
}
protected ConcurrentUpdateSolrClient(Builder builder) {
this.internalHttpClient = (builder.httpClient == null);
this.client = new HttpSolrClient.Builder(builder.baseSolrUrl)
.withHttpClient(builder.httpClient)
.build(); .build();
this.client.setFollowRedirects(false); this.client.setFollowRedirects(false);
queue = new LinkedBlockingQueue<>(queueSize); this.queue = new LinkedBlockingQueue<>(builder.queueSize);
this.threadCount = threadCount; this.threadCount = builder.threadCount;
runners = new LinkedList<>(); this.runners = new LinkedList<>();
this.streamDeletes = streamDeletes; this.streamDeletes = builder.streamDeletes;
if (es != null) { if (builder.executorService != null) {
scheduler = es; this.scheduler = builder.executorService;
shutdownExecutor = false; this.shutdownExecutor = false;
} else { } else {
scheduler = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("concurrentUpdateScheduler")); this.scheduler = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("concurrentUpdateScheduler"));
shutdownExecutor = true; this.shutdownExecutor = true;
} }
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
pollInterrupts = new AtomicInteger(); this.pollInterrupts = new AtomicInteger();
pollExits = new AtomicInteger(); this.pollExits = new AtomicInteger();
blockLoops = new AtomicInteger(); this.blockLoops = new AtomicInteger();
emptyQueueLoops = new AtomicInteger(); this.emptyQueueLoops = new AtomicInteger();
} }
} }
@ -743,12 +762,12 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
* Constructs {@link ConcurrentUpdateSolrClient} instances from provided configuration. * Constructs {@link ConcurrentUpdateSolrClient} instances from provided configuration.
*/ */
public static class Builder { public static class Builder {
private String baseSolrUrl; protected String baseSolrUrl;
private HttpClient httpClient; protected HttpClient httpClient;
private int queueSize; protected int queueSize;
private int threadCount; protected int threadCount;
private ExecutorService executorService; protected ExecutorService executorService;
private boolean streamDeletes; protected boolean streamDeletes;
/** /**
* Create a Builder object, based on the provided Solr URL. * Create a Builder object, based on the provided Solr URL.
@ -838,7 +857,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
throw new IllegalArgumentException("Cannot create HttpSolrClient without a valid baseSolrUrl!"); throw new IllegalArgumentException("Cannot create HttpSolrClient without a valid baseSolrUrl!");
} }
return new ConcurrentUpdateSolrClient(baseSolrUrl, httpClient, queueSize, threadCount, executorService, streamDeletes); return new ConcurrentUpdateSolrClient(this);
} }
} }
} }

View File

@ -38,7 +38,11 @@ public class DelegationTokenHttpSolrClient extends HttpSolrClient {
* Package protected constructor for use by * Package protected constructor for use by
* {@linkplain org.apache.solr.client.solrj.impl.HttpSolrClient.Builder}. * {@linkplain org.apache.solr.client.solrj.impl.HttpSolrClient.Builder}.
* @lucene.internal * @lucene.internal
*
* @deprecated use {@link DelegationTokenHttpSolrClient#DelegationTokenHttpSolrClient(HttpSolrClient.Builder)} instead, as it is a more
* extension/subclassing-friendly alternative
*/ */
@Deprecated
DelegationTokenHttpSolrClient(String baseURL, DelegationTokenHttpSolrClient(String baseURL,
HttpClient client, HttpClient client,
ResponseParser parser, ResponseParser parser,
@ -53,6 +57,11 @@ public class DelegationTokenHttpSolrClient extends HttpSolrClient {
invariantParams.set(DELEGATION_TOKEN_PARAM, delegationToken); invariantParams.set(DELEGATION_TOKEN_PARAM, delegationToken);
} }
protected DelegationTokenHttpSolrClient(Builder builder) {
super(builder);
setQueryParams(new TreeSet<>(Arrays.asList(DELEGATION_TOKEN_PARAM)));
}
/** /**
* This constructor is defined at "protected" scope. Ideally applications should * This constructor is defined at "protected" scope. Ideally applications should
* use {@linkplain org.apache.solr.client.solrj.impl.HttpSolrClient.Builder} instance * use {@linkplain org.apache.solr.client.solrj.impl.HttpSolrClient.Builder} instance
@ -63,7 +72,11 @@ public class DelegationTokenHttpSolrClient extends HttpSolrClient {
* @param parser Response parser instance to use to decode response from Solr server * @param parser Response parser instance to use to decode response from Solr server
* @param allowCompression Should compression be allowed ? * @param allowCompression Should compression be allowed ?
* @param invariantParams The parameters which should be passed with every request. * @param invariantParams The parameters which should be passed with every request.
*
* @deprecated use {@link DelegationTokenHttpSolrClient#DelegationTokenHttpSolrClient(HttpSolrClient.Builder)} instead, as it is a more
* extension/subclassing-friendly alternative
*/ */
@Deprecated
protected DelegationTokenHttpSolrClient(String baseURL, protected DelegationTokenHttpSolrClient(String baseURL,
HttpClient client, HttpClient client,
ResponseParser parser, ResponseParser parser,

View File

@ -143,8 +143,40 @@ public class HttpSolrClient extends SolrClient {
private volatile Integer connectionTimeout; private volatile Integer connectionTimeout;
private volatile Integer soTimeout; private volatile Integer soTimeout;
/**
* @deprecated use {@link HttpSolrClient#HttpSolrClient(Builder)} instead, as it is a more extension/subclassing-friendly alternative
*/
@Deprecated
protected HttpSolrClient(String baseURL, HttpClient client, ResponseParser parser, boolean allowCompression) { protected HttpSolrClient(String baseURL, HttpClient client, ResponseParser parser, boolean allowCompression) {
this.baseUrl = baseURL; this(new Builder(baseURL)
.withHttpClient(client)
.withResponseParser(parser)
.allowCompression(allowCompression));
}
/**
* The constructor.
*
* @param baseURL The base url to communicate with the Solr server
* @param client Http client instance to use for communication
* @param parser Response parser instance to use to decode response from Solr server
* @param allowCompression Should compression be allowed ?
* @param invariantParams The parameters which should be included with every request.
*
* @deprecated use {@link HttpSolrClient#HttpSolrClient(Builder)} instead, as it is a more extension/subclassing-friendly alternative
*/
@Deprecated
protected HttpSolrClient(String baseURL, HttpClient client, ResponseParser parser, boolean allowCompression,
ModifiableSolrParams invariantParams) {
this(new Builder(baseURL)
.withHttpClient(client)
.withResponseParser(parser)
.allowCompression(allowCompression)
.withInvariantParams(invariantParams));
}
protected HttpSolrClient(Builder builder) {
this.baseUrl = builder.baseSolrUrl;
if (baseUrl.endsWith("/")) { if (baseUrl.endsWith("/")) {
baseUrl = baseUrl.substring(0, baseUrl.length() - 1); baseUrl = baseUrl.substring(0, baseUrl.length() - 1);
} }
@ -154,34 +186,19 @@ public class HttpSolrClient extends SolrClient {
+ baseUrl); + baseUrl);
} }
if (client != null) { if (builder.httpClient != null) {
httpClient = client; this.httpClient = builder.httpClient;
internalClient = false; this.internalClient = false;
} else { } else {
internalClient = true; this.internalClient = true;
ModifiableSolrParams params = new ModifiableSolrParams(); ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects); params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects);
params.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, allowCompression); params.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, builder.compression);
httpClient = HttpClientUtil.createClient(params); httpClient = HttpClientUtil.createClient(params);
} }
this.parser = parser; this.parser = builder.responseParser;
} this.invariantParams = builder.invariantParams;
/**
* The consturctor.
*
* @param baseURL The base url to communicate with the Solr server
* @param client Http client instance to use for communication
* @param parser Response parser instance to use to decode response from Solr server
* @param allowCompression Should compression be allowed ?
* @param invariantParams The parameters which should be included with every request.
*/
protected HttpSolrClient(String baseURL, HttpClient client, ResponseParser parser, boolean allowCompression,
ModifiableSolrParams invariantParams) {
this(baseURL, client, parser, allowCompression);
this.invariantParams = invariantParams;
} }
public Set<String> getQueryParams() { public Set<String> getQueryParams() {
@ -803,11 +820,11 @@ public class HttpSolrClient extends SolrClient {
* Constructs {@link HttpSolrClient} instances from provided configuration. * Constructs {@link HttpSolrClient} instances from provided configuration.
*/ */
public static class Builder { public static class Builder {
private String baseSolrUrl; protected String baseSolrUrl;
private HttpClient httpClient; protected HttpClient httpClient;
private ResponseParser responseParser; protected ResponseParser responseParser;
private boolean compression; protected boolean compression;
private ModifiableSolrParams invariantParams = new ModifiableSolrParams(); protected ModifiableSolrParams invariantParams = new ModifiableSolrParams();
public Builder() { public Builder() {
this.responseParser = new BinaryResponseParser(); this.responseParser = new BinaryResponseParser();
@ -924,9 +941,9 @@ public class HttpSolrClient extends SolrClient {
} }
if (this.invariantParams.get(DelegationTokenHttpSolrClient.DELEGATION_TOKEN_PARAM) == null) { if (this.invariantParams.get(DelegationTokenHttpSolrClient.DELEGATION_TOKEN_PARAM) == null) {
return new HttpSolrClient(baseSolrUrl, httpClient, responseParser, compression, invariantParams); return new HttpSolrClient(this);
} else { } else {
return new DelegationTokenHttpSolrClient(baseSolrUrl, httpClient, responseParser, compression, invariantParams); return new DelegationTokenHttpSolrClient(this);
} }
} }
} }

View File

@ -239,34 +239,44 @@ public class LBHttpSolrClient extends SolrClient {
/** /**
* The provided httpClient should use a multi-threaded connection manager * The provided httpClient should use a multi-threaded connection manager
*
* @deprecated use {@link LBHttpSolrClient#LBHttpSolrClient(Builder)} instead, as it is a more extension/subclassing-friendly alternative
*/ */
@Deprecated
protected LBHttpSolrClient(HttpSolrClient.Builder httpSolrClientBuilder, protected LBHttpSolrClient(HttpSolrClient.Builder httpSolrClientBuilder,
HttpClient httpClient, String... solrServerUrl) { HttpClient httpClient, String... solrServerUrl) {
clientIsInternal = httpClient == null; this(new Builder()
this.httpSolrClientBuilder = httpSolrClientBuilder; .withHttpSolrClientBuilder(httpSolrClientBuilder)
httpClient = constructClient(null); .withHttpClient(httpClient)
this.httpClient = httpClient; .withBaseSolrUrls(solrServerUrl));
if (solrServerUrl != null) {
for (String s : solrServerUrl) {
ServerWrapper wrapper = new ServerWrapper(makeSolrClient(s));
aliveServers.put(wrapper.getKey(), wrapper);
}
}
updateAliveList();
} }
/** /**
* The provided httpClient should use a multi-threaded connection manager * The provided httpClient should use a multi-threaded connection manager
*
* @deprecated use {@link LBHttpSolrClient#LBHttpSolrClient(Builder)} instead, as it is a more extension/subclassing-friendly alternative
*/ */
@Deprecated
protected LBHttpSolrClient(HttpClient httpClient, ResponseParser parser, String... solrServerUrl) { protected LBHttpSolrClient(HttpClient httpClient, ResponseParser parser, String... solrServerUrl) {
clientIsInternal = (httpClient == null); this(new Builder()
this.httpClient = httpClient == null ? constructClient(solrServerUrl) : httpClient; .withBaseSolrUrls(solrServerUrl)
this.parser = parser; .withResponseParser(parser)
.withHttpClient(httpClient));
}
for (String s : solrServerUrl) { protected LBHttpSolrClient(Builder builder) {
this.clientIsInternal = builder.httpClient == null;
this.httpSolrClientBuilder = builder.httpSolrClientBuilder;
this.httpClient = builder.httpClient == null ? constructClient(builder.baseSolrUrls.toArray(new String[builder.baseSolrUrls.size()])) : builder.httpClient;
this.parser = builder.responseParser;
if (! builder.baseSolrUrls.isEmpty()) {
for (String s : builder.baseSolrUrls) {
ServerWrapper wrapper = new ServerWrapper(makeSolrClient(s)); ServerWrapper wrapper = new ServerWrapper(makeSolrClient(s));
aliveServers.put(wrapper.getKey(), wrapper); aliveServers.put(wrapper.getKey(), wrapper);
} }
}
updateAliveList(); updateAliveList();
} }
@ -852,10 +862,10 @@ public class LBHttpSolrClient extends SolrClient {
* Constructs {@link LBHttpSolrClient} instances from provided configuration. * Constructs {@link LBHttpSolrClient} instances from provided configuration.
*/ */
public static class Builder { public static class Builder {
private final List<String> baseSolrUrls; protected final List<String> baseSolrUrls;
private HttpClient httpClient; protected HttpClient httpClient;
private ResponseParser responseParser; protected ResponseParser responseParser;
private HttpSolrClient.Builder httpSolrClientBuilder; protected HttpSolrClient.Builder httpSolrClientBuilder;
public Builder() { public Builder() {
this.baseSolrUrls = new ArrayList<>(); this.baseSolrUrls = new ArrayList<>();
@ -953,13 +963,7 @@ public class LBHttpSolrClient extends SolrClient {
* Create a {@link HttpSolrClient} based on provided configuration. * Create a {@link HttpSolrClient} based on provided configuration.
*/ */
public LBHttpSolrClient build() { public LBHttpSolrClient build() {
final String[] baseUrlArray = new String[baseSolrUrls.size()]; return new LBHttpSolrClient(this);
String[] solrServerUrls = baseSolrUrls.toArray(baseUrlArray);
if (httpSolrClientBuilder != null) {
return new LBHttpSolrClient(httpSolrClientBuilder, httpClient, solrServerUrls);
} else {
return new LBHttpSolrClient(httpClient, responseParser, solrServerUrls);
}
} }
} }
} }

View File

@ -39,26 +39,11 @@ import java.util.List;
@Slow @Slow
public class SolrExampleStreamingTest extends SolrExampleTests { public class SolrExampleStreamingTest extends SolrExampleTests {
protected Throwable handledException = null;
@BeforeClass @BeforeClass
public static void beforeTest() throws Exception { public static void beforeTest() throws Exception {
createJetty(legacyExampleCollection1SolrHome()); createJetty(legacyExampleCollection1SolrHome());
} }
public class ErrorTrackingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient {
public Throwable lastError = null;
public ErrorTrackingConcurrentUpdateSolrClient(String solrServerUrl, int queueSize, int threadCount) {
super(solrServerUrl, null, queueSize, threadCount, null, false);
}
@Override
public void handleError(Throwable ex) {
handledException = lastError = ex;
}
}
@Override @Override
public SolrClient createNewSolrClient() public SolrClient createNewSolrClient()
{ {
@ -66,7 +51,10 @@ public class SolrExampleStreamingTest extends SolrExampleTests {
// setup the server... // setup the server...
String url = jetty.getBaseUrl().toString() + "/collection1"; String url = jetty.getBaseUrl().toString() + "/collection1";
// smaller queue size hits locks more often // smaller queue size hits locks more often
ConcurrentUpdateSolrClient concurrentClient = new ErrorTrackingConcurrentUpdateSolrClient( url, 2, 5 ); ConcurrentUpdateSolrClient concurrentClient = new ErrorTrackingConcurrentUpdateSolrClient.Builder(url)
.withQueueSize(2)
.withThreadCount(5)
.build();
concurrentClient.setParser(new XMLResponseParser()); concurrentClient.setParser(new XMLResponseParser());
concurrentClient.setRequestWriter(new RequestWriter()); concurrentClient.setRequestWriter(new RequestWriter());
return concurrentClient; return concurrentClient;
@ -81,7 +69,10 @@ public class SolrExampleStreamingTest extends SolrExampleTests {
// SOLR-3903 // SOLR-3903
final List<Throwable> failures = new ArrayList<>(); final List<Throwable> failures = new ArrayList<>();
final String serverUrl = jetty.getBaseUrl().toString() + "/collection1"; final String serverUrl = jetty.getBaseUrl().toString() + "/collection1";
try (ConcurrentUpdateSolrClient concurrentClient = new FailureRecordingConcurrentUpdateSolrClient(serverUrl, 2, 2)) { try (ConcurrentUpdateSolrClient concurrentClient = new FailureRecordingConcurrentUpdateSolrClient.Builder(serverUrl)
.withQueueSize(2)
.withThreadCount(2)
.build()) {
int docId = 42; int docId = 42;
for (UpdateRequest.ACTION action : EnumSet.allOf(UpdateRequest.ACTION.class)) { for (UpdateRequest.ACTION action : EnumSet.allOf(UpdateRequest.ACTION.class)) {
for (boolean waitSearch : Arrays.asList(true, false)) { for (boolean waitSearch : Arrays.asList(true, false)) {
@ -108,14 +99,49 @@ public class SolrExampleStreamingTest extends SolrExampleTests {
static class FailureRecordingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient { static class FailureRecordingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient {
private final List<Throwable> failures = new ArrayList<>(); private final List<Throwable> failures = new ArrayList<>();
public FailureRecordingConcurrentUpdateSolrClient(String serverUrl, int queueSize, int numThreads) { public FailureRecordingConcurrentUpdateSolrClient(Builder builder) {
super(serverUrl, null, queueSize, numThreads, null, false); super(builder);
} }
@Override @Override
public void handleError(Throwable ex) { public void handleError(Throwable ex) {
failures.add(ex); failures.add(ex);
} }
static class Builder extends ConcurrentUpdateSolrClient.Builder {
public Builder(String baseSolrUrl) {
super(baseSolrUrl);
} }
@Override
public FailureRecordingConcurrentUpdateSolrClient build() {
return new FailureRecordingConcurrentUpdateSolrClient(this);
}
}
}
public static class ErrorTrackingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient {
public Throwable lastError = null;
public ErrorTrackingConcurrentUpdateSolrClient(Builder builder) {
super(builder);
}
@Override
public void handleError(Throwable ex) {
lastError = ex;
}
static class Builder extends ConcurrentUpdateSolrClient.Builder {
public Builder(String baseSolrUrl) {
super(baseSolrUrl);
}
@Override
public ErrorTrackingConcurrentUpdateSolrClient build() {
return new ErrorTrackingConcurrentUpdateSolrClient(this);
}
}
}
} }

View File

@ -148,8 +148,10 @@ public class ConcurrentUpdateSolrClientTest extends SolrJettyTestBase {
final StringBuilder errors = new StringBuilder(); final StringBuilder errors = new StringBuilder();
@SuppressWarnings("serial") @SuppressWarnings("serial")
ConcurrentUpdateSolrClient concurrentClient = new OutcomeCountingConcurrentUpdateSolrClient(serverUrl, cussQueueSize, ConcurrentUpdateSolrClient concurrentClient = new OutcomeCountingConcurrentUpdateSolrClient.Builder(serverUrl, successCounter, errorCounter, errors)
cussThreadCount, successCounter, errorCounter, errors); .withQueueSize(cussQueueSize)
.withThreadCount(cussThreadCount)
.build();
concurrentClient.setPollQueueTime(0); concurrentClient.setPollQueueTime(0);
@ -309,13 +311,12 @@ public class ConcurrentUpdateSolrClientTest extends SolrJettyTestBase {
private final AtomicInteger successCounter; private final AtomicInteger successCounter;
private final AtomicInteger failureCounter; private final AtomicInteger failureCounter;
private final StringBuilder errors; private final StringBuilder errors;
public OutcomeCountingConcurrentUpdateSolrClient(String serverUrl, int queueSize, int threadCount,
AtomicInteger successCounter, AtomicInteger failureCounter, StringBuilder errors) {
super(serverUrl, null, queueSize, threadCount, null, false);
this.successCounter = successCounter; public OutcomeCountingConcurrentUpdateSolrClient(Builder builder) {
this.failureCounter = failureCounter; super(builder);
this.errors = errors; this.successCounter = builder.successCounter;
this.failureCounter = builder.failureCounter;
this.errors = builder.errors;
} }
@Override @Override
@ -327,5 +328,22 @@ public class ConcurrentUpdateSolrClientTest extends SolrJettyTestBase {
public void onSuccess(HttpResponse resp) { public void onSuccess(HttpResponse resp) {
successCounter.incrementAndGet(); successCounter.incrementAndGet();
} }
static class Builder extends ConcurrentUpdateSolrClient.Builder {
protected final AtomicInteger successCounter;
protected final AtomicInteger failureCounter;
protected final StringBuilder errors;
public Builder(String baseSolrUrl, AtomicInteger successCounter, AtomicInteger failureCounter, StringBuilder errors) {
super(baseSolrUrl);
this.successCounter = successCounter;
this.failureCounter = failureCounter;
this.errors = errors;
}
public OutcomeCountingConcurrentUpdateSolrClient build() {
return new OutcomeCountingConcurrentUpdateSolrClient(this);
}
}
} }
} }

View File

@ -40,7 +40,7 @@ public class LBHttpSolrClientTest {
public void testLBHttpSolrClientHttpClientResponseParserStringArray() throws IOException { public void testLBHttpSolrClientHttpClientResponseParserStringArray() throws IOException {
CloseableHttpClient httpClient = HttpClientUtil.createClient(new ModifiableSolrParams()); CloseableHttpClient httpClient = HttpClientUtil.createClient(new ModifiableSolrParams());
try ( try (
LBHttpSolrClient testClient = new LBHttpSolrClient(httpClient, (ResponseParser) null); LBHttpSolrClient testClient = new LBHttpSolrClient.Builder().withHttpClient(httpClient).withResponseParser(null).build();
HttpSolrClient httpSolrClient = testClient.makeSolrClient("http://127.0.0.1:8080")) { HttpSolrClient httpSolrClient = testClient.makeSolrClient("http://127.0.0.1:8080")) {
assertNull("Generated server should have null parser.", httpSolrClient.getParser()); assertNull("Generated server should have null parser.", httpSolrClient.getParser());
} finally { } finally {