mirror of https://github.com/apache/lucene.git
SOLR-5859 Harden Overseer restart
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1584069 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e081587dea
commit
3fd2922341
|
@ -71,7 +71,7 @@ public abstract class ElectionContext {
|
||||||
zkClient.delete(leaderSeqPath, -1, true);
|
zkClient.delete(leaderSeqPath, -1, true);
|
||||||
} catch (NoNodeException e) {
|
} catch (NoNodeException e) {
|
||||||
// fine
|
// fine
|
||||||
log.warn("cancelElection did not find election node to remove");
|
log.warn("cancelElection did not find election node to remove",e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -67,6 +67,8 @@ public class LeaderElector {
|
||||||
|
|
||||||
private volatile ElectionContext context;
|
private volatile ElectionContext context;
|
||||||
|
|
||||||
|
private ElectionWatcher watcher;
|
||||||
|
|
||||||
public LeaderElector(SolrZkClient zkClient) {
|
public LeaderElector(SolrZkClient zkClient) {
|
||||||
this.zkClient = zkClient;
|
this.zkClient = zkClient;
|
||||||
zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
|
zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
|
||||||
|
@ -90,7 +92,7 @@ public class LeaderElector {
|
||||||
// get all other numbers...
|
// get all other numbers...
|
||||||
final String holdElectionPath = context.electionPath + ELECTION_NODE;
|
final String holdElectionPath = context.electionPath + ELECTION_NODE;
|
||||||
List<String> seqs = zkClient.getChildren(holdElectionPath, null, true);
|
List<String> seqs = zkClient.getChildren(holdElectionPath, null, true);
|
||||||
|
|
||||||
sortSeqs(seqs);
|
sortSeqs(seqs);
|
||||||
List<Integer> intSeqs = getSeqs(seqs);
|
List<Integer> intSeqs = getSeqs(seqs);
|
||||||
if (intSeqs.size() == 0) {
|
if (intSeqs.size() == 0) {
|
||||||
|
@ -122,31 +124,7 @@ public class LeaderElector {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
zkClient.getData(holdElectionPath + "/" + seqs.get(index),
|
zkClient.getData(holdElectionPath + "/" + seqs.get(index), watcher = new ElectionWatcher(context.leaderSeqPath , seq, context) , null, true);
|
||||||
new Watcher() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(WatchedEvent event) {
|
|
||||||
// session events are not change events,
|
|
||||||
// and do not remove the watcher
|
|
||||||
if (EventType.None.equals(event.getType())) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// am I the next leader?
|
|
||||||
try {
|
|
||||||
checkIfIamLeader(seq, context, true);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// Restore the interrupted status
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
log.warn("", e);
|
|
||||||
} catch (IOException e) {
|
|
||||||
log.warn("", e);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}, null, true);
|
|
||||||
} catch (KeeperException.SessionExpiredException e) {
|
} catch (KeeperException.SessionExpiredException e) {
|
||||||
throw e;
|
throw e;
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
|
@ -290,6 +268,50 @@ public class LeaderElector {
|
||||||
|
|
||||||
return seq;
|
return seq;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class ElectionWatcher implements Watcher {
|
||||||
|
final String leaderSeqPath;
|
||||||
|
final int seq;
|
||||||
|
final ElectionContext context;
|
||||||
|
|
||||||
|
private boolean canceled = false;
|
||||||
|
|
||||||
|
private ElectionWatcher(String leaderSeqPath, int seq, ElectionContext context) {
|
||||||
|
this.leaderSeqPath = leaderSeqPath;
|
||||||
|
this.seq = seq;
|
||||||
|
this.context = context;
|
||||||
|
}
|
||||||
|
|
||||||
|
void cancel(String leaderSeqPath){
|
||||||
|
canceled = true;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(WatchedEvent event) {
|
||||||
|
// session events are not change events,
|
||||||
|
// and do not remove the watcher
|
||||||
|
if (EventType.None.equals(event.getType())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if(canceled) {
|
||||||
|
log.info("This watcher is not active anymore {}", leaderSeqPath);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// am I the next leader?
|
||||||
|
checkIfIamLeader(seq, context, true);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Restore the interrupted status
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
log.warn("", e);
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.warn("", e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up any ZooKeeper nodes needed for leader election.
|
* Set up any ZooKeeper nodes needed for leader election.
|
||||||
|
@ -317,6 +339,8 @@ public class LeaderElector {
|
||||||
}
|
}
|
||||||
void retryElection() throws KeeperException, InterruptedException, IOException {
|
void retryElection() throws KeeperException, InterruptedException, IOException {
|
||||||
context.cancelElection();
|
context.cancelElection();
|
||||||
|
ElectionWatcher watcher = this.watcher;
|
||||||
|
if(watcher!= null) watcher.cancel(context.leaderSeqPath);
|
||||||
joinElection(context, true);
|
joinElection(context, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,7 @@ public class Overseer {
|
||||||
public static final String ADD_ROUTING_RULE = "addroutingrule";
|
public static final String ADD_ROUTING_RULE = "addroutingrule";
|
||||||
public static final String REMOVE_ROUTING_RULE = "removeroutingrule";
|
public static final String REMOVE_ROUTING_RULE = "removeroutingrule";
|
||||||
public static final String STATE = "state";
|
public static final String STATE = "state";
|
||||||
|
public static final String QUIT = "quit";
|
||||||
|
|
||||||
public static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates
|
public static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates
|
||||||
public static final String CREATESHARD = "createshard";
|
public static final String CREATESHARD = "createshard";
|
||||||
|
@ -200,85 +201,131 @@ public class Overseer {
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Starting to work on the main queue");
|
log.info("Starting to work on the main queue");
|
||||||
while (!this.isClosed) {
|
try {
|
||||||
isLeader = amILeader();
|
while (!this.isClosed) {
|
||||||
if (LeaderStatus.NO == isLeader) {
|
isLeader = amILeader();
|
||||||
break;
|
if (LeaderStatus.NO == isLeader) {
|
||||||
}
|
break;
|
||||||
else if (LeaderStatus.YES != isLeader) {
|
|
||||||
log.debug("am_i_leader unclear {}", isLeader);
|
|
||||||
continue; // not a no, not a yes, try ask again
|
|
||||||
}
|
|
||||||
DistributedQueue.QueueEvent head = null;
|
|
||||||
try {
|
|
||||||
head = stateUpdateQueue.peek(true);
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
|
|
||||||
log.warn(
|
|
||||||
"Solr cannot talk to ZK, exiting Overseer main queue loop", e);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
log.error("Exception in Overseer main queue loop", e);
|
else if (LeaderStatus.YES != isLeader) {
|
||||||
} catch (InterruptedException e) {
|
log.debug("am_i_leader unclear {}", isLeader);
|
||||||
Thread.currentThread().interrupt();
|
continue; // not a no, not a yes, try ask again
|
||||||
return;
|
}
|
||||||
|
DistributedQueue.QueueEvent head = null;
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Exception in Overseer main queue loop", e);
|
|
||||||
}
|
|
||||||
synchronized (reader.getUpdateLock()) {
|
|
||||||
try {
|
try {
|
||||||
reader.updateClusterState(true);
|
head = stateUpdateQueue.peek(true);
|
||||||
ClusterState clusterState = reader.getClusterState();
|
|
||||||
|
|
||||||
while (head != null) {
|
|
||||||
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
|
|
||||||
final String operation = message.getStr(QUEUE_OPERATION);
|
|
||||||
final TimerContext timerContext = stats.time(operation);
|
|
||||||
try {
|
|
||||||
clusterState = processMessage(clusterState, message, operation);
|
|
||||||
stats.success(operation);
|
|
||||||
} catch (Exception e) {
|
|
||||||
// generally there is nothing we can do - in most cases, we have
|
|
||||||
// an issue that will fail again on retry or we cannot communicate with
|
|
||||||
// ZooKeeper in which case another Overseer should take over
|
|
||||||
// TODO: if ordering for the message is not important, we could
|
|
||||||
// track retries and put it back on the end of the queue
|
|
||||||
log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
|
|
||||||
stats.error(operation);
|
|
||||||
} finally {
|
|
||||||
timerContext.stop();
|
|
||||||
}
|
|
||||||
workQueue.offer(head.getBytes());
|
|
||||||
|
|
||||||
stateUpdateQueue.poll();
|
|
||||||
|
|
||||||
if (System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break;
|
|
||||||
|
|
||||||
// if an event comes in the next 100ms batch it together
|
|
||||||
head = stateUpdateQueue.peek(100);
|
|
||||||
}
|
|
||||||
lastUpdatedTime = System.nanoTime();
|
|
||||||
zkClient.setData(ZkStateReader.CLUSTER_STATE,
|
|
||||||
ZkStateReader.toJSON(clusterState), true);
|
|
||||||
// clean work queue
|
|
||||||
while (workQueue.poll() != null) ;
|
|
||||||
|
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
|
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
|
||||||
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
|
log.warn(
|
||||||
|
"Solr cannot talk to ZK, exiting Overseer main queue loop", e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
log.error("Exception in Overseer main queue loop", e);
|
log.error("Exception in Overseer main queue loop", e);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
return;
|
return;
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Exception in Overseer main queue loop", e);
|
log.error("Exception in Overseer main queue loop", e);
|
||||||
}
|
}
|
||||||
|
synchronized (reader.getUpdateLock()) {
|
||||||
|
try {
|
||||||
|
reader.updateClusterState(true);
|
||||||
|
ClusterState clusterState = reader.getClusterState();
|
||||||
|
|
||||||
|
while (head != null) {
|
||||||
|
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
|
||||||
|
final String operation = message.getStr(QUEUE_OPERATION);
|
||||||
|
final TimerContext timerContext = stats.time(operation);
|
||||||
|
try {
|
||||||
|
clusterState = processMessage(clusterState, message, operation);
|
||||||
|
stats.success(operation);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// generally there is nothing we can do - in most cases, we have
|
||||||
|
// an issue that will fail again on retry or we cannot communicate with
|
||||||
|
// ZooKeeper in which case another Overseer should take over
|
||||||
|
// TODO: if ordering for the message is not important, we could
|
||||||
|
// track retries and put it back on the end of the queue
|
||||||
|
log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
|
||||||
|
stats.error(operation);
|
||||||
|
} finally {
|
||||||
|
timerContext.stop();
|
||||||
|
}
|
||||||
|
workQueue.offer(head.getBytes());
|
||||||
|
|
||||||
|
stateUpdateQueue.poll();
|
||||||
|
|
||||||
|
if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break;
|
||||||
|
|
||||||
|
// if an event comes in the next 100ms batch it together
|
||||||
|
head = stateUpdateQueue.peek(100);
|
||||||
|
}
|
||||||
|
lastUpdatedTime = System.nanoTime();
|
||||||
|
zkClient.setData(ZkStateReader.CLUSTER_STATE,
|
||||||
|
ZkStateReader.toJSON(clusterState), true);
|
||||||
|
// clean work queue
|
||||||
|
while (workQueue.poll() != null) ;
|
||||||
|
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
|
||||||
|
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.error("Exception in Overseer main queue loop", e);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
return;
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Exception in Overseer main queue loop", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
new Thread("OverseerExitThread"){
|
||||||
|
//do this in a separate thread because any wait is interrupted in this main thread
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
checkIfIamStillLeader();
|
||||||
|
}
|
||||||
|
}.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkIfIamStillLeader() {
|
||||||
|
org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
|
||||||
|
String path = "/overseer_elect/leader";
|
||||||
|
byte[] data = null;
|
||||||
|
try {
|
||||||
|
data = zkClient.getData(path, null, stat, true);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("could not read the data" ,e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Map m = (Map) ZkStateReader.fromJSON(data);
|
||||||
|
String id = (String) m.get("id");
|
||||||
|
if(overseerCollectionProcessor.getId().equals(id)){
|
||||||
|
try {
|
||||||
|
log.info("I'm exiting , but I'm still the leader");
|
||||||
|
zkClient.delete(path,stat.getVersion(),true);
|
||||||
|
} catch (KeeperException.BadVersionException e) {
|
||||||
|
//no problem ignore it some other Overseer has already taken over
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Could not delete my leader node ", e);
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
if(zkController !=null && !zkController.getCoreContainer().isShutDown()){
|
||||||
|
zkController.rejoinOverseerElection();
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("error canceling overseer election election ",e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else{
|
||||||
|
log.info("somebody else has already taken up the overseer position");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,7 +371,11 @@ public class Overseer {
|
||||||
clusterState = removeRoutingRule(clusterState, message);
|
clusterState = removeRoutingRule(clusterState, message);
|
||||||
} else if(CLUSTERPROP.isEqual(operation)){
|
} else if(CLUSTERPROP.isEqual(operation)){
|
||||||
handleProp(message);
|
handleProp(message);
|
||||||
} else {
|
} else if( QUIT.equals(operation)){
|
||||||
|
log.info("################Quit command receive");
|
||||||
|
overseerCollectionProcessor.close();
|
||||||
|
close();
|
||||||
|
} else{
|
||||||
throw new RuntimeException("unknown operation:" + operation
|
throw new RuntimeException("unknown operation:" + operation
|
||||||
+ " contents:" + message.getProperties());
|
+ " contents:" + message.getProperties());
|
||||||
}
|
}
|
||||||
|
@ -1107,15 +1158,18 @@ public class Overseer {
|
||||||
|
|
||||||
private String adminPath;
|
private String adminPath;
|
||||||
|
|
||||||
private OverseerCollectionProcessor ocp;
|
private OverseerCollectionProcessor overseerCollectionProcessor;
|
||||||
|
|
||||||
|
private ZkController zkController;
|
||||||
|
|
||||||
private Stats stats;
|
private Stats stats;
|
||||||
|
|
||||||
// overseer not responsible for closing reader
|
// overseer not responsible for closing reader
|
||||||
public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader) throws KeeperException, InterruptedException {
|
public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader, ZkController zkController) throws KeeperException, InterruptedException {
|
||||||
this.reader = reader;
|
this.reader = reader;
|
||||||
this.shardHandler = shardHandler;
|
this.shardHandler = shardHandler;
|
||||||
this.adminPath = adminPath;
|
this.adminPath = adminPath;
|
||||||
|
this.zkController = zkController;
|
||||||
this.stats = new Stats();
|
this.stats = new Stats();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1130,8 +1184,8 @@ public class Overseer {
|
||||||
|
|
||||||
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
|
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
|
||||||
|
|
||||||
ocp = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats);
|
overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats);
|
||||||
ccThread = new OverseerThread(ccTg, ocp, "Overseer-" + id);
|
ccThread = new OverseerThread(ccTg, overseerCollectionProcessor, "Overseer-" + id);
|
||||||
ccThread.setDaemon(true);
|
ccThread.setDaemon(true);
|
||||||
|
|
||||||
updaterThread.start();
|
updaterThread.start();
|
||||||
|
|
|
@ -221,6 +221,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
}
|
}
|
||||||
|
|
||||||
QueueEvent head = workQueue.peek(true);
|
QueueEvent head = workQueue.peek(true);
|
||||||
|
if(isClosed) break;
|
||||||
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
|
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
|
||||||
|
|
||||||
final String asyncId = (message.containsKey(ASYNC) && message.get(ASYNC) != null) ? (String) message.get(ASYNC) : null;
|
final String asyncId = (message.containsKey(ASYNC) && message.get(ASYNC) != null) ? (String) message.get(ASYNC) : null;
|
||||||
|
@ -307,15 +308,16 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
List<String> availableDesignates = new ArrayList<>();
|
List<String> availableDesignates = new ArrayList<>();
|
||||||
|
|
||||||
log.info("sorted nodes {}", nodeNames);//TODO to be removed
|
log.info("sorted nodes {}", nodeNames);//TODO to be removed
|
||||||
for (int i = 0; i < nodeNames.size(); i++) {
|
for (int i = 1; i < nodeNames.size(); i++) {
|
||||||
String s = nodeNames.get(i);
|
String s = nodeNames.get(i);
|
||||||
|
|
||||||
if (overseerDesignates.contains(s)) {
|
if (overseerDesignates.contains(s)) {
|
||||||
availableDesignates.add(s);
|
availableDesignates.add(s);
|
||||||
|
|
||||||
for(int j=0;j<i;j++){
|
for(int j=1;j<i;j++){
|
||||||
if(!overseerDesignates.contains(nodeNames.get(j))) {
|
String n = nodeNames.get(j);
|
||||||
if(!nodesTobePushedBack.contains(nodeNames.get(j))) nodesTobePushedBack.add(nodeNames.get(j));
|
if(!overseerDesignates.contains(n)) {
|
||||||
|
if(!nodesTobePushedBack.contains(n)) nodesTobePushedBack.add(n);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,8 +326,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!availableDesignates.isEmpty()){
|
if(!availableDesignates.isEmpty()){
|
||||||
for (int i = nodesTobePushedBack.size() - 1; i >= 0; i--) {
|
for (String s : nodesTobePushedBack) {
|
||||||
String s = nodesTobePushedBack.get(i);
|
|
||||||
log.info("pushing back {} ", s);
|
log.info("pushing back {} ", s);
|
||||||
invokeOverseerOp(s, "rejoin");
|
invokeOverseerOp(s, "rejoin");
|
||||||
}
|
}
|
||||||
|
@ -368,9 +369,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
if(leaderNode ==null) return;
|
if(leaderNode ==null) return;
|
||||||
if(!overseerDesignates.contains(leaderNode) && !availableDesignates.isEmpty()){
|
if(!overseerDesignates.contains(leaderNode) && !availableDesignates.isEmpty()){
|
||||||
//this means there are designated Overseer nodes and I am not one of them , kill myself
|
//this means there are designated Overseer nodes and I am not one of them , kill myself
|
||||||
String newLeader = availableDesignates.get(0);
|
log.info("I am not an overseer designate , forcing myself out {} ", leaderNode);
|
||||||
log.info("I am not an overseerdesignate , forcing a new leader {} ", newLeader);
|
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(new ZkNodeProps( Overseer.QUEUE_OPERATION, Overseer.QUIT)));
|
||||||
invokeOverseerOp(newLeader, "leader");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -471,6 +471,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
processRoleCommand(message, operation);
|
processRoleCommand(message, operation);
|
||||||
} else if (ADDREPLICA.isEqual(operation)) {
|
} else if (ADDREPLICA.isEqual(operation)) {
|
||||||
addReplica(zkStateReader.getClusterState(), message, results);
|
addReplica(zkStateReader.getClusterState(), message, results);
|
||||||
|
} else if (REQUESTSTATUS.equals(operation)) {
|
||||||
|
requestStatus(message, results);
|
||||||
} else if (OVERSEERSTATUS.isEqual(operation)) {
|
} else if (OVERSEERSTATUS.isEqual(operation)) {
|
||||||
getOverseerStatus(message, results);
|
getOverseerStatus(message, results);
|
||||||
} else if(LIST.isEqual(operation)) {
|
} else if(LIST.isEqual(operation)) {
|
||||||
|
@ -655,6 +657,12 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
/**
|
/**
|
||||||
* Get collection status from cluster state.
|
* Get collection status from cluster state.
|
||||||
* Can return collection status by given shard name.
|
* Can return collection status by given shard name.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* @param clusterState
|
||||||
|
* @param name collection name
|
||||||
|
* @param shardStr comma separated shard names
|
||||||
|
* @return map of collection properties
|
||||||
*/
|
*/
|
||||||
private Map<String, Object> getCollectionStatus(Map<String, Object> clusterState, String name, String shardStr) {
|
private Map<String, Object> getCollectionStatus(Map<String, Object> clusterState, String name, String shardStr) {
|
||||||
Map<String, Object> docCollection = (Map<String, Object>) clusterState.get(name);
|
Map<String, Object> docCollection = (Map<String, Object>) clusterState.get(name);
|
||||||
|
@ -1492,6 +1500,40 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
} while (srsp != null);
|
} while (srsp != null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void requestStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
|
||||||
|
log.info("Request status invoked");
|
||||||
|
String requestId = message.getStr(REQUESTID);
|
||||||
|
|
||||||
|
// Special taskId (-1), clears up the request state maps.
|
||||||
|
if(requestId.equals("-1")) {
|
||||||
|
completedMap.clear();
|
||||||
|
failureMap.clear();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(completedMap.contains(requestId)) {
|
||||||
|
SimpleOrderedMap success = new SimpleOrderedMap();
|
||||||
|
success.add("state", "completed");
|
||||||
|
success.add("msg", "found " + requestId + " in completed tasks");
|
||||||
|
results.add("status", success);
|
||||||
|
} else if (runningMap.contains(requestId)) {
|
||||||
|
SimpleOrderedMap success = new SimpleOrderedMap();
|
||||||
|
success.add("state", "running");
|
||||||
|
success.add("msg", "found " + requestId + " in submitted tasks");
|
||||||
|
results.add("status", success);
|
||||||
|
} else if (failureMap.contains(requestId)) {
|
||||||
|
SimpleOrderedMap success = new SimpleOrderedMap();
|
||||||
|
success.add("state", "failed");
|
||||||
|
success.add("msg", "found " + requestId + " in failed tasks");
|
||||||
|
results.add("status", success);
|
||||||
|
} else {
|
||||||
|
SimpleOrderedMap failure = new SimpleOrderedMap();
|
||||||
|
failure.add("state", "notfound");
|
||||||
|
failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue");
|
||||||
|
results.add("status", failure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
|
private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
|
||||||
log.info("Delete shard invoked");
|
log.info("Delete shard invoked");
|
||||||
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
|
@ -2363,5 +2405,9 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
} while (srsp != null);
|
} while (srsp != null);
|
||||||
} while(true);
|
} while(true);
|
||||||
}
|
}
|
||||||
|
String getId(){
|
||||||
|
return myId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -304,18 +304,6 @@ public final class ZkController {
|
||||||
return leaderConflictResolveWait;
|
return leaderConflictResolveWait;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void forceOverSeer(){
|
|
||||||
try {
|
|
||||||
zkClient.delete("/overseer_elect/leader",-1, true);
|
|
||||||
log.info("Forcing me to be leader {} ", getBaseUrl());
|
|
||||||
overseerElector.getContext().runLeaderProcess(true, Overseer.STATE_UPDATE_DELAY + 100);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new SolrException(ErrorCode.SERVER_ERROR, " Error becoming overseer ",e);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private void registerAllCoresAsDown(
|
private void registerAllCoresAsDown(
|
||||||
final CurrentCoreDescriptorProvider registerOnReconnect, boolean updateLastPublished) {
|
final CurrentCoreDescriptorProvider registerOnReconnect, boolean updateLastPublished) {
|
||||||
List<CoreDescriptor> descriptors = registerOnReconnect
|
List<CoreDescriptor> descriptors = registerOnReconnect
|
||||||
|
@ -558,7 +546,7 @@ public final class ZkController {
|
||||||
adminPath = cc.getAdminPath();
|
adminPath = cc.getAdminPath();
|
||||||
|
|
||||||
overseerElector = new LeaderElector(zkClient);
|
overseerElector = new LeaderElector(zkClient);
|
||||||
this.overseer = new Overseer(shardHandler, adminPath, zkStateReader);
|
this.overseer = new Overseer(shardHandler, adminPath, zkStateReader,this);
|
||||||
ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
|
ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
|
||||||
overseerElector.setup(context);
|
overseerElector.setup(context);
|
||||||
overseerElector.joinElection(context, false);
|
overseerElector.joinElection(context, false);
|
||||||
|
@ -1679,4 +1667,8 @@ public final class ZkController {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CoreContainer getCoreContainer(){
|
||||||
|
return cc;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -273,10 +273,8 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||||
case OVERSEEROP:{
|
case OVERSEEROP:{
|
||||||
ZkController zkController = coreContainer.getZkController();
|
ZkController zkController = coreContainer.getZkController();
|
||||||
if(zkController != null){
|
if(zkController != null){
|
||||||
String op = req.getParams().get("op");
|
String op = req.getParams().get("op");
|
||||||
if("leader".equals(op)){
|
if ("rejoin".equals(op)) zkController.rejoinOverseerElection();
|
||||||
zkController.forceOverSeer();
|
|
||||||
} else if ("rejoin".equals(op)) zkController.rejoinOverseerElection();
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -256,7 +256,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
|
||||||
|
|
||||||
// TODO: close Overseer
|
// TODO: close Overseer
|
||||||
Overseer overseer = new Overseer(
|
Overseer overseer = new Overseer(
|
||||||
new HttpShardHandlerFactory().getShardHandler(), "/admin/cores", reader);
|
new HttpShardHandlerFactory().getShardHandler(), "/admin/cores", reader,null);
|
||||||
overseer.close();
|
overseer.close();
|
||||||
ElectionContext ec = new OverseerElectionContext(zkClient, overseer,
|
ElectionContext ec = new OverseerElectionContext(zkClient, overseer,
|
||||||
address.replaceAll("/", "_"));
|
address.replaceAll("/", "_"));
|
||||||
|
|
|
@ -39,12 +39,17 @@ import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
||||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
|
import org.apache.solr.common.cloud.SolrZkClient;
|
||||||
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||||
import org.apache.solr.common.params.MapSolrParams;
|
import org.apache.solr.common.params.MapSolrParams;
|
||||||
import org.apache.solr.common.params.SolrParams;
|
import org.apache.solr.common.params.SolrParams;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
@LuceneTestCase.Slow
|
@LuceneTestCase.Slow
|
||||||
@SuppressSSL // Currently unknown why SSL does not work
|
@SuppressSSL // Currently unknown why SSL does not work
|
||||||
public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
|
public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
|
||||||
|
@ -85,11 +90,43 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void doTest() throws Exception {
|
public void doTest() throws Exception {
|
||||||
addOverseerRole2ExistingNodes();
|
testOverseerRole();
|
||||||
|
testQuitCommand();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addOverseerRole2ExistingNodes() throws Exception {
|
private void testQuitCommand() throws Exception{
|
||||||
|
String collectionName = "testOverseerQuit";
|
||||||
|
|
||||||
|
createCollection(collectionName, client);
|
||||||
|
|
||||||
|
waitForRecoveriesToFinish(collectionName, false);
|
||||||
|
|
||||||
|
SolrZkClient zk = client.getZkStateReader().getZkClient();
|
||||||
|
byte[] data = new byte[0];
|
||||||
|
data = zk.getData("/overseer_elect/leader", null, new Stat(), true);
|
||||||
|
Map m = (Map) ZkStateReader.fromJSON(data);
|
||||||
|
String s = (String) m.get("id");
|
||||||
|
String leader = LeaderElector.getNodeName(s);
|
||||||
|
Overseer.getInQueue(zk).offer(ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.QUIT)));
|
||||||
|
long timeout = System.currentTimeMillis()+5000;
|
||||||
|
String newLeader=null;
|
||||||
|
for(;System.currentTimeMillis() < timeout;){
|
||||||
|
newLeader = OverseerCollectionProcessor.getLeaderNode(zk);
|
||||||
|
if(!newLeader.equals(leader)) break;
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
assertNotSame( "Leader not changed yet",newLeader,leader);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
assertTrue("The old leader should have rejoined election ", OverseerCollectionProcessor.getSortedOverseerNodeNames(zk).contains(leader));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private void testOverseerRole() throws Exception {
|
||||||
String collectionName = "testOverseerCol";
|
String collectionName = "testOverseerCol";
|
||||||
|
|
||||||
createCollection(collectionName, client);
|
createCollection(collectionName, client);
|
||||||
|
@ -202,13 +239,6 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
|
||||||
|
|
||||||
assertTrue("New overseer not the frontrunner : "+ getSortedOverseerNodeNames(client.getZkStateReader().getZkClient()) + " expected : "+ killedOverseer, leaderchanged);
|
assertTrue("New overseer not the frontrunner : "+ getSortedOverseerNodeNames(client.getZkStateReader().getZkClient()) + " expected : "+ killedOverseer, leaderchanged);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
client.shutdown();
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setOverseerRole(CollectionAction action, String overseerDesignate) throws Exception, IOException {
|
private void setOverseerRole(CollectionAction action, String overseerDesignate) throws Exception, IOException {
|
||||||
|
|
|
@ -992,7 +992,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||||
overseers.get(overseers.size() -1).getZkStateReader().getZkClient().close();
|
overseers.get(overseers.size() -1).getZkStateReader().getZkClient().close();
|
||||||
}
|
}
|
||||||
Overseer overseer = new Overseer(
|
Overseer overseer = new Overseer(
|
||||||
new HttpShardHandlerFactory().getShardHandler(), "/admin/cores", reader);
|
new HttpShardHandlerFactory().getShardHandler(), "/admin/cores", reader,null);
|
||||||
overseers.add(overseer);
|
overseers.add(overseer);
|
||||||
ElectionContext ec = new OverseerElectionContext(zkClient, overseer,
|
ElectionContext ec = new OverseerElectionContext(zkClient, overseer,
|
||||||
address.replaceAll("/", "_"));
|
address.replaceAll("/", "_"));
|
||||||
|
|
Loading…
Reference in New Issue