SOLR=6691: REBALANCELEADERS needs to change the leader election queue. Going to let this bake in trunk until 5.0 is cut

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1647857 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Erick Erickson 2014-12-24 22:55:36 +00:00
parent 0c6151e4bc
commit 1417ef1f60
9 changed files with 660 additions and 128 deletions

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
@ -86,7 +87,7 @@ public class LeaderElector {
*
* @param replacement has someone else been the leader already?
*/
private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement) throws KeeperException,
private void checkIfIamLeader(final ElectionContext context, boolean replacement) throws KeeperException,
InterruptedException, IOException {
context.checkIfIamLeaderFired();
// get all other numbers...
@ -99,9 +100,43 @@ public class LeaderElector {
log.warn("Our node is no longer in line to be leader");
return;
}
// We can't really rely on the sequence number stored in the old watcher, it may be stale, thus this check.
int seq = -1;
// See if we've already been re-added, and this is an old context. In which case, use our current sequence number.
String newLeaderSeq = "";
for (String elec : seqs) {
if (getNodeName(elec).equals(getNodeName(context.leaderSeqPath)) && seq < getSeq(elec)) {
seq = getSeq(elec); // so use the current sequence number.
newLeaderSeq = elec;
break;
}
}
// Now, if we've been re-added, presumably we've also set up watchers and all that kind of thing, so we're done
if (StringUtils.isNotBlank(newLeaderSeq) && seq > getSeq(context.leaderSeqPath)) {
log.info("Node " + context.leaderSeqPath + " already in queue as " + newLeaderSeq + " nothing to do.");
return;
}
// Fallback in case we're all coming in here fresh and there is no node for this core already in the election queue.
if (seq == -1) {
seq = getSeq(context.leaderSeqPath);
}
if (seq <= intSeqs.get(0)) {
if (seq == intSeqs.get(0) && !context.leaderSeqPath.equals(holdElectionPath + "/" + seqs.get(0))) {//somebody else already became the leader with the same sequence id , not me
log.info("was going be leader {} , seq(0) {}",context.leaderSeqPath,holdElectionPath+"/"+seqs.get(0));//but someone else jumped the line
log.info("was going to be leader {} , seq(0) {}", context.leaderSeqPath, holdElectionPath + "/" + seqs.get(0));//but someone else jumped the line
// The problem is that deleting the ZK node that's watched by others
// results in an unpredictable sequencing of the events and sometime the context that comes in for checking
// this happens to be after the node has already taken over leadership. So just leave out of here.
// This caused one of the tests to fail on having two nodes with the same name in the queue. I'm not sure
// the assumption that this is a bad state is valid.
if (getNodeName(context.leaderSeqPath).equals(getNodeName(seqs.get(0)))) {
return;
}
retryElection(context, false);//join at the tail again
return;
}
@ -129,21 +164,22 @@ public class LeaderElector {
}
} else {
// I am not the leader - watch the node below me
int i = 1;
for (; i < intSeqs.size(); i++) {
int s = intSeqs.get(i);
if (seq < s) {
// we found who we come before - watch the guy in front
int toWatch = -1;
for (int idx = 0; idx < intSeqs.size(); idx++) {
if (intSeqs.get(idx) < seq && ! getNodeName(context.leaderSeqPath).equals(getNodeName(seqs.get(idx)))) {
toWatch = idx;
}
if (intSeqs.get(idx) >= seq) {
break;
}
}
int index = i - 2;
if (index < 0) {
if (toWatch < 0) {
log.warn("Our node is no longer in line to be leader");
return;
}
try {
String watchedNode = holdElectionPath + "/" + seqs.get(index);
String watchedNode = holdElectionPath + "/" + seqs.get(toWatch);
zkClient.getData(watchedNode, watcher = new ElectionWatcher(context.leaderSeqPath , watchedNode,seq, context) , null, true);
} catch (KeeperException.SessionExpiredException e) {
throw e;
@ -151,7 +187,7 @@ public class LeaderElector {
log.warn("Failed setting watch", e);
// we couldn't set our watch - the node before us may already be down?
// we need to check if we are the leader again
checkIfIamLeader(seq, context, true);
checkIfIamLeader(context, true);
}
}
}
@ -309,15 +345,13 @@ public class LeaderElector {
}
}
}
int seq = getSeq(leaderSeqPath);
checkIfIamLeader(seq, context, replacement);
checkIfIamLeader(context, replacement);
return seq;
return getSeq(context.leaderSeqPath);
}
private class ElectionWatcher implements Watcher {
final String myNode,watchedNode;
final int seq;
final ElectionContext context;
private boolean canceled = false;
@ -325,11 +359,10 @@ public class LeaderElector {
private ElectionWatcher(String myNode, String watchedNode, int seq, ElectionContext context) {
this.myNode = myNode;
this.watchedNode = watchedNode;
this.seq = seq;
this.context = context;
}
void cancel(String leaderSeqPath){
void cancel() {
canceled = true;
}
@ -354,7 +387,7 @@ public class LeaderElector {
}
try {
// am I the next leader?
checkIfIamLeader(seq, context, true);
checkIfIamLeader(context, true);
} catch (Exception e) {
log.warn("", e);
}
@ -390,7 +423,7 @@ public class LeaderElector {
void retryElection(ElectionContext context, boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
ElectionWatcher watcher = this.watcher;
ElectionContext ctx = context.copy();
if(watcher!= null) watcher.cancel(this.context.leaderSeqPath);
if (watcher != null) watcher.cancel();
this.context.cancelElection();
this.context = ctx;
joinElection(ctx, true, joinAtHead);

View File

@ -21,6 +21,9 @@ import static org.apache.solr.cloud.Assign.getNodesForNewShard;
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
@ -659,7 +662,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
balanceProperty(message);
break;
case REBALANCELEADERS:
processAssignLeaders(message);
processRebalanceLeaders(message);
break;
default:
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
@ -687,42 +690,36 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
}
@SuppressWarnings("unchecked")
// re-purpose BALANCELEADERS to reassign a single leader over here
private void processAssignLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
String collectionName = message.getStr(COLLECTION_PROP);
String shardId = message.getStr(SHARD_ID_PROP);
String baseURL = message.getStr(BASE_URL_PROP);
String coreName = message.getStr(CORE_NAME_PROP);
private void processRebalanceLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
if (StringUtils.isBlank(collectionName) || StringUtils.isBlank(shardId) || StringUtils.isBlank(baseURL) ||
StringUtils.isBlank(coreName)) {
throw new SolrException(ErrorCode.BAD_REQUEST,
String.format(Locale.ROOT, "The '%s', '%s', '%s' and '%s' parameters are required when assigning a leader",
COLLECTION_PROP, SHARD_ID_PROP, BASE_URL_PROP, CORE_NAME_PROP));
}
SolrZkClient zkClient = zkStateReader.getZkClient();
DistributedQueue inQueue = Overseer.getInQueue(zkClient);
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower());
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, shardId);
propMap.put(BASE_URL_PROP, baseURL);
propMap.put(CORE_NAME_PROP, coreName);
inQueue.offer(zkStateReader.toJSON(propMap));
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
params.set(SHARD_ID_PROP, message.getStr(SHARD_ID_PROP));
params.set(REJOIN_AT_HEAD_PROP, message.getStr(REJOIN_AT_HEAD_PROP));
params.set(CoreAdminParams.ACTION, CoreAdminAction.REJOINLEADERELECTION.toString());
params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
params.set(NODE_NAME_PROP, message.getStr(NODE_NAME_PROP));
params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
params.set(BASE_URL_PROP, message.getStr(BASE_URL_PROP));
String baseUrl = message.getStr(BASE_URL_PROP);
ShardRequest sreq = new ShardRequest();
sreq.nodeName = message.getStr(ZkStateReader.CORE_NAME_PROP);
// yes, they must use same admin handler path everywhere...
params.set("qt", adminPath);
sreq.purpose = ShardRequest.PURPOSE_PRIVATE;
sreq.shards = new String[] {baseUrl};
sreq.actualShards = sreq.shards;
sreq.params = params;
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
shardHandler.submit(sreq, baseUrl, sreq.params);
}
@SuppressWarnings("unchecked")
private void processReplicaAddPropertyCommand(ZkNodeProps message) throws KeeperException, InterruptedException {
if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) ||
StringUtils.isBlank(message.getStr(SHARD_ID_PROP)) ||
StringUtils.isBlank(message.getStr(REPLICA_PROP)) ||
StringUtils.isBlank(message.getStr(PROPERTY_PROP)) ||
StringUtils.isBlank(message.getStr(PROPERTY_VALUE_PROP))) {
throw new SolrException(ErrorCode.BAD_REQUEST,
String.format(Locale.ROOT, "The '%s', '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete operations",
COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP));
}
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
SolrZkClient zkClient = zkStateReader.getZkClient();
DistributedQueue inQueue = Overseer.getInQueue(zkClient);
Map<String, Object> propMap = new HashMap<>();
@ -733,14 +730,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
}
private void processReplicaDeletePropertyCommand(ZkNodeProps message) throws KeeperException, InterruptedException {
if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) ||
StringUtils.isBlank(message.getStr(SHARD_ID_PROP)) ||
StringUtils.isBlank(message.getStr(REPLICA_PROP)) ||
StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
throw new SolrException(ErrorCode.BAD_REQUEST,
String.format(Locale.ROOT, "The '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete operations",
COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP));
}
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
SolrZkClient zkClient = zkStateReader.getZkClient();
DistributedQueue inQueue = Overseer.getInQueue(zkClient);
Map<String, Object> propMap = new HashMap<>();

View File

@ -80,6 +80,15 @@ import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateShardHandler;
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
@ -1860,6 +1869,31 @@ public final class ZkController {
}
public void rejoinShardLeaderElection(SolrParams params) {
try {
String collectionName = params.get(COLLECTION_PROP);
String shardId = params.get(SHARD_ID_PROP);
String nodeName = params.get(NODE_NAME_PROP);
String coreName = params.get(CORE_NAME_PROP);
String electionNode = params.get(ELECTION_NODE_PROP);
String baseUrl = params.get(BASE_URL_PROP);
ZkNodeProps zkProps = new ZkNodeProps(CORE_NAME_PROP, coreName, NODE_NAME_PROP, nodeName, COLLECTION_PROP, collectionName,
SHARD_ID_PROP, shardId, ELECTION_NODE_PROP, electionNode, BASE_URL_PROP, baseUrl);
ShardLeaderElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId, collectionName,
nodeName, zkProps, this, getCoreContainer());
LeaderElector elect = new LeaderElector(this.zkClient);
context.leaderSeqPath = context.electionPath + LeaderElector.ELECTION_NODE + "/" + electionNode;
elect.setup(context);
elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP));
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
}
}
public void checkOverseerDesignate() {
try {
byte[] data = zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true);

View File

@ -32,10 +32,11 @@ import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.ACTIVE;
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
@ -44,6 +45,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.STATE_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
@ -69,6 +71,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@ -82,6 +85,7 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.LeaderElector;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.cloud.OverseerSolrResponse;
@ -295,21 +299,64 @@ public class CollectionsHandler extends RequestHandlerBase {
if (dc == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
}
Map<String, String> current = new HashMap<>();
Map<String, String> currentRequests = new HashMap<>();
int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
if (max <= 0) max = Integer.MAX_VALUE;
int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
NamedList<Object> results = new NamedList<>();
SolrQueryResponse rspIgnore = new SolrQueryResponse();
final String inactivePreferreds = "inactivePreferreds";
final String alreadyLeaders = "alreadyLeaders";
boolean keepGoing = true;
for (Slice slice : dc.getSlices()) {
insurePreferredIsLeader(req, results, slice, currentRequests);
if (currentRequests.size() == max) {
log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results);
if (keepGoing == false) {
break; // If we've waited longer than specified, don't continue to wait!
}
}
}
if (keepGoing == true) {
keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results);
}
if (keepGoing == true) {
log.info("All leader reassignments completed.");
} else {
log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
}
rsp.getValues().addAll(results);
}
private void insurePreferredIsLeader(SolrQueryRequest req, NamedList<Object> results,
Slice slice, Map<String, String> currentRequests) throws KeeperException, InterruptedException {
final String inactivePreferreds = "inactivePreferreds";
final String alreadyLeaders = "alreadyLeaders";
String collectionName = req.getParams().get(COLLECTION_PROP);
for (Replica replica : slice.getReplicas()) {
// Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
continue;
}
// OK, we are the preferred leader, are we the actual leader?
if (replica.getBool(LEADER_PROP, false)) {
//We're a preferred leader, but we're _also_ the leader, don't need to do anything.
NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
if (noops == null) {
noops = new NamedList<>();
results.add(alreadyLeaders, noops);
}
NamedList<Object> res = new NamedList<>();
res.add("status", "success");
res.add("msg", "Already leader");
res.add("shard", slice.getName());
res.add("nodeName", replica.getNodeName());
noops.add(replica.getName(), res);
return; // already the leader, do nothing.
}
// We're the preferred leader, but someone else is leader. Only become leader if we're active.
if (StringUtils.equalsIgnoreCase(replica.getStr(STATE_PROP), ACTIVE) == false) {
NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
if (inactives == null) {
@ -319,62 +366,126 @@ public class CollectionsHandler extends RequestHandlerBase {
NamedList<Object> res = new NamedList<>();
res.add("status", "skipped");
res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
res.add("shard", slice.getName());
res.add("nodeName", replica.getNodeName());
inactives.add(replica.getName(), res);
break; // Don't try to assign if we're not active!
} // OK, we're the one, get in the queue to become the leader.
if (replica.getBool(LEADER_PROP, false)) {
NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
if (noops == null) {
noops = new NamedList<>();
results.add(alreadyLeaders, noops);
return; // Don't try to become the leader if we're not active!
}
NamedList<Object> res = new NamedList<>();
res.add("status", "success");
res.add("msg", "Already leader");
res.add("nodeName", replica.getNodeName());
noops.add(replica.getName(), res);
break; // already the leader, do nothing.
// Replica is the preferred leader but not the actual leader, do something about that.
// "Something" is
// 1> if the preferred leader isn't first in line, tell it to re-queue itself.
// 2> tell the actual leader to re-queue itself.
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
log.warn("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
"election queue, but replica " + replica.getName() + " doesn't think it's the leader. Do nothing");
return;
}
// Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole
// string is used, but that sorts nodes with the same sequence number by their session IDs from ZK.
// While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are
// watching the leader node..
String firstWatcher = electionNodes.get(1);
if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) {
makeReplicaFirstWatcher(collectionName, slice, replica);
}
String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP);
rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false);
waitForNodeChange(collectionName, slice, electionNodes.get(0));
return; // Done with this slice, skip the rest of the replicas.
}
}
// Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list
void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica)
throws KeeperException, InterruptedException {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
// First, queue up the preferred leader at the head of the queue.
int newSeq = -1;
for (String electionNode : electionNodes) {
if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
rejoinElection(collectionName, slice, electionNode, coreName, true);
newSeq = waitForNodeChange(collectionName, slice, electionNode);
break;
}
}
if (newSeq == -1) {
return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
}
List<String> electionNodesTmp = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
// Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
for (String thisNode : electionNodes) {
if (LeaderElector.getSeq(thisNode) > newSeq) {
break;
}
if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) {
continue;
}
if (LeaderElector.getSeq(thisNode) == newSeq) {
String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP);
rejoinElection(collectionName, slice, thisNode, coreName, false);
waitForNodeChange(collectionName, slice, thisNode);
}
}
}
int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException {
String nodeName = LeaderElector.getNodeName(electionNode);
int oldSeq = LeaderElector.getSeq(electionNode);
for (int idx = 0; idx < 600; ++idx) {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
for (String testNode : electionNodes) {
if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {
return LeaderElector.getSeq(testNode);
}
}
Thread.sleep(100);
}
return -1;
}
private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
boolean rejoinAtHead) throws KeeperException, InterruptedException {
Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, REBALANCELEADERS.toLower());
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, slice.getName());
propMap.put(BASE_URL_PROP, replica.get(BASE_URL_PROP));
String coreName = (String) replica.get(CORE_NAME_PROP);
// Put it in the waiting list.
String asyncId = REBALANCELEADERS.toLower() + "_" + coreName;
current.put(asyncId, String.format(Locale.ROOT, "Collection: '%s', Shard: '%s', Core: '%s', BaseUrl: '%s'",
collectionName, slice.getName(), coreName, replica.get(BASE_URL_PROP)));
propMap.put(CORE_NAME_PROP, coreName);
propMap.put(Overseer.QUEUE_OPERATION, REBALANCELEADERS.toLower());
propMap.put(CORE_NAME_PROP, core);
propMap.put(NODE_NAME_PROP, replica.getName());
propMap.put(ZkStateReader.BASE_URL_PROP, replica.getProperties().get(ZkStateReader.BASE_URL_PROP));
propMap.put(REJOIN_AT_HEAD_PROP, Boolean.toString(rejoinAtHead)); // Get ourselves to be first in line.
propMap.put(ELECTION_NODE_PROP, electionNode);
String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime());
propMap.put(ASYNC, asyncId);
ZkNodeProps m = new ZkNodeProps(propMap);
log.info("Queueing collection '" + collectionName + "' slice '" + slice.getName() + "' replica '" +
coreName + "' to become leader.");
SolrQueryResponse rspIgnore = new SolrQueryResponse(); // I'm constructing my own response
handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
break; // Done with this slice, skip the rest of the replicas.
}
if (current.size() == max) {
log.info("Queued " + max + " leader reassgnments, waiting for some to complete.");
keepGoing = waitForLeaderChange(current, maxWaitSecs, false, results);
if (keepGoing == false) {
break; // If we've waited longer than specified, don't continue to wait!
}
}
}
if (keepGoing == true) {
keepGoing = waitForLeaderChange(current, maxWaitSecs, true, results);
}
if (keepGoing == true) {
log.info("All leader reassignments completed.");
} else {
log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
}
rsp.getValues().addAll(results);
}
// currentAsyncIds - map of request IDs and reporting data (value)

View File

@ -288,6 +288,16 @@ public class CoreAdminHandler extends RequestHandlerBase {
}
case LOAD:
break;
case REJOINLEADERELECTION:
ZkController zkController = coreContainer.getZkController();
if (zkController != null) {
zkController.rejoinShardLeaderElection(req.getParams());
} else {
log.warn("zkController is null in CoreAdminHandler.handleRequestInternal:REJOINLEADERELCTIONS. No action taken.");
}
break;
}
}
rsp.setHttpCaching(false);

View File

@ -230,7 +230,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
"http://127.0.0.1/solr/", ZkStateReader.CORE_NAME_PROP, "2");
ElectionContext context = new ShardLeaderElectionContextBase(second,
"slice1", "collection2", "dummynode1", props, zkStateReader);
"slice1", "collection2", "dummynode2", props, zkStateReader);
second.setup(context);
second.joinElection(context, false);
Thread.sleep(1000);

View File

@ -0,0 +1,340 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.zookeeper.KeeperException;
import org.junit.Before;
public class TestRebalanceLeaders extends AbstractFullDistribZkTestBase {
public static final String COLLECTION_NAME = "testcollection";
public TestRebalanceLeaders() {
schemaString = "schema15.xml"; // we need a string id
}
@Override
@Before
public void setUp() throws Exception {
fixShardCount = true;
sliceCount = 4;
shardCount = 4;
super.setUp();
}
int reps = 10;
int timeoutMs = 60000;
Map<String, List<Replica>> initial = new HashMap<>();
Map<String, Replica> expected = new HashMap<>();
@Override
public void doTest() throws Exception {
CloudSolrServer client = createCloudClient(null);
reps = random().nextInt(9) + 1; // make sure and do at least one.
try {
// Mix up a bunch of different combinations of shards and replicas in order to exercise boundary cases.
// shards, replicationfactor, maxreplicaspernode
int shards = random().nextInt(7);
if (shards < 2) shards = 2;
int rFactor = random().nextInt(4);
if (rFactor < 2) rFactor = 2;
createCollection(null, COLLECTION_NAME, shards, rFactor, shards * rFactor + 1, client, null, "conf1");
} finally {
//remove collections
client.shutdown();
}
waitForCollection(cloudClient.getZkStateReader(), COLLECTION_NAME, 2);
waitForRecoveriesToFinish(COLLECTION_NAME, false);
listCollection();
rebalanceLeaderTest();
}
private void listCollection() throws IOException, SolrServerException {
//CloudSolrServer client = createCloudClient(null);
try {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.LIST.toString());
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
NamedList<Object> rsp = cloudClient.request(request);
List<String> collections = (List<String>) rsp.get("collections");
assertTrue("control_collection was not found in list", collections.contains("control_collection"));
assertTrue(DEFAULT_COLLECTION + " was not found in list", collections.contains(DEFAULT_COLLECTION));
assertTrue(COLLECTION_NAME + " was not found in list", collections.contains(COLLECTION_NAME));
} finally {
//remove collections
//client.shutdown();
}
}
void recordInitialState() throws InterruptedException {
Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap();
// Assemble a list of all the replicas for all the shards in a convenient way to look at them.
for (Map.Entry<String, Slice> ent : slices.entrySet()) {
initial.put(ent.getKey(), new ArrayList<>(ent.getValue().getReplicas()));
}
}
void rebalanceLeaderTest() throws InterruptedException, IOException, SolrServerException, KeeperException {
recordInitialState();
for (int idx = 0; idx < reps; ++idx) {
issueCommands();
checkConsistency();
}
}
// After we've called the rebalance command, we want to insure that:
// 1> all replicas appear once and only once in the respective leader election queue
// 2> All the replicas we _think_ are leaders are in the 0th position in the leader election queue.
// 3> The node that ZooKeeper thinks is the leader is the one we think should be the leader.
void checkConsistency() throws InterruptedException, KeeperException {
long start = System.currentTimeMillis();
while ((System.currentTimeMillis() - start) < timeoutMs) {
if (checkAppearOnce() &&
checkElectionZero() &&
checkZkLeadersAgree()) {
return;
}
Thread.sleep(1000);
}
fail("Checking the rebalance leader command failed");
}
// Do all the nodes appear exactly once in the leader election queue and vice-versa?
Boolean checkAppearOnce() throws KeeperException, InterruptedException {
for (Map.Entry<String, List<Replica>> ent : initial.entrySet()) {
List<String> leaderQueue = cloudClient.getZkStateReader().getZkClient().getChildren("/collections/" + COLLECTION_NAME +
"/leader_elect/" + ent.getKey() + "/election", null, true);
if (leaderQueue.size() != ent.getValue().size()) {
return false;
}
// Check that each election node has a corresponding replica.
for (String electionNode : leaderQueue) {
if (checkReplicaName(LeaderElector.getNodeName(electionNode), ent.getValue())) {
continue;
}
return false;
}
// Check that each replica has an election node.
for (Replica rep : ent.getValue()) {
if (checkElectionNode(rep.getName(), leaderQueue)) {
continue;
}
return false;
}
}
return true;
}
// Check that the given name is in the leader election queue
Boolean checkElectionNode(String repName, List<String> leaderQueue) {
for (String electionNode : leaderQueue) {
if (repName.equals(LeaderElector.getNodeName(electionNode))) {
return true;
}
}
return false;
}
// Check that the name passed in corresponds to a replica.
Boolean checkReplicaName(String toCheck, List<Replica> replicas) {
for (Replica rep : replicas) {
if (toCheck.equals(rep.getName())) {
return true;
}
}
return false;
}
// Get the shard leader election from ZK and sort it. The node may not actually be there, so retry
List<String> getOverseerSort(String key) {
List<String> ret = null;
try {
ret = OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
"/collections/" + COLLECTION_NAME + "/leader_elect/" + key + "/election");
return ret;
} catch (KeeperException e) {
cloudClient.connect();
} catch (InterruptedException e) {
return null;
}
return null;
}
// Is every node we think is the leader in the zeroth position in the leader election queue?
Boolean checkElectionZero() {
for (Map.Entry<String, Replica> ent : expected.entrySet()) {
List<String> leaderQueue = getOverseerSort(ent.getKey());
if (leaderQueue == null) return false;
String electName = LeaderElector.getNodeName(leaderQueue.get(0));
String coreName = ent.getValue().getName();
if (electName.equals(coreName) == false) {
return false;
}
}
return true;
}
// Do who we _think_ should be the leader agree with the leader nodes?
Boolean checkZkLeadersAgree() throws KeeperException, InterruptedException {
for (Map.Entry<String, Replica> ent : expected.entrySet()) {
String path = "/collections/" + COLLECTION_NAME + "/leaders/" + ent.getKey();
byte[] data = getZkData(cloudClient, path);
if (data == null) return false;
String repCore = null;
String zkCore = null;
if (data == null) {
return false;
} else {
Map m = (Map) ZkStateReader.fromJSON(data);
zkCore = (String) m.get("core");
repCore = ent.getValue().getStr("core");
if (zkCore.equals(repCore) == false) {
return false;
}
}
}
return true;
}
byte[] getZkData(CloudSolrServer server, String path) {
org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
long start = System.currentTimeMillis();
try {
byte[] data = server.getZkStateReader().getZkClient().getData(path, null, stat, true);
if (data != null) {
return data;
}
} catch (KeeperException.NoNodeException e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
return null;
}
} catch (InterruptedException | KeeperException e) {
return null;
}
return null;
}
// It's OK not to check the return here since the subsequent tests will fail.
void issueCommands() throws IOException, SolrServerException, KeeperException, InterruptedException {
// Find a replica to make the preferredLeader. NOTE: may be one that's _already_ leader!
expected.clear();
for (Map.Entry<String, List<Replica>> ent : initial.entrySet()) {
List<Replica> replicas = ent.getValue();
Replica rep = replicas.get(Math.abs(random().nextInt()) % replicas.size());
expected.put(ent.getKey(), rep);
issuePreferred(ent.getKey(), rep);
}
if (waitForAllPreferreds() == false) {
fail("Waited for timeout for preferredLeader assignments to be made and they werent.");
}
//fillExpectedWithCurrent();
// Now rebalance the leaders
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString());
// Insure we get error returns when omitting required parameters
params.set("collection", COLLECTION_NAME);
params.set("maxAtOnce", "10");
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
cloudClient.request(request);
}
void issuePreferred(String slice, Replica rep) throws IOException, SolrServerException, InterruptedException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.ADDREPLICAPROP.toString());
// Insure we get error returns when omitting required parameters
params.set("collection", COLLECTION_NAME);
params.set("shard", slice);
params.set("replica", rep.getName());
params.set("property", "preferredLeader");
params.set("property.value", "true");
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
cloudClient.request(request);
}
boolean waitForAllPreferreds() throws KeeperException, InterruptedException {
boolean goAgain = true;
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeoutMs) {
goAgain = false;
cloudClient.getZkStateReader().updateClusterState(true);
Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap();
for (Map.Entry<String, Replica> ent : expected.entrySet()) {
Replica me = slices.get(ent.getKey()).getReplica(ent.getValue().getName());
if (me.getBool("property.preferredleader", false) == false) {
goAgain = true;
break;
}
}
if (goAgain) {
Thread.sleep(250);
} else {
return true;
}
}
return false;
}
}

View File

@ -62,6 +62,7 @@ public class ZkStateReader implements Closeable {
public static final String STATE_PROP = "state";
public static final String CORE_NAME_PROP = "core";
public static final String COLLECTION_PROP = "collection";
public static final String ELECTION_NODE_PROP = "election_node";
public static final String SHARD_ID_PROP = "shard";
public static final String REPLICA_PROP = "replica";
public static final String SHARD_RANGE_PROP = "shard_range";
@ -78,6 +79,7 @@ public class ZkStateReader implements Closeable {
public static final String ALIASES = "/aliases.json";
public static final String CLUSTER_STATE = "/clusterstate.json";
public static final String CLUSTER_PROPS = "/clusterprops.json";
public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
public static final String REPLICATION_FACTOR = "replicationFactor";
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
@ -102,9 +104,10 @@ public class ZkStateReader implements Closeable {
private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000"));
public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
public static final String LEADER_ELECT_ZKNODE = "leader_elect";
public static final String SHARD_LEADERS_ZKNODE = "leaders";
public static final String ELECTION_NODE = "election";
private final Set<String> watchedCollections = new HashSet<String>();
@ -658,6 +661,16 @@ public class ZkStateReader implements Closeable {
: "");
}
/**
* Get path where shard leader elections ephemeral nodes are.
*/
public static String getShardLeadersElectPath(String collection, String shardId) {
return COLLECTIONS_ZKNODE + "/" + collection + "/"
+ LEADER_ELECT_ZKNODE + (shardId != null ? ("/" + shardId + "/" + ELECTION_NODE)
: "");
}
public List<ZkCoreNodeProps> getReplicaProps(String collection,
String shardId, String thisCoreNodeName) {
return getReplicaProps(collection, shardId, thisCoreNodeName, null);

View File

@ -135,7 +135,8 @@ public abstract class CoreAdminParams
LOAD_ON_STARTUP,
TRANSIENT,
OVERSEEROP,
REQUESTSTATUS;
REQUESTSTATUS,
REJOINLEADERELECTION;
public static CoreAdminAction get( String p )
{