mirror of https://github.com/apache/lucene.git
SOLR-8367: Fix the LeaderInitiatedRecovery 'all replicas participate' fail-safe.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1718987 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e04981edc7
commit
0f0b3ff8e5
|
@ -228,6 +228,8 @@ Bug Fixes
|
|||
* SOLR-8373: KerberosPlugin: Using multiple nodes on same machine leads clients to
|
||||
fetch TGT for every request (Ishan Chattopadhyaya via noble)
|
||||
|
||||
* SOLR-8367: Fix the LeaderInitiatedRecovery 'all replicas participate' fail-safe.
|
||||
(Mark Miller, Mike Drob)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
|
|
@ -293,16 +293,20 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
lt.minimumWaitBetweenActions();
|
||||
lt.markAttemptingAction();
|
||||
|
||||
log.info("Running the leader process for shard " + shardId);
|
||||
|
||||
int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
|
||||
|
||||
log.info("Running the leader process for shard={} and weAreReplacement={} and leaderVoteWait={}", shardId, weAreReplacement, leaderVoteWait);
|
||||
// clear the leader in clusterstate
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
|
||||
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
|
||||
Overseer.getInQueue(zkClient).offer(Utils.toJSON(m));
|
||||
|
||||
int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
|
||||
boolean allReplicasInLine = false;
|
||||
if (!weAreReplacement) {
|
||||
allReplicasInLine = waitForReplicasToComeUp(leaderVoteWait);
|
||||
} else {
|
||||
allReplicasInLine = areAllReplicasParticipating();
|
||||
}
|
||||
|
||||
if (isClosed) {
|
||||
|
@ -406,6 +410,9 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
}
|
||||
log.info("I am the new leader: " + ZkCoreNodeProps.getCoreUrl(leaderProps) + " " + shardId);
|
||||
|
||||
// we made it as leader - send any recovery requests we need to
|
||||
syncStrategy.requestRecoveries();
|
||||
|
||||
} catch (Exception e) {
|
||||
isLeader = false;
|
||||
SolrException.log(log, "There was a problem trying to register as the leader", e);
|
||||
|
@ -600,6 +607,39 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
return false;
|
||||
}
|
||||
|
||||
// returns true if all replicas are found to be up, false if not
|
||||
private boolean areAllReplicasParticipating() throws InterruptedException {
|
||||
final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
|
||||
Slice slices = zkController.getClusterState().getSlice(collection, shardId);
|
||||
|
||||
if (slices != null) {
|
||||
int found = 0;
|
||||
try {
|
||||
found = zkClient.getChildren(shardsElectZkPath, null, true).size();
|
||||
} catch (KeeperException e) {
|
||||
if (e instanceof KeeperException.SessionExpiredException) {
|
||||
// if the session has expired, then another election will be launched, so
|
||||
// quit here
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||
"ZK session expired - cancelling election for " + collection + " " + shardId);
|
||||
}
|
||||
SolrException.log(log, "Error checking for the number of election participants", e);
|
||||
}
|
||||
|
||||
if (found >= slices.getReplicasMap().size()) {
|
||||
log.info("All replicas are ready to participate in election.");
|
||||
return true;
|
||||
}
|
||||
|
||||
} else {
|
||||
log.warn("Shard not found: " + shardId + " for collection " + collection);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private void rejoinLeaderElection(SolrCore core)
|
||||
throws InterruptedException, KeeperException, IOException {
|
||||
// remove our ephemeral and re join the election
|
||||
|
|
|
@ -40,15 +40,10 @@ import org.apache.solr.handler.component.ShardHandler;
|
|||
import org.apache.solr.handler.component.ShardRequest;
|
||||
import org.apache.solr.handler.component.ShardResponse;
|
||||
import org.apache.solr.logging.MDCLoggingContext;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.request.SolrRequestInfo;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.solr.update.PeerSync;
|
||||
import org.apache.solr.update.UpdateShardHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
public class SyncStrategy {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
@ -63,6 +58,14 @@ public class SyncStrategy {
|
|||
|
||||
private final ExecutorService updateExecutor;
|
||||
|
||||
private final List<RecoveryRequest> recoveryRequests = new ArrayList<>();
|
||||
|
||||
private static class RecoveryRequest {
|
||||
ZkNodeProps leaderProps;
|
||||
String baseUrl;
|
||||
String coreName;
|
||||
}
|
||||
|
||||
public SyncStrategy(CoreContainer cc) {
|
||||
UpdateShardHandler updateShardHandler = cc.getUpdateShardHandler();
|
||||
client = updateShardHandler.getHttpClient();
|
||||
|
@ -93,6 +96,9 @@ public class SyncStrategy {
|
|||
log.warn("Closed, skipping sync up.");
|
||||
return false;
|
||||
}
|
||||
|
||||
recoveryRequests.clear();
|
||||
|
||||
log.info("Sync replicas to " + ZkCoreNodeProps.getCoreUrl(leaderProps));
|
||||
|
||||
if (core.getUpdateHandler().getUpdateLog() == null) {
|
||||
|
@ -208,15 +214,16 @@ public class SyncStrategy {
|
|||
}
|
||||
|
||||
if (!success) {
|
||||
try {
|
||||
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Sync failed - asking replica (" + srsp.getShardAddress() + ") to recover.");
|
||||
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Sync failed - we will ask replica (" + srsp.getShardAddress()
|
||||
+ ") to recover.");
|
||||
if (isClosed) {
|
||||
log.info("We have been closed, don't request that a replica recover");
|
||||
} else {
|
||||
requestRecovery(leaderProps, ((ShardCoreRequest)srsp.getShardRequest()).baseUrl, ((ShardCoreRequest)srsp.getShardRequest()).coreName);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", e);
|
||||
RecoveryRequest rr = new RecoveryRequest();
|
||||
rr.leaderProps = leaderProps;
|
||||
rr.baseUrl = ((ShardCoreRequest) srsp.getShardRequest()).baseUrl;
|
||||
rr.coreName = ((ShardCoreRequest) srsp.getShardRequest()).coreName;
|
||||
recoveryRequests.add(rr);
|
||||
}
|
||||
} else {
|
||||
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": " + " sync completed with " + srsp.getShardAddress());
|
||||
|
@ -261,6 +268,16 @@ public class SyncStrategy {
|
|||
this.isClosed = true;
|
||||
}
|
||||
|
||||
public void requestRecoveries() {
|
||||
for (RecoveryRequest rr : recoveryRequests) {
|
||||
try {
|
||||
requestRecovery(rr.leaderProps, rr.baseUrl, rr.coreName);
|
||||
} catch (SolrServerException | IOException e) {
|
||||
log.error("Problem requesting that a replica recover", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void requestRecovery(final ZkNodeProps leaderProps, final String baseUrl, final String coreName) throws SolrServerException, IOException {
|
||||
Thread thread = new Thread() {
|
||||
{
|
||||
|
@ -272,7 +289,6 @@ public class SyncStrategy {
|
|||
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
|
||||
recoverRequestCmd.setCoreName(coreName);
|
||||
|
||||
;
|
||||
try (HttpSolrClient client = new HttpSolrClient(baseUrl, SyncStrategy.this.client)) {
|
||||
client.setConnectionTimeout(30000);
|
||||
client.setSoTimeout(120000);
|
||||
|
|
|
@ -447,11 +447,6 @@ public final class ZkController {
|
|||
if (isClosed) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e1) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1045,7 +1040,7 @@ public final class ZkController {
|
|||
Thread.sleep(1000);
|
||||
}
|
||||
if (cc.isShutDown()) {
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is close");
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is closed");
|
||||
}
|
||||
}
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Could not get leader props", exp);
|
||||
|
@ -1581,12 +1576,15 @@ public final class ZkController {
|
|||
|
||||
private ZkCoreNodeProps waitForLeaderToSeeDownState(
|
||||
CoreDescriptor descriptor, final String coreZkNodeName) {
|
||||
// try not to wait too long here - if we are waiting too long, we should probably
|
||||
// move along and join the election
|
||||
|
||||
CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
|
||||
String collection = cloudDesc.getCollectionName();
|
||||
String shard = cloudDesc.getShardId();
|
||||
ZkCoreNodeProps leaderProps = null;
|
||||
|
||||
int retries = 6;
|
||||
int retries = 2;
|
||||
for (int i = 0; i < retries; i++) {
|
||||
try {
|
||||
if (isClosed) {
|
||||
|
@ -1594,8 +1592,8 @@ public final class ZkController {
|
|||
"We have been closed");
|
||||
}
|
||||
|
||||
// go straight to zk, not the cloud state - we must have current info
|
||||
leaderProps = getLeaderProps(collection, shard, 30000);
|
||||
// go straight to zk, not the cloud state - we want current info
|
||||
leaderProps = getLeaderProps(collection, shard, 5000);
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "There was a problem finding the leader in zk", e);
|
||||
|
@ -1649,7 +1647,7 @@ public final class ZkController {
|
|||
|
||||
// let's retry a couple times - perhaps the leader just went down,
|
||||
// or perhaps he is just not quite ready for us yet
|
||||
retries = 6;
|
||||
retries = 2;
|
||||
for (int i = 0; i < retries; i++) {
|
||||
if (isClosed) {
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
|
||||
|
|
|
@ -736,7 +736,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
this.dataDir = initDataDir(dataDir, config, coreDescriptor);
|
||||
this.ulogDir = initUpdateLogDir(coreDescriptor);
|
||||
|
||||
log.info("[{}] Opening new SolrCore at [{}], dataDir=[{}]", logid, resourceLoader.getInstancePath(), dataDir);
|
||||
log.info("[{}] Opening new SolrCore at [{}], dataDir=[{}]", logid, resourceLoader.getInstancePath(), this.dataDir);
|
||||
|
||||
checkVersionFieldExistsInSchema(schema, coreDescriptor);
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@
|
|||
<str name="hostContext">${hostContext:solr}</str>
|
||||
<int name="zkClientTimeout">${solr.zkclienttimeout:30000}</int>
|
||||
<bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
|
||||
<int name="leaderVoteWait">10000</int>
|
||||
<int name="leaderVoteWait">${leaderVoteWait:10000}</int>
|
||||
<int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:45000}</int>
|
||||
<int name="distribUpdateSoTimeout">${distribUpdateSoTimeout:340000}</int>
|
||||
</solrcloud>
|
||||
|
|
|
@ -17,37 +17,80 @@ package org.apache.solr.cloud;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.lucene.util.LuceneTestCase.Nightly;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
|
||||
import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
|
||||
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Slow
|
||||
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
|
||||
@Nightly
|
||||
public class LeaderInitiatedRecoveryOnShardRestartTest extends AbstractFullDistribZkTestBase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
public LeaderInitiatedRecoveryOnShardRestartTest() {
|
||||
public LeaderInitiatedRecoveryOnShardRestartTest() throws Exception {
|
||||
super();
|
||||
sliceCount = 1;
|
||||
fixShardCount(3);
|
||||
// we want 3 jetties, but we are using the control jetty as one
|
||||
fixShardCount(2);
|
||||
useFactory("solr.StandardDirectoryFactory");
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void before() {
|
||||
// we want more realistic leaderVoteWait so raise from
|
||||
// test default of 10s to 30s.
|
||||
System.setProperty("leaderVoteWait", "300000");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void after() {
|
||||
System.clearProperty("leaderVoteWait");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestartWithAllInLIR() throws Exception {
|
||||
waitForThingsToLevelOut(30000);
|
||||
|
||||
// still waiting to be able to properly start with no default collection1,
|
||||
// delete to remove confusion
|
||||
waitForRecoveriesToFinish(false);
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionAction.DELETE.toString());
|
||||
params.set("name", DEFAULT_COLLECTION);
|
||||
QueryRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
String baseUrl = ((HttpSolrClient) clients.get(0)).getBaseURL();
|
||||
HttpSolrClient delClient = new HttpSolrClient(baseUrl.substring(0, baseUrl.lastIndexOf("/")));
|
||||
delClient.request(request);
|
||||
delClient.close();
|
||||
|
||||
String testCollectionName = "all_in_lir";
|
||||
String shardId = "shard1";
|
||||
createCollection(testCollectionName, 1, 3, 1);
|
||||
|
||||
waitForRecoveriesToFinish(testCollectionName, false);
|
||||
|
||||
cloudClient.setDefaultCollection(testCollectionName);
|
||||
|
||||
Map<String,Object> stateObj = Utils.makeMap();
|
||||
|
@ -62,19 +105,57 @@ public class LeaderInitiatedRecoveryOnShardRestartTest extends AbstractFullDistr
|
|||
zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node2", znodeData, true);
|
||||
zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node3", znodeData, true);
|
||||
|
||||
// printLayout();
|
||||
// everyone gets a couple docs so that everyone has tlog entries
|
||||
// and won't become leader simply because they have no tlog versions
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
addFields(doc, "id", "1");
|
||||
SolrInputDocument doc2 = new SolrInputDocument();
|
||||
addFields(doc2, "id", "2");
|
||||
cloudClient.add(doc);
|
||||
cloudClient.add(doc2);
|
||||
|
||||
cloudClient.commit();
|
||||
|
||||
assertEquals("We just added 2 docs, we should be able to find them", 2, cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound());
|
||||
|
||||
// randomly add too many docs to peer sync to one replica so that only one random replica is the valid leader
|
||||
// the versions don't matter, they just have to be higher than what the last 2 docs got
|
||||
HttpSolrClient client = (HttpSolrClient) clients.get(random().nextInt(clients.size()));
|
||||
client.setBaseURL(client.getBaseURL().substring(0, client.getBaseURL().lastIndexOf("/")) + "/" + testCollectionName);
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
|
||||
|
||||
try {
|
||||
for (int i = 0; i < 101; i++) {
|
||||
add(client, params, sdoc("id", 3 + i, "_version_", Long.MAX_VALUE - 1 - i));
|
||||
}
|
||||
} catch (RemoteSolrException e) {
|
||||
// if we got a conflict it's because we tried to send a versioned doc to the leader,
|
||||
// resend without version
|
||||
if (e.getMessage().contains("conflict")) {
|
||||
for (int i = 0; i < 101; i++) {
|
||||
add(client, params, sdoc("id", 3 + i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
client.commit();
|
||||
|
||||
for (JettySolrRunner jetty : jettys) {
|
||||
ChaosMonkey.stop(jetty);
|
||||
}
|
||||
ChaosMonkey.stop(controlJetty);
|
||||
|
||||
Thread.sleep(2000);
|
||||
Thread.sleep(10000);
|
||||
|
||||
log.info("Start back up");
|
||||
|
||||
for (JettySolrRunner jetty : jettys) {
|
||||
ChaosMonkey.start(jetty);
|
||||
}
|
||||
ChaosMonkey.start(controlJetty);
|
||||
|
||||
// recoveries will not finish without SOLR-8075
|
||||
// recoveries will not finish without SOLR-8075 and SOLR-8367
|
||||
waitForRecoveriesToFinish(testCollectionName, true);
|
||||
|
||||
// now expire each node
|
||||
|
@ -100,7 +181,7 @@ public class LeaderInitiatedRecoveryOnShardRestartTest extends AbstractFullDistr
|
|||
|
||||
Thread.sleep(2000);
|
||||
|
||||
// recoveries will not finish without SOLR-8075
|
||||
// recoveries will not finish without SOLR-8075 and SOLR-8367
|
||||
waitForRecoveriesToFinish(testCollectionName, true);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue