mirror of https://github.com/apache/lucene.git
SOLR-6095 SolrCloud cluster can end up without an overseer with overseer roles
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1603382 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
67eef54ff2
commit
3ed3a46057
|
@ -88,7 +88,9 @@ Other Changes
|
|||
|
||||
================== 4.10.0 =================
|
||||
|
||||
(No Changes)
|
||||
Bug fixes
|
||||
|
||||
* SOLR-6095 : SolrCloud cluster can end up without an overseer with overseer roles (Noble Paul, Shalin Mangar)
|
||||
|
||||
================== 4.9.0 ==================
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ public abstract class ElectionContext {
|
|||
final String leaderPath;
|
||||
String leaderSeqPath;
|
||||
private SolrZkClient zkClient;
|
||||
|
||||
|
||||
public ElectionContext(final String coreNodeName,
|
||||
final String electionPath, final String leaderPath, final ZkNodeProps leaderProps, final SolrZkClient zkClient) {
|
||||
this.id = coreNodeName;
|
||||
|
@ -71,7 +71,7 @@ public abstract class ElectionContext {
|
|||
zkClient.delete(leaderSeqPath, -1, true);
|
||||
} catch (NoNodeException e) {
|
||||
// fine
|
||||
log.warn("cancelElection did not find election node to remove",e);
|
||||
log.warn("cancelElection did not find election node to remove {}" ,leaderSeqPath);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,6 +80,10 @@ public abstract class ElectionContext {
|
|||
public void checkIfIamLeaderFired() {}
|
||||
|
||||
public void joinedElectionFired() {}
|
||||
|
||||
public ElectionContext copy(){
|
||||
throw new UnsupportedOperationException("copy");
|
||||
}
|
||||
}
|
||||
|
||||
class ShardLeaderElectionContextBase extends ElectionContext {
|
||||
|
@ -529,6 +533,11 @@ final class OverseerElectionContext extends ElectionContext {
|
|||
super.cancelElection();
|
||||
overseer.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElectionContext copy() {
|
||||
return new OverseerElectionContext(zkClient, overseer ,id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void joinedElectionFired() {
|
||||
|
|
|
@ -57,7 +57,7 @@ public class LeaderElector {
|
|||
|
||||
static final String ELECTION_NODE = "/election";
|
||||
|
||||
private final static Pattern LEADER_SEQ = Pattern.compile(".*?/?.*?-n_(\\d+)");
|
||||
public final static Pattern LEADER_SEQ = Pattern.compile(".*?/?.*?-n_(\\d+)");
|
||||
private final static Pattern SESSION_ID = Pattern.compile(".*?/?(.*?-.*?)-n_\\d+");
|
||||
private final static Pattern NODE_NAME = Pattern.compile(".*?/?(.*?-)(.*?)-n_\\d+");
|
||||
|
||||
|
@ -77,7 +77,7 @@ public class LeaderElector {
|
|||
public ElectionContext getContext() {
|
||||
return context;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Check if the candidate with the given n_* sequence number is the leader.
|
||||
* If it is, set the leaderId on the leader zk node. If it is not, start
|
||||
|
@ -100,14 +100,34 @@ public class LeaderElector {
|
|||
return;
|
||||
}
|
||||
if (seq <= intSeqs.get(0)) {
|
||||
if(seq == intSeqs.get(0) && !context.leaderSeqPath.equals(holdElectionPath+"/"+seqs.get(0)) ) {//somebody else already became the leader with the same sequence id , not me
|
||||
log.info("was going be leader {} , seq(0) {}",context.leaderSeqPath,holdElectionPath+"/"+seqs.get(0));//but someone else jumped the line
|
||||
retryElection(context,false);//join at the tail again
|
||||
return;
|
||||
}
|
||||
// first we delete the node advertising the old leader in case the ephem is still there
|
||||
// first we delete the node advertising the old leader in case the ephem is still there
|
||||
try {
|
||||
zkClient.delete(context.leaderPath, -1, true);
|
||||
} catch(Exception e) {
|
||||
}catch (KeeperException.NoNodeException nne){
|
||||
//no problem
|
||||
}catch (InterruptedException e){
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
//failed to delete the leader node
|
||||
log.error("leader elect delete error",e);
|
||||
retryElection(context, false);
|
||||
return;
|
||||
// fine
|
||||
}
|
||||
|
||||
runIamLeaderProcess(context, replacement);
|
||||
try {
|
||||
runIamLeaderProcess(context, replacement);
|
||||
} catch (KeeperException.NodeExistsException e) {
|
||||
log.error("node exists",e);
|
||||
retryElection(context, false);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// I am not the leader - watch the node below me
|
||||
int i = 1;
|
||||
|
@ -124,7 +144,8 @@ public class LeaderElector {
|
|||
return;
|
||||
}
|
||||
try {
|
||||
zkClient.getData(holdElectionPath + "/" + seqs.get(index), watcher = new ElectionWatcher(context.leaderSeqPath , seq, context) , null, true);
|
||||
String watchedNode = holdElectionPath + "/" + seqs.get(index);
|
||||
zkClient.getData(watchedNode, watcher = new ElectionWatcher(context.leaderSeqPath , watchedNode,seq, context) , null, true);
|
||||
} catch (KeeperException.SessionExpiredException e) {
|
||||
throw e;
|
||||
} catch (KeeperException e) {
|
||||
|
@ -196,17 +217,20 @@ public class LeaderElector {
|
|||
}
|
||||
return intSeqs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Begin participating in the election process. Gets a new sequential number
|
||||
* and begins watching the node with the sequence number before it, unless it
|
||||
* is the lowest number, in which case, initiates the leader process. If the
|
||||
* node that is watched goes down, check if we are the new lowest node, else
|
||||
* watch the next lowest numbered node.
|
||||
*
|
||||
* @return sequential node number
|
||||
*/
|
||||
public int joinElection(ElectionContext context, boolean replacement) throws KeeperException, InterruptedException, IOException {
|
||||
return joinElection(context,replacement, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Begin participating in the election process. Gets a new sequential number
|
||||
* and begins watching the node with the sequence number before it, unless it
|
||||
* is the lowest number, in which case, initiates the leader process. If the
|
||||
* node that is watched goes down, check if we are the new lowest node, else
|
||||
* watch the next lowest numbered node.
|
||||
*
|
||||
* @return sequential node number
|
||||
*/
|
||||
public int joinElection(ElectionContext context, boolean replacement,boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
|
||||
context.joinedElectionFired();
|
||||
|
||||
final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;
|
||||
|
@ -218,8 +242,30 @@ public class LeaderElector {
|
|||
int tries = 0;
|
||||
while (cont) {
|
||||
try {
|
||||
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
|
||||
CreateMode.EPHEMERAL_SEQUENTIAL, false);
|
||||
if(joinAtHead){
|
||||
log.info("node {} Trying to join election at the head ", id);
|
||||
List<String> nodes = OverseerCollectionProcessor.getSortedElectionNodes(zkClient);
|
||||
if(nodes.size() <2){
|
||||
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
|
||||
CreateMode.EPHEMERAL_SEQUENTIAL, false);
|
||||
} else {
|
||||
String firstInLine = nodes.get(1);
|
||||
log.info("The current head: {}", firstInLine);
|
||||
Matcher m = LEADER_SEQ.matcher(firstInLine);
|
||||
if (!m.matches()) {
|
||||
throw new IllegalStateException("Could not find regex match in:"
|
||||
+ firstInLine);
|
||||
}
|
||||
leaderSeqPath = shardsElectZkPath + "/" + id + "-n_"+ m.group(1);
|
||||
zkClient.create(leaderSeqPath, null, CreateMode.EPHEMERAL, false);
|
||||
log.info("Joined at the head {}", leaderSeqPath );
|
||||
|
||||
}
|
||||
} else {
|
||||
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
|
||||
CreateMode.EPHEMERAL_SEQUENTIAL, false);
|
||||
}
|
||||
|
||||
context.leaderSeqPath = leaderSeqPath;
|
||||
cont = false;
|
||||
} catch (ConnectionLossException e) {
|
||||
|
@ -270,14 +316,15 @@ public class LeaderElector {
|
|||
}
|
||||
|
||||
private class ElectionWatcher implements Watcher {
|
||||
final String leaderSeqPath;
|
||||
final String myNode,watchedNode;
|
||||
final int seq;
|
||||
final ElectionContext context;
|
||||
|
||||
private boolean canceled = false;
|
||||
|
||||
private ElectionWatcher(String leaderSeqPath, int seq, ElectionContext context) {
|
||||
this.leaderSeqPath = leaderSeqPath;
|
||||
private ElectionWatcher(String myNode, String watchedNode, int seq, ElectionContext context) {
|
||||
this.myNode = myNode;
|
||||
this.watchedNode = watchedNode;
|
||||
this.seq = seq;
|
||||
this.context = context;
|
||||
}
|
||||
|
@ -295,7 +342,14 @@ public class LeaderElector {
|
|||
return;
|
||||
}
|
||||
if(canceled) {
|
||||
log.info("This watcher is not active anymore {}", leaderSeqPath);
|
||||
log.info("This watcher is not active anymore {}", myNode);
|
||||
try {
|
||||
zkClient.delete(myNode,-1,true);
|
||||
}catch (KeeperException.NoNodeException nne) {
|
||||
//expected . don't do anything
|
||||
} catch (Exception e) {
|
||||
log.warn("My watched node still exists and can't remove "+myNode, e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
try {
|
||||
|
@ -332,16 +386,19 @@ public class LeaderElector {
|
|||
|
||||
@Override
|
||||
public int compare(String o1, String o2) {
|
||||
return Integer.valueOf(getSeq(o1)).compareTo(
|
||||
int i = Integer.valueOf(getSeq(o1)).compareTo(
|
||||
Integer.valueOf(getSeq(o2)));
|
||||
return i == 0 ? o1.compareTo(o2) : i ;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void retryElection() throws KeeperException, InterruptedException, IOException {
|
||||
context.cancelElection();
|
||||
|
||||
void retryElection(ElectionContext context, boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
|
||||
ElectionWatcher watcher = this.watcher;
|
||||
if(watcher!= null) watcher.cancel(context.leaderSeqPath);
|
||||
joinElection(context, true);
|
||||
ElectionContext ctx = context.copy();
|
||||
if(watcher!= null) watcher.cancel(this.context.leaderSeqPath);
|
||||
this.context.cancelElection();
|
||||
this.context = ctx;
|
||||
joinElection(ctx, true, joinAtHead);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -308,29 +308,27 @@ public class Overseer {
|
|||
log.error("could not read the data" ,e);
|
||||
return;
|
||||
}
|
||||
Map m = (Map) ZkStateReader.fromJSON(data);
|
||||
String id = (String) m.get("id");
|
||||
if(overseerCollectionProcessor.getId().equals(id)){
|
||||
try {
|
||||
log.info("I'm exiting , but I'm still the leader");
|
||||
zkClient.delete(path,stat.getVersion(),true);
|
||||
} catch (KeeperException.BadVersionException e) {
|
||||
//no problem ignore it some other Overseer has already taken over
|
||||
} catch (Exception e) {
|
||||
log.error("Could not delete my leader node ", e);
|
||||
} finally {
|
||||
try {
|
||||
Map m = (Map) ZkStateReader.fromJSON(data);
|
||||
String id = (String) m.get("id");
|
||||
if(overseerCollectionProcessor.getId().equals(id)){
|
||||
try {
|
||||
if(zkController !=null && !zkController.getCoreContainer().isShutDown()){
|
||||
zkController.rejoinOverseerElection();
|
||||
}
|
||||
|
||||
log.info("I'm exiting , but I'm still the leader");
|
||||
zkClient.delete(path,stat.getVersion(),true);
|
||||
} catch (KeeperException.BadVersionException e) {
|
||||
//no problem ignore it some other Overseer has already taken over
|
||||
} catch (Exception e) {
|
||||
log.error("error canceling overseer election election ",e);
|
||||
log.error("Could not delete my leader node ", e);
|
||||
}
|
||||
}
|
||||
|
||||
} else{
|
||||
log.info("somebody else has already taken up the overseer position");
|
||||
} else{
|
||||
log.info("somebody else has already taken up the overseer position");
|
||||
}
|
||||
} finally {
|
||||
//if I am not shutting down, Then I need to rejoin election
|
||||
if (zkController != null && !zkController.getCoreContainer().isShutDown()) {
|
||||
zkController.rejoinOverseerElection(null, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -377,9 +375,13 @@ public class Overseer {
|
|||
} else if(CLUSTERPROP.isEqual(operation)){
|
||||
handleProp(message);
|
||||
} else if( QUIT.equals(operation)){
|
||||
log.info("Quit command received {}", LeaderElector.getNodeName(myId));
|
||||
overseerCollectionProcessor.close();
|
||||
close();
|
||||
if(myId.equals( message.get("id"))){
|
||||
log.info("Quit command received {}", LeaderElector.getNodeName(myId));
|
||||
overseerCollectionProcessor.close();
|
||||
close();
|
||||
} else {
|
||||
log.warn("Overseer received wrong QUIT message {}", message);
|
||||
}
|
||||
} else{
|
||||
throw new RuntimeException("unknown operation:" + operation
|
||||
+ " contents:" + message.getProperties());
|
||||
|
|
|
@ -407,95 +407,42 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
}
|
||||
}
|
||||
|
||||
private void prioritizeOverseerNodes() throws KeeperException, InterruptedException {
|
||||
private synchronized void prioritizeOverseerNodes() throws KeeperException, InterruptedException {
|
||||
SolrZkClient zk = zkStateReader.getZkClient();
|
||||
if(!zk.exists(ZkStateReader.ROLES,true))return;
|
||||
Map m = (Map) ZkStateReader.fromJSON(zk.getData(ZkStateReader.ROLES, null, new Stat(), true));
|
||||
|
||||
List overseerDesignates = (List) m.get("overseer");
|
||||
if(overseerDesignates==null || overseerDesignates.isEmpty()) return;
|
||||
if(overseerDesignates.size() == 1 && overseerDesignates.contains(getLeaderNode(zk))) return;
|
||||
log.info("prioritizing overseer nodes at {}", LeaderElector.getNodeName(myId));
|
||||
log.info("overseer designates {}", overseerDesignates);
|
||||
|
||||
List<String> nodeNames = getSortedOverseerNodeNames(zk);
|
||||
if(nodeNames.size()<2) return;
|
||||
boolean designateIsInFront = overseerDesignates.contains( nodeNames.get(0));
|
||||
|
||||
ArrayList<String> nodesTobePushedBack = new ArrayList<>();
|
||||
//ensure that the node right behind the leader , i.e at position 1 is a Overseer
|
||||
List<String> availableDesignates = new ArrayList<>();
|
||||
|
||||
log.info("sorted nodes {}", nodeNames);//TODO to be removed
|
||||
for (int i = 1; i < nodeNames.size(); i++) {
|
||||
String s = nodeNames.get(i);
|
||||
|
||||
if (overseerDesignates.contains(s)) {
|
||||
availableDesignates.add(s);
|
||||
|
||||
for(int j=1;j<i;j++){
|
||||
String n = nodeNames.get(j);
|
||||
if(!overseerDesignates.contains(n)) {
|
||||
if(!nodesTobePushedBack.contains(n)) nodesTobePushedBack.add(n);
|
||||
}
|
||||
}
|
||||
String ldr = getLeaderNode(zk);
|
||||
if(overseerDesignates.contains(ldr)) return;
|
||||
log.info("prioritizing overseer nodes at {} overseer designates are {}", myId, overseerDesignates);
|
||||
List<String> electionNodes = getSortedElectionNodes(zk);
|
||||
if(electionNodes.size()<2) return;
|
||||
log.info("sorted nodes {}", electionNodes);
|
||||
|
||||
String designateNodeId = null;
|
||||
for (String electionNode : electionNodes) {
|
||||
if(overseerDesignates.contains( LeaderElector.getNodeName(electionNode))){
|
||||
designateNodeId = electionNode;
|
||||
break;
|
||||
}
|
||||
if(availableDesignates.size()>1) break;//we don't need to line up more than 2 designates
|
||||
}
|
||||
|
||||
if(!availableDesignates.isEmpty()){
|
||||
for (String s : nodesTobePushedBack) {
|
||||
log.info("pushing back {} ", s);
|
||||
invokeOverseerOp(s, "rejoin");
|
||||
}
|
||||
|
||||
//wait for a while to ensure the designate has indeed come in front
|
||||
boolean prioritizationComplete = false;
|
||||
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(2500, TimeUnit.MILLISECONDS);
|
||||
|
||||
while (System.nanoTime() < timeout) {
|
||||
List<String> currentNodeNames = getSortedOverseerNodeNames(zk);
|
||||
|
||||
int totalLeaders = 0;
|
||||
|
||||
for(int i=0;i<availableDesignates.size();i++) {
|
||||
if(overseerDesignates.contains(currentNodeNames.get(i))) totalLeaders++;
|
||||
}
|
||||
if(totalLeaders == availableDesignates.size()){
|
||||
prioritizationComplete = true;
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("Thread interrupted",e);
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if(!prioritizationComplete) {
|
||||
log.warn("available designates and current state {} {} ", availableDesignates, getSortedOverseerNodeNames(zk));
|
||||
}
|
||||
|
||||
} else if(!designateIsInFront) {
|
||||
log.warn("No overseer designates are available, overseerDesignates: {}, live nodes : {}",overseerDesignates,nodeNames);
|
||||
if(designateNodeId == null){
|
||||
log.warn("No live overseer designate ");
|
||||
return;
|
||||
}
|
||||
|
||||
String leaderNode = getLeaderNode(zkStateReader.getZkClient());
|
||||
if(leaderNode ==null) return;
|
||||
if(!overseerDesignates.contains(leaderNode) ){
|
||||
List<String> sortedNodes = getSortedOverseerNodeNames(zk);
|
||||
|
||||
if(leaderNode.equals(sortedNodes.get(0)) || // I am leader and I am in front of the queue
|
||||
overseerDesignates.contains(sortedNodes.get(0))) {// I am leader but somebody else is in the front , Screwed up leader election
|
||||
//this means there are I am not a designate and the next guy is lined up to become the leader, kill myself
|
||||
log.info("I am not an overseer designate , forcing myself out {} ", leaderNode);
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.QUIT)));
|
||||
}
|
||||
if(!designateNodeId.equals( electionNodes.get(1))) { //checking if it is already at no:1
|
||||
log.info("asking node {} to come join election at head", designateNodeId);
|
||||
invokeOverseerOp(designateNodeId, "rejoinAtHead"); //ask designate to come first
|
||||
log.info("asking the old first in line {} to rejoin election ",electionNodes.get(1) );
|
||||
invokeOverseerOp(electionNodes.get(1), "rejoin");//ask second inline to go behind
|
||||
}
|
||||
//now ask the current leader to QUIT , so that the designate can takeover
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(
|
||||
ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.QUIT,
|
||||
"id",getLeaderId(zkStateReader.getZkClient()))));
|
||||
|
||||
}
|
||||
|
||||
|
@ -513,28 +460,46 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
return nodeNames;
|
||||
}
|
||||
|
||||
public static List<String> getSortedElectionNodes(SolrZkClient zk) throws KeeperException, InterruptedException {
|
||||
List<String> children = null;
|
||||
try {
|
||||
children = zk.getChildren(OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE, null, true);
|
||||
LeaderElector.sortSeqs(children);
|
||||
return children;
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static String getLeaderNode(SolrZkClient zkClient) throws KeeperException, InterruptedException {
|
||||
byte[] data = new byte[0];
|
||||
String id = getLeaderId(zkClient);
|
||||
return id==null ?
|
||||
null:
|
||||
LeaderElector.getNodeName( id);
|
||||
}
|
||||
|
||||
public static String getLeaderId(SolrZkClient zkClient) throws KeeperException,InterruptedException{
|
||||
byte[] data = null;
|
||||
try {
|
||||
data = zkClient.getData("/overseer_elect/leader", null, new Stat(), true);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
return null;
|
||||
}
|
||||
Map m = (Map) ZkStateReader.fromJSON(data);
|
||||
String s = (String) m.get("id");
|
||||
String nodeName = LeaderElector.getNodeName(s);
|
||||
return nodeName;
|
||||
return (String) m.get("id");
|
||||
}
|
||||
|
||||
private void invokeOverseerOp(String nodeName, String op) {
|
||||
private void invokeOverseerOp(String electionNode, String op) {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.OVERSEEROP.toString());
|
||||
params.set("op", op);
|
||||
params.set("qt", adminPath);
|
||||
params.set("electionNode", electionNode);
|
||||
ShardRequest sreq = new ShardRequest();
|
||||
sreq.purpose = 1;
|
||||
String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
|
||||
String replica = zkStateReader.getBaseUrlForNodeName(LeaderElector.getNodeName(electionNode));
|
||||
sreq.shards = new String[]{replica};
|
||||
sreq.actualShards = sreq.shards;
|
||||
sreq.params = params;
|
||||
|
|
|
@ -1691,9 +1691,33 @@ public final class ZkController {
|
|||
return out;
|
||||
}
|
||||
|
||||
public void rejoinOverseerElection() {
|
||||
public void rejoinOverseerElection(String electionNode, boolean joinAtHead) {
|
||||
try {
|
||||
overseerElector.retryElection();
|
||||
if(electionNode !=null){
|
||||
//this call is from inside the JVM . not from CoreAdminHandler
|
||||
if(overseerElector.getContext() == null || overseerElector.getContext().leaderSeqPath == null){
|
||||
overseerElector.retryElection(new OverseerElectionContext(zkClient,
|
||||
overseer, getNodeName()), joinAtHead);
|
||||
return;
|
||||
}
|
||||
if(!overseerElector.getContext().leaderSeqPath.endsWith(electionNode)){
|
||||
log.warn("Asked to rejoin with wrong election node : {}, current node is {}",electionNode, overseerElector.getContext().leaderSeqPath);
|
||||
//however delete it . This is possible when the last attempt at deleting the election node failed.
|
||||
if(electionNode.startsWith(getNodeName())){
|
||||
try {
|
||||
zkClient.delete(OverseerElectionContext.PATH+LeaderElector.ELECTION_NODE+"/"+electionNode,-1,true);
|
||||
} catch (NoNodeException e) {
|
||||
//no problem
|
||||
} catch (InterruptedException e){
|
||||
Thread.currentThread().interrupt();
|
||||
} catch(Exception e) {
|
||||
log.warn("Old election node exists , could not be removed ",e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}else {
|
||||
overseerElector.retryElection(overseerElector.getContext(), joinAtHead);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
|
||||
}
|
||||
|
|
|
@ -272,7 +272,12 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
ZkController zkController = coreContainer.getZkController();
|
||||
if(zkController != null){
|
||||
String op = req.getParams().get("op");
|
||||
if ("rejoin".equals(op)) zkController.rejoinOverseerElection();
|
||||
String electionNode = req.getParams().get("electionNode");
|
||||
if(electionNode != null) {
|
||||
zkController.rejoinOverseerElection(electionNode, "rejoinAtHead".equals(op));
|
||||
} else {
|
||||
log.info("electionNode is required param");
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ package org.apache.solr.cloud;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
|
||||
|
@ -54,7 +55,7 @@ import org.junit.BeforeClass;
|
|||
@SuppressSSL // Currently unknown why SSL does not work
|
||||
public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
|
||||
private CloudSolrServer client;
|
||||
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeThisClass2() throws Exception {
|
||||
|
||||
|
@ -173,80 +174,45 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
|
|||
log.info("Adding another overseer designate {}", anotherOverseer);
|
||||
setOverseerRole(CollectionAction.ADDROLE, anotherOverseer);
|
||||
|
||||
timeout = System.currentTimeMillis()+10000;
|
||||
String currentOverseer = getLeaderNode(client.getZkStateReader().getZkClient());
|
||||
|
||||
log.info("Current Overseer {}", currentOverseer);
|
||||
|
||||
String hostPort = currentOverseer.substring(0,currentOverseer.indexOf('_'));
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
//
|
||||
//
|
||||
log.info("hostPort : {}", hostPort);
|
||||
|
||||
JettySolrRunner leaderJetty = null;
|
||||
|
||||
for (JettySolrRunner jetty : jettys) {
|
||||
String s = jetty.getBaseUrl().toString();
|
||||
log.info("jetTy {}",s);
|
||||
sb.append(s).append(" , ");
|
||||
if (s.contains(hostPort)) {
|
||||
leaderJetty = jetty;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assertNotNull("Could not find a jetty2 kill", leaderJetty);
|
||||
|
||||
log.info("leader node {}", leaderJetty.getBaseUrl());
|
||||
log.info ("current election Queue", OverseerCollectionProcessor.getSortedElectionNodes(client.getZkStateReader().getZkClient()));
|
||||
ChaosMonkey.stop(leaderJetty);
|
||||
timeout = System.currentTimeMillis() + 10000;
|
||||
leaderchanged = false;
|
||||
for(;System.currentTimeMillis() < timeout;){
|
||||
List<String> sortedNodeNames = getSortedOverseerNodeNames(client.getZkStateReader().getZkClient());
|
||||
if(sortedNodeNames.get(1) .equals(anotherOverseer) || sortedNodeNames.get(0).equals(anotherOverseer)){
|
||||
leaderchanged =true;
|
||||
for (; System.currentTimeMillis() < timeout; ) {
|
||||
currentOverseer = getLeaderNode(client.getZkStateReader().getZkClient());
|
||||
if (anotherOverseer.equals(currentOverseer)) {
|
||||
leaderchanged = true;
|
||||
break;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
assertTrue("New overseer not the frontrunner : "+ getSortedOverseerNodeNames(client.getZkStateReader().getZkClient()) + " expected : "+ anotherOverseer, leaderchanged);
|
||||
|
||||
|
||||
String currentOverseer = getLeaderNode(client.getZkStateReader().getZkClient());
|
||||
|
||||
String killedOverseer = currentOverseer;
|
||||
|
||||
log.info("Current Overseer {}", currentOverseer);
|
||||
Pattern pattern = Pattern.compile("(.*):(\\d*)(.*)");
|
||||
Matcher m = pattern.matcher(currentOverseer);
|
||||
JettySolrRunner stoppedJetty =null;
|
||||
|
||||
String hostPort = null;
|
||||
StringBuilder sb = new StringBuilder();
|
||||
if(m.matches()){
|
||||
hostPort = m.group(1)+":"+m.group(2);
|
||||
|
||||
log.info("hostPort : {}", hostPort);
|
||||
|
||||
for (JettySolrRunner jetty : jettys) {
|
||||
String s = jetty.getBaseUrl().toString();
|
||||
sb.append(s).append(" , ");
|
||||
if(s.contains(hostPort)){
|
||||
log.info("leader node {}",s);
|
||||
ChaosMonkey.stop(jetty);
|
||||
stoppedJetty = jetty;
|
||||
timeout = System.currentTimeMillis()+10000;
|
||||
leaderchanged = false;
|
||||
for(;System.currentTimeMillis() < timeout;){
|
||||
currentOverseer = getLeaderNode(client.getZkStateReader().getZkClient());
|
||||
if(anotherOverseer.equals(currentOverseer)){
|
||||
leaderchanged =true;
|
||||
break;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
assertTrue("New overseer designate has not become the overseer, expected : "+ anotherOverseer + "actual : "+ currentOverseer, leaderchanged);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} else{
|
||||
fail("pattern didn't match for"+currentOverseer );
|
||||
}
|
||||
|
||||
if(stoppedJetty !=null) {
|
||||
ChaosMonkey.start(stoppedJetty);
|
||||
|
||||
timeout = System.currentTimeMillis() + 10000;
|
||||
leaderchanged = false;
|
||||
for (; System.currentTimeMillis() < timeout; ) {
|
||||
List<String> sortedNodeNames = getSortedOverseerNodeNames(client.getZkStateReader().getZkClient());
|
||||
if (sortedNodeNames.get(1).equals(killedOverseer) || sortedNodeNames.get(0).equals(killedOverseer)) {
|
||||
leaderchanged = true;
|
||||
break;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
assertTrue("New overseer not the frontrunner : " + getSortedOverseerNodeNames(client.getZkStateReader().getZkClient()) + " expected : " + killedOverseer, leaderchanged);
|
||||
} else {
|
||||
log.warn("The jetty where the overseer {} is running could not be located in {}",hostPort,sb);
|
||||
}
|
||||
assertTrue("New overseer designate has not become the overseer, expected : " + anotherOverseer + "actual : " + getLeaderNode(client.getZkStateReader().getZkClient()), leaderchanged);
|
||||
}
|
||||
|
||||
private void setOverseerRole(CollectionAction action, String overseerDesignate) throws Exception, IOException {
|
||||
|
@ -261,6 +227,7 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
|
|||
client.request(request);
|
||||
}
|
||||
|
||||
|
||||
protected void createCollection(String COLL_NAME, CloudSolrServer client) throws Exception {
|
||||
int replicationFactor = 2;
|
||||
int numShards = 4;
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RollingRestartTest extends AbstractFullDistribZkTestBase {
|
||||
public static Logger log = LoggerFactory.getLogger(ChaosMonkeyNothingIsSafeTest.class);
|
||||
|
||||
public RollingRestartTest() {
|
||||
fixShardCount = true;
|
||||
sliceCount = 2;
|
||||
shardCount = 16;
|
||||
}
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
System.setProperty("numShards", Integer.toString(sliceCount));
|
||||
useFactory("solr.StandardDirectoryFactory");
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
System.clearProperty("numShards");
|
||||
super.tearDown();
|
||||
resetExceptionIgnores();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doTest() throws Exception {
|
||||
waitForRecoveriesToFinish(false);
|
||||
|
||||
restartWithRolesTest();
|
||||
|
||||
waitForRecoveriesToFinish(false);
|
||||
}
|
||||
|
||||
|
||||
public void restartWithRolesTest() throws Exception {
|
||||
String leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
|
||||
assertNotNull(leader);
|
||||
log.info("Current overseer leader = {}", leader);
|
||||
|
||||
cloudClient.getZkStateReader().getZkClient().printLayoutToStdOut();
|
||||
|
||||
int numOverseers = 3;
|
||||
List<String> designates = new ArrayList<>();
|
||||
List<CloudJettyRunner> overseerDesignates = new ArrayList<>();
|
||||
for (int i = 0; i < numOverseers; i++) {
|
||||
int n = random().nextInt(shardCount);
|
||||
String nodeName = cloudJettys.get(n).nodeName;
|
||||
log.info("Chose {} as overseer designate", nodeName);
|
||||
invokeCollectionApi(CollectionParams.ACTION, CollectionParams.CollectionAction.ADDROLE.toLower(), "role", "overseer", "node", nodeName);
|
||||
designates.add(nodeName);
|
||||
overseerDesignates.add(cloudJettys.get(n));
|
||||
}
|
||||
|
||||
waitUntilOverseerDesignateIsLeader(cloudClient.getZkStateReader().getZkClient(), designates, 60);
|
||||
|
||||
cloudClient.getZkStateReader().getZkClient().printLayoutToStdOut();
|
||||
|
||||
int numRestarts = 4; // 1 + random().nextInt(5);
|
||||
for (int i = 0; i < numRestarts; i++) {
|
||||
log.info("Rolling restart #{}", i + 1);
|
||||
for (CloudJettyRunner cloudJetty : overseerDesignates) {
|
||||
log.info("Restarting {}", cloudJetty);
|
||||
chaosMonkey.stopJetty(cloudJetty);
|
||||
boolean success = waitUntilOverseerDesignateIsLeader(cloudClient.getZkStateReader().getZkClient(), designates, 60);
|
||||
if (!success) {
|
||||
leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
|
||||
if(leader == null) log.error("NOOVERSEER election queue is :"+ OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient()));
|
||||
fail("No overseer designate as leader found after restart #" + (i + 1) + ": " + leader);
|
||||
}
|
||||
cloudJetty.jetty.start();
|
||||
success = waitUntilOverseerDesignateIsLeader(cloudClient.getZkStateReader().getZkClient(), designates, 60);
|
||||
if (!success) {
|
||||
leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
|
||||
if(leader == null) log.error("NOOVERSEER election queue is :"+ OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient()));
|
||||
fail("No overseer leader found after restart #" + (i + 1) + ": " + leader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
|
||||
assertNotNull(leader);
|
||||
log.info("Current overseer leader (after restart) = {}", leader);
|
||||
|
||||
cloudClient.getZkStateReader().getZkClient().printLayoutToStdOut();
|
||||
}
|
||||
|
||||
static boolean waitUntilOverseerDesignateIsLeader(SolrZkClient testZkClient, List<String> overseerDesignates, int timeoutInSeconds) throws KeeperException, InterruptedException {
|
||||
long now = System.nanoTime();
|
||||
long timeout = now + TimeUnit.NANOSECONDS.convert(timeoutInSeconds, TimeUnit.SECONDS);
|
||||
boolean firstTime = true;
|
||||
int stableCheckTimeout = 2000;
|
||||
while (System.nanoTime() < timeout) {
|
||||
String newLeader = OverseerCollectionProcessor.getLeaderNode(testZkClient);
|
||||
if (!overseerDesignates.contains(newLeader)) {
|
||||
Thread.sleep(500);
|
||||
} else {
|
||||
if (firstTime) {
|
||||
firstTime = false;
|
||||
Thread.sleep(stableCheckTimeout);
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue