mirror of https://github.com/apache/lucene.git
SOLR-9446: Leader failure after creating a freshly replicated index can send nodes into recovery even if index was not changed
This commit is contained in:
parent
6c48549f8d
commit
15cee3141c
|
@ -122,6 +122,9 @@ Optimizations
|
||||||
|
|
||||||
* SOLR-9142: JSON Facet API: new method=dvhash can be chosen for fields with high cardinality. (David Smiley)
|
* SOLR-9142: JSON Facet API: new method=dvhash can be chosen for fields with high cardinality. (David Smiley)
|
||||||
|
|
||||||
|
* SOLR-9446: Leader failure after creating a freshly replicated index can send nodes into recovery even if
|
||||||
|
index was not changed (Pushkar Raste, noble)
|
||||||
|
|
||||||
Other Changes
|
Other Changes
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -112,6 +112,12 @@ public class RealTimeGetComponent extends SearchComponent
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val = params.get("getFingerprint");
|
||||||
|
if(val != null) {
|
||||||
|
processGetFingeprint(rb);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
val = params.get("getVersions");
|
val = params.get("getVersions");
|
||||||
if (val != null) {
|
if (val != null) {
|
||||||
processGetVersions(rb);
|
processGetVersions(rb);
|
||||||
|
@ -598,6 +604,17 @@ public class RealTimeGetComponent extends SearchComponent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public void processGetFingeprint(ResponseBuilder rb) throws IOException {
|
||||||
|
SolrQueryRequest req = rb.req;
|
||||||
|
SolrParams params = req.getParams();
|
||||||
|
|
||||||
|
long maxVersion = params.getLong("getFingerprint", Long.MAX_VALUE);
|
||||||
|
IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Math.abs(maxVersion));
|
||||||
|
rb.rsp.add("fingerprint", fingerprint);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////
|
||||||
// Returns last versions added to index
|
// Returns last versions added to index
|
||||||
///////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -188,6 +188,11 @@ public class PeerSync {
|
||||||
log.debug(msg() + "startingVersions=" + startingVersions.size() + " " + startingVersions);
|
log.debug(msg() + "startingVersions=" + startingVersions.size() + " " + startingVersions);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// check if we already in sync to begin with
|
||||||
|
if(doFingerprint && alreadyInSync()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// Fire off the requests before getting our own recent updates (for better concurrency)
|
// Fire off the requests before getting our own recent updates (for better concurrency)
|
||||||
// This also allows us to avoid getting updates we don't need... if we got our updates and then got their updates,
|
// This also allows us to avoid getting updates we don't need... if we got our updates and then got their updates,
|
||||||
|
@ -278,6 +283,51 @@ public class PeerSync {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if we are already in sync. Simple fingerprint comparison should do
|
||||||
|
*/
|
||||||
|
private boolean alreadyInSync() {
|
||||||
|
for (String replica : replicas) {
|
||||||
|
requestFingerprint(replica);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
ShardResponse srsp = shardHandler.takeCompletedOrError();
|
||||||
|
if (srsp == null) break;
|
||||||
|
|
||||||
|
try {
|
||||||
|
IndexFingerprint otherFingerprint = IndexFingerprint.fromObject(srsp.getSolrResponse().getResponse().get("fingerprint"));
|
||||||
|
IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
|
||||||
|
if(IndexFingerprint.compare(otherFingerprint, ourFingerprint) == 0) {
|
||||||
|
log.info("We are already in sync. No need to do a PeerSync ");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} catch(IOException e) {
|
||||||
|
log.warn("Could not cofirm if we are already in sync. Continue with PeerSync");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void requestFingerprint(String replica) {
|
||||||
|
SyncShardRequest sreq = new SyncShardRequest();
|
||||||
|
requests.add(sreq);
|
||||||
|
|
||||||
|
sreq.shards = new String[]{replica};
|
||||||
|
sreq.actualShards = sreq.shards;
|
||||||
|
sreq.params = new ModifiableSolrParams();
|
||||||
|
sreq.params = new ModifiableSolrParams();
|
||||||
|
sreq.params.set("qt","/get");
|
||||||
|
sreq.params.set("distrib",false);
|
||||||
|
sreq.params.set("getFingerprint", String.valueOf(Long.MAX_VALUE));
|
||||||
|
|
||||||
|
shardHandler.submit(sreq, replica, sreq.params);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private void requestVersions(String replica) {
|
private void requestVersions(String replica) {
|
||||||
SyncShardRequest sreq = new SyncShardRequest();
|
SyncShardRequest sreq = new SyncShardRequest();
|
||||||
requests.add(sreq);
|
requests.add(sreq);
|
||||||
|
|
|
@ -0,0 +1,295 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.RandomStringUtils;
|
||||||
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||||
|
import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
|
||||||
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
import org.apache.solr.common.cloud.Slice;
|
||||||
|
import org.apache.solr.common.cloud.Slice.State;
|
||||||
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
import org.apache.solr.core.Diagnostics;
|
||||||
|
import org.apache.solr.handler.ReplicationHandler;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static java.util.Collections.singletonList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Test for SOLR-9446
|
||||||
|
*
|
||||||
|
* This test is modeled after SyncSliceTest
|
||||||
|
*/
|
||||||
|
@Slow
|
||||||
|
public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestBase {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private boolean success = false;
|
||||||
|
int docId = 0;
|
||||||
|
|
||||||
|
List<CloudJettyRunner> nodesDown = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void distribTearDown() throws Exception {
|
||||||
|
if (!success) {
|
||||||
|
printLayoutOnTearDown = true;
|
||||||
|
}
|
||||||
|
System.clearProperty("solr.directoryFactory");
|
||||||
|
System.clearProperty("solr.ulog.numRecordsToKeep");
|
||||||
|
System.clearProperty("tests.zk.violationReportAction");
|
||||||
|
super.distribTearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public LeaderFailureAfterFreshStartTest() {
|
||||||
|
super();
|
||||||
|
sliceCount = 1;
|
||||||
|
fixShardCount(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getCloudSolrConfig() {
|
||||||
|
return "solrconfig-tlog.xml";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void distribSetUp() throws Exception {
|
||||||
|
// tlog gets deleted after node restarts if we use CachingDirectoryFactory.
|
||||||
|
// make sure that tlog stays intact after we restart a node
|
||||||
|
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
||||||
|
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
|
||||||
|
System.setProperty("tests.zk.violationReportAction", LimitViolationAction.IGNORE.toString());
|
||||||
|
super.distribSetUp();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
handle.clear();
|
||||||
|
handle.put("timestamp", SKIPVAL);
|
||||||
|
|
||||||
|
try {
|
||||||
|
CloudJettyRunner initialLeaderJetty = shardToLeaderJetty.get("shard1");
|
||||||
|
List<CloudJettyRunner> otherJetties = getOtherAvailableJetties(initialLeaderJetty);
|
||||||
|
|
||||||
|
log.info("Leader node_name: {}, url: {}", initialLeaderJetty.coreNodeName, initialLeaderJetty.url);
|
||||||
|
for (CloudJettyRunner cloudJettyRunner : otherJetties) {
|
||||||
|
log.info("Nonleader node_name: {}, url: {}", cloudJettyRunner.coreNodeName, cloudJettyRunner.url);
|
||||||
|
}
|
||||||
|
|
||||||
|
CloudJettyRunner secondNode = otherJetties.get(0);
|
||||||
|
CloudJettyRunner freshNode = otherJetties.get(1);
|
||||||
|
|
||||||
|
// shutdown a node to simulate fresh start
|
||||||
|
otherJetties.remove(freshNode);
|
||||||
|
forceNodeFailures(singletonList(freshNode));
|
||||||
|
|
||||||
|
del("*:*");
|
||||||
|
waitForThingsToLevelOut(30);
|
||||||
|
|
||||||
|
checkShardConsistency(false, true);
|
||||||
|
|
||||||
|
// index a few docs and commit
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
indexDoc(id, docId, i1, 50, tlong, 50, t1,
|
||||||
|
"document number " + docId++);
|
||||||
|
}
|
||||||
|
commit();
|
||||||
|
waitForThingsToLevelOut(30);
|
||||||
|
|
||||||
|
checkShardConsistency(false, true);
|
||||||
|
|
||||||
|
// start the freshNode
|
||||||
|
ChaosMonkey.start(freshNode.jetty);
|
||||||
|
nodesDown.remove(freshNode);
|
||||||
|
|
||||||
|
waitTillNodesActive();
|
||||||
|
waitForThingsToLevelOut(30);
|
||||||
|
|
||||||
|
//TODO check how to see if fresh node went into recovery (may be check count for replication handler on new leader)
|
||||||
|
|
||||||
|
long numRequestsBefore = (Long) secondNode.jetty
|
||||||
|
.getCoreContainer()
|
||||||
|
.getCores()
|
||||||
|
.iterator()
|
||||||
|
.next()
|
||||||
|
.getRequestHandler(ReplicationHandler.PATH)
|
||||||
|
.getStatistics().get("requests");
|
||||||
|
|
||||||
|
// shutdown the original leader
|
||||||
|
log.info("Now shutting down initial leader");
|
||||||
|
forceNodeFailures(singletonList(initialLeaderJetty));
|
||||||
|
waitForNewLeader(cloudClient, "shard1", (Replica)initialLeaderJetty.client.info , 15);
|
||||||
|
log.info("Updating mappings from zk");
|
||||||
|
updateMappingsFromZk(jettys, clients, true);
|
||||||
|
|
||||||
|
long numRequestsAfter = (Long) secondNode.jetty
|
||||||
|
.getCoreContainer()
|
||||||
|
.getCores()
|
||||||
|
.iterator()
|
||||||
|
.next()
|
||||||
|
.getRequestHandler(ReplicationHandler.PATH)
|
||||||
|
.getStatistics().get("requests");
|
||||||
|
|
||||||
|
assertEquals("Node went into replication", numRequestsBefore, numRequestsAfter);
|
||||||
|
|
||||||
|
success = true;
|
||||||
|
} finally {
|
||||||
|
System.clearProperty("solr.disableFingerprint");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void forceNodeFailures(List<CloudJettyRunner> replicasToShutDown) throws Exception {
|
||||||
|
for (CloudJettyRunner replicaToShutDown : replicasToShutDown) {
|
||||||
|
chaosMonkey.killJetty(replicaToShutDown);
|
||||||
|
waitForNoShardInconsistency();
|
||||||
|
}
|
||||||
|
|
||||||
|
int totalDown = 0;
|
||||||
|
|
||||||
|
Set<CloudJettyRunner> jetties = new HashSet<>();
|
||||||
|
jetties.addAll(shardToJetty.get("shard1"));
|
||||||
|
|
||||||
|
if (replicasToShutDown != null) {
|
||||||
|
jetties.removeAll(replicasToShutDown);
|
||||||
|
totalDown += replicasToShutDown.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
jetties.removeAll(nodesDown);
|
||||||
|
totalDown += nodesDown.size();
|
||||||
|
|
||||||
|
assertEquals(getShardCount() - totalDown, jetties.size());
|
||||||
|
|
||||||
|
nodesDown.addAll(replicasToShutDown);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Replica oldLeader, int maxWaitInSecs)
|
||||||
|
throws Exception {
|
||||||
|
log.info("Will wait for a node to become leader for {} secs", maxWaitInSecs);
|
||||||
|
boolean waitForLeader = true;
|
||||||
|
int i = 0;
|
||||||
|
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||||
|
zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION);
|
||||||
|
|
||||||
|
while(waitForLeader) {
|
||||||
|
ClusterState clusterState = zkStateReader.getClusterState();
|
||||||
|
DocCollection coll = clusterState.getCollection("collection1");
|
||||||
|
Slice slice = coll.getSlice(shardName);
|
||||||
|
if(slice.getLeader() != oldLeader && slice.getState() == State.ACTIVE) {
|
||||||
|
log.info("New leader got elected in {} secs", i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(i == maxWaitInSecs) {
|
||||||
|
Diagnostics.logThreadDumps("Could not find new leader in specified timeout");
|
||||||
|
zkStateReader.getZkClient().printLayoutToStdOut();
|
||||||
|
fail("Could not find new leader even after waiting for " + maxWaitInSecs + "secs");
|
||||||
|
}
|
||||||
|
|
||||||
|
i++;
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private void waitTillNodesActive() throws Exception {
|
||||||
|
for (int i = 0; i < 60; i++) {
|
||||||
|
Thread.sleep(3000);
|
||||||
|
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||||
|
ClusterState clusterState = zkStateReader.getClusterState();
|
||||||
|
DocCollection collection1 = clusterState.getCollection("collection1");
|
||||||
|
Slice slice = collection1.getSlice("shard1");
|
||||||
|
Collection<Replica> replicas = slice.getReplicas();
|
||||||
|
boolean allActive = true;
|
||||||
|
|
||||||
|
Collection<Replica> replicasToCheck = null;
|
||||||
|
replicasToCheck = replicas.stream().filter(r -> nodesDown.contains(r.getName()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
for (Replica replica : replicasToCheck) {
|
||||||
|
if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
|
||||||
|
allActive = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (allActive) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
printLayout();
|
||||||
|
fail("timeout waiting to see all nodes active");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private List<CloudJettyRunner> getOtherAvailableJetties(CloudJettyRunner leader) {
|
||||||
|
List<CloudJettyRunner> candidates = new ArrayList<>();
|
||||||
|
candidates.addAll(shardToJetty.get("shard1"));
|
||||||
|
|
||||||
|
if (leader != null) {
|
||||||
|
candidates.remove(leader);
|
||||||
|
}
|
||||||
|
|
||||||
|
candidates.removeAll(nodesDown);
|
||||||
|
|
||||||
|
return candidates;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void indexDoc(Object... fields) throws IOException,
|
||||||
|
SolrServerException {
|
||||||
|
SolrInputDocument doc = new SolrInputDocument();
|
||||||
|
|
||||||
|
addFields(doc, fields);
|
||||||
|
addFields(doc, "rnd_s", RandomStringUtils.random(random().nextInt(100) + 100));
|
||||||
|
|
||||||
|
UpdateRequest ureq = new UpdateRequest();
|
||||||
|
ureq.add(doc);
|
||||||
|
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||||
|
ureq.setParams(params);
|
||||||
|
ureq.process(cloudClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
// skip the randoms - they can deadlock...
|
||||||
|
@Override
|
||||||
|
protected void indexr(Object... fields) throws Exception {
|
||||||
|
SolrInputDocument doc = new SolrInputDocument();
|
||||||
|
addFields(doc, fields);
|
||||||
|
addFields(doc, "rnd_b", true);
|
||||||
|
indexDoc(doc);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -45,6 +45,9 @@ import org.apache.solr.handler.ReplicationHandler;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static java.util.Collections.singletonList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test sync peer sync when a node restarts and documents are indexed when node was down.
|
* Test sync peer sync when a node restarts and documents are indexed when node was down.
|
||||||
*
|
*
|
||||||
|
@ -120,7 +123,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
|
||||||
otherJetties.remove(neverLeader) ;
|
otherJetties.remove(neverLeader) ;
|
||||||
|
|
||||||
// first shutdown a node that will never be a leader
|
// first shutdown a node that will never be a leader
|
||||||
forceNodeFailures(Arrays.asList(neverLeader));
|
forceNodeFailures(singletonList(neverLeader));
|
||||||
|
|
||||||
// node failure and recovery via PeerSync
|
// node failure and recovery via PeerSync
|
||||||
log.info("Forcing PeerSync");
|
log.info("Forcing PeerSync");
|
||||||
|
@ -144,9 +147,9 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
|
||||||
|
|
||||||
// now shutdown the original leader
|
// now shutdown the original leader
|
||||||
log.info("Now shutting down initial leader");
|
log.info("Now shutting down initial leader");
|
||||||
forceNodeFailures(Arrays.asList(initialLeaderJetty));
|
forceNodeFailures(singletonList(initialLeaderJetty));
|
||||||
log.info("Updating mappings from zk");
|
log.info("Updating mappings from zk");
|
||||||
Thread.sleep(15000); // sleep for a while for leader to change ...
|
LeaderFailureAfterFreshStartTest.waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, 15);
|
||||||
updateMappingsFromZk(jettys, clients, true);
|
updateMappingsFromZk(jettys, clients, true);
|
||||||
assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1"));
|
assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1"));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue