From 976079a8ee8a2cff1c8df01ae9f2856b3ddcdac3 Mon Sep 17 00:00:00 2001 From: Christine Poerschke Date: Tue, 5 Jul 2016 17:14:47 +0100 Subject: [PATCH] SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient. (Marvin Justice, Christine Poerschke) --- solr/CHANGES.txt | 3 + .../client/solrj/impl/CloudSolrClient.java | 129 +++++++++++++++--- .../client/solrj/request/UpdateRequest.java | 6 + .../impl/CloudSolrClientBuilderTest.java | 10 ++ .../solrj/impl/CloudSolrClientTest.java | 24 +++- .../java/org/apache/solr/SolrTestCaseJ4.java | 56 +++++++- 6 files changed, 203 insertions(+), 25 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 95fa7962629..40add1d4a77 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -88,6 +88,9 @@ New Features * SOLR-9243: Add terms.list parameter to the TermsComponent to fetch the docFreq for a list of terms (Joel Bernstein) +* SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient. + (Marvin Justice, Christine Poerschke) + Bug Fixes ---------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java index f5e18b3027c..876f7f86cba 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -53,6 +53,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.ToleratedUpdateError; import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; @@ -117,6 +118,7 @@ public class CloudSolrClient extends SolrClient { Random rand = new Random(); private final boolean updatesToLeaders; + private final boolean directUpdatesToLeadersOnly; private boolean parallelUpdates = true; private ExecutorService threadPool = ExecutorUtil .newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory( @@ -206,6 +208,7 @@ public class CloudSolrClient extends SolrClient { this.lbClient.setRequestWriter(new BinaryRequestWriter()); this.lbClient.setParser(new BinaryResponseParser()); this.updatesToLeaders = true; + this.directUpdatesToLeadersOnly = false; shutdownLBHttpSolrServer = true; lbClient.addQueryParams(STATE_VERSION); } @@ -242,6 +245,7 @@ public class CloudSolrClient extends SolrClient { this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient; this.lbClient = createLBHttpSolrClient(myClient); this.updatesToLeaders = true; + this.directUpdatesToLeadersOnly = false; shutdownLBHttpSolrServer = true; lbClient.addQueryParams(STATE_VERSION); } @@ -299,6 +303,7 @@ public class CloudSolrClient extends SolrClient { this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient; this.lbClient = createLBHttpSolrClient(myClient); this.updatesToLeaders = true; + this.directUpdatesToLeadersOnly = false; shutdownLBHttpSolrServer = true; } @@ -329,8 +334,38 @@ public class CloudSolrClient extends SolrClient { */ @Deprecated public CloudSolrClient(Collection zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient, boolean updatesToLeaders) { + this(zkHosts, chroot, httpClient, lbSolrClient, updatesToLeaders, false); + } + + /** + * Create a new client object that connects to Zookeeper and is always aware + * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and + * SolrCloud has enough replicas for every shard in a collection, there is no + * single point of failure. Updates will be sent to shard leaders by default. + * + * @param zkHosts + * A Java Collection (List, Set, etc) of HOST:PORT strings, one for + * each host in the zookeeper ensemble. Note that with certain + * Collection types like HashSet, the order of hosts in the final + * connect string may not be in the same order you added them. + * @param chroot + * A chroot value for zookeeper, starting with a forward slash. If no + * chroot is required, use null. + * @param httpClient + * the {@link HttpClient} instance to be used for all requests. The provided httpClient should use a + * multi-threaded connection manager. If null, a default HttpClient will be used. + * @param lbSolrClient + * LBHttpSolrServer instance for requests. If null, a default HttpClient will be used. + * @param updatesToLeaders + * If true, sends updates to shard leaders. + * @param directUpdatesToLeadersOnly + * If true, sends direct updates to shard leaders only. + */ + private CloudSolrClient(Collection zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient, + boolean updatesToLeaders, boolean directUpdatesToLeadersOnly) { this.zkHost = buildZkHostString(zkHosts, chroot); this.updatesToLeaders = updatesToLeaders; + this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly; this.clientIsInternal = httpClient == null; this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient; @@ -374,6 +409,7 @@ public class CloudSolrClient extends SolrClient { this.lbClient.setRequestWriter(new BinaryRequestWriter()); this.lbClient.setParser(new BinaryResponseParser()); this.updatesToLeaders = updatesToLeaders; + this.directUpdatesToLeadersOnly = false; shutdownLBHttpSolrServer = true; lbClient.addQueryParams(STATE_VERSION); } @@ -414,6 +450,7 @@ public class CloudSolrClient extends SolrClient { this.zkHost = zkHost; this.lbClient = lbClient; this.updatesToLeaders = updatesToLeaders; + this.directUpdatesToLeadersOnly = false; shutdownLBHttpSolrServer = false; this.clientIsInternal = false; lbClient.addQueryParams(STATE_VERSION); @@ -648,15 +685,18 @@ public class CloudSolrClient extends SolrClient { //Create the URL map, which is keyed on slice name. //The value is a list of URLs for each replica in the slice. //The first value in the list is the leader for the slice. - Map> urlMap = buildUrlMap(col); - if (urlMap == null) { - // we could not find a leader yet - use unoptimized general path - return null; - } - - Map routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField); + final Map> urlMap = buildUrlMap(col); + final Map routes = (urlMap == null ? null : updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField)); if (routes == null) { - return null; + if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) { + // we have info (documents with ids and/or ids to delete) with + // which to find the leaders but we could not find (all of) them + throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, + "directUpdatesToLeadersOnly==true but could not find leader(s)"); + } else { + // we could not find a leader or routes yet - use unoptimized general path + return null; + } } final NamedList exceptions = new NamedList<>(); @@ -764,21 +804,23 @@ public class CloudSolrClient extends SolrClient { List urls = new ArrayList<>(); Replica leader = slice.getLeader(); if (leader == null) { + if (directUpdatesToLeadersOnly) { + continue; + } // take unoptimized general path - we cannot find a leader yet return null; } ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader); String url = zkProps.getCoreUrl(); urls.add(url); - Collection replicas = slice.getReplicas(); - Iterator replicaIterator = replicas.iterator(); - while (replicaIterator.hasNext()) { - Replica replica = replicaIterator.next(); - if (!replica.getNodeName().equals(leader.getNodeName()) && - !replica.getName().equals(leader.getName())) { - ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica); - String url1 = zkProps1.getCoreUrl(); - urls.add(url1); + if (!directUpdatesToLeadersOnly) { + for (Replica replica : slice.getReplicas()) { + if (!replica.getNodeName().equals(leader.getNodeName()) && + !replica.getName().equals(leader.getName())) { + ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica); + String url1 = zkProps1.getCoreUrl(); + urls.add(url1); + } } } urlMap.put(name, urls); @@ -1284,6 +1326,13 @@ public class CloudSolrClient extends SolrClient { return updatesToLeaders; } + /** + * @return true if direct updates are sent to shard leaders only + */ + public boolean isDirectUpdatesToLeadersOnly() { + return directUpdatesToLeadersOnly; + } + /**If caches are expired they are refreshed after acquiring a lock. * use this to set the number of locks */ @@ -1417,6 +1466,31 @@ public class CloudSolrClient extends SolrClient { this.lbClient.setSoTimeout(timeout); } + private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, String idField) { + final Map> documents = updateRequest.getDocumentsMap(); + final Map> deleteById = updateRequest.getDeleteByIdMap(); + + final boolean hasNoDocuments = (documents == null || documents.isEmpty()); + final boolean hasNoDeleteById = (deleteById == null || deleteById.isEmpty()); + if (hasNoDocuments && hasNoDeleteById) { + // no documents and no delete-by-id, so no info to find leader(s) + return false; + } + + if (documents != null) { + for (final Map.Entry> entry : documents.entrySet()) { + final SolrInputDocument doc = entry.getKey(); + final Object fieldValue = doc.getFieldValue(idField); + if (fieldValue == null) { + // a document with no id field value, so can't find leader for it + return false; + } + } + } + + return true; + } + private static LBHttpSolrClient createLBHttpSolrClient(HttpClient httpClient) { final LBHttpSolrClient lbClient = new LBHttpSolrClient.Builder() .withHttpClient(httpClient) @@ -1466,6 +1540,7 @@ public class CloudSolrClient extends SolrClient { private String zkChroot; private LBHttpSolrClient loadBalancedSolrClient; private boolean shardLeadersOnly; + private boolean directUpdatesToLeadersOnly; public Builder() { this.zkHosts = new ArrayList(); @@ -1542,11 +1617,29 @@ public class CloudSolrClient extends SolrClient { return this; } + /** + * Tells {@link Builder} that created clients should send direct updates to shard leaders only. + */ + public Builder sendDirectUpdatesToShardLeadersOnly() { + directUpdatesToLeadersOnly = true; + return this; + } + + /** + * Tells {@link Builder} that created clients can send updates + * to any shard replica (shard leaders and non-leaders). + */ + public Builder sendDirectUpdatesToAnyShardReplica() { + directUpdatesToLeadersOnly = false; + return this; + } + /** * Create a {@link CloudSolrClient} based on the provided configuration. */ public CloudSolrClient build() { - return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient, shardLeadersOnly); + return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient, + shardLeadersOnly, directUpdatesToLeadersOnly); } } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java index f93a197783b..aec6e22e41a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java @@ -257,6 +257,9 @@ public class UpdateRequest extends AbstractUpdateRequest { return null; } List urls = urlMap.get(slice.getName()); + if (urls == null) { + return null; + } String leaderUrl = urls.get(0); LBHttpSolrClient.Req request = (LBHttpSolrClient.Req) routes .get(leaderUrl); @@ -305,6 +308,9 @@ public class UpdateRequest extends AbstractUpdateRequest { return null; } List urls = urlMap.get(slice.getName()); + if (urls == null) { + return null; + } String leaderUrl = urls.get(0); LBHttpSolrClient.Req request = routes.get(leaderUrl); if (request != null) { diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java index 57692c75e3f..5f1c5c20118 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java @@ -87,4 +87,14 @@ public class CloudSolrClientBuilderTest extends LuceneTestCase { assertTrue(createdClient.isUpdatesToLeaders() == true); } } + + @Test + public void testIsDirectUpdatesToLeadersOnlyDefault() throws IOException { + try(CloudSolrClient createdClient = new Builder() + .withZkHost(ANY_ZK_HOST) + .withZkChroot(ANY_CHROOT) + .build()) { + assertFalse(createdClient.isDirectUpdatesToLeadersOnly()); + } + } } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java index 616ddc47237..cf12036b300 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java @@ -149,6 +149,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase { // Test single threaded routed updates for UpdateRequest NamedList response = cluster.getSolrClient().request(request, COLLECTION); + if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) { + checkSingleServer(response); + } CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response; Map routes = rr.getRoutes(); Iterator> it = routes.entrySet() @@ -173,10 +176,13 @@ public class CloudSolrClientTest extends SolrCloudTestCase { // Test the deleteById routing for UpdateRequest - new UpdateRequest() + final UpdateResponse uResponse = new UpdateRequest() .deleteById("0") .deleteById("2") .commit(cluster.getSolrClient(), COLLECTION); + if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) { + checkSingleServer(uResponse.getResponse()); + } QueryResponse qResponse = cluster.getSolrClient().query(COLLECTION, new SolrQuery("*:*")); SolrDocumentList docs = qResponse.getResults(); @@ -187,6 +193,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase { threadedClient.setParallelUpdates(true); threadedClient.setDefaultCollection(COLLECTION); response = threadedClient.request(request); + if (threadedClient.isDirectUpdatesToLeadersOnly()) { + checkSingleServer(response); + } rr = (CloudSolrClient.RouteResponse) response; routes = rr.getRoutes(); it = routes.entrySet() @@ -540,4 +549,17 @@ public class CloudSolrClientTest extends SolrCloudTestCase { HttpClientUtil.close(client); } } + + private static void checkSingleServer(NamedList response) { + final CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response; + final Map routes = rr.getRoutes(); + final Iterator> it = + routes.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + assertEquals("wrong number of servers: "+entry.getValue().getServers(), + 1, entry.getValue().getServers().size()); + } + } + } diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java index 7a414543150..ea70805ab2a 100644 --- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java +++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java @@ -2041,11 +2041,55 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase { return (0 == TestUtil.nextInt(random(), 0, 9)) ? unlikely : likely; } + public static class CloudSolrClientBuilder extends CloudSolrClient.Builder { + + private boolean configuredDUTflag = false; + + public CloudSolrClientBuilder() { + super(); + } + + @Override + public CloudSolrClient.Builder sendDirectUpdatesToShardLeadersOnly() { + configuredDUTflag = true; + return super.sendDirectUpdatesToShardLeadersOnly(); + } + + @Override + public CloudSolrClient.Builder sendDirectUpdatesToAnyShardReplica() { + configuredDUTflag = true; + return super.sendDirectUpdatesToAnyShardReplica(); + } + + private void randomlyChooseDirectUpdatesToLeadersOnly() { + if (random().nextBoolean()) { + sendDirectUpdatesToShardLeadersOnly(); + } else { + sendDirectUpdatesToAnyShardReplica(); + } + } + + @Override + public CloudSolrClient build() { + if (configuredDUTflag == false) { + // flag value not explicity configured + if (random().nextBoolean()) { + // so randomly choose a value + randomlyChooseDirectUpdatesToLeadersOnly(); + } else { + // or go with whatever the default value is + configuredDUTflag = true; + } + } + return super.build(); + } + } + public static CloudSolrClient getCloudSolrClient(String zkHost) { if (random().nextBoolean()) { return new CloudSolrClient(zkHost); } - return new CloudSolrClient.Builder() + return new CloudSolrClientBuilder() .withZkHost(zkHost) .build(); } @@ -2054,7 +2098,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase { if (random().nextBoolean()) { return new CloudSolrClient(zkHost, httpClient); } - return new CloudSolrClient.Builder() + return new CloudSolrClientBuilder() .withZkHost(zkHost) .withHttpClient(httpClient) .build(); @@ -2066,12 +2110,12 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase { } if (shardLeadersOnly) { - return new CloudSolrClient.Builder() + return new CloudSolrClientBuilder() .withZkHost(zkHost) .sendUpdatesOnlyToShardLeaders() .build(); } - return new CloudSolrClient.Builder() + return new CloudSolrClientBuilder() .withZkHost(zkHost) .sendUpdatesToAllReplicasInShard() .build(); @@ -2083,13 +2127,13 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase { } if (shardLeadersOnly) { - return new CloudSolrClient.Builder() + return new CloudSolrClientBuilder() .withZkHost(zkHost) .withHttpClient(httpClient) .sendUpdatesOnlyToShardLeaders() .build(); } - return new CloudSolrClient.Builder() + return new CloudSolrClientBuilder() .withZkHost(zkHost) .withHttpClient(httpClient) .sendUpdatesToAllReplicasInShard()