mirror of https://github.com/apache/lucene.git
SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient. (Marvin Justice, Christine Poerschke)
This commit is contained in:
parent
f61a5f27d2
commit
976079a8ee
|
@ -88,6 +88,9 @@ New Features
|
|||
* SOLR-9243: Add terms.list parameter to the TermsComponent to fetch the docFreq for a list of terms
|
||||
(Joel Bernstein)
|
||||
|
||||
* SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient.
|
||||
(Marvin Justice, Christine Poerschke)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
|
|||
import org.apache.solr.client.solrj.util.ClientUtils;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.ToleratedUpdateError;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
|
@ -117,6 +118,7 @@ public class CloudSolrClient extends SolrClient {
|
|||
Random rand = new Random();
|
||||
|
||||
private final boolean updatesToLeaders;
|
||||
private final boolean directUpdatesToLeadersOnly;
|
||||
private boolean parallelUpdates = true;
|
||||
private ExecutorService threadPool = ExecutorUtil
|
||||
.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
|
||||
|
@ -206,6 +208,7 @@ public class CloudSolrClient extends SolrClient {
|
|||
this.lbClient.setRequestWriter(new BinaryRequestWriter());
|
||||
this.lbClient.setParser(new BinaryResponseParser());
|
||||
this.updatesToLeaders = true;
|
||||
this.directUpdatesToLeadersOnly = false;
|
||||
shutdownLBHttpSolrServer = true;
|
||||
lbClient.addQueryParams(STATE_VERSION);
|
||||
}
|
||||
|
@ -242,6 +245,7 @@ public class CloudSolrClient extends SolrClient {
|
|||
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
|
||||
this.lbClient = createLBHttpSolrClient(myClient);
|
||||
this.updatesToLeaders = true;
|
||||
this.directUpdatesToLeadersOnly = false;
|
||||
shutdownLBHttpSolrServer = true;
|
||||
lbClient.addQueryParams(STATE_VERSION);
|
||||
}
|
||||
|
@ -299,6 +303,7 @@ public class CloudSolrClient extends SolrClient {
|
|||
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
|
||||
this.lbClient = createLBHttpSolrClient(myClient);
|
||||
this.updatesToLeaders = true;
|
||||
this.directUpdatesToLeadersOnly = false;
|
||||
shutdownLBHttpSolrServer = true;
|
||||
}
|
||||
|
||||
|
@ -329,8 +334,38 @@ public class CloudSolrClient extends SolrClient {
|
|||
*/
|
||||
@Deprecated
|
||||
public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient, boolean updatesToLeaders) {
|
||||
this(zkHosts, chroot, httpClient, lbSolrClient, updatesToLeaders, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new client object that connects to Zookeeper and is always aware
|
||||
* of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
|
||||
* SolrCloud has enough replicas for every shard in a collection, there is no
|
||||
* single point of failure. Updates will be sent to shard leaders by default.
|
||||
*
|
||||
* @param zkHosts
|
||||
* A Java Collection (List, Set, etc) of HOST:PORT strings, one for
|
||||
* each host in the zookeeper ensemble. Note that with certain
|
||||
* Collection types like HashSet, the order of hosts in the final
|
||||
* connect string may not be in the same order you added them.
|
||||
* @param chroot
|
||||
* A chroot value for zookeeper, starting with a forward slash. If no
|
||||
* chroot is required, use null.
|
||||
* @param httpClient
|
||||
* the {@link HttpClient} instance to be used for all requests. The provided httpClient should use a
|
||||
* multi-threaded connection manager. If null, a default HttpClient will be used.
|
||||
* @param lbSolrClient
|
||||
* LBHttpSolrServer instance for requests. If null, a default HttpClient will be used.
|
||||
* @param updatesToLeaders
|
||||
* If true, sends updates to shard leaders.
|
||||
* @param directUpdatesToLeadersOnly
|
||||
* If true, sends direct updates to shard leaders only.
|
||||
*/
|
||||
private CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient,
|
||||
boolean updatesToLeaders, boolean directUpdatesToLeadersOnly) {
|
||||
this.zkHost = buildZkHostString(zkHosts, chroot);
|
||||
this.updatesToLeaders = updatesToLeaders;
|
||||
this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
|
||||
|
||||
this.clientIsInternal = httpClient == null;
|
||||
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
|
||||
|
@ -374,6 +409,7 @@ public class CloudSolrClient extends SolrClient {
|
|||
this.lbClient.setRequestWriter(new BinaryRequestWriter());
|
||||
this.lbClient.setParser(new BinaryResponseParser());
|
||||
this.updatesToLeaders = updatesToLeaders;
|
||||
this.directUpdatesToLeadersOnly = false;
|
||||
shutdownLBHttpSolrServer = true;
|
||||
lbClient.addQueryParams(STATE_VERSION);
|
||||
}
|
||||
|
@ -414,6 +450,7 @@ public class CloudSolrClient extends SolrClient {
|
|||
this.zkHost = zkHost;
|
||||
this.lbClient = lbClient;
|
||||
this.updatesToLeaders = updatesToLeaders;
|
||||
this.directUpdatesToLeadersOnly = false;
|
||||
shutdownLBHttpSolrServer = false;
|
||||
this.clientIsInternal = false;
|
||||
lbClient.addQueryParams(STATE_VERSION);
|
||||
|
@ -648,15 +685,18 @@ public class CloudSolrClient extends SolrClient {
|
|||
//Create the URL map, which is keyed on slice name.
|
||||
//The value is a list of URLs for each replica in the slice.
|
||||
//The first value in the list is the leader for the slice.
|
||||
Map<String,List<String>> urlMap = buildUrlMap(col);
|
||||
if (urlMap == null) {
|
||||
// we could not find a leader yet - use unoptimized general path
|
||||
return null;
|
||||
}
|
||||
|
||||
Map<String, LBHttpSolrClient.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);
|
||||
final Map<String,List<String>> urlMap = buildUrlMap(col);
|
||||
final Map<String, LBHttpSolrClient.Req> routes = (urlMap == null ? null : updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField));
|
||||
if (routes == null) {
|
||||
return null;
|
||||
if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
|
||||
// we have info (documents with ids and/or ids to delete) with
|
||||
// which to find the leaders but we could not find (all of) them
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
|
||||
"directUpdatesToLeadersOnly==true but could not find leader(s)");
|
||||
} else {
|
||||
// we could not find a leader or routes yet - use unoptimized general path
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
final NamedList<Throwable> exceptions = new NamedList<>();
|
||||
|
@ -764,21 +804,23 @@ public class CloudSolrClient extends SolrClient {
|
|||
List<String> urls = new ArrayList<>();
|
||||
Replica leader = slice.getLeader();
|
||||
if (leader == null) {
|
||||
if (directUpdatesToLeadersOnly) {
|
||||
continue;
|
||||
}
|
||||
// take unoptimized general path - we cannot find a leader yet
|
||||
return null;
|
||||
}
|
||||
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
|
||||
String url = zkProps.getCoreUrl();
|
||||
urls.add(url);
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
Iterator<Replica> replicaIterator = replicas.iterator();
|
||||
while (replicaIterator.hasNext()) {
|
||||
Replica replica = replicaIterator.next();
|
||||
if (!replica.getNodeName().equals(leader.getNodeName()) &&
|
||||
!replica.getName().equals(leader.getName())) {
|
||||
ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
|
||||
String url1 = zkProps1.getCoreUrl();
|
||||
urls.add(url1);
|
||||
if (!directUpdatesToLeadersOnly) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
if (!replica.getNodeName().equals(leader.getNodeName()) &&
|
||||
!replica.getName().equals(leader.getName())) {
|
||||
ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
|
||||
String url1 = zkProps1.getCoreUrl();
|
||||
urls.add(url1);
|
||||
}
|
||||
}
|
||||
}
|
||||
urlMap.put(name, urls);
|
||||
|
@ -1284,6 +1326,13 @@ public class CloudSolrClient extends SolrClient {
|
|||
return updatesToLeaders;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if direct updates are sent to shard leaders only
|
||||
*/
|
||||
public boolean isDirectUpdatesToLeadersOnly() {
|
||||
return directUpdatesToLeadersOnly;
|
||||
}
|
||||
|
||||
/**If caches are expired they are refreshed after acquiring a lock.
|
||||
* use this to set the number of locks
|
||||
*/
|
||||
|
@ -1417,6 +1466,31 @@ public class CloudSolrClient extends SolrClient {
|
|||
this.lbClient.setSoTimeout(timeout);
|
||||
}
|
||||
|
||||
private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, String idField) {
|
||||
final Map<SolrInputDocument,Map<String,Object>> documents = updateRequest.getDocumentsMap();
|
||||
final Map<String,Map<String,Object>> deleteById = updateRequest.getDeleteByIdMap();
|
||||
|
||||
final boolean hasNoDocuments = (documents == null || documents.isEmpty());
|
||||
final boolean hasNoDeleteById = (deleteById == null || deleteById.isEmpty());
|
||||
if (hasNoDocuments && hasNoDeleteById) {
|
||||
// no documents and no delete-by-id, so no info to find leader(s)
|
||||
return false;
|
||||
}
|
||||
|
||||
if (documents != null) {
|
||||
for (final Map.Entry<SolrInputDocument,Map<String,Object>> entry : documents.entrySet()) {
|
||||
final SolrInputDocument doc = entry.getKey();
|
||||
final Object fieldValue = doc.getFieldValue(idField);
|
||||
if (fieldValue == null) {
|
||||
// a document with no id field value, so can't find leader for it
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private static LBHttpSolrClient createLBHttpSolrClient(HttpClient httpClient) {
|
||||
final LBHttpSolrClient lbClient = new LBHttpSolrClient.Builder()
|
||||
.withHttpClient(httpClient)
|
||||
|
@ -1466,6 +1540,7 @@ public class CloudSolrClient extends SolrClient {
|
|||
private String zkChroot;
|
||||
private LBHttpSolrClient loadBalancedSolrClient;
|
||||
private boolean shardLeadersOnly;
|
||||
private boolean directUpdatesToLeadersOnly;
|
||||
|
||||
public Builder() {
|
||||
this.zkHosts = new ArrayList();
|
||||
|
@ -1542,11 +1617,29 @@ public class CloudSolrClient extends SolrClient {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tells {@link Builder} that created clients should send direct updates to shard leaders only.
|
||||
*/
|
||||
public Builder sendDirectUpdatesToShardLeadersOnly() {
|
||||
directUpdatesToLeadersOnly = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tells {@link Builder} that created clients can send updates
|
||||
* to any shard replica (shard leaders and non-leaders).
|
||||
*/
|
||||
public Builder sendDirectUpdatesToAnyShardReplica() {
|
||||
directUpdatesToLeadersOnly = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link CloudSolrClient} based on the provided configuration.
|
||||
*/
|
||||
public CloudSolrClient build() {
|
||||
return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient, shardLeadersOnly);
|
||||
return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient,
|
||||
shardLeadersOnly, directUpdatesToLeadersOnly);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -257,6 +257,9 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
return null;
|
||||
}
|
||||
List<String> urls = urlMap.get(slice.getName());
|
||||
if (urls == null) {
|
||||
return null;
|
||||
}
|
||||
String leaderUrl = urls.get(0);
|
||||
LBHttpSolrClient.Req request = (LBHttpSolrClient.Req) routes
|
||||
.get(leaderUrl);
|
||||
|
@ -305,6 +308,9 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
return null;
|
||||
}
|
||||
List<String> urls = urlMap.get(slice.getName());
|
||||
if (urls == null) {
|
||||
return null;
|
||||
}
|
||||
String leaderUrl = urls.get(0);
|
||||
LBHttpSolrClient.Req request = routes.get(leaderUrl);
|
||||
if (request != null) {
|
||||
|
|
|
@ -87,4 +87,14 @@ public class CloudSolrClientBuilderTest extends LuceneTestCase {
|
|||
assertTrue(createdClient.isUpdatesToLeaders() == true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsDirectUpdatesToLeadersOnlyDefault() throws IOException {
|
||||
try(CloudSolrClient createdClient = new Builder()
|
||||
.withZkHost(ANY_ZK_HOST)
|
||||
.withZkChroot(ANY_CHROOT)
|
||||
.build()) {
|
||||
assertFalse(createdClient.isDirectUpdatesToLeadersOnly());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -149,6 +149,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
|||
|
||||
// Test single threaded routed updates for UpdateRequest
|
||||
NamedList<Object> response = cluster.getSolrClient().request(request, COLLECTION);
|
||||
if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) {
|
||||
checkSingleServer(response);
|
||||
}
|
||||
CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
|
||||
Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
|
||||
Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it = routes.entrySet()
|
||||
|
@ -173,10 +176,13 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
|||
|
||||
// Test the deleteById routing for UpdateRequest
|
||||
|
||||
new UpdateRequest()
|
||||
final UpdateResponse uResponse = new UpdateRequest()
|
||||
.deleteById("0")
|
||||
.deleteById("2")
|
||||
.commit(cluster.getSolrClient(), COLLECTION);
|
||||
if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) {
|
||||
checkSingleServer(uResponse.getResponse());
|
||||
}
|
||||
|
||||
QueryResponse qResponse = cluster.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
|
||||
SolrDocumentList docs = qResponse.getResults();
|
||||
|
@ -187,6 +193,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
|||
threadedClient.setParallelUpdates(true);
|
||||
threadedClient.setDefaultCollection(COLLECTION);
|
||||
response = threadedClient.request(request);
|
||||
if (threadedClient.isDirectUpdatesToLeadersOnly()) {
|
||||
checkSingleServer(response);
|
||||
}
|
||||
rr = (CloudSolrClient.RouteResponse) response;
|
||||
routes = rr.getRoutes();
|
||||
it = routes.entrySet()
|
||||
|
@ -540,4 +549,17 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
|||
HttpClientUtil.close(client);
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkSingleServer(NamedList<Object> response) {
|
||||
final CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
|
||||
final Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
|
||||
final Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it =
|
||||
routes.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<String,LBHttpSolrClient.Req> entry = it.next();
|
||||
assertEquals("wrong number of servers: "+entry.getValue().getServers(),
|
||||
1, entry.getValue().getServers().size());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2041,11 +2041,55 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
return (0 == TestUtil.nextInt(random(), 0, 9)) ? unlikely : likely;
|
||||
}
|
||||
|
||||
public static class CloudSolrClientBuilder extends CloudSolrClient.Builder {
|
||||
|
||||
private boolean configuredDUTflag = false;
|
||||
|
||||
public CloudSolrClientBuilder() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloudSolrClient.Builder sendDirectUpdatesToShardLeadersOnly() {
|
||||
configuredDUTflag = true;
|
||||
return super.sendDirectUpdatesToShardLeadersOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloudSolrClient.Builder sendDirectUpdatesToAnyShardReplica() {
|
||||
configuredDUTflag = true;
|
||||
return super.sendDirectUpdatesToAnyShardReplica();
|
||||
}
|
||||
|
||||
private void randomlyChooseDirectUpdatesToLeadersOnly() {
|
||||
if (random().nextBoolean()) {
|
||||
sendDirectUpdatesToShardLeadersOnly();
|
||||
} else {
|
||||
sendDirectUpdatesToAnyShardReplica();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloudSolrClient build() {
|
||||
if (configuredDUTflag == false) {
|
||||
// flag value not explicity configured
|
||||
if (random().nextBoolean()) {
|
||||
// so randomly choose a value
|
||||
randomlyChooseDirectUpdatesToLeadersOnly();
|
||||
} else {
|
||||
// or go with whatever the default value is
|
||||
configuredDUTflag = true;
|
||||
}
|
||||
}
|
||||
return super.build();
|
||||
}
|
||||
}
|
||||
|
||||
public static CloudSolrClient getCloudSolrClient(String zkHost) {
|
||||
if (random().nextBoolean()) {
|
||||
return new CloudSolrClient(zkHost);
|
||||
}
|
||||
return new CloudSolrClient.Builder()
|
||||
return new CloudSolrClientBuilder()
|
||||
.withZkHost(zkHost)
|
||||
.build();
|
||||
}
|
||||
|
@ -2054,7 +2098,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
if (random().nextBoolean()) {
|
||||
return new CloudSolrClient(zkHost, httpClient);
|
||||
}
|
||||
return new CloudSolrClient.Builder()
|
||||
return new CloudSolrClientBuilder()
|
||||
.withZkHost(zkHost)
|
||||
.withHttpClient(httpClient)
|
||||
.build();
|
||||
|
@ -2066,12 +2110,12 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
}
|
||||
|
||||
if (shardLeadersOnly) {
|
||||
return new CloudSolrClient.Builder()
|
||||
return new CloudSolrClientBuilder()
|
||||
.withZkHost(zkHost)
|
||||
.sendUpdatesOnlyToShardLeaders()
|
||||
.build();
|
||||
}
|
||||
return new CloudSolrClient.Builder()
|
||||
return new CloudSolrClientBuilder()
|
||||
.withZkHost(zkHost)
|
||||
.sendUpdatesToAllReplicasInShard()
|
||||
.build();
|
||||
|
@ -2083,13 +2127,13 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
}
|
||||
|
||||
if (shardLeadersOnly) {
|
||||
return new CloudSolrClient.Builder()
|
||||
return new CloudSolrClientBuilder()
|
||||
.withZkHost(zkHost)
|
||||
.withHttpClient(httpClient)
|
||||
.sendUpdatesOnlyToShardLeaders()
|
||||
.build();
|
||||
}
|
||||
return new CloudSolrClient.Builder()
|
||||
return new CloudSolrClientBuilder()
|
||||
.withZkHost(zkHost)
|
||||
.withHttpClient(httpClient)
|
||||
.sendUpdatesToAllReplicasInShard()
|
||||
|
|
Loading…
Reference in New Issue