mirror of https://github.com/apache/lucene.git
Revert "SOLR-9512: CloudSolrClient tries other replicas if a cached leader is down"
This reverts commit f96017d9e1
.
This commit is contained in:
parent
74bf88f8fe
commit
a4293ce7c4
|
@ -104,9 +104,6 @@ Bug Fixes
|
||||||
|
|
||||||
* SOLR-9522: Improve error handling in ZKPropertiesWriter (Varun Thacker)
|
* SOLR-9522: Improve error handling in ZKPropertiesWriter (Varun Thacker)
|
||||||
|
|
||||||
* SOLR-9512: CloudSolrClient will try and keep up with leader changes if its
|
|
||||||
state cache points to a down server (Alan Woodward, noble)
|
|
||||||
|
|
||||||
* SOLR-8080: bin/solr start script now exits with informative message if using wrong Java version (janhoy)
|
* SOLR-8080: bin/solr start script now exits with informative message if using wrong Java version (janhoy)
|
||||||
|
|
||||||
* SOLR-9475: bin/install_solr_service.sh script got improved detection of Linux distro, especially within
|
* SOLR-9475: bin/install_solr_service.sh script got improved detection of Linux distro, especially within
|
||||||
|
|
|
@ -191,10 +191,6 @@ public class CloudSolrClient extends SolrClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void invalidateCollectionState(String collection) {
|
|
||||||
collectionStateCache.remove(collection);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new client object that connects to Zookeeper and is always aware
|
* Create a new client object that connects to Zookeeper and is always aware
|
||||||
* of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
|
* of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
|
||||||
|
@ -725,29 +721,23 @@ public class CloudSolrClient extends SolrClient {
|
||||||
long start = System.nanoTime();
|
long start = System.nanoTime();
|
||||||
|
|
||||||
if (parallelUpdates) {
|
if (parallelUpdates) {
|
||||||
final Map<String, Future<LBHttpSolrClient.Rsp>> responseFutures = new HashMap<>(routes.size());
|
final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size());
|
||||||
for (final Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
|
for (final Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
|
||||||
final String url = entry.getKey();
|
final String url = entry.getKey();
|
||||||
final LBHttpSolrClient.Req lbRequest = entry.getValue();
|
final LBHttpSolrClient.Req lbRequest = entry.getValue();
|
||||||
try {
|
try {
|
||||||
MDC.put("CloudSolrClient.url", url);
|
MDC.put("CloudSolrClient.url", url);
|
||||||
responseFutures.put(url, threadPool.submit(() -> lbClient.request(lbRequest)));
|
responseFutures.put(url, threadPool.submit(() -> lbClient.request(lbRequest).getResponse()));
|
||||||
} finally {
|
} finally {
|
||||||
MDC.remove("CloudSolrClient.url");
|
MDC.remove("CloudSolrClient.url");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final Map.Entry<String, Future<LBHttpSolrClient.Rsp>> entry: responseFutures.entrySet()) {
|
for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
|
||||||
final String url = entry.getKey();
|
final String url = entry.getKey();
|
||||||
final Future<LBHttpSolrClient.Rsp> responseFuture = entry.getValue();
|
final Future<NamedList<?>> responseFuture = entry.getValue();
|
||||||
try {
|
try {
|
||||||
LBHttpSolrClient.Rsp response = responseFuture.get();
|
shardResponses.add(url, responseFuture.get());
|
||||||
shardResponses.add(url, response.getResponse());
|
|
||||||
if (url.startsWith(response.getServer())) { // startsWith to deal with stray trailing slashes
|
|
||||||
// we didn't hit our first-preference server, which means that our cached
|
|
||||||
// collection state is no longer valid
|
|
||||||
invalidateCollectionState(collection);
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
@ -770,14 +760,8 @@ public class CloudSolrClient extends SolrClient {
|
||||||
String url = entry.getKey();
|
String url = entry.getKey();
|
||||||
LBHttpSolrClient.Req lbRequest = entry.getValue();
|
LBHttpSolrClient.Req lbRequest = entry.getValue();
|
||||||
try {
|
try {
|
||||||
LBHttpSolrClient.Rsp response = lbClient.request(lbRequest);
|
NamedList<Object> rsp = lbClient.request(lbRequest).getResponse();
|
||||||
NamedList<Object> rsp = response.getResponse();
|
|
||||||
shardResponses.add(url, rsp);
|
shardResponses.add(url, rsp);
|
||||||
if (response.getServer().equals(url) == false) {
|
|
||||||
// we didn't hit our first-preference server, which means that our cached
|
|
||||||
// collection state is no longer valid
|
|
||||||
invalidateCollectionState(collection);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if(e instanceof SolrException) {
|
if(e instanceof SolrException) {
|
||||||
throw (SolrException) e;
|
throw (SolrException) e;
|
||||||
|
@ -828,7 +812,10 @@ public class CloudSolrClient extends SolrClient {
|
||||||
|
|
||||||
private Map<String,List<String>> buildUrlMap(DocCollection col) {
|
private Map<String,List<String>> buildUrlMap(DocCollection col) {
|
||||||
Map<String, List<String>> urlMap = new HashMap<>();
|
Map<String, List<String>> urlMap = new HashMap<>();
|
||||||
for (Slice slice : col) {
|
Collection<Slice> slices = col.getActiveSlices();
|
||||||
|
Iterator<Slice> sliceIterator = slices.iterator();
|
||||||
|
while (sliceIterator.hasNext()) {
|
||||||
|
Slice slice = sliceIterator.next();
|
||||||
String name = slice.getName();
|
String name = slice.getName();
|
||||||
List<String> urls = new ArrayList<>();
|
List<String> urls = new ArrayList<>();
|
||||||
Replica leader = slice.getLeader();
|
Replica leader = slice.getLeader();
|
||||||
|
@ -839,15 +826,19 @@ public class CloudSolrClient extends SolrClient {
|
||||||
// take unoptimized general path - we cannot find a leader yet
|
// take unoptimized general path - we cannot find a leader yet
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
urls.add(leader.getCoreUrl());
|
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
|
||||||
|
String url = zkProps.getCoreUrl();
|
||||||
|
urls.add(url);
|
||||||
|
if (!directUpdatesToLeadersOnly) {
|
||||||
for (Replica replica : slice.getReplicas()) {
|
for (Replica replica : slice.getReplicas()) {
|
||||||
if (!replica.getNodeName().equals(leader.getNodeName()) &&
|
if (!replica.getNodeName().equals(leader.getNodeName()) &&
|
||||||
!replica.getName().equals(leader.getName())) {
|
!replica.getName().equals(leader.getName())) {
|
||||||
urls.add(replica.getCoreUrl());
|
ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
|
||||||
|
String url1 = zkProps1.getCoreUrl();
|
||||||
|
urls.add(url1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
urlMap.put(name, urls);
|
urlMap.put(name, urls);
|
||||||
}
|
}
|
||||||
return urlMap;
|
return urlMap;
|
||||||
|
|
|
@ -247,19 +247,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
|
||||||
return activeShards == expectedShards;
|
return activeShards == expectedShards;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check that all shards in a collection have a leader
|
|
||||||
*/
|
|
||||||
public static boolean isUpdateable(Set<String> liveNodes, DocCollection collectionState, int expectedShards) {
|
|
||||||
int updateableShards = 0;
|
|
||||||
for (Slice slice : collectionState) {
|
|
||||||
Replica leader = slice.getLeader();
|
|
||||||
if (leader != null && leader.isActive(liveNodes))
|
|
||||||
updateableShards++;
|
|
||||||
}
|
|
||||||
return updateableShards == expectedShards;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<Slice> iterator() {
|
public Iterator<Slice> iterator() {
|
||||||
return slices.values().iterator();
|
return slices.values().iterator();
|
||||||
|
|
|
@ -156,6 +156,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
// Test single threaded routed updates for UpdateRequest
|
// Test single threaded routed updates for UpdateRequest
|
||||||
NamedList<Object> response = cluster.getSolrClient().request(request, COLLECTION);
|
NamedList<Object> response = cluster.getSolrClient().request(request, COLLECTION);
|
||||||
|
if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) {
|
||||||
|
checkSingleServer(response);
|
||||||
|
}
|
||||||
CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
|
CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
|
||||||
Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
|
Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
|
||||||
Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it = routes.entrySet()
|
Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it = routes.entrySet()
|
||||||
|
@ -184,6 +187,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
||||||
.deleteById("0")
|
.deleteById("0")
|
||||||
.deleteById("2")
|
.deleteById("2")
|
||||||
.commit(cluster.getSolrClient(), COLLECTION);
|
.commit(cluster.getSolrClient(), COLLECTION);
|
||||||
|
if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) {
|
||||||
|
checkSingleServer(uResponse.getResponse());
|
||||||
|
}
|
||||||
|
|
||||||
QueryResponse qResponse = cluster.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
|
QueryResponse qResponse = cluster.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
|
||||||
SolrDocumentList docs = qResponse.getResults();
|
SolrDocumentList docs = qResponse.getResults();
|
||||||
|
@ -194,6 +200,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
||||||
threadedClient.setParallelUpdates(true);
|
threadedClient.setParallelUpdates(true);
|
||||||
threadedClient.setDefaultCollection(COLLECTION);
|
threadedClient.setDefaultCollection(COLLECTION);
|
||||||
response = threadedClient.request(request);
|
response = threadedClient.request(request);
|
||||||
|
if (threadedClient.isDirectUpdatesToLeadersOnly()) {
|
||||||
|
checkSingleServer(response);
|
||||||
|
}
|
||||||
rr = (CloudSolrClient.RouteResponse) response;
|
rr = (CloudSolrClient.RouteResponse) response;
|
||||||
routes = rr.getRoutes();
|
routes = rr.getRoutes();
|
||||||
it = routes.entrySet()
|
it = routes.entrySet()
|
||||||
|
@ -599,4 +608,16 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void checkSingleServer(NamedList<Object> response) {
|
||||||
|
final CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
|
||||||
|
final Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
|
||||||
|
final Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it =
|
||||||
|
routes.entrySet().iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
Map.Entry<String,LBHttpSolrClient.Req> entry = it.next();
|
||||||
|
assertEquals("wrong number of servers: "+entry.getValue().getServers(),
|
||||||
|
1, entry.getValue().getServers().size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,81 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.solr.client.solrj.impl;
|
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
|
||||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
|
||||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class TestCloudSolrClientStateCacheing extends SolrCloudTestCase {
|
|
||||||
|
|
||||||
private final String id = "id";
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setupCluster() throws Exception {
|
|
||||||
configureCluster(4)
|
|
||||||
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
|
|
||||||
.configure();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCacheInvalidationOnLeaderChange() throws Exception {
|
|
||||||
|
|
||||||
final String collectionName = "cacheInvalidation";
|
|
||||||
|
|
||||||
try (CloudSolrClient solrClient = new CloudSolrClient.Builder()
|
|
||||||
.withZkHost(cluster.getZkServer().getZkAddress())
|
|
||||||
.sendDirectUpdatesToShardLeadersOnly()
|
|
||||||
.build()) {
|
|
||||||
|
|
||||||
CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
|
|
||||||
.process(solrClient);
|
|
||||||
|
|
||||||
// send one update that will populate the client's cluster state cache
|
|
||||||
new UpdateRequest()
|
|
||||||
.add(id, "0", "a_t", "hello1")
|
|
||||||
.add(id, "2", "a_t", "hello2")
|
|
||||||
.add(id, "3", "a_t", "hello2")
|
|
||||||
.commit(solrClient, collectionName);
|
|
||||||
|
|
||||||
// take down a leader node
|
|
||||||
JettySolrRunner leaderJetty = cluster.getLeaderJetty(collectionName, "shard1");
|
|
||||||
leaderJetty.stop();
|
|
||||||
|
|
||||||
// wait for a new leader to be elected
|
|
||||||
solrClient.waitForState(collectionName, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
|
|
||||||
(n, c) -> DocCollection.isUpdateable(n, c, 2));
|
|
||||||
|
|
||||||
// send another update - this should still succeed, even though the client's
|
|
||||||
// cached leader will be incorrect
|
|
||||||
new UpdateRequest()
|
|
||||||
.add(id, "4", "a_t", "hello1")
|
|
||||||
.add(id, "5", "a_t", "hello2")
|
|
||||||
.add(id, "6", "a_t", "hello2")
|
|
||||||
.commit(solrClient, collectionName);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -46,8 +46,6 @@ import org.apache.solr.client.solrj.embedded.SSLConfig;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
|
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
|
||||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
|
||||||
import org.apache.solr.common.cloud.Replica;
|
|
||||||
import org.apache.solr.common.cloud.SolrZkClient;
|
import org.apache.solr.common.cloud.SolrZkClient;
|
||||||
import org.apache.solr.common.cloud.ZkConfigManager;
|
import org.apache.solr.common.cloud.ZkConfigManager;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
@ -503,26 +501,4 @@ public class MiniSolrCloudCluster {
|
||||||
}
|
}
|
||||||
return ok ? null : parsed;
|
return ok ? null : parsed;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the Jetty that a particular Replica is located on
|
|
||||||
*/
|
|
||||||
public JettySolrRunner getReplicaJetty(Replica replica) {
|
|
||||||
for (JettySolrRunner jetty : jettys) {
|
|
||||||
if (replica.getCoreUrl().startsWith(jetty.getBaseUrl().toString()))
|
|
||||||
return jetty;
|
|
||||||
}
|
|
||||||
throw new IllegalStateException("No jetty found for replica with core url " + replica.getCoreUrl());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the Jetty that the leader of a particular collection shard is located on
|
|
||||||
*/
|
|
||||||
public JettySolrRunner getLeaderJetty(String collectionName, String shard) {
|
|
||||||
DocCollection collectionState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
|
||||||
Replica leader = collectionState.getLeader(shard);
|
|
||||||
if (leader == null)
|
|
||||||
throw new IllegalStateException("No leader for shard " + shard);
|
|
||||||
return getReplicaJetty(leader);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue