SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient. (Marvin Justice, Christine Poerschke)

This commit is contained in:
Christine Poerschke 2016-07-05 17:14:47 +01:00
parent 7ee15727c9
commit 85c8f22ca9
6 changed files with 203 additions and 25 deletions

View File

@ -58,6 +58,9 @@ New Features
Spatial4j package "com.spatial4j.core" it is rewritten to "org.locationtech.spatial4j" with a warning.
(David Smiley)
* SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient.
(Marvin Justice, Christine Poerschke)
Bug Fixes
----------------------

View File

@ -53,6 +53,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.ToleratedUpdateError;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
@ -117,6 +118,7 @@ public class CloudSolrClient extends SolrClient {
Random rand = new Random();
private final boolean updatesToLeaders;
private final boolean directUpdatesToLeadersOnly;
private boolean parallelUpdates = true;
private ExecutorService threadPool = ExecutorUtil
.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
@ -206,6 +208,7 @@ public class CloudSolrClient extends SolrClient {
this.lbClient.setRequestWriter(new BinaryRequestWriter());
this.lbClient.setParser(new BinaryResponseParser());
this.updatesToLeaders = true;
this.directUpdatesToLeadersOnly = false;
shutdownLBHttpSolrServer = true;
lbClient.addQueryParams(STATE_VERSION);
}
@ -242,6 +245,7 @@ public class CloudSolrClient extends SolrClient {
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
this.lbClient = createLBHttpSolrClient(myClient);
this.updatesToLeaders = true;
this.directUpdatesToLeadersOnly = false;
shutdownLBHttpSolrServer = true;
lbClient.addQueryParams(STATE_VERSION);
}
@ -299,6 +303,7 @@ public class CloudSolrClient extends SolrClient {
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
this.lbClient = createLBHttpSolrClient(myClient);
this.updatesToLeaders = true;
this.directUpdatesToLeadersOnly = false;
shutdownLBHttpSolrServer = true;
}
@ -329,8 +334,38 @@ public class CloudSolrClient extends SolrClient {
*/
@Deprecated
public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient, boolean updatesToLeaders) {
this(zkHosts, chroot, httpClient, lbSolrClient, updatesToLeaders, false);
}
/**
* Create a new client object that connects to Zookeeper and is always aware
* of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
* SolrCloud has enough replicas for every shard in a collection, there is no
* single point of failure. Updates will be sent to shard leaders by default.
*
* @param zkHosts
* A Java Collection (List, Set, etc) of HOST:PORT strings, one for
* each host in the zookeeper ensemble. Note that with certain
* Collection types like HashSet, the order of hosts in the final
* connect string may not be in the same order you added them.
* @param chroot
* A chroot value for zookeeper, starting with a forward slash. If no
* chroot is required, use null.
* @param httpClient
* the {@link HttpClient} instance to be used for all requests. The provided httpClient should use a
* multi-threaded connection manager. If null, a default HttpClient will be used.
* @param lbSolrClient
* LBHttpSolrServer instance for requests. If null, a default HttpClient will be used.
* @param updatesToLeaders
* If true, sends updates to shard leaders.
* @param directUpdatesToLeadersOnly
* If true, sends direct updates to shard leaders only.
*/
private CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient,
boolean updatesToLeaders, boolean directUpdatesToLeadersOnly) {
this.zkHost = buildZkHostString(zkHosts, chroot);
this.updatesToLeaders = updatesToLeaders;
this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
this.clientIsInternal = httpClient == null;
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
@ -374,6 +409,7 @@ public class CloudSolrClient extends SolrClient {
this.lbClient.setRequestWriter(new BinaryRequestWriter());
this.lbClient.setParser(new BinaryResponseParser());
this.updatesToLeaders = updatesToLeaders;
this.directUpdatesToLeadersOnly = false;
shutdownLBHttpSolrServer = true;
lbClient.addQueryParams(STATE_VERSION);
}
@ -414,6 +450,7 @@ public class CloudSolrClient extends SolrClient {
this.zkHost = zkHost;
this.lbClient = lbClient;
this.updatesToLeaders = updatesToLeaders;
this.directUpdatesToLeadersOnly = false;
shutdownLBHttpSolrServer = false;
this.clientIsInternal = false;
lbClient.addQueryParams(STATE_VERSION);
@ -648,15 +685,18 @@ public class CloudSolrClient extends SolrClient {
//Create the URL map, which is keyed on slice name.
//The value is a list of URLs for each replica in the slice.
//The first value in the list is the leader for the slice.
Map<String,List<String>> urlMap = buildUrlMap(col);
if (urlMap == null) {
// we could not find a leader yet - use unoptimized general path
return null;
}
Map<String, LBHttpSolrClient.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);
final Map<String,List<String>> urlMap = buildUrlMap(col);
final Map<String, LBHttpSolrClient.Req> routes = (urlMap == null ? null : updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField));
if (routes == null) {
return null;
if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
// we have info (documents with ids and/or ids to delete) with
// which to find the leaders but we could not find (all of) them
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"directUpdatesToLeadersOnly==true but could not find leader(s)");
} else {
// we could not find a leader or routes yet - use unoptimized general path
return null;
}
}
final NamedList<Throwable> exceptions = new NamedList<>();
@ -764,21 +804,23 @@ public class CloudSolrClient extends SolrClient {
List<String> urls = new ArrayList<>();
Replica leader = slice.getLeader();
if (leader == null) {
if (directUpdatesToLeadersOnly) {
continue;
}
// take unoptimized general path - we cannot find a leader yet
return null;
}
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
String url = zkProps.getCoreUrl();
urls.add(url);
Collection<Replica> replicas = slice.getReplicas();
Iterator<Replica> replicaIterator = replicas.iterator();
while (replicaIterator.hasNext()) {
Replica replica = replicaIterator.next();
if (!replica.getNodeName().equals(leader.getNodeName()) &&
!replica.getName().equals(leader.getName())) {
ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
String url1 = zkProps1.getCoreUrl();
urls.add(url1);
if (!directUpdatesToLeadersOnly) {
for (Replica replica : slice.getReplicas()) {
if (!replica.getNodeName().equals(leader.getNodeName()) &&
!replica.getName().equals(leader.getName())) {
ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
String url1 = zkProps1.getCoreUrl();
urls.add(url1);
}
}
}
urlMap.put(name, urls);
@ -1284,6 +1326,13 @@ public class CloudSolrClient extends SolrClient {
return updatesToLeaders;
}
/**
* @return true if direct updates are sent to shard leaders only
*/
public boolean isDirectUpdatesToLeadersOnly() {
return directUpdatesToLeadersOnly;
}
/**If caches are expired they are refreshed after acquiring a lock.
* use this to set the number of locks
*/
@ -1409,6 +1458,31 @@ public class CloudSolrClient extends SolrClient {
return results;
}
private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, String idField) {
final Map<SolrInputDocument,Map<String,Object>> documents = updateRequest.getDocumentsMap();
final Map<String,Map<String,Object>> deleteById = updateRequest.getDeleteByIdMap();
final boolean hasNoDocuments = (documents == null || documents.isEmpty());
final boolean hasNoDeleteById = (deleteById == null || deleteById.isEmpty());
if (hasNoDocuments && hasNoDeleteById) {
// no documents and no delete-by-id, so no info to find leader(s)
return false;
}
if (documents != null) {
for (final Map.Entry<SolrInputDocument,Map<String,Object>> entry : documents.entrySet()) {
final SolrInputDocument doc = entry.getKey();
final Object fieldValue = doc.getFieldValue(idField);
if (fieldValue == null) {
// a document with no id field value, so can't find leader for it
return false;
}
}
}
return true;
}
private static LBHttpSolrClient createLBHttpSolrClient(HttpClient httpClient) {
final LBHttpSolrClient lbClient = new LBHttpSolrClient.Builder()
.withHttpClient(httpClient)
@ -1458,6 +1532,7 @@ public class CloudSolrClient extends SolrClient {
private String zkChroot;
private LBHttpSolrClient loadBalancedSolrClient;
private boolean shardLeadersOnly;
private boolean directUpdatesToLeadersOnly;
public Builder() {
this.zkHosts = new ArrayList();
@ -1534,11 +1609,29 @@ public class CloudSolrClient extends SolrClient {
return this;
}
/**
* Tells {@link Builder} that created clients should send direct updates to shard leaders only.
*/
public Builder sendDirectUpdatesToShardLeadersOnly() {
directUpdatesToLeadersOnly = true;
return this;
}
/**
* Tells {@link Builder} that created clients can send updates
* to any shard replica (shard leaders and non-leaders).
*/
public Builder sendDirectUpdatesToAnyShardReplica() {
directUpdatesToLeadersOnly = false;
return this;
}
/**
* Create a {@link CloudSolrClient} based on the provided configuration.
*/
public CloudSolrClient build() {
return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient, shardLeadersOnly);
return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient,
shardLeadersOnly, directUpdatesToLeadersOnly);
}
}
}

View File

@ -257,6 +257,9 @@ public class UpdateRequest extends AbstractUpdateRequest {
return null;
}
List<String> urls = urlMap.get(slice.getName());
if (urls == null) {
return null;
}
String leaderUrl = urls.get(0);
LBHttpSolrClient.Req request = (LBHttpSolrClient.Req) routes
.get(leaderUrl);
@ -305,6 +308,9 @@ public class UpdateRequest extends AbstractUpdateRequest {
return null;
}
List<String> urls = urlMap.get(slice.getName());
if (urls == null) {
return null;
}
String leaderUrl = urls.get(0);
LBHttpSolrClient.Req request = routes.get(leaderUrl);
if (request != null) {

View File

@ -87,4 +87,14 @@ public class CloudSolrClientBuilderTest extends LuceneTestCase {
assertTrue(createdClient.isUpdatesToLeaders() == true);
}
}
@Test
public void testIsDirectUpdatesToLeadersOnlyDefault() throws IOException {
try(CloudSolrClient createdClient = new Builder()
.withZkHost(ANY_ZK_HOST)
.withZkChroot(ANY_CHROOT)
.build()) {
assertFalse(createdClient.isDirectUpdatesToLeadersOnly());
}
}
}

View File

@ -149,6 +149,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
// Test single threaded routed updates for UpdateRequest
NamedList<Object> response = cluster.getSolrClient().request(request, COLLECTION);
if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) {
checkSingleServer(response);
}
CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it = routes.entrySet()
@ -173,10 +176,13 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
// Test the deleteById routing for UpdateRequest
new UpdateRequest()
final UpdateResponse uResponse = new UpdateRequest()
.deleteById("0")
.deleteById("2")
.commit(cluster.getSolrClient(), COLLECTION);
if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) {
checkSingleServer(uResponse.getResponse());
}
QueryResponse qResponse = cluster.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
SolrDocumentList docs = qResponse.getResults();
@ -187,6 +193,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
threadedClient.setParallelUpdates(true);
threadedClient.setDefaultCollection(COLLECTION);
response = threadedClient.request(request);
if (threadedClient.isDirectUpdatesToLeadersOnly()) {
checkSingleServer(response);
}
rr = (CloudSolrClient.RouteResponse) response;
routes = rr.getRoutes();
it = routes.entrySet()
@ -540,4 +549,17 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
assertTrue(solrClient.getLbClient().getHttpClient() == client);
}
}
private static void checkSingleServer(NamedList<Object> response) {
final CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
final Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
final Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it =
routes.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String,LBHttpSolrClient.Req> entry = it.next();
assertEquals("wrong number of servers: "+entry.getValue().getServers(),
1, entry.getValue().getServers().size());
}
}
}

View File

@ -2042,11 +2042,55 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
return (0 == TestUtil.nextInt(random(), 0, 9)) ? unlikely : likely;
}
public static class CloudSolrClientBuilder extends CloudSolrClient.Builder {
private boolean configuredDUTflag = false;
public CloudSolrClientBuilder() {
super();
}
@Override
public CloudSolrClient.Builder sendDirectUpdatesToShardLeadersOnly() {
configuredDUTflag = true;
return super.sendDirectUpdatesToShardLeadersOnly();
}
@Override
public CloudSolrClient.Builder sendDirectUpdatesToAnyShardReplica() {
configuredDUTflag = true;
return super.sendDirectUpdatesToAnyShardReplica();
}
private void randomlyChooseDirectUpdatesToLeadersOnly() {
if (random().nextBoolean()) {
sendDirectUpdatesToShardLeadersOnly();
} else {
sendDirectUpdatesToAnyShardReplica();
}
}
@Override
public CloudSolrClient build() {
if (configuredDUTflag == false) {
// flag value not explicity configured
if (random().nextBoolean()) {
// so randomly choose a value
randomlyChooseDirectUpdatesToLeadersOnly();
} else {
// or go with whatever the default value is
configuredDUTflag = true;
}
}
return super.build();
}
}
public static CloudSolrClient getCloudSolrClient(String zkHost) {
if (random().nextBoolean()) {
return new CloudSolrClient(zkHost);
}
return new CloudSolrClient.Builder()
return new CloudSolrClientBuilder()
.withZkHost(zkHost)
.build();
}
@ -2055,7 +2099,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
if (random().nextBoolean()) {
return new CloudSolrClient(zkHost, httpClient);
}
return new CloudSolrClient.Builder()
return new CloudSolrClientBuilder()
.withZkHost(zkHost)
.withHttpClient(httpClient)
.build();
@ -2067,12 +2111,12 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
}
if (shardLeadersOnly) {
return new CloudSolrClient.Builder()
return new CloudSolrClientBuilder()
.withZkHost(zkHost)
.sendUpdatesOnlyToShardLeaders()
.build();
}
return new CloudSolrClient.Builder()
return new CloudSolrClientBuilder()
.withZkHost(zkHost)
.sendUpdatesToAllReplicasInShard()
.build();
@ -2084,13 +2128,13 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
}
if (shardLeadersOnly) {
return new CloudSolrClient.Builder()
return new CloudSolrClientBuilder()
.withZkHost(zkHost)
.withHttpClient(httpClient)
.sendUpdatesOnlyToShardLeaders()
.build();
}
return new CloudSolrClient.Builder()
return new CloudSolrClientBuilder()
.withZkHost(zkHost)
.withHttpClient(httpClient)
.sendUpdatesToAllReplicasInShard()