SOLR-9512: CloudSolrClient tries other replicas if a cached leader is down

This commit is contained in:
Alan Woodward 2016-09-19 15:29:14 +01:00
parent 1a3bacfc0f
commit f96017d9e1
6 changed files with 151 additions and 42 deletions

View File

@ -104,6 +104,9 @@ Bug Fixes
* SOLR-9522: Improve error handling in ZKPropertiesWriter (Varun Thacker)
* SOLR-9512: CloudSolrClient will try and keep up with leader changes if its
state cache points to a down server (Alan Woodward, noble)
Optimizations
----------------------

View File

@ -191,6 +191,10 @@ public class CloudSolrClient extends SolrClient {
}
}
private void invalidateCollectionState(String collection) {
collectionStateCache.remove(collection);
}
/**
* Create a new client object that connects to Zookeeper and is always aware
* of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
@ -721,23 +725,29 @@ public class CloudSolrClient extends SolrClient {
long start = System.nanoTime();
if (parallelUpdates) {
final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size());
final Map<String, Future<LBHttpSolrClient.Rsp>> responseFutures = new HashMap<>(routes.size());
for (final Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
final String url = entry.getKey();
final LBHttpSolrClient.Req lbRequest = entry.getValue();
try {
MDC.put("CloudSolrClient.url", url);
responseFutures.put(url, threadPool.submit(() -> lbClient.request(lbRequest).getResponse()));
responseFutures.put(url, threadPool.submit(() -> lbClient.request(lbRequest)));
} finally {
MDC.remove("CloudSolrClient.url");
}
}
for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
for (final Map.Entry<String, Future<LBHttpSolrClient.Rsp>> entry: responseFutures.entrySet()) {
final String url = entry.getKey();
final Future<NamedList<?>> responseFuture = entry.getValue();
final Future<LBHttpSolrClient.Rsp> responseFuture = entry.getValue();
try {
shardResponses.add(url, responseFuture.get());
LBHttpSolrClient.Rsp response = responseFuture.get();
shardResponses.add(url, response.getResponse());
if (url.startsWith(response.getServer())) { // startsWith to deal with stray trailing slashes
// we didn't hit our first-preference server, which means that our cached
// collection state is no longer valid
invalidateCollectionState(collection);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
@ -760,8 +770,14 @@ public class CloudSolrClient extends SolrClient {
String url = entry.getKey();
LBHttpSolrClient.Req lbRequest = entry.getValue();
try {
NamedList<Object> rsp = lbClient.request(lbRequest).getResponse();
LBHttpSolrClient.Rsp response = lbClient.request(lbRequest);
NamedList<Object> rsp = response.getResponse();
shardResponses.add(url, rsp);
if (response.getServer().equals(url) == false) {
// we didn't hit our first-preference server, which means that our cached
// collection state is no longer valid
invalidateCollectionState(collection);
}
} catch (Exception e) {
if(e instanceof SolrException) {
throw (SolrException) e;
@ -812,10 +828,7 @@ public class CloudSolrClient extends SolrClient {
private Map<String,List<String>> buildUrlMap(DocCollection col) {
Map<String, List<String>> urlMap = new HashMap<>();
Collection<Slice> slices = col.getActiveSlices();
Iterator<Slice> sliceIterator = slices.iterator();
while (sliceIterator.hasNext()) {
Slice slice = sliceIterator.next();
for (Slice slice : col) {
String name = slice.getName();
List<String> urls = new ArrayList<>();
Replica leader = slice.getLeader();
@ -826,19 +839,15 @@ public class CloudSolrClient extends SolrClient {
// take unoptimized general path - we cannot find a leader yet
return null;
}
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
String url = zkProps.getCoreUrl();
urls.add(url);
if (!directUpdatesToLeadersOnly) {
for (Replica replica : slice.getReplicas()) {
if (!replica.getNodeName().equals(leader.getNodeName()) &&
!replica.getName().equals(leader.getName())) {
ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
String url1 = zkProps1.getCoreUrl();
urls.add(url1);
}
urls.add(leader.getCoreUrl());
for (Replica replica : slice.getReplicas()) {
if (!replica.getNodeName().equals(leader.getNodeName()) &&
!replica.getName().equals(leader.getName())) {
urls.add(replica.getCoreUrl());
}
}
urlMap.put(name, urls);
}
return urlMap;

View File

@ -247,6 +247,19 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
return activeShards == expectedShards;
}
/**
* Check that all shards in a collection have a leader
*/
public static boolean isUpdateable(Set<String> liveNodes, DocCollection collectionState, int expectedShards) {
int updateableShards = 0;
for (Slice slice : collectionState) {
Replica leader = slice.getLeader();
if (leader != null && leader.isActive(liveNodes))
updateableShards++;
}
return updateableShards == expectedShards;
}
@Override
public Iterator<Slice> iterator() {
return slices.values().iterator();

View File

@ -156,9 +156,6 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
// Test single threaded routed updates for UpdateRequest
NamedList<Object> response = cluster.getSolrClient().request(request, COLLECTION);
if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) {
checkSingleServer(response);
}
CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it = routes.entrySet()
@ -187,9 +184,6 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
.deleteById("0")
.deleteById("2")
.commit(cluster.getSolrClient(), COLLECTION);
if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) {
checkSingleServer(uResponse.getResponse());
}
QueryResponse qResponse = cluster.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
SolrDocumentList docs = qResponse.getResults();
@ -200,9 +194,6 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
threadedClient.setParallelUpdates(true);
threadedClient.setDefaultCollection(COLLECTION);
response = threadedClient.request(request);
if (threadedClient.isDirectUpdatesToLeadersOnly()) {
checkSingleServer(response);
}
rr = (CloudSolrClient.RouteResponse) response;
routes = rr.getRoutes();
it = routes.entrySet()
@ -608,16 +599,4 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
}
}
private static void checkSingleServer(NamedList<Object> response) {
final CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
final Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
final Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it =
routes.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String,LBHttpSolrClient.Req> entry = it.next();
assertEquals("wrong number of servers: "+entry.getValue().getServers(),
1, entry.getValue().getServers().size());
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestCloudSolrClientStateCacheing extends SolrCloudTestCase {
private final String id = "id";
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(4)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
}
@Test
public void testCacheInvalidationOnLeaderChange() throws Exception {
final String collectionName = "cacheInvalidation";
try (CloudSolrClient solrClient = new CloudSolrClient.Builder()
.withZkHost(cluster.getZkServer().getZkAddress())
.sendDirectUpdatesToShardLeadersOnly()
.build()) {
CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
.process(solrClient);
// send one update that will populate the client's cluster state cache
new UpdateRequest()
.add(id, "0", "a_t", "hello1")
.add(id, "2", "a_t", "hello2")
.add(id, "3", "a_t", "hello2")
.commit(solrClient, collectionName);
// take down a leader node
JettySolrRunner leaderJetty = cluster.getLeaderJetty(collectionName, "shard1");
leaderJetty.stop();
// wait for a new leader to be elected
solrClient.waitForState(collectionName, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isUpdateable(n, c, 2));
// send another update - this should still succeed, even though the client's
// cached leader will be incorrect
new UpdateRequest()
.add(id, "4", "a_t", "hello1")
.add(id, "5", "a_t", "hello2")
.add(id, "6", "a_t", "hello2")
.commit(solrClient, collectionName);
}
}
}

View File

@ -46,6 +46,8 @@ import org.apache.solr.client.solrj.embedded.SSLConfig;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.cloud.ZkStateReader;
@ -501,4 +503,26 @@ public class MiniSolrCloudCluster {
}
return ok ? null : parsed;
}
/**
* Get the Jetty that a particular Replica is located on
*/
public JettySolrRunner getReplicaJetty(Replica replica) {
for (JettySolrRunner jetty : jettys) {
if (replica.getCoreUrl().startsWith(jetty.getBaseUrl().toString()))
return jetty;
}
throw new IllegalStateException("No jetty found for replica with core url " + replica.getCoreUrl());
}
/**
* Get the Jetty that the leader of a particular collection shard is located on
*/
public JettySolrRunner getLeaderJetty(String collectionName, String shard) {
DocCollection collectionState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
Replica leader = collectionState.getLeader(shard);
if (leader == null)
throw new IllegalStateException("No leader for shard " + shard);
return getReplicaJetty(leader);
}
}