mirror of https://github.com/apache/lucene.git
SOLR-9504: A replica with an empty index becomes the leader even when other more qualified replicas are in line
(cherry picked from commit ce24de5
)
This commit is contained in:
parent
64ab29a796
commit
effd224576
|
@ -101,6 +101,9 @@ Bug Fixes
|
||||||
|
|
||||||
* SOLR-9411: Better validation for Schema API add-field and add-dynamic-field (janhoy, Steve Rowe)
|
* SOLR-9411: Better validation for Schema API add-field and add-dynamic-field (janhoy, Steve Rowe)
|
||||||
|
|
||||||
|
* SOLR-9504: A replica with an empty index becomes the leader even when other more qualified replicas
|
||||||
|
are in line. (shalin)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.solr.core.CoreContainer;
|
||||||
import org.apache.solr.core.SolrCore;
|
import org.apache.solr.core.SolrCore;
|
||||||
import org.apache.solr.logging.MDCLoggingContext;
|
import org.apache.solr.logging.MDCLoggingContext;
|
||||||
import org.apache.solr.search.SolrIndexSearcher;
|
import org.apache.solr.search.SolrIndexSearcher;
|
||||||
|
import org.apache.solr.update.PeerSync;
|
||||||
import org.apache.solr.update.UpdateLog;
|
import org.apache.solr.update.UpdateLog;
|
||||||
import org.apache.solr.util.RefCounted;
|
import org.apache.solr.util.RefCounted;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
|
@ -359,13 +360,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
||||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, e);
|
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PeerSync.PeerSyncResult result = null;
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
success = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
|
result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
|
||||||
|
success = result.isSuccess();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SolrException.log(log, "Exception while trying to sync", e);
|
SolrException.log(log, "Exception while trying to sync", e);
|
||||||
success = false;
|
result = PeerSync.PeerSyncResult.failure();
|
||||||
}
|
}
|
||||||
|
|
||||||
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
|
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
|
||||||
|
@ -382,10 +385,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
||||||
if (!hasRecentUpdates) {
|
if (!hasRecentUpdates) {
|
||||||
// we failed sync, but we have no versions - we can't sync in that case
|
// we failed sync, but we have no versions - we can't sync in that case
|
||||||
// - we were active
|
// - we were active
|
||||||
// before, so become leader anyway
|
// before, so become leader anyway if no one else has any versions either
|
||||||
log.info(
|
if (result.getOtherHasVersions().orElse(false)) {
|
||||||
"We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
|
log.info("We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader");
|
||||||
success = true;
|
success = false;
|
||||||
|
} else {
|
||||||
|
log.info(
|
||||||
|
"We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
|
||||||
|
success = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -373,7 +373,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
|
||||||
PeerSync peerSync = new PeerSync(core,
|
PeerSync peerSync = new PeerSync(core,
|
||||||
Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
|
Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
|
||||||
peerSync.setStartingVersions(recentVersions);
|
peerSync.setStartingVersions(recentVersions);
|
||||||
boolean syncSuccess = peerSync.sync();
|
boolean syncSuccess = peerSync.sync().isSuccess();
|
||||||
if (syncSuccess) {
|
if (syncSuccess) {
|
||||||
SolrQueryRequest req = new LocalSolrQueryRequest(core,
|
SolrQueryRequest req = new LocalSolrQueryRequest(core,
|
||||||
new ModifiableSolrParams());
|
new ModifiableSolrParams());
|
||||||
|
|
|
@ -77,23 +77,21 @@ public class SyncStrategy {
|
||||||
public String baseUrl;
|
public String baseUrl;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean sync(ZkController zkController, SolrCore core, ZkNodeProps leaderProps) {
|
public PeerSync.PeerSyncResult sync(ZkController zkController, SolrCore core, ZkNodeProps leaderProps) {
|
||||||
return sync(zkController, core, leaderProps, false);
|
return sync(zkController, core, leaderProps, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean sync(ZkController zkController, SolrCore core, ZkNodeProps leaderProps,
|
public PeerSync.PeerSyncResult sync(ZkController zkController, SolrCore core, ZkNodeProps leaderProps,
|
||||||
boolean peerSyncOnlyWithActive) {
|
boolean peerSyncOnlyWithActive) {
|
||||||
if (SKIP_AUTO_RECOVERY) {
|
if (SKIP_AUTO_RECOVERY) {
|
||||||
return true;
|
return PeerSync.PeerSyncResult.success();
|
||||||
}
|
}
|
||||||
|
|
||||||
MDCLoggingContext.setCore(core);
|
MDCLoggingContext.setCore(core);
|
||||||
try {
|
try {
|
||||||
boolean success;
|
|
||||||
|
|
||||||
if (isClosed) {
|
if (isClosed) {
|
||||||
log.warn("Closed, skipping sync up.");
|
log.warn("Closed, skipping sync up.");
|
||||||
return false;
|
return PeerSync.PeerSyncResult.failure();
|
||||||
}
|
}
|
||||||
|
|
||||||
recoveryRequests.clear();
|
recoveryRequests.clear();
|
||||||
|
@ -102,40 +100,40 @@ public class SyncStrategy {
|
||||||
|
|
||||||
if (core.getUpdateHandler().getUpdateLog() == null) {
|
if (core.getUpdateHandler().getUpdateLog() == null) {
|
||||||
log.error("No UpdateLog found - cannot sync");
|
log.error("No UpdateLog found - cannot sync");
|
||||||
return false;
|
return PeerSync.PeerSyncResult.failure();
|
||||||
}
|
}
|
||||||
|
|
||||||
success = syncReplicas(zkController, core, leaderProps, peerSyncOnlyWithActive);
|
return syncReplicas(zkController, core, leaderProps, peerSyncOnlyWithActive);
|
||||||
|
|
||||||
return success;
|
|
||||||
} finally {
|
} finally {
|
||||||
MDCLoggingContext.clear();
|
MDCLoggingContext.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean syncReplicas(ZkController zkController, SolrCore core,
|
private PeerSync.PeerSyncResult syncReplicas(ZkController zkController, SolrCore core,
|
||||||
ZkNodeProps leaderProps, boolean peerSyncOnlyWithActive) {
|
ZkNodeProps leaderProps, boolean peerSyncOnlyWithActive) {
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
|
PeerSync.PeerSyncResult result = null;
|
||||||
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
|
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
|
||||||
String collection = cloudDesc.getCollectionName();
|
String collection = cloudDesc.getCollectionName();
|
||||||
String shardId = cloudDesc.getShardId();
|
String shardId = cloudDesc.getShardId();
|
||||||
|
|
||||||
if (isClosed) {
|
if (isClosed) {
|
||||||
log.info("We have been closed, won't sync with replicas");
|
log.info("We have been closed, won't sync with replicas");
|
||||||
return false;
|
return PeerSync.PeerSyncResult.failure();
|
||||||
}
|
}
|
||||||
|
|
||||||
// first sync ourselves - we are the potential leader after all
|
// first sync ourselves - we are the potential leader after all
|
||||||
try {
|
try {
|
||||||
success = syncWithReplicas(zkController, core, leaderProps, collection,
|
result = syncWithReplicas(zkController, core, leaderProps, collection,
|
||||||
shardId, peerSyncOnlyWithActive);
|
shardId, peerSyncOnlyWithActive);
|
||||||
|
success = result.isSuccess();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SolrException.log(log, "Sync Failed", e);
|
SolrException.log(log, "Sync Failed", e);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (isClosed) {
|
if (isClosed) {
|
||||||
log.info("We have been closed, won't attempt to sync replicas back to leader");
|
log.info("We have been closed, won't attempt to sync replicas back to leader");
|
||||||
return false;
|
return PeerSync.PeerSyncResult.failure();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (success) {
|
if (success) {
|
||||||
|
@ -152,17 +150,17 @@ public class SyncStrategy {
|
||||||
SolrException.log(log, "Sync Failed", e);
|
SolrException.log(log, "Sync Failed", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
return success;
|
return result == null ? PeerSync.PeerSyncResult.failure() : result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean syncWithReplicas(ZkController zkController, SolrCore core,
|
private PeerSync.PeerSyncResult syncWithReplicas(ZkController zkController, SolrCore core,
|
||||||
ZkNodeProps props, String collection, String shardId, boolean peerSyncOnlyWithActive) {
|
ZkNodeProps props, String collection, String shardId, boolean peerSyncOnlyWithActive) {
|
||||||
List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
|
List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
|
||||||
.getReplicaProps(collection, shardId,core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
|
.getReplicaProps(collection, shardId,core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
|
||||||
|
|
||||||
if (nodes == null) {
|
if (nodes == null) {
|
||||||
// I have no replicas
|
// I have no replicas
|
||||||
return true;
|
return PeerSync.PeerSyncResult.success();
|
||||||
}
|
}
|
||||||
|
|
||||||
List<String> syncWith = new ArrayList<>(nodes.size());
|
List<String> syncWith = new ArrayList<>(nodes.size());
|
||||||
|
|
|
@ -65,7 +65,7 @@ class RequestSyncShardOp implements CoreAdminHandler.CoreAdminOp {
|
||||||
props.put(ZkStateReader.CORE_NAME_PROP, cname);
|
props.put(ZkStateReader.CORE_NAME_PROP, cname);
|
||||||
props.put(ZkStateReader.NODE_NAME_PROP, zkController.getNodeName());
|
props.put(ZkStateReader.NODE_NAME_PROP, zkController.getNodeName());
|
||||||
|
|
||||||
boolean success = syncStrategy.sync(zkController, core, new ZkNodeProps(props), true);
|
boolean success = syncStrategy.sync(zkController, core, new ZkNodeProps(props), true).isSuccess();
|
||||||
// solrcloud_debug
|
// solrcloud_debug
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -675,7 +675,7 @@ public class RealTimeGetComponent extends SearchComponent
|
||||||
boolean cantReachIsSuccess = rb.req.getParams().getBool("cantReachIsSuccess", false);
|
boolean cantReachIsSuccess = rb.req.getParams().getBool("cantReachIsSuccess", false);
|
||||||
|
|
||||||
PeerSync peerSync = new PeerSync(rb.req.getCore(), replicas, nVersions, cantReachIsSuccess, true);
|
PeerSync peerSync = new PeerSync(rb.req.getCore(), replicas, nVersions, cantReachIsSuccess, true);
|
||||||
boolean success = peerSync.sync();
|
boolean success = peerSync.sync().isSuccess();
|
||||||
|
|
||||||
// TODO: more complex response?
|
// TODO: more complex response?
|
||||||
rb.rsp.add("sync", success);
|
rb.rsp.add("sync", success);
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -172,12 +173,42 @@ public class PeerSync {
|
||||||
return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
|
return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class PeerSyncResult {
|
||||||
|
private final boolean success;
|
||||||
|
private final Boolean otherHasVersions;
|
||||||
|
|
||||||
|
public PeerSyncResult(boolean success, Boolean otherHasVersions) {
|
||||||
|
this.success = success;
|
||||||
|
this.otherHasVersions = otherHasVersions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isSuccess() {
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<Boolean> getOtherHasVersions() {
|
||||||
|
return Optional.ofNullable(otherHasVersions);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static PeerSyncResult success() {
|
||||||
|
return new PeerSyncResult(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static PeerSyncResult failure() {
|
||||||
|
return new PeerSyncResult(false, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static PeerSyncResult failure(boolean otherHasVersions) {
|
||||||
|
return new PeerSyncResult(false, otherHasVersions);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Returns true if peer sync was successful, meaning that this core may be considered to have the latest updates.
|
/** Returns true if peer sync was successful, meaning that this core may be considered to have the latest updates.
|
||||||
* It does not mean that the remote replica is in sync with us.
|
* It does not mean that the remote replica is in sync with us.
|
||||||
*/
|
*/
|
||||||
public boolean sync() {
|
public PeerSyncResult sync() {
|
||||||
if (ulog == null) {
|
if (ulog == null) {
|
||||||
return false;
|
return PeerSyncResult.failure();
|
||||||
}
|
}
|
||||||
MDCLoggingContext.setCore(core);
|
MDCLoggingContext.setCore(core);
|
||||||
try {
|
try {
|
||||||
|
@ -190,7 +221,7 @@ public class PeerSync {
|
||||||
}
|
}
|
||||||
// check if we already in sync to begin with
|
// check if we already in sync to begin with
|
||||||
if(doFingerprint && alreadyInSync()) {
|
if(doFingerprint && alreadyInSync()) {
|
||||||
return true;
|
return PeerSyncResult.success();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -211,7 +242,7 @@ public class PeerSync {
|
||||||
if (startingVersions != null) {
|
if (startingVersions != null) {
|
||||||
if (startingVersions.size() == 0) {
|
if (startingVersions.size() == 0) {
|
||||||
log.warn("no frame of reference to tell if we've missed updates");
|
log.warn("no frame of reference to tell if we've missed updates");
|
||||||
return false;
|
return PeerSyncResult.failure();
|
||||||
}
|
}
|
||||||
Collections.sort(startingVersions, absComparator);
|
Collections.sort(startingVersions, absComparator);
|
||||||
|
|
||||||
|
@ -226,7 +257,7 @@ public class PeerSync {
|
||||||
if (Math.abs(startingVersions.get(0)) < smallestNewUpdate) {
|
if (Math.abs(startingVersions.get(0)) < smallestNewUpdate) {
|
||||||
log.warn(msg()
|
log.warn(msg()
|
||||||
+ "too many updates received since start - startingUpdates no longer overlaps with our currentUpdates");
|
+ "too many updates received since start - startingUpdates no longer overlaps with our currentUpdates");
|
||||||
return false;
|
return PeerSyncResult.failure();
|
||||||
}
|
}
|
||||||
|
|
||||||
// let's merge the lists
|
// let's merge the lists
|
||||||
|
@ -248,7 +279,17 @@ public class PeerSync {
|
||||||
// we have no versions and hence no frame of reference to tell if we can use a peers
|
// we have no versions and hence no frame of reference to tell if we can use a peers
|
||||||
// updates to bring us into sync
|
// updates to bring us into sync
|
||||||
log.info(msg() + "DONE. We have no versions. sync failed.");
|
log.info(msg() + "DONE. We have no versions. sync failed.");
|
||||||
return false;
|
for (;;) {
|
||||||
|
ShardResponse srsp = shardHandler.takeCompletedOrError();
|
||||||
|
if (srsp == null) break;
|
||||||
|
if (srsp.getException() == null) {
|
||||||
|
List<Long> otherVersions = (List<Long>)srsp.getSolrResponse().getResponse().get("versions");
|
||||||
|
if (otherVersions != null && !otherVersions.isEmpty()) {
|
||||||
|
return PeerSyncResult.failure(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return PeerSyncResult.failure(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,7 +304,7 @@ public class PeerSync {
|
||||||
if (!success) {
|
if (!success) {
|
||||||
log.info(msg() + "DONE. sync failed");
|
log.info(msg() + "DONE. sync failed");
|
||||||
shardHandler.cancelAll();
|
shardHandler.cancelAll();
|
||||||
return false;
|
return PeerSyncResult.failure();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -277,7 +318,7 @@ public class PeerSync {
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info(msg() + "DONE. sync " + (success ? "succeeded" : "failed"));
|
log.info(msg() + "DONE. sync " + (success ? "succeeded" : "failed"));
|
||||||
return success;
|
return success ? PeerSyncResult.success() : PeerSyncResult.failure();
|
||||||
} finally {
|
} finally {
|
||||||
MDCLoggingContext.clear();
|
MDCLoggingContext.clear();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,125 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrQuery;
|
||||||
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
import org.apache.solr.common.cloud.Slice;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See SOLR-9504
|
||||||
|
*/
|
||||||
|
public class TestLeaderElectionWithEmptyReplica extends SolrCloudTestCase {
|
||||||
|
private static final String COLLECTION_NAME = "solr_9504";
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void beforeClass() throws Exception {
|
||||||
|
useFactory(null);
|
||||||
|
configureCluster(2)
|
||||||
|
.addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
|
||||||
|
.configure();
|
||||||
|
|
||||||
|
CollectionAdminRequest.createCollection(COLLECTION_NAME, "config", 1, 1)
|
||||||
|
.processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
|
||||||
|
|
||||||
|
cluster.getSolrClient().waitForState(COLLECTION_NAME, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
|
||||||
|
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
solrClient.setDefaultCollection(COLLECTION_NAME);
|
||||||
|
for (int i=0; i<10; i++) {
|
||||||
|
SolrInputDocument doc = new SolrInputDocument();
|
||||||
|
doc.addField("id", String.valueOf(i));
|
||||||
|
solrClient.add(doc);
|
||||||
|
}
|
||||||
|
solrClient.commit();
|
||||||
|
|
||||||
|
// find the leader node
|
||||||
|
Replica replica = solrClient.getZkStateReader().getLeaderRetry(COLLECTION_NAME, "shard1");
|
||||||
|
JettySolrRunner replicaJetty = null;
|
||||||
|
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
|
||||||
|
for (JettySolrRunner jettySolrRunner : jettySolrRunners) {
|
||||||
|
int port = jettySolrRunner.getBaseUrl().getPort();
|
||||||
|
if (replica.getStr(BASE_URL_PROP).contains(":" + port)) {
|
||||||
|
replicaJetty = jettySolrRunner;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// kill the leader
|
||||||
|
ChaosMonkey.kill(replicaJetty);
|
||||||
|
|
||||||
|
// add a replica (asynchronously)
|
||||||
|
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(COLLECTION_NAME, "shard1");
|
||||||
|
String asyncId = addReplica.processAsync(solrClient);
|
||||||
|
|
||||||
|
// wait a bit
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// bring the old leader node back up
|
||||||
|
ChaosMonkey.start(replicaJetty);
|
||||||
|
|
||||||
|
// wait until everyone is active
|
||||||
|
solrClient.waitForState(COLLECTION_NAME, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
|
||||||
|
(n, c) -> DocCollection.isFullyActive(n, c, 1, 2));
|
||||||
|
|
||||||
|
// now query each replica and check for consistency
|
||||||
|
assertConsistentReplicas(solrClient, solrClient.getZkStateReader().getClusterState().getSlice(COLLECTION_NAME, "shard1"));
|
||||||
|
|
||||||
|
// sanity check that documents still exist
|
||||||
|
QueryResponse response = solrClient.query(new SolrQuery("*:*"));
|
||||||
|
assertEquals("Indexed documents not found", 10, response.getResults().getNumFound());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int assertConsistentReplicas(CloudSolrClient cloudClient, Slice shard) throws SolrServerException, IOException {
|
||||||
|
long numFound = Long.MIN_VALUE;
|
||||||
|
int count = 0;
|
||||||
|
for (Replica replica : shard.getReplicas()) {
|
||||||
|
HttpSolrClient client = new HttpSolrClient.Builder(replica.getCoreUrl())
|
||||||
|
.withHttpClient(cloudClient.getLbClient().getHttpClient()).build();
|
||||||
|
QueryResponse response = client.query(new SolrQuery("q", "*:*", "distrib", "false"));
|
||||||
|
// log.info("Found numFound={} on replica: {}", response.getResults().getNumFound(), replica.getCoreUrl());
|
||||||
|
if (numFound == Long.MIN_VALUE) {
|
||||||
|
numFound = response.getResults().getNumFound();
|
||||||
|
} else {
|
||||||
|
assertEquals("Shard " + shard.getName() + " replicas do not have same number of documents", numFound, response.getResults().getNumFound());
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue