SOLR-13091: REBALANCELEADERS is broken

This commit is contained in:
Erick Erickson 2019-01-19 19:20:39 -08:00
parent c51838479a
commit a692d05a90
5 changed files with 843 additions and 353 deletions

View File

@ -284,6 +284,8 @@ Bug Fixes
* SOLR-13137: NPE when /admin/zookeeper/status endpoint hit in standalone mode (janhoy)
* SOLR-13091: REBALANCELEADERS is broken (Erick Erickson)
Improvements
----------------------

View File

@ -17,6 +17,7 @@
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -39,6 +40,8 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SHARD_UNIQUE;
@ -46,6 +49,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.BA
// Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.
class ExclusiveSliceProperty {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private ClusterState clusterState;
private final boolean onlyActiveNodes;
private final String property;
@ -235,6 +239,15 @@ class ExclusiveSliceProperty {
adjustLimits(nodesHostingProp.get(nodeName));
removeSliceAlreadyHostedFromPossibles(srToChange.slice.getName());
addProp(srToChange.slice, srToChange.replica.getName());
// When you set the property, you must insure that it is _removed_ from any other replicas.
for (Replica rep : srToChange.slice.getReplicas()) {
if (rep.getName().equals(srToChange.replica.getName())) {
continue;
}
if (rep.getProperty(property) != null) {
removeProp(srToChange.slice, srToChange.replica.getName());
}
}
}
}
@ -266,10 +279,12 @@ class ExclusiveSliceProperty {
}
private void removeProp(Slice origSlice, String replicaName) {
log.debug("Removing property {} from slice {}, replica {}", property, origSlice.getName(), replicaName);
getReplicaFromChanged(origSlice, replicaName).getProperties().remove(property);
}
private void addProp(Slice origSlice, String replicaName) {
log.debug("Adding property {} to slice {}, replica {}", property, origSlice.getName(), replicaName);
getReplicaFromChanged(origSlice, replicaName).getProperties().put(property, "true");
}
@ -342,5 +357,10 @@ class ExclusiveSliceProperty {
this.slice = slice;
this.replica = replica;
}
public String toString() {
StringBuilder sb = new StringBuilder(System.lineSeparator()).append(System.lineSeparator()).append("******EOE20 starting toString of SliceReplica");
sb.append(" :").append(System.lineSeparator()).append("slice: ").append(slice.toString()).append(System.lineSeparator()).append(" replica: ").append(replica.toString()).append(System.lineSeparator());
return sb.toString();
}
}
}

View File

@ -18,10 +18,13 @@ package org.apache.solr.handler.admin;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.cloud.LeaderElector;
@ -55,13 +58,62 @@ import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
/**
* The end point for the collections API REBALANCELEADERS call that actually does the work.
* <p>
* Overview:
* <p>
* The leader election process is that each replica of a shard watches one, and only one other replica via
* ephemeral nodes in ZooKeeper. When the node being watched goes down, the node watching it is sent a notification
* and, if the node being watched is the leader, the node getting the notification assumes leadership.
* <p>
* ZooKeeper's ephemeral nodes get a monotonically increasing "sequence number" that defines it's position in the queue
* <p>
* So to force a particular node to become a leader it must have a watch on the leader. This can lead to two nodes
* having the same sequence number. Say the process is this
* replica1 is the leader (seq 1)
* replica3 is on a Solr node that happens to be started next, it watches the leader (seq2)
* replica2 is on the next Solr node started. It will _also_ watch the leader, it's sequence number is 2 exactly
* like replica3s
* <p>
* This is true on startup, but can also be a consequence of, say, a replica going into recovery. It's no longer
* eligible to become leader, so will be put at the end of the queue by default. So there's code to put it in the
* queue with the same sequence number as the current second replica.
* <p>
* To compilcate matters further, when the nodes are sorted (see OverseerTaskProcessor.getSortedElectionNodes)
* the primary sort is on the sequence number, secondary sort on the session ID. So the preferredLeader may
* or may not be second in that list.
* <p>
* what all this means is that when the REBALANCELEADER command is issued, this class examines the election queue and
* performs just three things for each shard in the collection:
* <p>
* 1> insures that the preferredLeader is watching the leader (rejoins the election queue at the head)
* <p>
* 2> if there are two ephemeral nodes with the same sequence number watching the leader, and if one of them is the
* preferredLeader it will send the _other_ node to the end of the queue (rejoins it)
* <p>
* 3> rejoins the zeroth entry in the list at the end of the queue, which triggers the watch on the preferredLeader
* replica which then takes over leadership
* <p>
* All this of course assuming the preferedLeader is alive and well and is assigned for any given shard.
*/
class RebalanceLeaders {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final SolrQueryRequest req;
final SolrQueryResponse rsp;
final CollectionsHandler collectionsHandler;
final CoreContainer coreContainer;
private final Set<String> asyncRequests = new HashSet<>();
final static String INACTIVE_PREFERREDS = "inactivePreferreds";
final static String ALREADY_LEADERS = "alreadyLeaders";
final static String SUMMARY = "Summary";
final NamedList<Object> results = new NamedList<>();
final Map<String, String> pendingOps = new HashMap<>();
private String collectionName;
RebalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler collectionsHandler) {
this.req = req;
@ -71,38 +123,29 @@ class RebalanceLeaders {
}
void execute() throws KeeperException, InterruptedException {
req.getParams().required().check(COLLECTION_PROP);
DocCollection dc = checkParams();
String collectionName = req.getParams().get(COLLECTION_PROP);
if (StringUtils.isBlank(collectionName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the Rebalance Leaders command."));
}
coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName);
ClusterState clusterState = coreContainer.getZkController().getClusterState();
DocCollection dc = clusterState.getCollection(collectionName);
if (dc == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
}
Map<String, String> currentRequests = new HashMap<>();
int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
if (max <= 0) max = Integer.MAX_VALUE;
int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
NamedList<Object> results = new NamedList<>();
// If there are a maximum number of simultaneous requests specified, we have to pause when we have that many
// outstanding requests and wait for at least one to finish before going on the the next rebalance.
boolean keepGoing = true;
for (Slice slice : dc.getSlices()) {
ensurePreferredIsLeader(results, slice, currentRequests);
if (currentRequests.size() == max) {
ensurePreferredIsLeader(slice);
if (asyncRequests.size() == max) {
log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results);
keepGoing = waitAsyncRequests(maxWaitSecs, false);
if (keepGoing == false) {
break; // If we've waited longer than specified, don't continue to wait!
}
}
}
if (keepGoing == true) {
keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results);
keepGoing = waitAsyncRequests(maxWaitSecs, true);
}
if (keepGoing == true) {
log.info("All leader reassignments completed.");
@ -110,15 +153,72 @@ class RebalanceLeaders {
log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
}
checkLeaderStatus();
NamedList<Object> summary = new NamedList<>();
if (pendingOps.size() == 0) {
summary.add("Success", "All active replicas with the preferredLeader property set are leaders");
} else {
summary.add("Failure", "Not all active replicas with preferredLeader property are leaders");
}
rsp.getValues().add(SUMMARY, summary); // we want this first.
rsp.getValues().addAll(results);
}
private void ensurePreferredIsLeader(NamedList<Object> results,
Slice slice, Map<String, String> currentRequests) throws KeeperException, InterruptedException {
final String inactivePreferreds = "inactivePreferreds";
final String alreadyLeaders = "alreadyLeaders";
String collectionName = req.getParams().get(COLLECTION_PROP);
// Insure that ll required parameters are there and the doc colection exists.
private DocCollection checkParams() throws KeeperException, InterruptedException {
req.getParams().required().check(COLLECTION_PROP);
collectionName = req.getParams().get(COLLECTION_PROP);
if (StringUtils.isBlank(collectionName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the Rebalance Leaders command."));
}
coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName);
ClusterState clusterState = coreContainer.getZkController().getClusterState();
DocCollection dc = clusterState.getCollection(collectionName);
if (dc == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
}
return dc;
}
// Once we've done all the fiddling with the queues, check on the way out to see if all the active preferred
// leaders that we intended to change are in fact the leaders.
private void checkLeaderStatus() throws InterruptedException, KeeperException {
for (int idx = 0; pendingOps.size() > 0 && idx < 600; ++idx) {
ClusterState clusterState = coreContainer.getZkController().getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
DocCollection dc = clusterState.getCollection(collectionName);
for (Slice slice : dc.getSlices()) {
for (Replica replica : slice.getReplicas()) {
if (replica.isActive(liveNodes) && replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false)) {
if (replica.getBool(LEADER_PROP, false)) {
if (pendingOps.containsKey(slice.getName())) {
// Record for return that the leader changed successfully
pendingOps.remove(slice.getName());
addToSuccesses(slice, replica);
break;
}
}
}
}
}
TimeUnit.MILLISECONDS.sleep(100);
coreContainer.getZkController().getZkStateReader().forciblyRefreshAllClusterStateSlow();
}
addAnyFailures();
}
// The process is:
// if the replica with preferredLeader is already the leader, do nothing
// Otherwise:
// > if two nodes have the same sequence number and both point to the current leader, we presume that we've just
// moved it, move the one that does _not_ have the preferredLeader to the end of the list.
// > move the current leader to the end of the list. This _should_ mean that the current ephemeral node in the
// leader election queue is removed and the only remaining node watching it is triggered to become leader.
private void ensurePreferredIsLeader(Slice slice) throws KeeperException, InterruptedException {
for (Replica replica : slice.getReplicas()) {
// Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
@ -127,87 +227,126 @@ class RebalanceLeaders {
// OK, we are the preferred leader, are we the actual leader?
if (replica.getBool(LEADER_PROP, false)) {
//We're a preferred leader, but we're _also_ the leader, don't need to do anything.
NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
if (noops == null) {
noops = new NamedList<>();
results.add(alreadyLeaders, noops);
}
NamedList<Object> res = new NamedList<>();
res.add("status", "success");
res.add("msg", "Already leader");
res.add("shard", slice.getName());
res.add("nodeName", replica.getNodeName());
noops.add(replica.getName(), res);
addAlreadyLeaderToResults(slice, replica);
return; // already the leader, do nothing.
}
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
// We're the preferred leader, but someone else is leader. Only become leader if we're active.
if (replica.getState() != Replica.State.ACTIVE) {
NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
if (inactives == null) {
inactives = new NamedList<>();
results.add(inactivePreferreds, inactives);
}
NamedList<Object> res = new NamedList<>();
res.add("status", "skipped");
res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
res.add("shard", slice.getName());
res.add("nodeName", replica.getNodeName());
inactives.add(replica.getName(), res);
if (replica.isActive(zkStateReader.getClusterState().getLiveNodes()) == false) {
addInactiveToResults(slice, replica);
return; // Don't try to become the leader if we're not active!
}
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
if (electionQueueInBadState(electionNodes, slice, replica)) {
return;
}
// Replica is the preferred leader but not the actual leader, do something about that.
// "Something" is
// 1> if the preferred leader isn't first in line, tell it to re-queue itself.
// 2> tell the actual leader to re-queue itself.
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
log.info("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
"election queue, but replica " + replica.getName() + " doesn't think it's the leader.");
return;
}
// Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole
// string is used, but that sorts nodes with the same sequence number by their session IDs from ZK.
// While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are
// watching the leader node..
String firstWatcher = electionNodes.get(1);
if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) {
makeReplicaFirstWatcher(collectionName, slice, replica);
makeReplicaFirstWatcher(slice, replica);
}
String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP);
rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false);
waitForNodeChange(collectionName, slice, electionNodes.get(0));
// This replica should be the leader at the end of the day, so let's record that information to check at the end
pendingOps.put(slice.getName(), replica.getName());
String leaderElectionNode = electionNodes.get(0);
String coreName = slice.getReplica(LeaderElector.getNodeName(leaderElectionNode)).getStr(CORE_NAME_PROP);
rejoinElectionQueue(slice, leaderElectionNode, coreName, false);
waitForNodeChange(slice, leaderElectionNode);
return; // Done with this slice, skip the rest of the replicas.
}
}
// Check that the election queue has some members! There really should be two or more for this to make any sense,
// if there's only one we can't change anything.
private boolean electionQueueInBadState(List<String> electionNodes, Slice slice, Replica replica) {
if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
log.warn("Rebalancing leaders and slice {} has less than two elements in the leader " +
"election queue, but replica {} doesn't think it's the leader.", slice.getName(), replica.getName());
return true;
}
return false;
}
// Provide some feedback to the user about what actually happened, or in this case where no action was
// possible
private void addInactiveToResults(Slice slice, Replica replica) {
NamedList<Object> inactives = (NamedList<Object>) results.get(INACTIVE_PREFERREDS);
if (inactives == null) {
inactives = new NamedList<>();
results.add(INACTIVE_PREFERREDS, inactives);
}
NamedList<Object> res = new NamedList<>();
res.add("status", "skipped");
res.add("msg", "Replica " + replica.getName() + " is a referredLeader for shard " + slice.getName() + ", but is inactive. No change necessary");
inactives.add(replica.getName(), res);
}
// Provide some feedback to the user about what actually happened, or in this case where no action was
// necesary since this preferred replica was already the leader
private void addAlreadyLeaderToResults(Slice slice, Replica replica) {
NamedList<Object> alreadyLeaders = (NamedList<Object>) results.get(ALREADY_LEADERS);
if (alreadyLeaders == null) {
alreadyLeaders = new NamedList<>();
results.add(ALREADY_LEADERS, alreadyLeaders);
}
NamedList<Object> res = new NamedList<>();
res.add("status", "skipped");
res.add("msg", "Replica " + replica.getName() + " is already the leader for shard " + slice.getName() + ". No change necessary");
alreadyLeaders.add(replica.getName(), res);
}
// Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list
void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica)
// There can be "ties", i.e. replicas in the queue with the same sequence number. Sorting doesn't necessarily sort
// the one we most care about first. So put the node we _don't care about at the end of the election queuel
void makeReplicaFirstWatcher(Slice slice, Replica replica)
throws KeeperException, InterruptedException {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
// First, queue up the preferred leader at the head of the queue.
// First, queue up the preferred leader watching the leader if it isn't already
int secondSeq = Integer.MAX_VALUE;
int candidateSeq = -1;
for (int idx = 1; idx < electionNodes.size(); ++idx) {
String candidate = electionNodes.get(idx);
secondSeq = Math.min(secondSeq, LeaderElector.getSeq(candidate));
if (LeaderElector.getNodeName(candidate).equals(replica.getName())) {
candidateSeq = LeaderElector.getSeq(candidate);
}
}
int newSeq = -1;
for (String electionNode : electionNodes) {
if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
rejoinElection(collectionName, slice, electionNode, coreName, true);
newSeq = waitForNodeChange(collectionName, slice, electionNode);
break;
if (candidateSeq == secondSeq) {
// the preferredLeader is already watching the leader, no need to move it around.
newSeq = secondSeq;
} else {
for (String electionNode : electionNodes) {
if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
// Make the preferred leader watch the leader.
String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
rejoinElectionQueue(slice, electionNode, coreName, true);
newSeq = waitForNodeChange(slice, electionNode);
break;
}
}
}
if (newSeq == -1) {
@ -225,18 +364,22 @@ class RebalanceLeaders {
if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) {
continue;
}
// We won't get here for the preferredLeader node
if (LeaderElector.getSeq(thisNode) == newSeq) {
String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP);
rejoinElection(collectionName, slice, thisNode, coreName, false);
waitForNodeChange(collectionName, slice, thisNode);
rejoinElectionQueue(slice, thisNode, coreName, false);
waitForNodeChange(slice, thisNode);
}
}
}
int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException {
// We're just waiting for the electionNode to rejoin the queue with a _different_ node, indicating that any
// requeueing we've done has happened.
int waitForNodeChange(Slice slice, String electionNode) throws InterruptedException, KeeperException {
String nodeName = LeaderElector.getNodeName(electionNode);
int oldSeq = LeaderElector.getSeq(electionNode);
for (int idx = 0; idx < 600; ++idx) {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
@ -245,14 +388,16 @@ class RebalanceLeaders {
return LeaderElector.getSeq(testNode);
}
}
Thread.sleep(100);
TimeUnit.MILLISECONDS.sleep(100);
zkStateReader.forciblyRefreshAllClusterStateSlow();
}
return -1;
}
private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
boolean rejoinAtHead) throws KeeperException, InterruptedException {
// Move an election node to some other place in the queue. If rejoinAtHead==false, then at the end, otherwise
// the new node should point at the leader.
private void rejoinElectionQueue(Slice slice, String electionNode, String core, boolean rejoinAtHead)
throws KeeperException, InterruptedException {
Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
Map<String, Object> propMap = new HashMap<>();
propMap.put(COLLECTION_PROP, collectionName);
@ -265,64 +410,84 @@ class RebalanceLeaders {
propMap.put(ELECTION_NODE_PROP, electionNode);
String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime());
propMap.put(ASYNC, asyncId);
asyncRequests.add(asyncId);
collectionsHandler.sendToOCPQueue(new ZkNodeProps(propMap)); // ignore response; we construct our own
}
// currentAsyncIds - map of request IDs and reporting data (value)
// maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
// waitForAll - if true, do not return until all assignments have been made.
// results - a place to stash results for reporting back to the user.
// waitForAll - if true, do not return until all requests have been processed. "Processed" could mean failure!
//
private boolean waitForLeaderChange(Map<String, String> currentAsyncIds, final int maxWaitSecs,
Boolean waitForAll, NamedList<Object> results)
private boolean waitAsyncRequests(final int maxWaitSecs, Boolean waitForAll)
throws KeeperException, InterruptedException {
if (currentAsyncIds.size() == 0) return true;
if (asyncRequests.size() == 0) {
return true;
}
for (int idx = 0; idx < maxWaitSecs * 10; ++idx) {
Iterator<Map.Entry<String, String>> iter = currentAsyncIds.entrySet().iterator();
Iterator<String> iter = asyncRequests.iterator();
boolean foundChange = false;
while (iter.hasNext()) {
Map.Entry<String, String> pair = iter.next();
String asyncId = pair.getKey();
String asyncId = iter.next();
if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
coreContainer.getZkController().clearAsyncId(asyncId);
NamedList<Object> fails = (NamedList<Object>) results.get("failures");
if (fails == null) {
fails = new NamedList<>();
results.add("failures", fails);
}
NamedList<Object> res = new NamedList<>();
res.add("status", "failed");
res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader");
fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
iter.remove();
foundChange = true;
} else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) {
coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
coreContainer.getZkController().clearAsyncId(asyncId);
NamedList<Object> successes = (NamedList<Object>) results.get("successes");
if (successes == null) {
successes = new NamedList<>();
results.add("successes", successes);
}
NamedList<Object> res = new NamedList<>();
res.add("status", "success");
res.add("msg", "Assigned '" + pair.getValue() + "' to be leader");
successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
iter.remove();
foundChange = true;
}
}
// We're done if we're processing a few at a time or all requests are processed.
if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) {
// We don't want to change, say, 100s of leaders simultaneously. So if the request specifies some limit,
// and we're at that limit, we want to return to the caller so it can immediately add another request.
// That's the purpose of the first clause here. Otherwise, of course, just return if all requests are
// processed.
if ((foundChange && waitForAll == false) || asyncRequests.size() == 0) {
return true;
}
Thread.sleep(100); //TODO: Is there a better thing to do than sleep here?
TimeUnit.MILLISECONDS.sleep(100);
}
// If we get here, we've timed out waiting.
return false;
}
// If we actually changed the leader, we should send that fact back in the response.
private void addToSuccesses(Slice slice, Replica replica) {
NamedList<Object> successes = (NamedList<Object>) results.get("successes");
if (successes == null) {
successes = new NamedList<>();
results.add("successes", successes);
}
log.info("Successfully changed leader of shard {} to replica {}", slice.getName(), replica.getName());
NamedList<Object> res = new NamedList<>();
res.add("status", "success");
res.add("msg", "Successfully changed leader of slice " + slice.getName() + " to " + replica.getName());
successes.add(slice.getName(), res);
}
// If for any reason we were supposed to change leadership, that should be recorded in changingLeaders. Any
// time we verified that the change actually occurred, that entry should have been removed. So report anything
// left over as a failure.
private void addAnyFailures() {
if (pendingOps.size() == 0) {
return;
}
NamedList<Object> fails = (NamedList<Object>) new NamedList<>();
results.add("failures", fails);
for (Map.Entry<String, String> ent : pendingOps.entrySet()) {
log.info("Failed to change leader of shard {} to replica {}", ent.getKey(), ent.getValue());
NamedList<Object> res = new NamedList<>();
res.add("status", "failed");
res.add("msg", String.format(Locale.ROOT, "Could not change leder for slice %s to %s", ent.getKey(), ent.getValue()));
fails.add(ent.getKey(), res);
}
}
}

View File

@ -15,335 +15,629 @@
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestRebalanceLeaders extends AbstractFullDistribZkTestBase {
@LuceneTestCase.Slow
public class TestRebalanceLeaders extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String COLLECTION_NAME = "testcollection";
private static final String COLLECTION_NAME = "TestColl";
private static int numNodes;
private static int numShards;
private static int numReplicas;
private static boolean useAdminToSetProps = false;
@BeforeClass
public static void setupCluster() throws Exception {
numNodes = random().nextInt(4) + 4;
numShards = random().nextInt(3) + 3;
numReplicas = random().nextInt(2) + 2;
useAdminToSetProps = random().nextBoolean();
configureCluster(numNodes)
.addConfig(COLLECTION_NAME, configset("cloud-minimal"))
.configure();
CollectionAdminResponse resp = CollectionAdminRequest.createCollection(COLLECTION_NAME, COLLECTION_NAME,
numShards, numReplicas, 0, 0)
.setMaxShardsPerNode((numShards * numReplicas) / numNodes + 1)
.process(cluster.getSolrClient());
assertEquals("Admin request failed; ", 0, resp.getStatus());
cluster.waitForActiveCollection(COLLECTION_NAME, numShards, numShards * numReplicas);
public TestRebalanceLeaders() {
schemaString = "schema15.xml"; // we need a string id
sliceCount = 4;
}
int reps = 10;
@Before
public void removeAllProperties() throws KeeperException, InterruptedException {
forceUpdateCollectionStatus();
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
for (Slice slice : docCollection.getSlices()) {
for (Replica rep : slice.getReplicas()) {
rep.getProperties().forEach((key, value) -> {
if (key.startsWith("property.")) {
try {
delProp(slice, rep, key);
} catch (IOException | SolrServerException e) {
fail("Caught unexpected exception in @Before " + e.getMessage());
}
}
});
}
}
}
int timeoutMs = 60000;
Map<String, List<Replica>> initial = new HashMap<>();
Map<String, Replica> expected = new HashMap<>();
// test that setting an arbitrary "slice unique" property un-sets the property if it's on another replica in the
// slice. This is testing when the property is set on an _individual_ replica whereas testBalancePropertySliceUnique
// tests whether changing an individual _replica_ un-sets the property on other replicas _in that slice_.
//
// NOTE: There were significant problems because at one point the code implicitly defined
// shardUnique=true for the special property preferredLeader. That was removed at one point so we're explicitly
// testing that as well.
@Test
@ShardsFixed(num = 4)
public void test() throws Exception {
reps = random().nextInt(9) + 1; // make sure and do at least one.
try (CloudSolrClient client = createCloudClient(null)) {
// Mix up a bunch of different combinations of shards and replicas in order to exercise boundary cases.
// shards, replicationfactor, maxreplicaspernode
int shards = random().nextInt(7);
if (shards < 2) shards = 2;
int rFactor = random().nextInt(4);
if (rFactor < 2) rFactor = 2;
createCollection(null, COLLECTION_NAME, shards, rFactor, shards * rFactor + 1, client, null, "conf1");
}
waitForCollection(cloudClient.getZkStateReader(), COLLECTION_NAME, 2);
waitForRecoveriesToFinish(COLLECTION_NAME, false);
listCollection();
rebalanceLeaderTest();
public void testSetArbitraryPropertySliceUnique() throws IOException, SolrServerException, InterruptedException, KeeperException {
// Check both special (preferredLeader) and something arbitrary.
doTestSetArbitraryPropertySliceUnique("foo" + random().nextInt(1_000_000));
removeAllProperties();
doTestSetArbitraryPropertySliceUnique("preferredleader");
}
private void listCollection() throws IOException, SolrServerException {
//CloudSolrServer client = createCloudClient(null);
try {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.LIST.toString());
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
NamedList<Object> rsp = cloudClient.request(request);
List<String> collections = (List<String>) rsp.get("collections");
assertTrue("control_collection was not found in list", collections.contains("control_collection"));
assertTrue(DEFAULT_COLLECTION + " was not found in list", collections.contains(DEFAULT_COLLECTION));
assertTrue(COLLECTION_NAME + " was not found in list", collections.contains(COLLECTION_NAME));
} finally {
//remove collections
//client.shutdown();
// Test that automatically distributing a slice unique property un-sets that property if it's in any other replica
// on that slice.
// This is different than the test above. The test above sets individual properties on individual nodes. This one
// relies on Solr to pick which replicas to set the property on
@Test
public void testBalancePropertySliceUnique() throws KeeperException, InterruptedException, IOException, SolrServerException {
// Check both cases of "special" property preferred(Ll)eader
doTestBalancePropertySliceUnique("foo" + random().nextInt(1_000_000));
removeAllProperties();
doTestBalancePropertySliceUnique("preferredleader");
}
// We've moved on from a property being tested, we need to check if rebalancing the leaders actually chantges the
// leader appropriately.
@Test
public void testRebalanceLeaders() throws Exception {
// First let's unbalance the preferredLeader property, do all the leaders get reassigned properly?
concentrateProp("preferredLeader");
sendRebalanceCommand();
checkPreferredsAreLeaders();
// Now follow up by evenly distributing the property as well as possible.
doTestBalancePropertySliceUnique("preferredLeader");
sendRebalanceCommand();
checkPreferredsAreLeaders();
// Now check the condition we saw "in the wild" where you could not rebalance properly when Jetty was restarted.
concentratePropByRestartingJettys();
sendRebalanceCommand();
checkPreferredsAreLeaders();
}
// Insure that the property is set on only one replica per slice when changing a unique property on an individual
// replica.
private void doTestSetArbitraryPropertySliceUnique(String propIn) throws InterruptedException, KeeperException, IOException, SolrServerException {
final String prop = (random().nextBoolean()) ? propIn : propIn.toUpperCase(Locale.ROOT);
// First set the property in some replica in some slice
forceUpdateCollectionStatus();
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
Slice[] slices = docCollection.getSlices().toArray(new Slice[0]);
Slice slice = slices[random().nextInt(slices.length)];
// Bounce around a bit setting this property and insure it's only set in one replica.
Replica[] reps = slice.getReplicas().toArray(new Replica[0]);
for (int idx = 0; idx < 4; ++idx) {
Replica rep = reps[random().nextInt(reps.length)];
// Set the property on a particular replica
setProp(slice, rep, prop);
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
long count = 0;
boolean rightRep = false;
Slice modSlice;
DocCollection modColl = null; // keeps IDE happy
// insure that no other replica in that slice has the property when we return.
while (timeout.hasTimedOut() == false) {
forceUpdateCollectionStatus();
modColl = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
modSlice = modColl.getSlice(slice.getName());
rightRep = modSlice.getReplica(rep.getName()).getBool("property." + prop.toLowerCase(Locale.ROOT), false);
count = modSlice.getReplicas().stream().filter(thisRep -> thisRep.getBool("property." + prop.toLowerCase(Locale.ROOT), false)).count();
if (count == 1 && rightRep) {
break;
}
TimeUnit.MILLISECONDS.sleep(100);
}
if (count != 1 || rightRep == false) {
fail("The property " + prop + " was not uniquely distributed in slice " + slice.getName()
+ " " + modColl.toString());
}
}
}
void recordInitialState() throws InterruptedException {
Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap();
// Assemble a list of all the replicas for all the shards in a convenient way to look at them.
for (Map.Entry<String, Slice> ent : slices.entrySet()) {
initial.put(ent.getKey(), new ArrayList<>(ent.getValue().getReplicas()));
}
}
void rebalanceLeaderTest() throws InterruptedException, IOException, SolrServerException, KeeperException {
recordInitialState();
for (int idx = 0; idx < reps; ++idx) {
issueCommands();
checkConsistency();
}
}
// After we've called the rebalance command, we want to insure that:
// 1> all replicas appear once and only once in the respective leader election queue
// 2> All the replicas we _think_ are leaders are in the 0th position in the leader election queue.
// 3> The node that ZooKeeper thinks is the leader is the one we think should be the leader.
void checkConsistency() throws InterruptedException, KeeperException {
// Fail if we the replicas with the preferredLeader property are _not_ also the leaders.
private void checkPreferredsAreLeaders() throws InterruptedException, KeeperException {
// Make sure that the shard unique are where you expect.
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
boolean checkAppearOnce = false;
boolean checkElectionZero = false;
boolean checkZkLeadersAgree = false;
while (!timeout.hasTimedOut()) {
checkAppearOnce = checkAppearOnce();
checkElectionZero = checkElectionZero();
checkZkLeadersAgree = checkZkLeadersAgree();
if (checkAppearOnce && checkElectionZero && checkZkLeadersAgree) {
while (timeout.hasTimedOut() == false) {
if (checkPreferredsAreLeaders(false)) {
// Ok, all preferreds are leaders. Just for Let's also get the election queue and guarantee that every
// live replica is in the queue and none are repeated.
checkElectionQueues();
return;
}
Thread.sleep(1000);
TimeUnit.MILLISECONDS.sleep(100);
}
fail("Checking the rebalance leader command failed, checkAppearOnce=" + checkAppearOnce + " checkElectionZero="
+ checkElectionZero + " checkZkLeadersAgree=" + checkZkLeadersAgree);
log.error("Leaders are not all preferres {}", cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME));
// Show the errors
checkPreferredsAreLeaders(true);
}
// Do all active nodes in each slice appear exactly once in the slice's leader election queue?
// Since we assert that the number of live replicas is the same size as the leader election queue, we only
// have to compare one way.
private void checkElectionQueues() throws KeeperException, InterruptedException {
// Do all the nodes appear exactly once in the leader election queue and vice-versa?
Boolean checkAppearOnce() throws KeeperException, InterruptedException {
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
for (Map.Entry<String, List<Replica>> ent : initial.entrySet()) {
List<String> leaderQueue = cloudClient.getZkStateReader().getZkClient().getChildren("/collections/" + COLLECTION_NAME +
"/leader_elect/" + ent.getKey() + "/election", null, true);
if (leaderQueue.size() != ent.getValue().size()) {
return false;
}
// Check that each election node has a corresponding replica.
for (String electionNode : leaderQueue) {
if (checkReplicaName(LeaderElector.getNodeName(electionNode), ent.getValue())) {
continue;
for (Slice slice : docCollection.getSlices()) {
Set<Replica> liveReplicas = new HashSet<>();
slice.getReplicas().forEach(replica -> {
if (replica.isActive(liveNodes)) {
liveReplicas.add(replica);
}
return false;
}
// Check that each replica has an election node.
for (Replica rep : ent.getValue()) {
if (checkElectionNode(rep.getName(), leaderQueue)) {
continue;
}
return false;
}
});
checkOneQueue(docCollection, slice, liveReplicas);
}
return true;
}
// Check that the given name is in the leader election queue
Boolean checkElectionNode(String repName, List<String> leaderQueue) {
// Helper method to check one leader election queue's consistency.
private void checkOneQueue(DocCollection coll, Slice slice, Set<Replica> liveReplicas) throws KeeperException, InterruptedException {
List<String> leaderQueue = cluster.getSolrClient().getZkStateReader().getZkClient().getChildren("/collections/" + COLLECTION_NAME +
"/leader_elect/" + slice.getName() + "/election", null, true);
if (leaderQueue.size() != liveReplicas.size()) {
log.error("One or more replicas is missing from the leader election queue! Slice {}, election queue: {}, collection: {}"
, slice.getName(), leaderQueue, coll);
fail("One or more replicas is missing from the leader election queue");
}
// Check that each election node has a corresponding live replica.
for (String electionNode : leaderQueue) {
if (repName.equals(LeaderElector.getNodeName(electionNode))) {
return true;
String replica = LeaderElector.getNodeName(electionNode);
if (slice.getReplica(replica) == null) {
log.error("Replica {} is not in the election queue: {}", replica, leaderQueue);
fail("Replica is not in the election queue!");
}
}
return false;
}
// Check that the name passed in corresponds to a replica.
Boolean checkReplicaName(String toCheck, List<Replica> replicas) {
for (Replica rep : replicas) {
if (toCheck.equals(rep.getName())) {
return true;
}
}
return false;
}
// Get the shard leader election from ZK and sort it. The node may not actually be there, so retry
List<String> getOverseerSort(String key) {
List<String> ret = null;
try {
ret = OverseerCollectionConfigSetProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
"/collections/" + COLLECTION_NAME + "/leader_elect/" + key + "/election");
return ret;
} catch (KeeperException e) {
cloudClient.connect();
} catch (InterruptedException e) {
return null;
}
return null;
}
// Is every node we think is the leader in the zeroth position in the leader election queue?
Boolean checkElectionZero() {
for (Map.Entry<String, Replica> ent : expected.entrySet()) {
List<String> leaderQueue = getOverseerSort(ent.getKey());
if (leaderQueue == null) return false;
String electName = LeaderElector.getNodeName(leaderQueue.get(0));
String coreName = ent.getValue().getName();
if (electName.equals(coreName) == false) {
return false;
// Just an encapsulation for checkPreferredsAreLeaders to make returning easier.
// the doAsserts var is to actually print the problem and fail the test if the condition is not met.
private boolean checkPreferredsAreLeaders(boolean doAsserts) throws KeeperException, InterruptedException {
forceUpdateCollectionStatus();
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
for (Slice slice : docCollection.getSlices()) {
for (Replica rep : slice.getReplicas()) {
if (rep.getBool("property.preferredleader", false)) {
boolean isLeader = rep.getBool("leader", false);
if (doAsserts) {
assertTrue("PreferredLeader should be the leader: ", isLeader);
} else if (isLeader == false) {
return false;
}
}
}
}
return true;
}
// Do who we _think_ should be the leader agree with the leader nodes?
Boolean checkZkLeadersAgree() throws KeeperException, InterruptedException {
for (Map.Entry<String,Replica> ent : expected.entrySet()) {
String path = "/collections/" + COLLECTION_NAME + "/leaders/" + ent.getKey() + "/leader";
byte[] data = getZkData(cloudClient, path);
if (data == null) {
log.warn("path to check not found {}", path);
return false;
}
String repCore = null;
String zkCore = null;
Map m = (Map) Utils.fromJSON(data);
zkCore = (String) m.get("core");
repCore = ent.getValue().getStr("core");
if (zkCore.equals(repCore) == false) {
log.warn("leader in zk does not match what we expect: {} != {}", zkCore, repCore);
return false;
}
}
return true;
}
byte[] getZkData(CloudSolrClient client, String path) {
org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
try {
byte[] data = client.getZkStateReader().getZkClient().getData(path, null, stat, true);
if (data != null) {
return data;
}
} catch (KeeperException.NoNodeException e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
return null;
}
} catch (InterruptedException | KeeperException e) {
return null;
}
return null;
}
// It's OK not to check the return here since the subsequent tests will fail.
void issueCommands() throws IOException, SolrServerException, KeeperException, InterruptedException {
// Find a replica to make the preferredLeader. NOTE: may be one that's _already_ leader!
expected.clear();
for (Map.Entry<String, List<Replica>> ent : initial.entrySet()) {
List<Replica> replicas = ent.getValue();
Replica rep = replicas.get(Math.abs(random().nextInt()) % replicas.size());
expected.put(ent.getKey(), rep);
issuePreferred(ent.getKey(), rep);
}
if (waitForAllPreferreds() == false) {
fail("Waited for timeout for preferredLeader assignments to be made and they werent.");
}
//fillExpectedWithCurrent();
// Now rebalance the leaders randomly using SolrJ or direct call
if(random().nextBoolean())
// Arbitrarily send the rebalance command either with the SolrJ interface or with an HTTP request.
private void sendRebalanceCommand() throws SolrServerException, InterruptedException, IOException {
if (random().nextBoolean()) {
rebalanceLeaderUsingSolrJAPI();
else
rebalanceLeaderUsingDirectCall();
} else {
rebalanceLeaderUsingStandardRequest();
}
}
// Helper method to make sure the property is _unbalanced_ first, then it gets properly re-assigned with the
// BALANCESHARDUNIQUE command.
private void doTestBalancePropertySliceUnique(String propIn) throws InterruptedException, IOException, KeeperException, SolrServerException {
final String prop = (random().nextBoolean()) ? propIn : propIn.toUpperCase(Locale.ROOT);
// Concentrate the properties on as few replicas a possible
concentrateProp(prop);
// issue the BALANCESHARDUNIQUE command
rebalancePropAndCheck(prop);
// Verify that there are no more than one replica with the property per shard.
verifyPropUniquePerShard(prop);
// Verify that the property is reasonably evenly distributed
verifyPropCorrectlyDistributed(prop);
}
private void rebalanceLeaderUsingSolrJAPI() throws IOException, SolrServerException {
CollectionAdminRequest.RebalanceLeaders rebalanceLeaders = CollectionAdminRequest.rebalanceLeaders(COLLECTION_NAME);
rebalanceLeaders.setMaxAtOnce(10)
.process(cloudClient);
private void verifyPropCorrectlyDistributed(String prop) throws KeeperException, InterruptedException {
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
String propLC = prop.toLowerCase(Locale.ROOT);
DocCollection docCollection = null;
while (timeout.hasTimedOut() == false) {
forceUpdateCollectionStatus();
docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
int maxPropCount = Integer.MAX_VALUE;
int minPropCount = Integer.MIN_VALUE;
for (Slice slice : docCollection.getSlices()) {
int repCount = 0;
for (Replica rep : slice.getReplicas()) {
if (rep.getBool("property." + propLC, false)) {
repCount++;
}
}
maxPropCount = Math.max(maxPropCount, repCount);
minPropCount = Math.min(minPropCount, repCount);
}
if (Math.abs(maxPropCount - minPropCount) < 2) return;
}
log.error("Property {} is not distributed evenly. {}", prop, docCollection);
fail("Property is not distributed evenly " + prop);
}
private void rebalanceLeaderUsingDirectCall() throws IOException, SolrServerException {
// Used when we concentrate the leader on a few nodes.
private void verifyPropDistributedAsExpected(Map<String, String> expectedShardReplicaMap, String prop) throws InterruptedException, KeeperException {
// Make sure that the shard unique are where you expect.
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
String propLC = prop.toLowerCase(Locale.ROOT);
boolean failure = false;
DocCollection docCollection = null;
while (timeout.hasTimedOut() == false) {
forceUpdateCollectionStatus();
docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
failure = false;
for (Map.Entry<String, String> ent : expectedShardReplicaMap.entrySet()) {
Replica rep = docCollection.getSlice(ent.getKey()).getReplica(ent.getValue());
if (rep.getBool("property." + propLC, false) == false) {
failure = true;
}
}
if (failure == false) {
return;
}
TimeUnit.MILLISECONDS.sleep(100);
}
fail(prop + " properties are not on the expected replicas: " + docCollection.toString()
+ System.lineSeparator() + "Expected " + expectedShardReplicaMap.toString());
}
// Just check that the property is distributed as expectecd. This does _not_ rebalance the leaders
private void rebalancePropAndCheck(String prop) throws IOException, SolrServerException, InterruptedException, KeeperException {
if (random().nextBoolean()) {
rebalancePropUsingSolrJAPI(prop);
} else {
rebalancePropUsingStandardRequest(prop);
}
}
private void rebalanceLeaderUsingSolrJAPI() throws IOException, SolrServerException, InterruptedException {
CollectionAdminResponse resp = CollectionAdminRequest
.rebalanceLeaders(COLLECTION_NAME)
.process(cluster.getSolrClient());
assertTrue("All leaders should have been verified", resp.getResponse().get("Summary").toString().contains("Success"));
assertEquals("Admin request failed; ", 0, resp.getStatus());
}
private void rebalanceLeaderUsingStandardRequest() throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString());
// Insure we get error returns when omitting required parameters
params.set("collection", COLLECTION_NAME);
params.set("maxAtOnce", "10");
SolrRequest request = new QueryRequest(params);
QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
cloudClient.request(request);
QueryResponse resp = request.process(cluster.getSolrClient());
assertTrue("All leaders should have been verified", resp.getResponse().get("Summary").toString().contains("Success"));
assertEquals("Call to rebalanceLeaders failed ", 0, resp.getStatus());
}
void issuePreferred(String slice, Replica rep) throws IOException, SolrServerException, InterruptedException {
private void rebalancePropUsingSolrJAPI(String prop) throws IOException, SolrServerException, InterruptedException {
// Don't set the value, that should be done automatically.
CollectionAdminResponse resp;
if (prop.toLowerCase(Locale.ROOT).contains("preferredleader")) {
resp = CollectionAdminRequest
.balanceReplicaProperty(COLLECTION_NAME, prop)
.process(cluster.getSolrClient());
} else {
resp = CollectionAdminRequest
.balanceReplicaProperty(COLLECTION_NAME, prop)
.setShardUnique(true)
.process(cluster.getSolrClient());
}
assertEquals("Admin request failed; ", 0, resp.getStatus());
}
private void rebalancePropUsingStandardRequest(String prop) throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.BALANCESHARDUNIQUE.toString());
params.set("property", prop);
params.set("collection", COLLECTION_NAME);
if (prop.toLowerCase(Locale.ROOT).contains("preferredleader") == false) {
params.set("shardUnique", true);
}
QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
QueryResponse resp = request.process(cluster.getSolrClient());
assertEquals("Call to rebalanceLeaders failed ", 0, resp.getStatus());
}
// This important. I've (Erick Erickson) run across a situation where the "standard request" causes failures, but
// never the Admin request. So let's test both all the time for a given test.
//
// This sets an _individual_ replica to have the property, not collection-wide
private void setProp(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
if (useAdminToSetProps) {
setPropWithAdminRequest(slice, rep, prop);
} else {
setPropWithStandardRequest(slice, rep, prop);
}
}
void setPropWithStandardRequest(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.ADDREPLICAPROP.toString());
// Insure we get error returns when omitting required parameters
params.set("collection", COLLECTION_NAME);
params.set("shard", slice);
params.set("shard", slice.getName());
params.set("replica", rep.getName());
params.set("property", "preferredLeader");
params.set("property", prop);
params.set("property.value", "true");
// Test to insure that implicit shardUnique is added for preferredLeader.
if (prop.toLowerCase(Locale.ROOT).equals("preferredleader") == false) {
params.set("shardUnique", "true");
}
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
cloudClient.request(request);
cluster.getSolrClient().request(request);
String propLC = prop.toLowerCase(Locale.ROOT);
waitForState("Expecting property '" + prop + "'to appear on replica " + rep.getName(), COLLECTION_NAME,
(n, c) -> "true".equals(c.getReplica(rep.getName()).getProperty(propLC)));
}
boolean waitForAllPreferreds() throws KeeperException, InterruptedException {
boolean goAgain = true;
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
while (! timeout.hasTimedOut()) {
goAgain = false;
Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap();
void setPropWithAdminRequest(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
boolean setUnique = (prop.toLowerCase(Locale.ROOT).equals("preferredleader") == false);
CollectionAdminRequest.AddReplicaProp addProp =
CollectionAdminRequest.addReplicaProperty(COLLECTION_NAME, slice.getName(), rep.getName(), prop, "true");
if (setUnique) {
addProp.setShardUnique(true);
}
CollectionAdminResponse resp = addProp.process(cluster.getSolrClient());
assertEquals(0, resp.getStatus());
String propLC = prop.toLowerCase(Locale.ROOT);
waitForState("Expecting property '" + prop + "'to appear on replica " + rep.getName(), COLLECTION_NAME,
(n, c) -> "true".equals(c.getReplica(rep.getName()).getProperty(propLC)));
for (Map.Entry<String, Replica> ent : expected.entrySet()) {
Replica me = slices.get(ent.getKey()).getReplica(ent.getValue().getName());
if (me.getBool("property.preferredleader", false) == false) {
goAgain = true;
break;
}
private void delProp(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
String propLC = prop.toLowerCase(Locale.ROOT);
CollectionAdminResponse resp = CollectionAdminRequest.deleteReplicaProperty(COLLECTION_NAME, slice.getName(), rep.getName(), propLC)
.process(cluster.getSolrClient());
assertEquals("Admin request failed; ", 0, resp.getStatus());
waitForState("Expecting property '" + prop + "' to be removed from replica " + rep.getName(), COLLECTION_NAME,
(n, c) -> c.getReplica(rep.getName()).getProperty(prop) == null);
}
// Intentionally un-balance the property to insure that BALANCESHARDUNIQUE does its job. There was an odd case
// where rebalancing didn't work very well if the Solr nodes were stopped and restarted that worked perfectly
// when if the nodes were _not_ restarted in the test. So we have to test that too.
private void concentratePropByRestartingJettys() throws Exception {
List<JettySolrRunner> jettys = new ArrayList<>(cluster.getJettySolrRunners());
Collections.shuffle(jettys, random());
jettys.remove(random().nextInt(jettys.size()));
// Now we have a list of jettys, and there is one missing. Stop all of the remaining jettys, then start them again
// to concentrate the leaders. It's not necessary that all shards have a leader.
for (JettySolrRunner jetty : jettys) {
cluster.stopJettySolrRunner(jetty);
cluster.waitForJettyToStop(jetty);
}
checkReplicasInactive(jettys);
for (int idx = 0; idx < jettys.size(); ++idx) {
cluster.startJettySolrRunner(jettys.get(idx));
}
cluster.waitForAllNodes(60);
// the nodes are present, but are all replica active?
checkAllReplicasActive();
}
// while banging my nead against a wall, I put a lot of force refresh statements in. Want to leave them in
// but have this be a no-op so if we start to get failures, we can re-enable with minimal effort.
private void forceUpdateCollectionStatus() throws KeeperException, InterruptedException {
// cluster.getSolrClient().getZkStateReader().forceUpdateCollection(COLLECTION_NAME);
}
// Since we have to restart jettys, we don't want to try rebalancing etc. until we're sure all jettys that should
// be up are up and all replicas are active.
private void checkReplicasInactive(List<JettySolrRunner> downJettys) throws KeeperException, InterruptedException {
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
DocCollection docCollection = null;
Set<String> liveNodes = null;
Set<String> downJettyNodes = new TreeSet<>();
for (JettySolrRunner jetty : downJettys) {
downJettyNodes.add(jetty.getBaseUrl().getHost() + ":" + jetty.getBaseUrl().getPort() + "_solr");
}
while (timeout.hasTimedOut() == false) {
forceUpdateCollectionStatus();
docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
boolean expectedInactive = true;
for (Slice slice : docCollection.getSlices()) {
for (Replica rep : slice.getReplicas()) {
if (downJettyNodes.contains(rep.getNodeName()) == false) {
continue; // We are on a live node
}
// A replica on an allegedly down node is reported as active.
if (rep.isActive(liveNodes)) {
expectedInactive = false;
}
}
}
if (goAgain) {
Thread.sleep(250);
} else {
return true;
if (expectedInactive) {
return;
}
TimeUnit.MILLISECONDS.sleep(100);
}
return false;
fail("timed out waiting for all replicas to become inactive: livenodes: " + liveNodes +
" Collection state: " + docCollection.toString());
}
}
// We need to wait around until all replicas are active before expecting rebalancing or distributing shard-unique
// properties to work.
private void checkAllReplicasActive() throws KeeperException, InterruptedException {
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
while (timeout.hasTimedOut() == false) {
forceUpdateCollectionStatus();
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
boolean allActive = true;
for (Slice slice : docCollection.getSlices()) {
for (Replica rep : slice.getReplicas()) {
if (rep.isActive(liveNodes) == false) {
allActive = false;
}
}
}
if (allActive) {
return;
}
TimeUnit.MILLISECONDS.sleep(100);
}
fail("timed out waiting for all replicas to become active");
}
// use a simple heuristic to put as many replicas with the property on as few nodes as possible. The point is that
// then we can execute BALANCESHARDUNIQUE and be sure it worked correctly
private void concentrateProp(String prop) throws KeeperException, InterruptedException, IOException, SolrServerException {
// find all the live nodes
// for each slice, assign the leader to the first replica that is in the lowest position on live_nodes
List<String> liveNodes = new ArrayList<>(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes());
Collections.shuffle(liveNodes, random());
Map<String, String> uniquePropMap = new TreeMap<>();
forceUpdateCollectionStatus();
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
for (Slice slice : docCollection.getSlices()) {
Replica changedRep = null;
int livePos = Integer.MAX_VALUE;
for (Replica rep : slice.getReplicas()) {
int pos = liveNodes.indexOf(rep.getNodeName());
if (pos >= 0 && pos < livePos) {
livePos = pos;
changedRep = rep;
}
}
if (livePos == Integer.MAX_VALUE) {
fail("Invalid state! We should have a replica to add the property to! " + docCollection.toString());
}
uniquePropMap.put(slice.getName(), changedRep.getName());
// Now set the property on the "lowest" node in live_nodes.
setProp(slice, changedRep, prop);
}
verifyPropDistributedAsExpected(uniquePropMap, prop);
}
// make sure that the property in question is unique per shard.
private Map<String, String> verifyPropUniquePerShard(String prop) throws InterruptedException, KeeperException {
Map<String, String> uniquePropMaps = new TreeMap<>();
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
while (timeout.hasTimedOut() == false) {
uniquePropMaps.clear();
if (checkdUniquePropPerShard(uniquePropMaps, prop)) {
return uniquePropMaps;
}
TimeUnit.MILLISECONDS.sleep(100);
}
fail("There should be exactly one replica with value " + prop + " set to true per shard: "
+ cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).toString());
return null; // keeps IDE happy.
}
// return true if every shard has exactly one replica with the unique property set to "true"
private boolean checkdUniquePropPerShard(Map<String, String> uniques, String prop) throws KeeperException, InterruptedException {
forceUpdateCollectionStatus();
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
for (Slice slice : docCollection.getSlices()) {
int propfCount = 0;
for (Replica rep : slice.getReplicas()) {
if (rep.getBool("property." + prop.toLowerCase(Locale.ROOT), false)) {
propfCount++;
uniques.put(slice.getName(), rep.getName());
}
}
if (1 != propfCount) {
return false;
}
}
return true;
}
}

View File

@ -2163,6 +2163,8 @@ http://localhost:8983/solr/admin/collections?action=REBALANCELEADERS&collection=
In this example, two replicas in the "alreadyLeaders" section already had the leader assigned to the same node as the `preferredLeader` property so no action was taken.
The "Success" tag indicates that the command rebalanced all leaders. If, for any reason some replicas with preferredLeader=true are not leaders, this will be "Failure" rather than "Success". If a replica cannot be made leader due to not being "Active", it's also considered a failure.
The replica in the "inactivePreferreds" section had the `preferredLeader` property set but the node was down and no action was taken. The three nodes in the "successes" section were made leaders because they had the `preferredLeader` property set but were not leaders and they were active.
[source,xml]
@ -2172,6 +2174,9 @@ The replica in the "inactivePreferreds" section had the `preferredLeader` proper
<int name="status">0</int>
<int name="QTime">123</int>
</lst>
<lst>
<str name="Success">All replicas with the preferredLeader property set are leaders</str>
</lst>
<lst name="alreadyLeaders">
<lst name="core_node1">
<str name="status">success</str>
@ -2219,6 +2224,10 @@ The replica in the "inactivePreferreds" section had the `preferredLeader` proper
Examining the clusterstate after issuing this call should show that every live node that has the `preferredLeader` property should also have the "leader" property set to _true_.
NOTE: The added work done by an NRT leader during indexing is quite small. The primary use-case is to redistribute the leader role if there are a large number of leaders concentrated on a small number of nodes. Rebalancing will likely not improve performance unless the imbalance of leadership roles is measured in multiples of 10.
NOTE: The BALANCESHARDUNIQUE command that distributes the preferredLeader property does not guarantee perfect distribution and in some collection topoligies it is impossible to make that guarantee.
[[forceleader]]
== FORCELEADER: Force Shard Leader