mirror of https://github.com/apache/lucene.git
SOLR-8697: Scope ZK election nodes by session to prevent elections from interfering with each other and other small LeaderElector improvements.
This commit is contained in:
parent
771f14cb6e
commit
9418369b46
|
@ -193,6 +193,9 @@ Bug Fixes
|
|||
|
||||
* SOLR-8656: PeerSync should use same nUpdates everywhere. (Ramsey Haddad via Mark Miller)
|
||||
|
||||
* SOLR-8697: Scope ZK election nodes by session to prevent elections from interfering with each other
|
||||
and other small LeaderElector improvements. (Scott Blum via Mark Miller)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
* SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been
|
||||
|
|
|
@ -137,6 +137,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
|
|||
|
||||
@Override
|
||||
public void cancelElection() throws InterruptedException, KeeperException {
|
||||
super.cancelElection();
|
||||
if (leaderZkNodeParentVersion != null) {
|
||||
try {
|
||||
// We need to be careful and make sure we *only* delete our own leader registration node.
|
||||
|
@ -163,7 +164,6 @@ class ShardLeaderElectionContextBase extends ElectionContext {
|
|||
} else {
|
||||
log.info("No version found for ephemeral leader parent node, won't remove previous leader registration.");
|
||||
}
|
||||
super.cancelElection();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -179,7 +179,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
|
|||
|
||||
@Override
|
||||
public void execute() throws InterruptedException, KeeperException {
|
||||
log.info("Creating leader registration node", leaderPath);
|
||||
log.info("Creating leader registration node {} after winning as {}", leaderPath, leaderSeqPath);
|
||||
List<Op> ops = new ArrayList<>(2);
|
||||
|
||||
// We use a multi operation to get the parent nodes version, which will
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
|
@ -105,54 +106,34 @@ public class LeaderElector {
|
|||
// get all other numbers...
|
||||
final String holdElectionPath = context.electionPath + ELECTION_NODE;
|
||||
List<String> seqs = zkClient.getChildren(holdElectionPath, null, true);
|
||||
|
||||
sortSeqs(seqs);
|
||||
List<Integer> intSeqs = getSeqs(seqs);
|
||||
if (intSeqs.size() == 0) {
|
||||
|
||||
String leaderSeqNodeName = context.leaderSeqPath.substring(context.leaderSeqPath.lastIndexOf('/') + 1);
|
||||
if (!seqs.contains(leaderSeqNodeName)) {
|
||||
log.warn("Our node is no longer in line to be leader");
|
||||
return;
|
||||
}
|
||||
// We can't really rely on the sequence number stored in the old watcher, it may be stale, thus this check.
|
||||
|
||||
int seq = -1;
|
||||
|
||||
// See if we've already been re-added, and this is an old context. In which case, use our current sequence number.
|
||||
String newLeaderSeq = "";
|
||||
for (String elec : seqs) {
|
||||
if (getNodeName(elec).equals(getNodeName(context.leaderSeqPath)) && seq < getSeq(elec)) {
|
||||
seq = getSeq(elec); // so use the current sequence number.
|
||||
newLeaderSeq = elec;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Now, if we've been re-added, presumably we've also set up watchers and all that kind of thing, so we're done
|
||||
if (StringUtils.isNotBlank(newLeaderSeq) && seq > getSeq(context.leaderSeqPath)) {
|
||||
log.info("Node " + context.leaderSeqPath + " already in queue as " + newLeaderSeq + " nothing to do.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Fallback in case we're all coming in here fresh and there is no node for this core already in the election queue.
|
||||
if (seq == -1) {
|
||||
seq = getSeq(context.leaderSeqPath);
|
||||
}
|
||||
|
||||
if (seq <= intSeqs.get(0)) {
|
||||
if (seq == intSeqs.get(0) && !context.leaderSeqPath.equals(holdElectionPath + "/" + seqs.get(0))) {//somebody else already became the leader with the same sequence id , not me
|
||||
log.info("was going to be leader {} , seq(0) {}", context.leaderSeqPath, holdElectionPath + "/" + seqs.get(0));//but someone else jumped the line
|
||||
|
||||
// The problem is that deleting the ZK node that's watched by others
|
||||
// results in an unpredictable sequencing of the events and sometime the context that comes in for checking
|
||||
// this happens to be after the node has already taken over leadership. So just leave out of here.
|
||||
// This caused one of the tests to fail on having two nodes with the same name in the queue. I'm not sure
|
||||
// the assumption that this is a bad state is valid.
|
||||
if (getNodeName(context.leaderSeqPath).equals(getNodeName(seqs.get(0)))) {
|
||||
return;
|
||||
// If any double-registrations exist for me, remove all but this latest one!
|
||||
// TODO: can we even get into this state?
|
||||
String prefix = zkClient.getSolrZooKeeper().getSessionId() + "-" + context.id;
|
||||
Iterator<String> it = seqs.iterator();
|
||||
while (it.hasNext()) {
|
||||
String elec = it.next();
|
||||
if (!elec.equals(leaderSeqNodeName) && elec.startsWith(prefix)) {
|
||||
try {
|
||||
String toDelete = holdElectionPath + "/" + elec;
|
||||
log.warn("Deleting duplicate registration: {}", toDelete);
|
||||
zkClient.delete(toDelete, -1, true);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
// ignore
|
||||
}
|
||||
retryElection(context, false);//join at the tail again
|
||||
return;
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
|
||||
if (leaderSeqNodeName.equals(seqs.get(0))) {
|
||||
// I am the leader
|
||||
try {
|
||||
runIamLeaderProcess(context, replacement);
|
||||
} catch (KeeperException.NodeExistsException e) {
|
||||
|
@ -162,30 +143,25 @@ public class LeaderElector {
|
|||
}
|
||||
} else {
|
||||
// I am not the leader - watch the node below me
|
||||
int toWatch = -1;
|
||||
for (int idx = 0; idx < intSeqs.size(); idx++) {
|
||||
if (intSeqs.get(idx) < seq && ! getNodeName(context.leaderSeqPath).equals(getNodeName(seqs.get(idx)))) {
|
||||
toWatch = idx;
|
||||
}
|
||||
if (intSeqs.get(idx) >= seq) {
|
||||
String toWatch = seqs.get(0);
|
||||
for (String node : seqs) {
|
||||
if (leaderSeqNodeName.equals(node)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (toWatch < 0) {
|
||||
log.warn("Our node is no longer in line to be leader");
|
||||
return;
|
||||
toWatch = node;
|
||||
}
|
||||
try {
|
||||
String watchedNode = holdElectionPath + "/" + seqs.get(toWatch);
|
||||
|
||||
zkClient.getData(watchedNode, watcher = new ElectionWatcher(context.leaderSeqPath , watchedNode,seq, context) , null, true);
|
||||
String watchedNode = holdElectionPath + "/" + toWatch;
|
||||
zkClient.getData(watchedNode, watcher = new ElectionWatcher(context.leaderSeqPath, watchedNode, getSeq(context.leaderSeqPath), context), null, true);
|
||||
log.info("Watching path {} to know if I could be the leader", watchedNode);
|
||||
} catch (KeeperException.SessionExpiredException e) {
|
||||
throw e;
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
// the previous node disappeared, check if we are the leader again
|
||||
checkIfIamLeader(context, true);
|
||||
} catch (KeeperException e) {
|
||||
// we couldn't set our watch for some other reason, retry
|
||||
log.warn("Failed setting watch", e);
|
||||
// we couldn't set our watch - the node before us may already be down?
|
||||
// we need to check if we are the leader again
|
||||
checkIfIamLeader(context, true);
|
||||
}
|
||||
}
|
||||
|
@ -239,18 +215,6 @@ public class LeaderElector {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns int list given list of form n_0000000001, n_0000000003, etc.
|
||||
*
|
||||
* @return int seqs
|
||||
*/
|
||||
private List<Integer> getSeqs(List<String> seqs) {
|
||||
List<Integer> intSeqs = new ArrayList<>(seqs.size());
|
||||
for (String seq : seqs) {
|
||||
intSeqs.add(getSeq(seq));
|
||||
}
|
||||
return intSeqs;
|
||||
}
|
||||
public int joinElection(ElectionContext context, boolean replacement) throws KeeperException, InterruptedException, IOException {
|
||||
return joinElection(context,replacement, false);
|
||||
}
|
||||
|
@ -410,8 +374,7 @@ public class LeaderElector {
|
|||
|
||||
@Override
|
||||
public int compare(String o1, String o2) {
|
||||
int i = Integer.valueOf(getSeq(o1)).compareTo(
|
||||
Integer.valueOf(getSeq(o2)));
|
||||
int i = getSeq(o1) - getSeq(o2);
|
||||
return i == 0 ? o1.compareTo(o2) : i ;
|
||||
}
|
||||
});
|
||||
|
|
|
@ -157,6 +157,11 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
for (int i = 0; i < 120; i++) {
|
||||
String shardId = getShardId(collection, coreNodeName);
|
||||
if (shardId != null) {
|
||||
ElectionContext prevContext = electionContext.get(coreName);
|
||||
if (prevContext != null) {
|
||||
prevContext.cancelElection();
|
||||
}
|
||||
|
||||
try {
|
||||
zkClient.makePath("/collections/" + collection + "/leader_elect/"
|
||||
+ shardId + "/election", true);
|
||||
|
@ -172,6 +177,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
elector, shardId, collection, nodeName + "_" + coreName, props,
|
||||
zkStateReader);
|
||||
elector.setup(ctx);
|
||||
electionContext.put(coreName, ctx);
|
||||
elector.joinElection(ctx, false);
|
||||
return shardId;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue