SOLR-12176: Improve FORCELEADER to handle the case when a replica win the election but does not present in clusterstate

This commit is contained in:
Cao Manh Dat 2018-04-04 03:41:57 +07:00
parent 56f80c0dc7
commit 34b83ed869
3 changed files with 124 additions and 9 deletions

View File

@ -163,6 +163,9 @@ Other Changes
* SOLR-12154: Disallow explicit usage of Log4j2 logger via forbidden APIs. (Varun Thacker, Tomás Fernández Löbbe) * SOLR-12154: Disallow explicit usage of Log4j2 logger via forbidden APIs. (Varun Thacker, Tomás Fernández Löbbe)
* SOLR-12176: Improve FORCELEADER to handle the case when a replica win the election but does not present
in clusterstate (Cao Manh Dat)
================== 7.3.0 ================== ================== 7.3.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -40,6 +40,7 @@ import org.apache.solr.api.Api;
import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder; import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard; import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator; import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
@ -1161,15 +1162,26 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
// Wait till we have an active leader // Wait till we have an active leader
boolean success = false; boolean success = false;
for (int i = 0; i < 9; i++) { for (int i = 0; i < 10; i++) {
Thread.sleep(5000); ZkCoreNodeProps zombieLeaderProps = getZombieLeader(zkController, collectionName, sliceId);
clusterState = handler.coreContainer.getZkController().getClusterState(); if (zombieLeaderProps != null) {
log.warn("A replica {} on node {} won the leader election, but not exist in clusterstate, " +
"remove it and waiting for another round of election",
zombieLeaderProps.getCoreName(), zombieLeaderProps.getNodeName());
try (HttpSolrClient solrClient = new HttpSolrClient.Builder(zombieLeaderProps.getBaseUrl()).build()) {
CoreAdminRequest.unloadCore(zombieLeaderProps.getCoreName(), solrClient);
}
// waiting for another election round
i = 0;
}
clusterState = zkController.getClusterState();
collection = clusterState.getCollection(collectionName); collection = clusterState.getCollection(collectionName);
slice = collection.getSlice(sliceId); slice = collection.getSlice(sliceId);
if (slice.getLeader() != null && slice.getLeader().getState() == State.ACTIVE) { if (slice.getLeader() != null && slice.getLeader().getState() == State.ACTIVE) {
success = true; success = true;
break; break;
} }
Thread.sleep(5000);
log.warn("Force leader attempt {}. Waiting 5 secs for an active leader. State of the slice: {}", (i + 1), slice); log.warn("Force leader attempt {}. Waiting 5 secs for an active leader. State of the slice: {}", (i + 1), slice);
} }
@ -1186,6 +1198,25 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
} }
} }
/**
* Zombie leader is a replica won the election but does not exist in clusterstate
* @return null if the zombie leader does not exist
*/
private static ZkCoreNodeProps getZombieLeader(ZkController zkController, String collection, String shardId) {
try {
ZkCoreNodeProps leaderProps = zkController.getLeaderProps(collection, shardId, 1000);
DocCollection docCollection = zkController.getClusterState().getCollection(collection);
Replica replica = docCollection.getReplica(leaderProps.getNodeProps().getStr(ZkStateReader.CORE_NODE_NAME_PROP));
if (replica == null) return leaderProps;
if (!replica.getNodeName().equals(leaderProps.getNodeName())) {
return leaderProps;
}
return null;
} catch (Exception e) {
return null;
}
}
public static void waitForActiveCollection(String collectionName, CoreContainer cc, SolrResponse createCollResponse) public static void waitForActiveCollection(String collectionName, CoreContainer cc, SolrResponse createCollResponse)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {

View File

@ -22,17 +22,23 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Replica.State; import org.apache.solr.common.cloud.Replica.State;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NoNodeException;
import org.junit.Ignore; import org.junit.Ignore;
@ -56,6 +62,81 @@ public class ForceLeaderTest extends HttpPartitionTest {
} }
/**
* Tests that FORCELEADER can get an active leader even in the case there are a replica won the election but not present in clusterstate
*/
@Test
@Slow
public void testZombieLeader() throws Exception {
String testCollectionName = "forceleader_zombie_leader_collection";
createCollection(testCollectionName, "conf1", 1, 3, 1);
cloudClient.setDefaultCollection(testCollectionName);
try {
List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
assertEquals("Expected 2 replicas for collection " + testCollectionName
+ " but found " + notLeaders.size() + "; clusterState: "
+ printClusterStateInfo(testCollectionName), 2, notLeaders.size());
List<JettySolrRunner> notLeaderJetties = notLeaders.stream().map(rep -> getJettyOnPort(getReplicaPort(rep)))
.collect(Collectors.toList());
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
// remove leader from clusterstate
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
ZkStateReader.CORE_NAME_PROP, leader.getCoreName(),
ZkStateReader.NODE_NAME_PROP, leader.getNodeName(),
ZkStateReader.COLLECTION_PROP, testCollectionName,
ZkStateReader.CORE_NODE_NAME_PROP, leader.getName(),
ZkStateReader.BASE_URL_PROP, leader.getBaseUrl());
Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient()).offer(Utils.toJSON(m));
boolean restartOtherReplicas = random().nextBoolean();
log.info("Starting test with restartOtherReplicas:{}", restartOtherReplicas);
if (restartOtherReplicas) {
for (JettySolrRunner notLeaderJetty : notLeaderJetties) {
notLeaderJetty.stop();
}
}
cloudClient.waitForState(testCollectionName, 30, TimeUnit.SECONDS,
(liveNodes, collectionState) -> collectionState.getReplicas().size() == 2);
if (restartOtherReplicas) {
for (JettySolrRunner notLeaderJetty : notLeaderJetties) {
notLeaderJetty.start();
}
}
log.info("Before forcing leader: " + cloudClient.getZkStateReader().getClusterState()
.getCollection(testCollectionName).getSlice(SHARD1));
doForceLeader(cloudClient, testCollectionName, SHARD1);
// By now we have an active leader. Wait for recoveries to begin
waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
log.info("After forcing leader: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1));
assertNull("Expected zombie leader get deleted", leaderJetty.getCoreContainer().getCore(leader.getCoreName()));
Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
assertNotNull(newLeader);
assertEquals(State.ACTIVE, newLeader.getState());
int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
assertEquals(2, numActiveReplicas);
// Assert that indexing works again
sendDoc(1);
cloudClient.commit();
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
} finally {
log.info("Cleaning up after the test.");
// try to clean up
attemptCollectionDelete(cloudClient, testCollectionName);
}
}
/** /**
* Tests that FORCELEADER can get an active leader even only replicas with term lower than leader's term are live * Tests that FORCELEADER can get an active leader even only replicas with term lower than leader's term are live
*/ */
@ -149,7 +230,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
} }
} }
void putNonLeadersIntoLowerTerm(String collectionName, String shard, ZkController zkController, Replica leader, List<Replica> notLeaders) throws Exception { private void putNonLeadersIntoLowerTerm(String collectionName, String shard, ZkController zkController, Replica leader, List<Replica> notLeaders) throws Exception {
SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()]; SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()];
for (int i = 0; i < notLeaders.size(); i++) for (int i = 0; i < notLeaders.size(); i++)
nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i)); nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i));
@ -315,7 +396,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
} }
} }
void assertSendDocFails(int docId) throws Exception { private void assertSendDocFails(int docId) throws Exception {
// sending a doc in this state fails // sending a doc in this state fails
try { try {
sendDoc(docId); sendDoc(docId);
@ -326,7 +407,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
} }
} }
void putNonLeadersIntoLIR(String collectionName, String shard, ZkController zkController, Replica leader, List<Replica> notLeaders) throws Exception { private void putNonLeadersIntoLIR(String collectionName, String shard, ZkController zkController, Replica leader, List<Replica> notLeaders) throws Exception {
SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()]; SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()];
for (int i = 0; i < notLeaders.size(); i++) for (int i = 0; i < notLeaders.size(); i++)
nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i)); nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i));
@ -388,7 +469,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
Replica.State.DOWN == lirState || Replica.State.RECOVERING == lirState); Replica.State.DOWN == lirState || Replica.State.RECOVERING == lirState);
} }
protected void bringBackOldLeaderAndSendDoc(String collection, Replica leader, List<Replica> notLeaders, int docid) throws Exception { private void bringBackOldLeaderAndSendDoc(String collection, Replica leader, List<Replica> notLeaders, int docid) throws Exception {
// Bring back the leader which was stopped // Bring back the leader which was stopped
log.info("Bringing back originally killed leader..."); log.info("Bringing back originally killed leader...");
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader)); JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
@ -409,7 +490,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
assertDocsExistInAllReplicas(Collections.singletonList(leader), collection, docid, docid); assertDocsExistInAllReplicas(Collections.singletonList(leader), collection, docid, docid);
} }
protected String getLIRState(ZkController zkController, String collection, String shard) throws KeeperException, InterruptedException { private String getLIRState(ZkController zkController, String collection, String shard) throws KeeperException, InterruptedException {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
String path = zkController.getLeaderInitiatedRecoveryZnodePath(collection, shard); String path = zkController.getLeaderInitiatedRecoveryZnodePath(collection, shard);
if (path == null) if (path == null)
@ -436,7 +517,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
client.request(forceLeader); client.request(forceLeader);
} }
protected int getNumberOfActiveReplicas(ClusterState clusterState, String collection, String sliceId) { private int getNumberOfActiveReplicas(ClusterState clusterState, String collection, String sliceId) {
int numActiveReplicas = 0; int numActiveReplicas = 0;
// Assert all replicas are active // Assert all replicas are active
for (Replica rep : clusterState.getCollection(collection).getSlice(sliceId).getReplicas()) { for (Replica rep : clusterState.getCollection(collection).getSlice(sliceId).getReplicas()) {