mirror of https://github.com/apache/lucene.git
SOLR-12051: Election timeout when no replicas are qualified to become leader
This commit is contained in:
parent
7dfb04ee5e
commit
423a8cf69c
|
@ -78,8 +78,11 @@ Upgrade Notes
|
||||||
* LUCENE-8161: If you are using the spatial JTS library with Solr, you must upgrade to 1.15.0. This new version
|
* LUCENE-8161: If you are using the spatial JTS library with Solr, you must upgrade to 1.15.0. This new version
|
||||||
of JTS is now dual-licensed to include a BSD style license.
|
of JTS is now dual-licensed to include a BSD style license.
|
||||||
|
|
||||||
* SOLR-12011: Replicas which are not up-to-date are not allowed to become leader. Use FORCELEADER API to
|
* SOLR-12051: By using term value introduced in SOLR-11702, a replica will know that it is in-sync with the leader or not.
|
||||||
allow these replicas become leader.
|
If all the replicas who participate in the election are out-of-sync with previous leader, the election will hang for
|
||||||
|
leaderVoteWait before allowing out-of-sync with previous leader replicas become leader. Note that the new leader
|
||||||
|
still needs to contains more updates than any other active replicas in the same shard. Therefore by increasing
|
||||||
|
leaderVoteWait will increase the consistency (over availability) of the system.
|
||||||
|
|
||||||
* SOLR-11957: The default Solr log file size and number of backups is raised to 32MB and 10 respectively
|
* SOLR-11957: The default Solr log file size and number of backups is raised to 32MB and 10 respectively
|
||||||
|
|
||||||
|
@ -373,6 +376,8 @@ Other Changes
|
||||||
|
|
||||||
* SOLR-12047: Increase checkStateInZk timeout (Cao Manh Dat, Varun Thacker)
|
* SOLR-12047: Increase checkStateInZk timeout (Cao Manh Dat, Varun Thacker)
|
||||||
|
|
||||||
|
* SOLR-12051: Election timeout when no replicas are qualified to become leader (Cao Manh Dat)
|
||||||
|
|
||||||
================== 7.2.1 ==================
|
================== 7.2.1 ==================
|
||||||
|
|
||||||
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
||||||
|
|
|
@ -346,7 +346,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
Replica.Type replicaType;
|
Replica.Type replicaType;
|
||||||
|
String coreNodeName;
|
||||||
try (SolrCore core = cc.getCore(coreName)) {
|
try (SolrCore core = cc.getCore(coreName)) {
|
||||||
|
|
||||||
if (core == null) {
|
if (core == null) {
|
||||||
|
@ -360,13 +360,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
|
replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
|
||||||
String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
|
coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
|
||||||
// should I be leader?
|
// should I be leader?
|
||||||
if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
|
ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
|
||||||
&& !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
|
if (zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
|
||||||
log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
|
if (!waitForEligibleBecomeLeaderAfterTimeout(zkShardTerms, coreNodeName, leaderVoteWait)) {
|
||||||
rejoinLeaderElection(core);
|
log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
|
||||||
return;
|
rejoinLeaderElection(core);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isClosed) {
|
if (isClosed) {
|
||||||
|
@ -467,7 +469,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// in case of leaderVoteWait timeout, a replica with lower term can win the election
|
||||||
|
if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)) {
|
||||||
|
zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreNodeName);
|
||||||
|
}
|
||||||
super.runLeaderProcess(weAreReplacement, 0);
|
super.runLeaderProcess(weAreReplacement, 0);
|
||||||
try (SolrCore core = cc.getCore(coreName)) {
|
try (SolrCore core = cc.getCore(coreName)) {
|
||||||
if (core != null) {
|
if (core != null) {
|
||||||
|
@ -517,6 +522,53 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for other replicas with higher terms participate in the electioon
|
||||||
|
* @return true if after {@code timeout} there are no other replicas with higher term participate in the election,
|
||||||
|
* false if otherwise
|
||||||
|
*/
|
||||||
|
private boolean waitForEligibleBecomeLeaderAfterTimeout(ZkShardTerms zkShardTerms, String coreNodeName, int timeout) throws InterruptedException {
|
||||||
|
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
|
||||||
|
while (!isClosed && !cc.isShutDown()) {
|
||||||
|
if (System.nanoTime() > timeoutAt) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (replicasWithHigherTermParticipated(zkShardTerms, coreNodeName)) {
|
||||||
|
log.info("Can't become leader, other replicas with higher term participated in leader election");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Thread.sleep(500L);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Do other replicas with higher term participated in the election
|
||||||
|
* @return true if other replicas with higher term participated in the election, false if otherwise
|
||||||
|
*/
|
||||||
|
private boolean replicasWithHigherTermParticipated(ZkShardTerms zkShardTerms, String coreNodeName) {
|
||||||
|
ClusterState clusterState = zkController.getClusterState();
|
||||||
|
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
|
||||||
|
Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
|
||||||
|
if (slices == null) return false;
|
||||||
|
|
||||||
|
long replicaTerm = zkShardTerms.getTerm(coreNodeName);
|
||||||
|
boolean isRecovering = zkShardTerms.isRecovering(coreNodeName);
|
||||||
|
|
||||||
|
for (Replica replica : slices.getReplicas()) {
|
||||||
|
if (replica.getName().equals(coreNodeName)) continue;
|
||||||
|
|
||||||
|
if (clusterState.getLiveNodes().contains(replica.getNodeName())) {
|
||||||
|
long otherTerm = zkShardTerms.getTerm(replica.getName());
|
||||||
|
boolean isOtherReplicaRecovering = zkShardTerms.isRecovering(replica.getName());
|
||||||
|
|
||||||
|
if (isRecovering && !isOtherReplicaRecovering) return true;
|
||||||
|
if (otherTerm > replicaTerm) return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws Exception {
|
public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws Exception {
|
||||||
if (core.getCoreDescriptor().getCloudDescriptor().hasRegistered()) {
|
if (core.getCoreDescriptor().getCloudDescriptor().hasRegistered()) {
|
||||||
ZkStateReader zkStateReader = zkController.getZkStateReader();
|
ZkStateReader zkStateReader = zkController.getZkStateReader();
|
||||||
|
|
|
@ -755,13 +755,20 @@ public class RecoveryStrategy implements Runnable, Closeable {
|
||||||
zkController.publish(coreDesc, Replica.State.DOWN);
|
zkController.publish(coreDesc, Replica.State.DOWN);
|
||||||
}
|
}
|
||||||
numTried++;
|
numTried++;
|
||||||
final Replica leaderReplica = zkStateReader.getLeaderRetry(
|
Replica leaderReplica = null;
|
||||||
cloudDesc.getCollectionName(), cloudDesc.getShardId());
|
|
||||||
|
|
||||||
if (isClosed()) {
|
if (isClosed()) {
|
||||||
return leaderReplica;
|
return leaderReplica;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
leaderReplica = zkStateReader.getLeaderRetry(
|
||||||
|
cloudDesc.getCollectionName(), cloudDesc.getShardId());
|
||||||
|
} catch (SolrException e) {
|
||||||
|
Thread.sleep(500);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (leaderReplica.getCoreUrl().equals(ourUrl)) {
|
if (leaderReplica.getCoreUrl().equals(ourUrl)) {
|
||||||
return leaderReplica;
|
return leaderReplica;
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,10 +170,16 @@ public class ZkShardTerms implements AutoCloseable{
|
||||||
listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms));
|
listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms));
|
||||||
numListeners = listeners.size();
|
numListeners = listeners.size();
|
||||||
}
|
}
|
||||||
|
return removeTerm(cd.getCloudDescriptor().getCoreNodeName()) || numListeners == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// package private for testing, only used by tests
|
||||||
|
// return true if this object should not be reused
|
||||||
|
boolean removeTerm(String coreNodeName) {
|
||||||
Terms newTerms;
|
Terms newTerms;
|
||||||
while ( (newTerms = terms.removeTerm(cd.getCloudDescriptor().getCoreNodeName())) != null) {
|
while ( (newTerms = terms.removeTerm(coreNodeName)) != null) {
|
||||||
try {
|
try {
|
||||||
if (saveTerms(newTerms)) return numListeners == 0;
|
if (saveTerms(newTerms)) return false;
|
||||||
} catch (KeeperException.NoNodeException e) {
|
} catch (KeeperException.NoNodeException e) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -232,6 +238,11 @@ public class ZkShardTerms implements AutoCloseable{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isRecovering(String name) {
|
||||||
|
return terms.values.containsKey(name + "_recovering");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* When first updates come in, all replicas have some data now,
|
* When first updates come in, all replicas have some data now,
|
||||||
* so we must switch from term 0 (registered) to 1 (have some data)
|
* so we must switch from term 0 (registered) to 1 (have some data)
|
||||||
|
|
|
@ -0,0 +1,247 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
|
import org.apache.solr.client.solrj.SolrQuery;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||||
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
import org.apache.solr.common.cloud.ClusterStateUtil;
|
||||||
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
import org.apache.solr.common.cloud.Slice;
|
||||||
|
import org.apache.solr.common.cloud.SolrZkClient;
|
||||||
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
import org.apache.solr.common.util.TimeSource;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
|
import org.apache.solr.core.CoreContainer;
|
||||||
|
import org.apache.solr.update.processor.DistributedUpdateProcessor;
|
||||||
|
import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
|
||||||
|
import org.apache.solr.util.TimeOut;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
@LuceneTestCase.Nightly
|
||||||
|
@LuceneTestCase.Slow
|
||||||
|
@Deprecated
|
||||||
|
public class LIROnShardRestartTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
||||||
|
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
|
||||||
|
|
||||||
|
configureCluster(3)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownCluster() throws Exception {
|
||||||
|
System.clearProperty("solr.directoryFactory");
|
||||||
|
System.clearProperty("solr.ulog.numRecordsToKeep");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAllReplicasInLIR() throws Exception {
|
||||||
|
String collection = "allReplicasInLIR";
|
||||||
|
CollectionAdminRequest.createCollection(collection, 1, 3)
|
||||||
|
.process(cluster.getSolrClient());
|
||||||
|
cluster.getSolrClient().add(collection, new SolrInputDocument("id", "1"));
|
||||||
|
cluster.getSolrClient().add(collection, new SolrInputDocument("id", "2"));
|
||||||
|
cluster.getSolrClient().commit(collection);
|
||||||
|
|
||||||
|
DocCollection docCollection = getCollectionState(collection);
|
||||||
|
Slice shard1 = docCollection.getSlice("shard1");
|
||||||
|
Replica newLeader = shard1.getReplicas(rep -> !rep.getName().equals(shard1.getLeader().getName())).get(random().nextInt(2));
|
||||||
|
JettySolrRunner jettyOfNewLeader = cluster.getJettySolrRunners().stream()
|
||||||
|
.filter(jetty -> jetty.getNodeName().equals(newLeader.getNodeName()))
|
||||||
|
.findAny().get();
|
||||||
|
assertNotNull(jettyOfNewLeader);
|
||||||
|
|
||||||
|
// randomly add too many docs to peer sync to one replica so that only one random replica is the valid leader
|
||||||
|
// the versions don't matter, they just have to be higher than what the last 2 docs got
|
||||||
|
try (HttpSolrClient client = getHttpSolrClient(jettyOfNewLeader.getBaseUrl().toString())) {
|
||||||
|
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||||
|
params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString());
|
||||||
|
|
||||||
|
for (int i = 0; i < 101; i++) {
|
||||||
|
UpdateRequest ureq = new UpdateRequest();
|
||||||
|
ureq.setParams(new ModifiableSolrParams(params));
|
||||||
|
ureq.add(sdoc("id", 3 + i, "_version_", Long.MAX_VALUE - 1 - i));
|
||||||
|
ureq.process(client, collection);
|
||||||
|
}
|
||||||
|
client.commit(collection);
|
||||||
|
}
|
||||||
|
|
||||||
|
ChaosMonkey.stop(cluster.getJettySolrRunners());
|
||||||
|
assertTrue("Timeout waiting for all not live",
|
||||||
|
ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 45000));
|
||||||
|
|
||||||
|
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
|
||||||
|
for (Replica replica : docCollection.getReplicas()) {
|
||||||
|
zkShardTerms.removeTerm(replica.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String,Object> stateObj = Utils.makeMap();
|
||||||
|
stateObj.put(ZkStateReader.STATE_PROP, "down");
|
||||||
|
stateObj.put("createdByNodeName", "test");
|
||||||
|
stateObj.put("createdByCoreNodeName", "test");
|
||||||
|
byte[] znodeData = Utils.toJSON(stateObj);
|
||||||
|
|
||||||
|
for (Replica replica : docCollection.getReplicas()) {
|
||||||
|
try {
|
||||||
|
cluster.getZkClient().makePath("/collections/" + collection + "/leader_initiated_recovery/shard1/" + replica.getName(),
|
||||||
|
znodeData, true);
|
||||||
|
} catch (KeeperException.NodeExistsException e) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ChaosMonkey.start(cluster.getJettySolrRunners());
|
||||||
|
waitForState("Timeout waiting for active replicas", collection, clusterShape(1, 3));
|
||||||
|
|
||||||
|
assertEquals(103, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound());
|
||||||
|
|
||||||
|
|
||||||
|
// now expire each node
|
||||||
|
for (Replica replica : docCollection.getReplicas()) {
|
||||||
|
try {
|
||||||
|
cluster.getZkClient().makePath("/collections/" + collection + "/leader_initiated_recovery/shard1/" + replica.getName(),
|
||||||
|
znodeData, true);
|
||||||
|
} catch (KeeperException.NodeExistsException e) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// only 2 replicas join the election and all of them are in LIR state, no one should win the election
|
||||||
|
List<String> oldElectionNodes = getElectionNodes(collection, "shard1", cluster.getZkClient());
|
||||||
|
|
||||||
|
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
|
||||||
|
expire(jetty);
|
||||||
|
}
|
||||||
|
|
||||||
|
TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.CURRENT_TIME);
|
||||||
|
while (!timeOut.hasTimedOut()) {
|
||||||
|
List<String> electionNodes = getElectionNodes(collection, "shard1", cluster.getZkClient());
|
||||||
|
electionNodes.retainAll(oldElectionNodes);
|
||||||
|
if (electionNodes.isEmpty()) break;
|
||||||
|
}
|
||||||
|
assertFalse("Timeout waiting for replicas rejoin election", timeOut.hasTimedOut());
|
||||||
|
waitForState("Timeout waiting for active replicas", collection, clusterShape(1, 3));
|
||||||
|
|
||||||
|
assertEquals(103, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound());
|
||||||
|
|
||||||
|
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void expire(JettySolrRunner jetty) {
|
||||||
|
CoreContainer cores = jetty.getCoreContainer();
|
||||||
|
ChaosMonkey.causeConnectionLoss(jetty);
|
||||||
|
long sessionId = cores.getZkController().getZkClient()
|
||||||
|
.getSolrZooKeeper().getSessionId();
|
||||||
|
cluster.getZkServer().expire(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testSeveralReplicasInLIR() throws Exception {
|
||||||
|
String collection = "severalReplicasInLIR";
|
||||||
|
CollectionAdminRequest.createCollection(collection, 1, 3)
|
||||||
|
.process(cluster.getSolrClient());
|
||||||
|
cluster.getSolrClient().add(collection, new SolrInputDocument("id", "1"));
|
||||||
|
cluster.getSolrClient().add(collection, new SolrInputDocument("id", "2"));
|
||||||
|
cluster.getSolrClient().commit(collection);
|
||||||
|
|
||||||
|
DocCollection docCollection = getCollectionState(collection);
|
||||||
|
Map<JettySolrRunner, String> nodeNameToJetty = cluster.getJettySolrRunners().stream()
|
||||||
|
.collect(Collectors.toMap(jetty -> jetty, JettySolrRunner::getNodeName));
|
||||||
|
ChaosMonkey.stop(cluster.getJettySolrRunners());
|
||||||
|
assertTrue("Timeout waiting for all not live",
|
||||||
|
ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 45000));
|
||||||
|
|
||||||
|
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
|
||||||
|
for (Replica replica : docCollection.getReplicas()) {
|
||||||
|
zkShardTerms.removeTerm(replica.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String,Object> stateObj = Utils.makeMap();
|
||||||
|
stateObj.put(ZkStateReader.STATE_PROP, "down");
|
||||||
|
stateObj.put("createdByNodeName", "test");
|
||||||
|
stateObj.put("createdByCoreNodeName", "test");
|
||||||
|
byte[] znodeData = Utils.toJSON(stateObj);
|
||||||
|
|
||||||
|
Replica replicaNotInLIR = docCollection.getReplicas().get(random().nextInt(3));
|
||||||
|
for (Replica replica : docCollection.getReplicas()) {
|
||||||
|
if (replica.getName().equals(replicaNotInLIR.getName())) continue;
|
||||||
|
try {
|
||||||
|
cluster.getZkClient().makePath("/collections/" + collection + "/leader_initiated_recovery/shard1/" + replica.getName(),
|
||||||
|
znodeData, true);
|
||||||
|
} catch (KeeperException.NodeExistsException e) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
|
||||||
|
if (nodeNameToJetty.get(jetty).equals(replicaNotInLIR.getNodeName())) continue;
|
||||||
|
jetty.start();
|
||||||
|
}
|
||||||
|
waitForState("Timeout waiting for no leader", collection, (liveNodes, collectionState) -> {
|
||||||
|
Replica leader = collectionState.getSlice("shard1").getLeader();
|
||||||
|
return leader == null;
|
||||||
|
});
|
||||||
|
|
||||||
|
// only 2 replicas join the election and all of them are in LIR state, no one should win the election
|
||||||
|
List<String> oldElectionNodes = getElectionNodes(collection, "shard1", cluster.getZkClient());
|
||||||
|
TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.CURRENT_TIME);
|
||||||
|
while (!timeOut.hasTimedOut()) {
|
||||||
|
List<String> electionNodes = getElectionNodes(collection, "shard1", cluster.getZkClient());
|
||||||
|
electionNodes.retainAll(oldElectionNodes);
|
||||||
|
if (electionNodes.isEmpty()) break;
|
||||||
|
}
|
||||||
|
assertFalse("Timeout waiting for replicas rejoin election", timeOut.hasTimedOut());
|
||||||
|
|
||||||
|
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
|
||||||
|
if (nodeNameToJetty.get(jetty).equals(replicaNotInLIR.getNodeName())) {
|
||||||
|
jetty.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
waitForState("Timeout waiting for new leader", collection, (liveNodes, collectionState) -> {
|
||||||
|
Replica leader = collectionState.getSlice("shard1").getLeader();
|
||||||
|
return leader != null;
|
||||||
|
});
|
||||||
|
waitForState("Timeout waiting for new leader", collection, clusterShape(1, 3));
|
||||||
|
|
||||||
|
assertEquals(2L, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound());
|
||||||
|
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> getElectionNodes(String collection, String shard, SolrZkClient client) throws KeeperException, InterruptedException {
|
||||||
|
return client.getChildren("/collections/"+collection+"/leader_elect/"+shard+LeaderElector.ELECTION_NODE, null, true);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,196 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.solr.cloud;
|
|
||||||
|
|
||||||
import java.lang.invoke.MethodHandles;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Nightly;
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
|
||||||
import org.apache.solr.client.solrj.SolrQuery;
|
|
||||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
|
||||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
|
||||||
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
|
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
|
||||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
|
||||||
import org.apache.solr.common.SolrInputDocument;
|
|
||||||
import org.apache.solr.common.cloud.SolrZkClient;
|
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
|
||||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
|
||||||
import org.apache.solr.common.util.Utils;
|
|
||||||
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
|
|
||||||
import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
|
|
||||||
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
@Slow
|
|
||||||
@Nightly
|
|
||||||
@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-10071")
|
|
||||||
@Deprecated
|
|
||||||
public class LeaderInitiatedRecoveryOnShardRestartTest extends AbstractFullDistribZkTestBase {
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
|
||||||
|
|
||||||
public LeaderInitiatedRecoveryOnShardRestartTest() throws Exception {
|
|
||||||
super();
|
|
||||||
sliceCount = 1;
|
|
||||||
// we want 3 jetties, but we are using the control jetty as one
|
|
||||||
fixShardCount(2);
|
|
||||||
useFactory("solr.StandardDirectoryFactory");
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void before() {
|
|
||||||
// we want more realistic leaderVoteWait so raise from
|
|
||||||
// test default of 10s to 30s.
|
|
||||||
System.setProperty("leaderVoteWait", "300000");
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void after() {
|
|
||||||
System.clearProperty("leaderVoteWait");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRestartWithAllInLIR() throws Exception {
|
|
||||||
|
|
||||||
// still waiting to be able to properly start with no default collection1,
|
|
||||||
// delete to remove confusion
|
|
||||||
waitForRecoveriesToFinish(false);
|
|
||||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
|
||||||
params.set("action", CollectionAction.DELETE.toString());
|
|
||||||
params.set("name", DEFAULT_COLLECTION);
|
|
||||||
QueryRequest request = new QueryRequest(params);
|
|
||||||
request.setPath("/admin/collections");
|
|
||||||
String baseUrl = ((HttpSolrClient) clients.get(0)).getBaseURL();
|
|
||||||
HttpSolrClient delClient = getHttpSolrClient(baseUrl.substring(0, baseUrl.lastIndexOf("/")));
|
|
||||||
delClient.request(request);
|
|
||||||
delClient.close();
|
|
||||||
|
|
||||||
String testCollectionName = "all_in_lir";
|
|
||||||
String shardId = "shard1";
|
|
||||||
CollectionAdminRequest.createCollection(testCollectionName, "conf1", 1, 3)
|
|
||||||
.setCreateNodeSet("")
|
|
||||||
.process(cloudClient);
|
|
||||||
Properties oldLir = new Properties();
|
|
||||||
oldLir.setProperty("lirVersion", "old");
|
|
||||||
for (int i = 0; i < 3; i++) {
|
|
||||||
CollectionAdminRequest.addReplicaToShard(testCollectionName, "shard1").setProperties(oldLir).process(cloudClient);
|
|
||||||
}
|
|
||||||
|
|
||||||
waitForRecoveriesToFinish(testCollectionName, false);
|
|
||||||
|
|
||||||
cloudClient.setDefaultCollection(testCollectionName);
|
|
||||||
|
|
||||||
Map<String,Object> stateObj = Utils.makeMap();
|
|
||||||
stateObj.put(ZkStateReader.STATE_PROP, "down");
|
|
||||||
stateObj.put("createdByNodeName", "test");
|
|
||||||
stateObj.put("createdByCoreNodeName", "test");
|
|
||||||
|
|
||||||
byte[] znodeData = Utils.toJSON(stateObj);
|
|
||||||
|
|
||||||
SolrZkClient zkClient = cloudClient.getZkStateReader().getZkClient();
|
|
||||||
zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node1", znodeData, true);
|
|
||||||
zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node2", znodeData, true);
|
|
||||||
zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node3", znodeData, true);
|
|
||||||
|
|
||||||
// everyone gets a couple docs so that everyone has tlog entries
|
|
||||||
// and won't become leader simply because they have no tlog versions
|
|
||||||
SolrInputDocument doc = new SolrInputDocument();
|
|
||||||
addFields(doc, "id", "1");
|
|
||||||
SolrInputDocument doc2 = new SolrInputDocument();
|
|
||||||
addFields(doc2, "id", "2");
|
|
||||||
cloudClient.add(doc);
|
|
||||||
cloudClient.add(doc2);
|
|
||||||
|
|
||||||
cloudClient.commit();
|
|
||||||
|
|
||||||
assertEquals("We just added 2 docs, we should be able to find them", 2, cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound());
|
|
||||||
|
|
||||||
// randomly add too many docs to peer sync to one replica so that only one random replica is the valid leader
|
|
||||||
// the versions don't matter, they just have to be higher than what the last 2 docs got
|
|
||||||
HttpSolrClient client = (HttpSolrClient) clients.get(random().nextInt(clients.size()));
|
|
||||||
client.setBaseURL(client.getBaseURL().substring(0, client.getBaseURL().lastIndexOf("/")) + "/" + testCollectionName);
|
|
||||||
params = new ModifiableSolrParams();
|
|
||||||
params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
|
|
||||||
|
|
||||||
try {
|
|
||||||
for (int i = 0; i < 101; i++) {
|
|
||||||
add(client, params, sdoc("id", 3 + i, "_version_", Long.MAX_VALUE - 1 - i));
|
|
||||||
}
|
|
||||||
} catch (RemoteSolrException e) {
|
|
||||||
// if we got a conflict it's because we tried to send a versioned doc to the leader,
|
|
||||||
// resend without version
|
|
||||||
if (e.getMessage().contains("conflict")) {
|
|
||||||
for (int i = 0; i < 101; i++) {
|
|
||||||
add(client, params, sdoc("id", 3 + i));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
client.commit();
|
|
||||||
|
|
||||||
for (JettySolrRunner jetty : jettys) {
|
|
||||||
ChaosMonkey.stop(jetty);
|
|
||||||
}
|
|
||||||
ChaosMonkey.stop(controlJetty);
|
|
||||||
|
|
||||||
Thread.sleep(10000);
|
|
||||||
|
|
||||||
log.info("Start back up");
|
|
||||||
|
|
||||||
for (JettySolrRunner jetty : jettys) {
|
|
||||||
ChaosMonkey.start(jetty);
|
|
||||||
}
|
|
||||||
ChaosMonkey.start(controlJetty);
|
|
||||||
|
|
||||||
// recoveries will not finish without SOLR-8075 and SOLR-8367
|
|
||||||
waitForRecoveriesToFinish(testCollectionName, true);
|
|
||||||
|
|
||||||
// now expire each node
|
|
||||||
try {
|
|
||||||
zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node1", znodeData, true);
|
|
||||||
} catch (NodeExistsException e) {
|
|
||||||
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node2", znodeData, true);
|
|
||||||
} catch (NodeExistsException e) {
|
|
||||||
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node3", znodeData, true);
|
|
||||||
} catch (NodeExistsException e) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
for (JettySolrRunner jetty : jettys) {
|
|
||||||
chaosMonkey.expireSession(jetty);
|
|
||||||
}
|
|
||||||
|
|
||||||
Thread.sleep(2000);
|
|
||||||
|
|
||||||
// recoveries will not finish without SOLR-8075 and SOLR-8367
|
|
||||||
waitForRecoveriesToFinish(testCollectionName, true);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,261 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.solr.JSONTestUtil;
|
||||||
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
import org.apache.solr.common.cloud.CollectionStatePredicate;
|
||||||
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class LeaderVoteWaitTimeoutTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static Map<JettySolrRunner, SocketProxy> proxies;
|
||||||
|
private static Map<URI, JettySolrRunner> jettys;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
||||||
|
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
|
||||||
|
System.setProperty("leaderVoteWait", "2000");
|
||||||
|
|
||||||
|
configureCluster(4)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
|
||||||
|
// Add proxies
|
||||||
|
proxies = new HashMap<>(cluster.getJettySolrRunners().size());
|
||||||
|
jettys = new HashMap<>();
|
||||||
|
for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
|
||||||
|
SocketProxy proxy = new SocketProxy();
|
||||||
|
jetty.setProxyPort(proxy.getListenPort());
|
||||||
|
cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart
|
||||||
|
cluster.startJettySolrRunner(jetty);
|
||||||
|
proxy.open(jetty.getBaseUrl().toURI());
|
||||||
|
LOG.info("Adding proxy for URL: " + jetty.getBaseUrl() + ". Proxy: " + proxy.getUrl());
|
||||||
|
proxies.put(jetty, proxy);
|
||||||
|
jettys.put(proxy.getUrl(), jetty);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownCluster() throws Exception {
|
||||||
|
for (SocketProxy proxy:proxies.values()) {
|
||||||
|
proxy.close();
|
||||||
|
}
|
||||||
|
proxies = null;
|
||||||
|
jettys = null;
|
||||||
|
System.clearProperty("solr.directoryFactory");
|
||||||
|
System.clearProperty("solr.ulog.numRecordsToKeep");
|
||||||
|
System.clearProperty("leaderVoteWait");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void basicTest() throws Exception {
|
||||||
|
final String collectionName = "basicTest";
|
||||||
|
CollectionAdminRequest.createCollection(collectionName, 1, 1)
|
||||||
|
.setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName())
|
||||||
|
.process(cluster.getSolrClient());
|
||||||
|
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1"));
|
||||||
|
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2"));
|
||||||
|
cluster.getSolrClient().commit(collectionName);
|
||||||
|
|
||||||
|
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", cluster.getZkClient())) {
|
||||||
|
assertEquals(1, zkShardTerms.getTerms().size());
|
||||||
|
assertEquals(1L, zkShardTerms.getHighestTerm());
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster.getJettySolrRunner(0).stop();
|
||||||
|
|
||||||
|
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
|
||||||
|
.setNode(cluster.getJettySolrRunner(1).getNodeName())
|
||||||
|
.process(cluster.getSolrClient());
|
||||||
|
|
||||||
|
waitForState("Timeout waiting for replica win the election", collectionName, (liveNodes, collectionState) -> {
|
||||||
|
Replica newLeader = collectionState.getSlice("shard1").getLeader();
|
||||||
|
return newLeader.getNodeName().equals(cluster.getJettySolrRunner(1).getNodeName());
|
||||||
|
});
|
||||||
|
|
||||||
|
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", cluster.getZkClient())) {
|
||||||
|
Replica newLeader = getCollectionState(collectionName).getSlice("shard1").getLeader();
|
||||||
|
assertEquals(2, zkShardTerms.getTerms().size());
|
||||||
|
assertEquals(1L, zkShardTerms.getTerm(newLeader.getName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster.getJettySolrRunner(0).start();
|
||||||
|
CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMostInSyncReplicasCanWinElection() throws Exception {
|
||||||
|
final String collectionName = "collection1";
|
||||||
|
CollectionAdminRequest.createCollection(collectionName, 1, 3)
|
||||||
|
.setCreateNodeSet("")
|
||||||
|
.process(cluster.getSolrClient());
|
||||||
|
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
|
||||||
|
.setNode(cluster.getJettySolrRunner(0).getNodeName())
|
||||||
|
.process(cluster.getSolrClient());
|
||||||
|
waitForState("Timeout waiting for shard leader", collectionName, clusterShape(1, 1));
|
||||||
|
Replica leader = getCollectionState(collectionName).getSlice("shard1").getLeader();
|
||||||
|
|
||||||
|
// this replica will ahead of election queue
|
||||||
|
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
|
||||||
|
.setNode(cluster.getJettySolrRunner(1).getNodeName())
|
||||||
|
.process(cluster.getSolrClient());
|
||||||
|
waitForState("Timeout waiting for 1x2 collection", collectionName, clusterShape(1, 2));
|
||||||
|
Replica replica1 = getCollectionState(collectionName).getSlice("shard1")
|
||||||
|
.getReplicas(replica -> replica.getNodeName().equals(cluster.getJettySolrRunner(1).getNodeName())).get(0);
|
||||||
|
|
||||||
|
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
|
||||||
|
.setNode(cluster.getJettySolrRunner(2).getNodeName())
|
||||||
|
.process(cluster.getSolrClient());
|
||||||
|
waitForState("Timeout waiting for 1x3 collection", collectionName, clusterShape(1, 3));
|
||||||
|
Replica replica2 = getCollectionState(collectionName).getSlice("shard1")
|
||||||
|
.getReplicas(replica -> replica.getNodeName().equals(cluster.getJettySolrRunner(2).getNodeName())).get(0);
|
||||||
|
|
||||||
|
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1"));
|
||||||
|
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2"));
|
||||||
|
cluster.getSolrClient().commit(collectionName);
|
||||||
|
|
||||||
|
// replica in node1 won't be able to do recovery
|
||||||
|
proxies.get(cluster.getJettySolrRunner(0)).close();
|
||||||
|
// leader won't be able to send request to replica in node1
|
||||||
|
proxies.get(cluster.getJettySolrRunner(1)).close();
|
||||||
|
|
||||||
|
addDoc(collectionName, 3, cluster.getJettySolrRunner(0));
|
||||||
|
|
||||||
|
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", cluster.getZkClient())) {
|
||||||
|
assertEquals(3, zkShardTerms.getTerms().size());
|
||||||
|
assertEquals(zkShardTerms.getHighestTerm(), zkShardTerms.getTerm(leader.getName()));
|
||||||
|
assertEquals(zkShardTerms.getHighestTerm(), zkShardTerms.getTerm(replica2.getName()));
|
||||||
|
assertTrue(zkShardTerms.getHighestTerm() > zkShardTerms.getTerm(replica1.getName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
proxies.get(cluster.getJettySolrRunner(2)).close();
|
||||||
|
addDoc(collectionName, 4, cluster.getJettySolrRunner(0));
|
||||||
|
|
||||||
|
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", cluster.getZkClient())) {
|
||||||
|
assertEquals(3, zkShardTerms.getTerms().size());
|
||||||
|
assertEquals(zkShardTerms.getHighestTerm(), zkShardTerms.getTerm(leader.getName()));
|
||||||
|
assertTrue(zkShardTerms.getHighestTerm() > zkShardTerms.getTerm(replica2.getName()));
|
||||||
|
assertTrue(zkShardTerms.getHighestTerm() > zkShardTerms.getTerm(replica1.getName()));
|
||||||
|
assertTrue(zkShardTerms.getTerm(replica2.getName()) > zkShardTerms.getTerm(replica1.getName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
proxies.get(cluster.getJettySolrRunner(1)).reopen();
|
||||||
|
proxies.get(cluster.getJettySolrRunner(2)).reopen();
|
||||||
|
cluster.getJettySolrRunner(0).stop();
|
||||||
|
|
||||||
|
// even replica2 joined election at the end of the queue, but it is the one with highest term
|
||||||
|
waitForState("Timeout waiting for new leader", collectionName, new CollectionStatePredicate() {
|
||||||
|
@Override
|
||||||
|
public boolean matches(Set<String> liveNodes, DocCollection collectionState) {
|
||||||
|
Replica newLeader = collectionState.getSlice("shard1").getLeader();
|
||||||
|
return newLeader.getName().equals(replica2.getName());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
cluster.getJettySolrRunner(0).start();
|
||||||
|
proxies.get(cluster.getJettySolrRunner(0)).reopen();
|
||||||
|
|
||||||
|
waitForState("Timeout waiting for 1x3 collection", collectionName, clusterShape(1, 3));
|
||||||
|
assertDocsExistInAllReplicas(Arrays.asList(leader, replica1), collectionName, 1, 3);
|
||||||
|
CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void addDoc(String collection, int docId, JettySolrRunner solrRunner) throws IOException, SolrServerException {
|
||||||
|
try (HttpSolrClient solrClient = new HttpSolrClient.Builder(solrRunner.getBaseUrl().toString()).build()) {
|
||||||
|
solrClient.add(collection, new SolrInputDocument("id", String.valueOf(docId)));
|
||||||
|
solrClient.commit(collection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertDocsExistInAllReplicas(List<Replica> notLeaders,
|
||||||
|
String testCollectionName, int firstDocId, int lastDocId) throws Exception {
|
||||||
|
Replica leader =
|
||||||
|
cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
|
||||||
|
HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName);
|
||||||
|
List<HttpSolrClient> replicas =
|
||||||
|
new ArrayList<HttpSolrClient>(notLeaders.size());
|
||||||
|
|
||||||
|
for (Replica r : notLeaders) {
|
||||||
|
replicas.add(getHttpSolrClient(r, testCollectionName));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
for (int d = firstDocId; d <= lastDocId; d++) {
|
||||||
|
String docId = String.valueOf(d);
|
||||||
|
assertDocExists(leaderSolr, testCollectionName, docId);
|
||||||
|
for (HttpSolrClient replicaSolr : replicas) {
|
||||||
|
assertDocExists(replicaSolr, testCollectionName, docId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (leaderSolr != null) {
|
||||||
|
leaderSolr.close();
|
||||||
|
}
|
||||||
|
for (HttpSolrClient replicaSolr : replicas) {
|
||||||
|
replicaSolr.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertDocExists(HttpSolrClient solr, String coll, String docId) throws Exception {
|
||||||
|
NamedList rsp = realTimeGetDocId(solr, docId);
|
||||||
|
String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), docId);
|
||||||
|
assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
|
||||||
|
+ " due to: " + match + "; rsp="+rsp, match == null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private NamedList realTimeGetDocId(HttpSolrClient solr, String docId) throws SolrServerException, IOException {
|
||||||
|
QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
|
||||||
|
return solr.request(qr);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected HttpSolrClient getHttpSolrClient(Replica replica, String coll) throws Exception {
|
||||||
|
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
|
||||||
|
String url = zkProps.getBaseUrl() + "/" + coll;
|
||||||
|
return getHttpSolrClient(url);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -57,6 +57,7 @@ public class TestCloudConsistency extends SolrCloudTestCase {
|
||||||
public static void setupCluster() throws Exception {
|
public static void setupCluster() throws Exception {
|
||||||
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
||||||
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
|
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
|
||||||
|
System.setProperty("leaderVoteWait", "60000");
|
||||||
|
|
||||||
configureCluster(4)
|
configureCluster(4)
|
||||||
.addConfig("conf", configset("cloud-minimal"))
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
@ -83,6 +84,9 @@ public class TestCloudConsistency extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
proxies = null;
|
proxies = null;
|
||||||
jettys = null;
|
jettys = null;
|
||||||
|
System.clearProperty("solr.directoryFactory");
|
||||||
|
System.clearProperty("solr.ulog.numRecordsToKeep");
|
||||||
|
System.clearProperty("leaderVoteWait");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -262,15 +266,4 @@ public class TestCloudConsistency extends SolrCloudTestCase {
|
||||||
return getHttpSolrClient(url);
|
return getHttpSolrClient(url);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected JettySolrRunner getJettyForReplica(Replica replica) throws Exception {
|
|
||||||
String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
|
|
||||||
assertNotNull(replicaBaseUrl);
|
|
||||||
URL baseUrl = new URL(replicaBaseUrl);
|
|
||||||
|
|
||||||
JettySolrRunner proxy = jettys.get(baseUrl.toURI());
|
|
||||||
assertNotNull("No proxy found for " + baseUrl + "!", proxy);
|
|
||||||
return proxy;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,7 +83,7 @@ public class MiniSolrCloudCluster {
|
||||||
" <str name=\"hostContext\">${hostContext:solr}</str>\n" +
|
" <str name=\"hostContext\">${hostContext:solr}</str>\n" +
|
||||||
" <int name=\"zkClientTimeout\">${solr.zkclienttimeout:30000}</int>\n" +
|
" <int name=\"zkClientTimeout\">${solr.zkclienttimeout:30000}</int>\n" +
|
||||||
" <bool name=\"genericCoreNodeNames\">${genericCoreNodeNames:true}</bool>\n" +
|
" <bool name=\"genericCoreNodeNames\">${genericCoreNodeNames:true}</bool>\n" +
|
||||||
" <int name=\"leaderVoteWait\">10000</int>\n" +
|
" <int name=\"leaderVoteWait\">${leaderVoteWait:10000}</int>\n" +
|
||||||
" <int name=\"distribUpdateConnTimeout\">${distribUpdateConnTimeout:45000}</int>\n" +
|
" <int name=\"distribUpdateConnTimeout\">${distribUpdateConnTimeout:45000}</int>\n" +
|
||||||
" <int name=\"distribUpdateSoTimeout\">${distribUpdateSoTimeout:340000}</int>\n" +
|
" <int name=\"distribUpdateSoTimeout\">${distribUpdateSoTimeout:340000}</int>\n" +
|
||||||
" <str name=\"zkCredentialsProvider\">${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider}</str> \n" +
|
" <str name=\"zkCredentialsProvider\">${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider}</str> \n" +
|
||||||
|
|
Loading…
Reference in New Issue