mirror of https://github.com/apache/lucene.git
SOLR-9906-Use better check to validate if node recovered via PeerSync or Replication
This commit is contained in:
parent
832d02bf49
commit
3988532d26
|
@ -18,12 +18,15 @@ package org.apache.solr.util;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
||||
|
||||
public class TimeOut {
|
||||
|
||||
private final long timeoutAt;
|
||||
private final long timeoutAt, startTime;
|
||||
|
||||
public TimeOut(long interval, TimeUnit unit) {
|
||||
this.timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(interval, unit);
|
||||
startTime = System.nanoTime();
|
||||
this.timeoutAt = startTime + NANOSECONDS.convert(interval, unit);
|
||||
}
|
||||
|
||||
public boolean hasTimedOut() {
|
||||
|
@ -31,6 +34,10 @@ public class TimeOut {
|
|||
}
|
||||
|
||||
public long timeLeft(TimeUnit unit) {
|
||||
return unit.convert(timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS);
|
||||
return unit.convert(timeoutAt - System.nanoTime(), NANOSECONDS);
|
||||
}
|
||||
|
||||
public long timeElapsed(TimeUnit unit) {
|
||||
return unit.convert(System.nanoTime() - startTime, NANOSECONDS);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.solr.cloud;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
|
@ -26,6 +28,7 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
|
@ -38,12 +41,13 @@ import org.apache.solr.common.cloud.Replica;
|
|||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.handler.ReplicationHandler;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -127,52 +131,53 @@ public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestB
|
|||
waitForThingsToLevelOut(30);
|
||||
|
||||
checkShardConsistency(false, true);
|
||||
|
||||
// bring down the other node and index a few docs; so the leader and other node segments diverge
|
||||
forceNodeFailures(singletonList(secondNode));
|
||||
for (int i = 0; i < 10; i++) {
|
||||
indexDoc(id, docId, i1, 50, tlong, 50, t1,
|
||||
"document number " + docId++);
|
||||
if(i % 2 == 0) {
|
||||
commit();
|
||||
}
|
||||
}
|
||||
commit();
|
||||
restartNodes(singletonList(secondNode));
|
||||
|
||||
// start the freshNode
|
||||
ChaosMonkey.start(freshNode.jetty);
|
||||
nodesDown.remove(freshNode);
|
||||
|
||||
waitTillNodesActive();
|
||||
waitForThingsToLevelOut(30);
|
||||
|
||||
//TODO check how to see if fresh node went into recovery (may be check count for replication handler on new leader)
|
||||
|
||||
long numRequestsBefore = (Long) secondNode.jetty
|
||||
.getCoreContainer()
|
||||
.getCores()
|
||||
.iterator()
|
||||
.next()
|
||||
.getRequestHandler(ReplicationHandler.PATH)
|
||||
.getStatistics().get("requests");
|
||||
restartNodes(singletonList(freshNode));
|
||||
|
||||
String replicationProperties = (String) freshNode.jetty.getSolrHome() + "/cores/" + DEFAULT_TEST_COLLECTION_NAME + "/data/replication.properties";
|
||||
String md5 = DigestUtils.md5Hex(Files.readAllBytes(Paths.get(replicationProperties)));
|
||||
|
||||
// shutdown the original leader
|
||||
log.info("Now shutting down initial leader");
|
||||
forceNodeFailures(singletonList(initialLeaderJetty));
|
||||
waitForNewLeader(cloudClient, "shard1", (Replica)initialLeaderJetty.client.info , 15);
|
||||
waitForNewLeader(cloudClient, "shard1", (Replica)initialLeaderJetty.client.info , new TimeOut(15, SECONDS));
|
||||
waitTillNodesActive();
|
||||
log.info("Updating mappings from zk");
|
||||
updateMappingsFromZk(jettys, clients, true);
|
||||
|
||||
long numRequestsAfter = (Long) secondNode.jetty
|
||||
.getCoreContainer()
|
||||
.getCores()
|
||||
.iterator()
|
||||
.next()
|
||||
.getRequestHandler(ReplicationHandler.PATH)
|
||||
.getStatistics().get("requests");
|
||||
|
||||
assertEquals("Node went into replication", numRequestsBefore, numRequestsAfter);
|
||||
assertEquals("Node went into replication", md5, DigestUtils.md5Hex(Files.readAllBytes(Paths.get(replicationProperties))));
|
||||
|
||||
success = true;
|
||||
} finally {
|
||||
System.clearProperty("solr.disableFingerprint");
|
||||
}
|
||||
}
|
||||
|
||||
private void restartNodes(List<CloudJettyRunner> nodesToRestart) throws Exception {
|
||||
for (CloudJettyRunner node : nodesToRestart) {
|
||||
chaosMonkey.start(node.jetty);
|
||||
nodesDown.remove(node);
|
||||
}
|
||||
waitTillNodesActive();
|
||||
checkShardConsistency(false, true);
|
||||
}
|
||||
|
||||
|
||||
private void forceNodeFailures(List<CloudJettyRunner> replicasToShutDown) throws Exception {
|
||||
for (CloudJettyRunner replicaToShutDown : replicasToShutDown) {
|
||||
chaosMonkey.killJetty(replicaToShutDown);
|
||||
waitForNoShardInconsistency();
|
||||
}
|
||||
|
||||
int totalDown = 0;
|
||||
|
@ -205,8 +210,13 @@ public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestB
|
|||
Collection<Replica> replicas = slice.getReplicas();
|
||||
boolean allActive = true;
|
||||
|
||||
Collection<String> nodesDownNames = nodesDown.stream()
|
||||
.map(n -> n.coreNodeName)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
Collection<Replica> replicasToCheck = null;
|
||||
replicasToCheck = replicas.stream().filter(r -> nodesDown.contains(r.getName()))
|
||||
replicasToCheck = replicas.stream()
|
||||
.filter(r -> !nodesDownNames.contains(r.getName()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
for (Replica replica : replicasToCheck) {
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.solr.cloud;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -41,15 +43,16 @@ import org.apache.solr.common.cloud.Replica;
|
|||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.handler.ReplicationHandler;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
/**
|
||||
* Test sync peer sync when a node restarts and documents are indexed when node was down.
|
||||
* Test PeerSync when a node restarts and documents are indexed when node was down.
|
||||
*
|
||||
* This test is modeled after SyncSliceTest
|
||||
*/
|
||||
|
@ -149,12 +152,12 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
|
|||
log.info("Now shutting down initial leader");
|
||||
forceNodeFailures(singletonList(initialLeaderJetty));
|
||||
log.info("Updating mappings from zk");
|
||||
waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, 15);
|
||||
waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, new TimeOut(15, SECONDS));
|
||||
updateMappingsFromZk(jettys, clients, true);
|
||||
assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1"));
|
||||
|
||||
// bring up node that was down all along, and let it PeerSync from the node that was forced to PeerSynce
|
||||
bringUpDeadNodeAndEnsureNoReplication(shardToLeaderJetty.get("shard1"), neverLeader, false);
|
||||
bringUpDeadNodeAndEnsureNoReplication(neverLeader, false);
|
||||
waitTillNodesActive();
|
||||
|
||||
checkShardConsistency(false, true);
|
||||
|
@ -199,7 +202,6 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
|
|||
private void forceNodeFailures(List<CloudJettyRunner> replicasToShutDown) throws Exception {
|
||||
for (CloudJettyRunner replicaToShutDown : replicasToShutDown) {
|
||||
chaosMonkey.killJetty(replicaToShutDown);
|
||||
waitForNoShardInconsistency();
|
||||
}
|
||||
|
||||
int totalDown = 0;
|
||||
|
@ -218,8 +220,6 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
|
|||
assertEquals(getShardCount() - totalDown, jetties.size());
|
||||
|
||||
nodesDown.addAll(replicasToShutDown);
|
||||
|
||||
Thread.sleep(3000);
|
||||
}
|
||||
|
||||
|
||||
|
@ -241,26 +241,17 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
|
|||
"document number " + docId++);
|
||||
commit();
|
||||
|
||||
bringUpDeadNodeAndEnsureNoReplication(leaderJetty, replicaToShutDown, disableFingerprint);
|
||||
bringUpDeadNodeAndEnsureNoReplication(replicaToShutDown, disableFingerprint);
|
||||
|
||||
return replicaToShutDown;
|
||||
}
|
||||
|
||||
|
||||
|
||||
private void bringUpDeadNodeAndEnsureNoReplication(CloudJettyRunner leaderJetty, CloudJettyRunner nodeToBringUp,
|
||||
boolean disableFingerprint) throws Exception {
|
||||
|
||||
private void bringUpDeadNodeAndEnsureNoReplication(CloudJettyRunner nodeToBringUp, boolean disableFingerprint)
|
||||
throws Exception {
|
||||
// disable fingerprint check if needed
|
||||
System.setProperty("solr.disableFingerprint", String.valueOf(disableFingerprint));
|
||||
|
||||
long numRequestsBefore = (Long) leaderJetty.jetty
|
||||
.getCoreContainer()
|
||||
.getCores()
|
||||
.iterator()
|
||||
.next()
|
||||
.getRequestHandler(ReplicationHandler.PATH)
|
||||
.getStatistics().get("requests");
|
||||
|
||||
indexInBackground(50);
|
||||
|
||||
// bring back dead node and ensure it recovers
|
||||
|
@ -279,15 +270,9 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
|
|||
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
|
||||
assertEquals(docId, cloudClientDocs);
|
||||
|
||||
long numRequestsAfter = (Long) leaderJetty.jetty
|
||||
.getCoreContainer()
|
||||
.getCores()
|
||||
.iterator()
|
||||
.next()
|
||||
.getRequestHandler(ReplicationHandler.PATH)
|
||||
.getStatistics().get("requests");
|
||||
|
||||
assertEquals("PeerSync failed. Had to fail back to replication", numRequestsBefore, numRequestsAfter);
|
||||
// if there was no replication, we should not have replication.properties file
|
||||
String replicationProperties = nodeToBringUp.jetty.getSolrHome() + "/cores/" + DEFAULT_TEST_COLLECTION_NAME + "/data/replication.properties";
|
||||
assertTrue("PeerSync failed. Had to fail back to replication", Files.notExists(Paths.get(replicationProperties)));
|
||||
}
|
||||
|
||||
|
||||
|
@ -302,9 +287,15 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
|
|||
Collection<Replica> replicas = slice.getReplicas();
|
||||
boolean allActive = true;
|
||||
|
||||
Collection<Replica> replicasToCheck = null;
|
||||
replicasToCheck = replicas.stream().filter(r -> nodesDown.contains(r.getName()))
|
||||
.collect(Collectors.toList());
|
||||
Collection<String> nodesDownNames =
|
||||
nodesDown.stream()
|
||||
.map(n -> n.coreNodeName)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
Collection<Replica> replicasToCheck =
|
||||
replicas.stream()
|
||||
.filter(r -> !nodesDownNames.contains(r.getName()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
for (Replica replica : replicasToCheck) {
|
||||
if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.cloud;
|
|||
import java.io.File;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
@ -29,16 +30,20 @@ import org.apache.solr.common.cloud.ClusterState;
|
|||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.Slice.State;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.cloud.Slice.State;
|
||||
import org.apache.solr.core.Diagnostics;
|
||||
import org.apache.solr.core.MockDirectoryFactory;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.BeforeClass;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTestCase {
|
||||
|
||||
private static final String REMOVE_VERSION_FIELD = "remove.version.field";
|
||||
|
@ -226,31 +231,28 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
|
|||
log.info("Collection has disappeared - collection: " + collection);
|
||||
}
|
||||
|
||||
static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Replica oldLeader, int maxWaitInSecs)
|
||||
static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Replica oldLeader, TimeOut timeOut)
|
||||
throws Exception {
|
||||
log.info("Will wait for a node to become leader for {} secs", maxWaitInSecs);
|
||||
boolean waitForLeader = true;
|
||||
int i = 0;
|
||||
log.info("Will wait for a node to become leader for {} secs", timeOut.timeLeft(SECONDS));
|
||||
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||
zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION);
|
||||
|
||||
while(waitForLeader) {
|
||||
|
||||
for (; ; ) {
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
DocCollection coll = clusterState.getCollection("collection1");
|
||||
Slice slice = coll.getSlice(shardName);
|
||||
if(slice.getLeader() != oldLeader && slice.getState() == State.ACTIVE) {
|
||||
log.info("New leader got elected in {} secs", i);
|
||||
if (slice.getLeader() != null && !slice.getLeader().equals(oldLeader) && slice.getState() == State.ACTIVE) {
|
||||
log.info("Old leader {}, new leader. New leader got elected in {} ms", oldLeader, slice.getLeader(),timeOut.timeElapsed(MILLISECONDS) );
|
||||
break;
|
||||
}
|
||||
|
||||
if(i == maxWaitInSecs) {
|
||||
|
||||
if (timeOut.hasTimedOut()) {
|
||||
Diagnostics.logThreadDumps("Could not find new leader in specified timeout");
|
||||
zkStateReader.getZkClient().printLayoutToStdOut();
|
||||
fail("Could not find new leader even after waiting for " + maxWaitInSecs + "secs");
|
||||
fail("Could not find new leader even after waiting for " + timeOut.timeElapsed(MILLISECONDS) + "ms");
|
||||
}
|
||||
|
||||
i++;
|
||||
Thread.sleep(1000);
|
||||
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue