From a692d05a909e1ce385c56c087cb62911c27b5f5b Mon Sep 17 00:00:00 2001 From: Erick Erickson Date: Sat, 19 Jan 2019 19:20:39 -0800 Subject: [PATCH] SOLR-13091: REBALANCELEADERS is broken --- solr/CHANGES.txt | 2 + .../solr/cloud/ExclusiveSliceProperty.java | 20 + .../solr/handler/admin/RebalanceLeaders.java | 383 ++++++--- .../solr/cloud/TestRebalanceLeaders.java | 782 ++++++++++++------ solr/solr-ref-guide/src/collections-api.adoc | 9 + 5 files changed, 843 insertions(+), 353 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 65604c2e6c4..5f770a60353 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -284,6 +284,8 @@ Bug Fixes * SOLR-13137: NPE when /admin/zookeeper/status endpoint hit in standalone mode (janhoy) +* SOLR-13091: REBALANCELEADERS is broken (Erick Erickson) + Improvements ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java index 953023f9153..f5672ba31cb 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java +++ b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java @@ -17,6 +17,7 @@ package org.apache.solr.cloud; +import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -39,6 +40,8 @@ import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES; import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SHARD_UNIQUE; @@ -46,6 +49,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.BA // Class to encapsulate processing replica properties that have at most one replica hosting a property per slice. class ExclusiveSliceProperty { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private ClusterState clusterState; private final boolean onlyActiveNodes; private final String property; @@ -235,6 +239,15 @@ class ExclusiveSliceProperty { adjustLimits(nodesHostingProp.get(nodeName)); removeSliceAlreadyHostedFromPossibles(srToChange.slice.getName()); addProp(srToChange.slice, srToChange.replica.getName()); + // When you set the property, you must insure that it is _removed_ from any other replicas. + for (Replica rep : srToChange.slice.getReplicas()) { + if (rep.getName().equals(srToChange.replica.getName())) { + continue; + } + if (rep.getProperty(property) != null) { + removeProp(srToChange.slice, srToChange.replica.getName()); + } + } } } @@ -266,10 +279,12 @@ class ExclusiveSliceProperty { } private void removeProp(Slice origSlice, String replicaName) { + log.debug("Removing property {} from slice {}, replica {}", property, origSlice.getName(), replicaName); getReplicaFromChanged(origSlice, replicaName).getProperties().remove(property); } private void addProp(Slice origSlice, String replicaName) { + log.debug("Adding property {} to slice {}, replica {}", property, origSlice.getName(), replicaName); getReplicaFromChanged(origSlice, replicaName).getProperties().put(property, "true"); } @@ -342,5 +357,10 @@ class ExclusiveSliceProperty { this.slice = slice; this.replica = replica; } + public String toString() { + StringBuilder sb = new StringBuilder(System.lineSeparator()).append(System.lineSeparator()).append("******EOE20 starting toString of SliceReplica"); + sb.append(" :").append(System.lineSeparator()).append("slice: ").append(slice.toString()).append(System.lineSeparator()).append(" replica: ").append(replica.toString()).append(System.lineSeparator()); + return sb.toString(); + } } } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java index f0819bdded9..522a432fe05 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java @@ -18,10 +18,13 @@ package org.apache.solr.handler.admin; import java.lang.invoke.MethodHandles; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.solr.cloud.LeaderElector; @@ -55,13 +58,62 @@ import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; + +/** + * The end point for the collections API REBALANCELEADERS call that actually does the work. + *

+ * Overview: + *

+ * The leader election process is that each replica of a shard watches one, and only one other replica via + * ephemeral nodes in ZooKeeper. When the node being watched goes down, the node watching it is sent a notification + * and, if the node being watched is the leader, the node getting the notification assumes leadership. + *

+ * ZooKeeper's ephemeral nodes get a monotonically increasing "sequence number" that defines it's position in the queue + *

+ * So to force a particular node to become a leader it must have a watch on the leader. This can lead to two nodes + * having the same sequence number. Say the process is this + * replica1 is the leader (seq 1) + * replica3 is on a Solr node that happens to be started next, it watches the leader (seq2) + * replica2 is on the next Solr node started. It will _also_ watch the leader, it's sequence number is 2 exactly + * like replica3s + *

+ * This is true on startup, but can also be a consequence of, say, a replica going into recovery. It's no longer + * eligible to become leader, so will be put at the end of the queue by default. So there's code to put it in the + * queue with the same sequence number as the current second replica. + *

+ * To compilcate matters further, when the nodes are sorted (see OverseerTaskProcessor.getSortedElectionNodes) + * the primary sort is on the sequence number, secondary sort on the session ID. So the preferredLeader may + * or may not be second in that list. + *

+ * what all this means is that when the REBALANCELEADER command is issued, this class examines the election queue and + * performs just three things for each shard in the collection: + *

+ * 1> insures that the preferredLeader is watching the leader (rejoins the election queue at the head) + *

+ * 2> if there are two ephemeral nodes with the same sequence number watching the leader, and if one of them is the + * preferredLeader it will send the _other_ node to the end of the queue (rejoins it) + *

+ * 3> rejoins the zeroth entry in the list at the end of the queue, which triggers the watch on the preferredLeader + * replica which then takes over leadership + *

+ * All this of course assuming the preferedLeader is alive and well and is assigned for any given shard. + */ + class RebalanceLeaders { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - + final SolrQueryRequest req; final SolrQueryResponse rsp; final CollectionsHandler collectionsHandler; final CoreContainer coreContainer; + private final Set asyncRequests = new HashSet<>(); + final static String INACTIVE_PREFERREDS = "inactivePreferreds"; + final static String ALREADY_LEADERS = "alreadyLeaders"; + final static String SUMMARY = "Summary"; + final NamedList results = new NamedList<>(); + final Map pendingOps = new HashMap<>(); + private String collectionName; + RebalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler collectionsHandler) { this.req = req; @@ -71,38 +123,29 @@ class RebalanceLeaders { } void execute() throws KeeperException, InterruptedException { - req.getParams().required().check(COLLECTION_PROP); + DocCollection dc = checkParams(); + - String collectionName = req.getParams().get(COLLECTION_PROP); - if (StringUtils.isBlank(collectionName)) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the Rebalance Leaders command.")); - } - coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName); - ClusterState clusterState = coreContainer.getZkController().getClusterState(); - DocCollection dc = clusterState.getCollection(collectionName); - if (dc == null) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken."); - } - Map currentRequests = new HashMap<>(); int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE); if (max <= 0) max = Integer.MAX_VALUE; int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60); - NamedList results = new NamedList<>(); + + // If there are a maximum number of simultaneous requests specified, we have to pause when we have that many + // outstanding requests and wait for at least one to finish before going on the the next rebalance. boolean keepGoing = true; for (Slice slice : dc.getSlices()) { - ensurePreferredIsLeader(results, slice, currentRequests); - if (currentRequests.size() == max) { + ensurePreferredIsLeader(slice); + if (asyncRequests.size() == max) { log.info("Queued " + max + " leader reassignments, waiting for some to complete."); - keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results); + keepGoing = waitAsyncRequests(maxWaitSecs, false); if (keepGoing == false) { break; // If we've waited longer than specified, don't continue to wait! } } } if (keepGoing == true) { - keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results); + keepGoing = waitAsyncRequests(maxWaitSecs, true); } if (keepGoing == true) { log.info("All leader reassignments completed."); @@ -110,15 +153,72 @@ class RebalanceLeaders { log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned"); } + checkLeaderStatus(); + NamedList summary = new NamedList<>(); + if (pendingOps.size() == 0) { + summary.add("Success", "All active replicas with the preferredLeader property set are leaders"); + } else { + summary.add("Failure", "Not all active replicas with preferredLeader property are leaders"); + } + rsp.getValues().add(SUMMARY, summary); // we want this first. + rsp.getValues().addAll(results); } - private void ensurePreferredIsLeader(NamedList results, - Slice slice, Map currentRequests) throws KeeperException, InterruptedException { - final String inactivePreferreds = "inactivePreferreds"; - final String alreadyLeaders = "alreadyLeaders"; - String collectionName = req.getParams().get(COLLECTION_PROP); + // Insure that ll required parameters are there and the doc colection exists. + private DocCollection checkParams() throws KeeperException, InterruptedException { + req.getParams().required().check(COLLECTION_PROP); + collectionName = req.getParams().get(COLLECTION_PROP); + if (StringUtils.isBlank(collectionName)) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the Rebalance Leaders command.")); + } + coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName); + ClusterState clusterState = coreContainer.getZkController().getClusterState(); + + DocCollection dc = clusterState.getCollection(collectionName); + if (dc == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken."); + } + return dc; + } + + // Once we've done all the fiddling with the queues, check on the way out to see if all the active preferred + // leaders that we intended to change are in fact the leaders. + private void checkLeaderStatus() throws InterruptedException, KeeperException { + for (int idx = 0; pendingOps.size() > 0 && idx < 600; ++idx) { + ClusterState clusterState = coreContainer.getZkController().getClusterState(); + Set liveNodes = clusterState.getLiveNodes(); + DocCollection dc = clusterState.getCollection(collectionName); + for (Slice slice : dc.getSlices()) { + for (Replica replica : slice.getReplicas()) { + if (replica.isActive(liveNodes) && replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false)) { + if (replica.getBool(LEADER_PROP, false)) { + if (pendingOps.containsKey(slice.getName())) { + // Record for return that the leader changed successfully + pendingOps.remove(slice.getName()); + addToSuccesses(slice, replica); + break; + } + } + } + } + } + TimeUnit.MILLISECONDS.sleep(100); + coreContainer.getZkController().getZkStateReader().forciblyRefreshAllClusterStateSlow(); + } + addAnyFailures(); + } + + // The process is: + // if the replica with preferredLeader is already the leader, do nothing + // Otherwise: + // > if two nodes have the same sequence number and both point to the current leader, we presume that we've just + // moved it, move the one that does _not_ have the preferredLeader to the end of the list. + // > move the current leader to the end of the list. This _should_ mean that the current ephemeral node in the + // leader election queue is removed and the only remaining node watching it is triggered to become leader. + private void ensurePreferredIsLeader(Slice slice) throws KeeperException, InterruptedException { for (Replica replica : slice.getReplicas()) { // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) { @@ -127,87 +227,126 @@ class RebalanceLeaders { // OK, we are the preferred leader, are we the actual leader? if (replica.getBool(LEADER_PROP, false)) { //We're a preferred leader, but we're _also_ the leader, don't need to do anything. - NamedList noops = (NamedList) results.get(alreadyLeaders); - if (noops == null) { - noops = new NamedList<>(); - results.add(alreadyLeaders, noops); - } - NamedList res = new NamedList<>(); - res.add("status", "success"); - res.add("msg", "Already leader"); - res.add("shard", slice.getName()); - res.add("nodeName", replica.getNodeName()); - noops.add(replica.getName(), res); + addAlreadyLeaderToResults(slice, replica); return; // already the leader, do nothing. } - + ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader(); // We're the preferred leader, but someone else is leader. Only become leader if we're active. - if (replica.getState() != Replica.State.ACTIVE) { - NamedList inactives = (NamedList) results.get(inactivePreferreds); - if (inactives == null) { - inactives = new NamedList<>(); - results.add(inactivePreferreds, inactives); - } - NamedList res = new NamedList<>(); - res.add("status", "skipped"); - res.add("msg", "Node is a referredLeader, but it's inactive. Skipping"); - res.add("shard", slice.getName()); - res.add("nodeName", replica.getNodeName()); - inactives.add(replica.getName(), res); + if (replica.isActive(zkStateReader.getClusterState().getLiveNodes()) == false) { + addInactiveToResults(slice, replica); return; // Don't try to become the leader if we're not active! } + List electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), + ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); + + if (electionQueueInBadState(electionNodes, slice, replica)) { + return; + } + // Replica is the preferred leader but not the actual leader, do something about that. // "Something" is // 1> if the preferred leader isn't first in line, tell it to re-queue itself. // 2> tell the actual leader to re-queue itself. - ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader(); - - List electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), - ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); - - if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway. - log.info("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " + - "election queue, but replica " + replica.getName() + " doesn't think it's the leader."); - return; - } - // Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole // string is used, but that sorts nodes with the same sequence number by their session IDs from ZK. // While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are // watching the leader node.. + String firstWatcher = electionNodes.get(1); if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) { - makeReplicaFirstWatcher(collectionName, slice, replica); + makeReplicaFirstWatcher(slice, replica); } - String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP); - rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false); - waitForNodeChange(collectionName, slice, electionNodes.get(0)); - + // This replica should be the leader at the end of the day, so let's record that information to check at the end + pendingOps.put(slice.getName(), replica.getName()); + String leaderElectionNode = electionNodes.get(0); + String coreName = slice.getReplica(LeaderElector.getNodeName(leaderElectionNode)).getStr(CORE_NAME_PROP); + rejoinElectionQueue(slice, leaderElectionNode, coreName, false); + waitForNodeChange(slice, leaderElectionNode); return; // Done with this slice, skip the rest of the replicas. } } + + // Check that the election queue has some members! There really should be two or more for this to make any sense, + // if there's only one we can't change anything. + private boolean electionQueueInBadState(List electionNodes, Slice slice, Replica replica) { + if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway. + log.warn("Rebalancing leaders and slice {} has less than two elements in the leader " + + "election queue, but replica {} doesn't think it's the leader.", slice.getName(), replica.getName()); + return true; + } + + return false; + } + + // Provide some feedback to the user about what actually happened, or in this case where no action was + // possible + private void addInactiveToResults(Slice slice, Replica replica) { + NamedList inactives = (NamedList) results.get(INACTIVE_PREFERREDS); + if (inactives == null) { + inactives = new NamedList<>(); + results.add(INACTIVE_PREFERREDS, inactives); + } + NamedList res = new NamedList<>(); + res.add("status", "skipped"); + res.add("msg", "Replica " + replica.getName() + " is a referredLeader for shard " + slice.getName() + ", but is inactive. No change necessary"); + inactives.add(replica.getName(), res); + } + + // Provide some feedback to the user about what actually happened, or in this case where no action was + // necesary since this preferred replica was already the leader + private void addAlreadyLeaderToResults(Slice slice, Replica replica) { + NamedList alreadyLeaders = (NamedList) results.get(ALREADY_LEADERS); + if (alreadyLeaders == null) { + alreadyLeaders = new NamedList<>(); + results.add(ALREADY_LEADERS, alreadyLeaders); + } + NamedList res = new NamedList<>(); + res.add("status", "skipped"); + res.add("msg", "Replica " + replica.getName() + " is already the leader for shard " + slice.getName() + ". No change necessary"); + alreadyLeaders.add(replica.getName(), res); + } + // Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list - void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica) + // There can be "ties", i.e. replicas in the queue with the same sequence number. Sorting doesn't necessarily sort + // the one we most care about first. So put the node we _don't care about at the end of the election queuel + + void makeReplicaFirstWatcher(Slice slice, Replica replica) throws KeeperException, InterruptedException { ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader(); List electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); - // First, queue up the preferred leader at the head of the queue. + // First, queue up the preferred leader watching the leader if it isn't already + int secondSeq = Integer.MAX_VALUE; + + int candidateSeq = -1; + for (int idx = 1; idx < electionNodes.size(); ++idx) { + String candidate = electionNodes.get(idx); + secondSeq = Math.min(secondSeq, LeaderElector.getSeq(candidate)); + if (LeaderElector.getNodeName(candidate).equals(replica.getName())) { + candidateSeq = LeaderElector.getSeq(candidate); + } + } int newSeq = -1; - for (String electionNode : electionNodes) { - if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) { - String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP); - rejoinElection(collectionName, slice, electionNode, coreName, true); - newSeq = waitForNodeChange(collectionName, slice, electionNode); - break; + if (candidateSeq == secondSeq) { + // the preferredLeader is already watching the leader, no need to move it around. + newSeq = secondSeq; + } else { + for (String electionNode : electionNodes) { + if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) { + // Make the preferred leader watch the leader. + String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP); + rejoinElectionQueue(slice, electionNode, coreName, true); + newSeq = waitForNodeChange(slice, electionNode); + break; + } } } if (newSeq == -1) { @@ -225,18 +364,22 @@ class RebalanceLeaders { if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) { continue; } + // We won't get here for the preferredLeader node if (LeaderElector.getSeq(thisNode) == newSeq) { String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP); - rejoinElection(collectionName, slice, thisNode, coreName, false); - waitForNodeChange(collectionName, slice, thisNode); + rejoinElectionQueue(slice, thisNode, coreName, false); + waitForNodeChange(slice, thisNode); } } } - int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException { + // We're just waiting for the electionNode to rejoin the queue with a _different_ node, indicating that any + // requeueing we've done has happened. + int waitForNodeChange(Slice slice, String electionNode) throws InterruptedException, KeeperException { String nodeName = LeaderElector.getNodeName(electionNode); int oldSeq = LeaderElector.getSeq(electionNode); for (int idx = 0; idx < 600; ++idx) { + ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader(); List electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); @@ -245,14 +388,16 @@ class RebalanceLeaders { return LeaderElector.getSeq(testNode); } } - - Thread.sleep(100); + TimeUnit.MILLISECONDS.sleep(100); + zkStateReader.forciblyRefreshAllClusterStateSlow(); } return -1; } - - private void rejoinElection(String collectionName, Slice slice, String electionNode, String core, - boolean rejoinAtHead) throws KeeperException, InterruptedException { + + // Move an election node to some other place in the queue. If rejoinAtHead==false, then at the end, otherwise + // the new node should point at the leader. + private void rejoinElectionQueue(Slice slice, String electionNode, String core, boolean rejoinAtHead) + throws KeeperException, InterruptedException { Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode)); Map propMap = new HashMap<>(); propMap.put(COLLECTION_PROP, collectionName); @@ -265,64 +410,84 @@ class RebalanceLeaders { propMap.put(ELECTION_NODE_PROP, electionNode); String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime()); propMap.put(ASYNC, asyncId); + asyncRequests.add(asyncId); + collectionsHandler.sendToOCPQueue(new ZkNodeProps(propMap)); // ignore response; we construct our own } - // currentAsyncIds - map of request IDs and reporting data (value) // maxWaitSecs - How long are we going to wait? Defaults to 30 seconds. - // waitForAll - if true, do not return until all assignments have been made. - // results - a place to stash results for reporting back to the user. + // waitForAll - if true, do not return until all requests have been processed. "Processed" could mean failure! // - private boolean waitForLeaderChange(Map currentAsyncIds, final int maxWaitSecs, - Boolean waitForAll, NamedList results) + + private boolean waitAsyncRequests(final int maxWaitSecs, Boolean waitForAll) throws KeeperException, InterruptedException { - if (currentAsyncIds.size() == 0) return true; + if (asyncRequests.size() == 0) { + return true; + } for (int idx = 0; idx < maxWaitSecs * 10; ++idx) { - Iterator> iter = currentAsyncIds.entrySet().iterator(); + Iterator iter = asyncRequests.iterator(); boolean foundChange = false; while (iter.hasNext()) { - Map.Entry pair = iter.next(); - String asyncId = pair.getKey(); + String asyncId = iter.next(); if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) { coreContainer.getZkController().getOverseerFailureMap().remove(asyncId); coreContainer.getZkController().clearAsyncId(asyncId); - NamedList fails = (NamedList) results.get("failures"); - if (fails == null) { - fails = new NamedList<>(); - results.add("failures", fails); - } - NamedList res = new NamedList<>(); - res.add("status", "failed"); - res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader"); - fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res); iter.remove(); foundChange = true; } else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) { coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId); coreContainer.getZkController().clearAsyncId(asyncId); - NamedList successes = (NamedList) results.get("successes"); - if (successes == null) { - successes = new NamedList<>(); - results.add("successes", successes); - } - NamedList res = new NamedList<>(); - res.add("status", "success"); - res.add("msg", "Assigned '" + pair.getValue() + "' to be leader"); - successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res); iter.remove(); foundChange = true; } } // We're done if we're processing a few at a time or all requests are processed. - if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) { + // We don't want to change, say, 100s of leaders simultaneously. So if the request specifies some limit, + // and we're at that limit, we want to return to the caller so it can immediately add another request. + // That's the purpose of the first clause here. Otherwise, of course, just return if all requests are + // processed. + if ((foundChange && waitForAll == false) || asyncRequests.size() == 0) { return true; } - Thread.sleep(100); //TODO: Is there a better thing to do than sleep here? + TimeUnit.MILLISECONDS.sleep(100); } + // If we get here, we've timed out waiting. return false; } + // If we actually changed the leader, we should send that fact back in the response. + private void addToSuccesses(Slice slice, Replica replica) { + NamedList successes = (NamedList) results.get("successes"); + if (successes == null) { + successes = new NamedList<>(); + results.add("successes", successes); + } + log.info("Successfully changed leader of shard {} to replica {}", slice.getName(), replica.getName()); + NamedList res = new NamedList<>(); + res.add("status", "success"); + res.add("msg", "Successfully changed leader of slice " + slice.getName() + " to " + replica.getName()); + successes.add(slice.getName(), res); + } + // If for any reason we were supposed to change leadership, that should be recorded in changingLeaders. Any + // time we verified that the change actually occurred, that entry should have been removed. So report anything + // left over as a failure. + private void addAnyFailures() { + if (pendingOps.size() == 0) { + return; + } + NamedList fails = (NamedList) new NamedList<>(); + results.add("failures", fails); + + for (Map.Entry ent : pendingOps.entrySet()) { + log.info("Failed to change leader of shard {} to replica {}", ent.getKey(), ent.getValue()); + NamedList res = new NamedList<>(); + res.add("status", "failed"); + res.add("msg", String.format(Locale.ROOT, "Could not change leder for slice %s to %s", ent.getKey(), ent.getValue())); + fails.add(ent.getKey(), res); + + } + } } diff --git a/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java b/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java index b47424fe956..b207fa3c91c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java @@ -15,335 +15,629 @@ * limitations under the License. */ package org.apache.solr.cloud; + import java.io.IOException; import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; +import org.apache.lucene.util.LuceneTestCase; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.TimeSource; -import org.apache.solr.common.util.Utils; import org.apache.solr.util.TimeOut; import org.apache.zookeeper.KeeperException; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -public class TestRebalanceLeaders extends AbstractFullDistribZkTestBase { +@LuceneTestCase.Slow +public class TestRebalanceLeaders extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public static final String COLLECTION_NAME = "testcollection"; + private static final String COLLECTION_NAME = "TestColl"; + + private static int numNodes; + private static int numShards; + private static int numReplicas; + + private static boolean useAdminToSetProps = false; + + @BeforeClass + public static void setupCluster() throws Exception { + + numNodes = random().nextInt(4) + 4; + numShards = random().nextInt(3) + 3; + numReplicas = random().nextInt(2) + 2; + useAdminToSetProps = random().nextBoolean(); + + configureCluster(numNodes) + .addConfig(COLLECTION_NAME, configset("cloud-minimal")) + .configure(); + + CollectionAdminResponse resp = CollectionAdminRequest.createCollection(COLLECTION_NAME, COLLECTION_NAME, + numShards, numReplicas, 0, 0) + .setMaxShardsPerNode((numShards * numReplicas) / numNodes + 1) + .process(cluster.getSolrClient()); + assertEquals("Admin request failed; ", 0, resp.getStatus()); + cluster.waitForActiveCollection(COLLECTION_NAME, numShards, numShards * numReplicas); - public TestRebalanceLeaders() { - schemaString = "schema15.xml"; // we need a string id - sliceCount = 4; } - int reps = 10; + @Before + public void removeAllProperties() throws KeeperException, InterruptedException { + forceUpdateCollectionStatus(); + DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME); + for (Slice slice : docCollection.getSlices()) { + for (Replica rep : slice.getReplicas()) { + rep.getProperties().forEach((key, value) -> { + if (key.startsWith("property.")) { + try { + delProp(slice, rep, key); + } catch (IOException | SolrServerException e) { + fail("Caught unexpected exception in @Before " + e.getMessage()); + } + } + }); + } + } + } + int timeoutMs = 60000; - Map> initial = new HashMap<>(); - Map expected = new HashMap<>(); + // test that setting an arbitrary "slice unique" property un-sets the property if it's on another replica in the + // slice. This is testing when the property is set on an _individual_ replica whereas testBalancePropertySliceUnique + // tests whether changing an individual _replica_ un-sets the property on other replicas _in that slice_. + // + // NOTE: There were significant problems because at one point the code implicitly defined + // shardUnique=true for the special property preferredLeader. That was removed at one point so we're explicitly + // testing that as well. @Test - @ShardsFixed(num = 4) - public void test() throws Exception { - reps = random().nextInt(9) + 1; // make sure and do at least one. - try (CloudSolrClient client = createCloudClient(null)) { - // Mix up a bunch of different combinations of shards and replicas in order to exercise boundary cases. - // shards, replicationfactor, maxreplicaspernode - int shards = random().nextInt(7); - if (shards < 2) shards = 2; - int rFactor = random().nextInt(4); - if (rFactor < 2) rFactor = 2; - createCollection(null, COLLECTION_NAME, shards, rFactor, shards * rFactor + 1, client, null, "conf1"); - } - - waitForCollection(cloudClient.getZkStateReader(), COLLECTION_NAME, 2); - waitForRecoveriesToFinish(COLLECTION_NAME, false); - - listCollection(); - rebalanceLeaderTest(); + public void testSetArbitraryPropertySliceUnique() throws IOException, SolrServerException, InterruptedException, KeeperException { + // Check both special (preferredLeader) and something arbitrary. + doTestSetArbitraryPropertySliceUnique("foo" + random().nextInt(1_000_000)); + removeAllProperties(); + doTestSetArbitraryPropertySliceUnique("preferredleader"); } - private void listCollection() throws IOException, SolrServerException { - //CloudSolrServer client = createCloudClient(null); - try { - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set("action", CollectionParams.CollectionAction.LIST.toString()); - SolrRequest request = new QueryRequest(params); - request.setPath("/admin/collections"); - NamedList rsp = cloudClient.request(request); - List collections = (List) rsp.get("collections"); - assertTrue("control_collection was not found in list", collections.contains("control_collection")); - assertTrue(DEFAULT_COLLECTION + " was not found in list", collections.contains(DEFAULT_COLLECTION)); - assertTrue(COLLECTION_NAME + " was not found in list", collections.contains(COLLECTION_NAME)); - } finally { - //remove collections - //client.shutdown(); + // Test that automatically distributing a slice unique property un-sets that property if it's in any other replica + // on that slice. + // This is different than the test above. The test above sets individual properties on individual nodes. This one + // relies on Solr to pick which replicas to set the property on + @Test + public void testBalancePropertySliceUnique() throws KeeperException, InterruptedException, IOException, SolrServerException { + // Check both cases of "special" property preferred(Ll)eader + doTestBalancePropertySliceUnique("foo" + random().nextInt(1_000_000)); + removeAllProperties(); + doTestBalancePropertySliceUnique("preferredleader"); + } + + // We've moved on from a property being tested, we need to check if rebalancing the leaders actually chantges the + // leader appropriately. + @Test + public void testRebalanceLeaders() throws Exception { + + // First let's unbalance the preferredLeader property, do all the leaders get reassigned properly? + concentrateProp("preferredLeader"); + sendRebalanceCommand(); + checkPreferredsAreLeaders(); + + // Now follow up by evenly distributing the property as well as possible. + doTestBalancePropertySliceUnique("preferredLeader"); + sendRebalanceCommand(); + checkPreferredsAreLeaders(); + + // Now check the condition we saw "in the wild" where you could not rebalance properly when Jetty was restarted. + concentratePropByRestartingJettys(); + sendRebalanceCommand(); + checkPreferredsAreLeaders(); + } + + // Insure that the property is set on only one replica per slice when changing a unique property on an individual + // replica. + private void doTestSetArbitraryPropertySliceUnique(String propIn) throws InterruptedException, KeeperException, IOException, SolrServerException { + final String prop = (random().nextBoolean()) ? propIn : propIn.toUpperCase(Locale.ROOT); + // First set the property in some replica in some slice + forceUpdateCollectionStatus(); + DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME); + + Slice[] slices = docCollection.getSlices().toArray(new Slice[0]); + Slice slice = slices[random().nextInt(slices.length)]; + + // Bounce around a bit setting this property and insure it's only set in one replica. + Replica[] reps = slice.getReplicas().toArray(new Replica[0]); + for (int idx = 0; idx < 4; ++idx) { + Replica rep = reps[random().nextInt(reps.length)]; + // Set the property on a particular replica + setProp(slice, rep, prop); + TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); + + long count = 0; + boolean rightRep = false; + Slice modSlice; + DocCollection modColl = null; // keeps IDE happy + + // insure that no other replica in that slice has the property when we return. + while (timeout.hasTimedOut() == false) { + forceUpdateCollectionStatus(); + modColl = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME); + modSlice = modColl.getSlice(slice.getName()); + rightRep = modSlice.getReplica(rep.getName()).getBool("property." + prop.toLowerCase(Locale.ROOT), false); + count = modSlice.getReplicas().stream().filter(thisRep -> thisRep.getBool("property." + prop.toLowerCase(Locale.ROOT), false)).count(); + + if (count == 1 && rightRep) { + break; + } + + TimeUnit.MILLISECONDS.sleep(100); + } + if (count != 1 || rightRep == false) { + fail("The property " + prop + " was not uniquely distributed in slice " + slice.getName() + + " " + modColl.toString()); + } } } - void recordInitialState() throws InterruptedException { - Map slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap(); - // Assemble a list of all the replicas for all the shards in a convenient way to look at them. - for (Map.Entry ent : slices.entrySet()) { - initial.put(ent.getKey(), new ArrayList<>(ent.getValue().getReplicas())); - } - } - - void rebalanceLeaderTest() throws InterruptedException, IOException, SolrServerException, KeeperException { - recordInitialState(); - for (int idx = 0; idx < reps; ++idx) { - issueCommands(); - checkConsistency(); - } - } - - // After we've called the rebalance command, we want to insure that: - // 1> all replicas appear once and only once in the respective leader election queue - // 2> All the replicas we _think_ are leaders are in the 0th position in the leader election queue. - // 3> The node that ZooKeeper thinks is the leader is the one we think should be the leader. - void checkConsistency() throws InterruptedException, KeeperException { + // Fail if we the replicas with the preferredLeader property are _not_ also the leaders. + private void checkPreferredsAreLeaders() throws InterruptedException, KeeperException { + // Make sure that the shard unique are where you expect. TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); - boolean checkAppearOnce = false; - boolean checkElectionZero = false; - boolean checkZkLeadersAgree = false; - while (!timeout.hasTimedOut()) { - checkAppearOnce = checkAppearOnce(); - checkElectionZero = checkElectionZero(); - checkZkLeadersAgree = checkZkLeadersAgree(); - if (checkAppearOnce && checkElectionZero && checkZkLeadersAgree) { + + while (timeout.hasTimedOut() == false) { + if (checkPreferredsAreLeaders(false)) { + // Ok, all preferreds are leaders. Just for Let's also get the election queue and guarantee that every + // live replica is in the queue and none are repeated. + checkElectionQueues(); return; } - Thread.sleep(1000); + TimeUnit.MILLISECONDS.sleep(100); } - fail("Checking the rebalance leader command failed, checkAppearOnce=" + checkAppearOnce + " checkElectionZero=" - + checkElectionZero + " checkZkLeadersAgree=" + checkZkLeadersAgree); + log.error("Leaders are not all preferres {}", cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME)); + // Show the errors + checkPreferredsAreLeaders(true); } + // Do all active nodes in each slice appear exactly once in the slice's leader election queue? + // Since we assert that the number of live replicas is the same size as the leader election queue, we only + // have to compare one way. + private void checkElectionQueues() throws KeeperException, InterruptedException { - // Do all the nodes appear exactly once in the leader election queue and vice-versa? - Boolean checkAppearOnce() throws KeeperException, InterruptedException { + DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME); + Set liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes(); - for (Map.Entry> ent : initial.entrySet()) { - List leaderQueue = cloudClient.getZkStateReader().getZkClient().getChildren("/collections/" + COLLECTION_NAME + - "/leader_elect/" + ent.getKey() + "/election", null, true); - - if (leaderQueue.size() != ent.getValue().size()) { - return false; - } - // Check that each election node has a corresponding replica. - for (String electionNode : leaderQueue) { - if (checkReplicaName(LeaderElector.getNodeName(electionNode), ent.getValue())) { - continue; + for (Slice slice : docCollection.getSlices()) { + Set liveReplicas = new HashSet<>(); + slice.getReplicas().forEach(replica -> { + if (replica.isActive(liveNodes)) { + liveReplicas.add(replica); } - return false; - } - // Check that each replica has an election node. - for (Replica rep : ent.getValue()) { - if (checkElectionNode(rep.getName(), leaderQueue)) { - continue; - } - return false; - } + }); + checkOneQueue(docCollection, slice, liveReplicas); } - return true; } - // Check that the given name is in the leader election queue - Boolean checkElectionNode(String repName, List leaderQueue) { + // Helper method to check one leader election queue's consistency. + private void checkOneQueue(DocCollection coll, Slice slice, Set liveReplicas) throws KeeperException, InterruptedException { + + List leaderQueue = cluster.getSolrClient().getZkStateReader().getZkClient().getChildren("/collections/" + COLLECTION_NAME + + "/leader_elect/" + slice.getName() + "/election", null, true); + + if (leaderQueue.size() != liveReplicas.size()) { + + log.error("One or more replicas is missing from the leader election queue! Slice {}, election queue: {}, collection: {}" + , slice.getName(), leaderQueue, coll); + fail("One or more replicas is missing from the leader election queue"); + } + // Check that each election node has a corresponding live replica. for (String electionNode : leaderQueue) { - if (repName.equals(LeaderElector.getNodeName(electionNode))) { - return true; + String replica = LeaderElector.getNodeName(electionNode); + if (slice.getReplica(replica) == null) { + log.error("Replica {} is not in the election queue: {}", replica, leaderQueue); + fail("Replica is not in the election queue!"); } } - return false; } - // Check that the name passed in corresponds to a replica. - Boolean checkReplicaName(String toCheck, List replicas) { - for (Replica rep : replicas) { - if (toCheck.equals(rep.getName())) { - return true; - } - } - return false; - } - - // Get the shard leader election from ZK and sort it. The node may not actually be there, so retry - List getOverseerSort(String key) { - List ret = null; - try { - ret = OverseerCollectionConfigSetProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(), - "/collections/" + COLLECTION_NAME + "/leader_elect/" + key + "/election"); - return ret; - } catch (KeeperException e) { - cloudClient.connect(); - } catch (InterruptedException e) { - return null; - } - return null; - } - - // Is every node we think is the leader in the zeroth position in the leader election queue? - Boolean checkElectionZero() { - for (Map.Entry ent : expected.entrySet()) { - - List leaderQueue = getOverseerSort(ent.getKey()); - if (leaderQueue == null) return false; - - String electName = LeaderElector.getNodeName(leaderQueue.get(0)); - String coreName = ent.getValue().getName(); - if (electName.equals(coreName) == false) { - return false; + // Just an encapsulation for checkPreferredsAreLeaders to make returning easier. + // the doAsserts var is to actually print the problem and fail the test if the condition is not met. + private boolean checkPreferredsAreLeaders(boolean doAsserts) throws KeeperException, InterruptedException { + forceUpdateCollectionStatus(); + DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME); + for (Slice slice : docCollection.getSlices()) { + for (Replica rep : slice.getReplicas()) { + if (rep.getBool("property.preferredleader", false)) { + boolean isLeader = rep.getBool("leader", false); + if (doAsserts) { + assertTrue("PreferredLeader should be the leader: ", isLeader); + } else if (isLeader == false) { + return false; + } + } } } return true; } - // Do who we _think_ should be the leader agree with the leader nodes? - Boolean checkZkLeadersAgree() throws KeeperException, InterruptedException { - for (Map.Entry ent : expected.entrySet()) { - - String path = "/collections/" + COLLECTION_NAME + "/leaders/" + ent.getKey() + "/leader"; - byte[] data = getZkData(cloudClient, path); - if (data == null) { - log.warn("path to check not found {}", path); - return false; - } - - String repCore = null; - String zkCore = null; - - Map m = (Map) Utils.fromJSON(data); - zkCore = (String) m.get("core"); - repCore = ent.getValue().getStr("core"); - if (zkCore.equals(repCore) == false) { - log.warn("leader in zk does not match what we expect: {} != {}", zkCore, repCore); - return false; - } - - } - return true; - } - - byte[] getZkData(CloudSolrClient client, String path) { - org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat(); - try { - byte[] data = client.getZkStateReader().getZkClient().getData(path, null, stat, true); - if (data != null) { - return data; - } - } catch (KeeperException.NoNodeException e) { - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - return null; - } - } catch (InterruptedException | KeeperException e) { - return null; - } - return null; - } - - // It's OK not to check the return here since the subsequent tests will fail. - void issueCommands() throws IOException, SolrServerException, KeeperException, InterruptedException { - - // Find a replica to make the preferredLeader. NOTE: may be one that's _already_ leader! - expected.clear(); - for (Map.Entry> ent : initial.entrySet()) { - List replicas = ent.getValue(); - Replica rep = replicas.get(Math.abs(random().nextInt()) % replicas.size()); - expected.put(ent.getKey(), rep); - issuePreferred(ent.getKey(), rep); - } - - if (waitForAllPreferreds() == false) { - fail("Waited for timeout for preferredLeader assignments to be made and they werent."); - } - //fillExpectedWithCurrent(); - // Now rebalance the leaders randomly using SolrJ or direct call - if(random().nextBoolean()) + // Arbitrarily send the rebalance command either with the SolrJ interface or with an HTTP request. + private void sendRebalanceCommand() throws SolrServerException, InterruptedException, IOException { + if (random().nextBoolean()) { rebalanceLeaderUsingSolrJAPI(); - else - rebalanceLeaderUsingDirectCall(); + } else { + rebalanceLeaderUsingStandardRequest(); + } + } + + // Helper method to make sure the property is _unbalanced_ first, then it gets properly re-assigned with the + // BALANCESHARDUNIQUE command. + private void doTestBalancePropertySliceUnique(String propIn) throws InterruptedException, IOException, KeeperException, SolrServerException { + final String prop = (random().nextBoolean()) ? propIn : propIn.toUpperCase(Locale.ROOT); + + // Concentrate the properties on as few replicas a possible + concentrateProp(prop); + + // issue the BALANCESHARDUNIQUE command + rebalancePropAndCheck(prop); + + // Verify that there are no more than one replica with the property per shard. + verifyPropUniquePerShard(prop); + + // Verify that the property is reasonably evenly distributed + verifyPropCorrectlyDistributed(prop); } - private void rebalanceLeaderUsingSolrJAPI() throws IOException, SolrServerException { - CollectionAdminRequest.RebalanceLeaders rebalanceLeaders = CollectionAdminRequest.rebalanceLeaders(COLLECTION_NAME); - rebalanceLeaders.setMaxAtOnce(10) - .process(cloudClient); + private void verifyPropCorrectlyDistributed(String prop) throws KeeperException, InterruptedException { + + TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); + + String propLC = prop.toLowerCase(Locale.ROOT); + DocCollection docCollection = null; + while (timeout.hasTimedOut() == false) { + forceUpdateCollectionStatus(); + docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME); + int maxPropCount = Integer.MAX_VALUE; + int minPropCount = Integer.MIN_VALUE; + for (Slice slice : docCollection.getSlices()) { + int repCount = 0; + for (Replica rep : slice.getReplicas()) { + if (rep.getBool("property." + propLC, false)) { + repCount++; + } + } + maxPropCount = Math.max(maxPropCount, repCount); + minPropCount = Math.min(minPropCount, repCount); + } + if (Math.abs(maxPropCount - minPropCount) < 2) return; + } + log.error("Property {} is not distributed evenly. {}", prop, docCollection); + fail("Property is not distributed evenly " + prop); } - private void rebalanceLeaderUsingDirectCall() throws IOException, SolrServerException { + // Used when we concentrate the leader on a few nodes. + private void verifyPropDistributedAsExpected(Map expectedShardReplicaMap, String prop) throws InterruptedException, KeeperException { + // Make sure that the shard unique are where you expect. + TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); + + String propLC = prop.toLowerCase(Locale.ROOT); + boolean failure = false; + DocCollection docCollection = null; + while (timeout.hasTimedOut() == false) { + forceUpdateCollectionStatus(); + docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME); + failure = false; + for (Map.Entry ent : expectedShardReplicaMap.entrySet()) { + Replica rep = docCollection.getSlice(ent.getKey()).getReplica(ent.getValue()); + if (rep.getBool("property." + propLC, false) == false) { + failure = true; + } + } + if (failure == false) { + return; + } + TimeUnit.MILLISECONDS.sleep(100); + } + + fail(prop + " properties are not on the expected replicas: " + docCollection.toString() + + System.lineSeparator() + "Expected " + expectedShardReplicaMap.toString()); + } + + // Just check that the property is distributed as expectecd. This does _not_ rebalance the leaders + private void rebalancePropAndCheck(String prop) throws IOException, SolrServerException, InterruptedException, KeeperException { + + if (random().nextBoolean()) { + rebalancePropUsingSolrJAPI(prop); + } else { + rebalancePropUsingStandardRequest(prop); + } + } + + + private void rebalanceLeaderUsingSolrJAPI() throws IOException, SolrServerException, InterruptedException { + CollectionAdminResponse resp = CollectionAdminRequest + .rebalanceLeaders(COLLECTION_NAME) + .process(cluster.getSolrClient()); + assertTrue("All leaders should have been verified", resp.getResponse().get("Summary").toString().contains("Success")); + assertEquals("Admin request failed; ", 0, resp.getStatus()); + } + + private void rebalanceLeaderUsingStandardRequest() throws IOException, SolrServerException { ModifiableSolrParams params = new ModifiableSolrParams(); params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString()); - - // Insure we get error returns when omitting required parameters params.set("collection", COLLECTION_NAME); - params.set("maxAtOnce", "10"); - SolrRequest request = new QueryRequest(params); + QueryRequest request = new QueryRequest(params); request.setPath("/admin/collections"); - cloudClient.request(request); - + QueryResponse resp = request.process(cluster.getSolrClient()); + assertTrue("All leaders should have been verified", resp.getResponse().get("Summary").toString().contains("Success")); + assertEquals("Call to rebalanceLeaders failed ", 0, resp.getStatus()); } - void issuePreferred(String slice, Replica rep) throws IOException, SolrServerException, InterruptedException { + + private void rebalancePropUsingSolrJAPI(String prop) throws IOException, SolrServerException, InterruptedException { + // Don't set the value, that should be done automatically. + CollectionAdminResponse resp; + + if (prop.toLowerCase(Locale.ROOT).contains("preferredleader")) { + resp = CollectionAdminRequest + .balanceReplicaProperty(COLLECTION_NAME, prop) + .process(cluster.getSolrClient()); + + } else { + resp = CollectionAdminRequest + .balanceReplicaProperty(COLLECTION_NAME, prop) + .setShardUnique(true) + .process(cluster.getSolrClient()); + + } + assertEquals("Admin request failed; ", 0, resp.getStatus()); + } + + private void rebalancePropUsingStandardRequest(String prop) throws IOException, SolrServerException { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set("action", CollectionParams.CollectionAction.BALANCESHARDUNIQUE.toString()); + params.set("property", prop); + + params.set("collection", COLLECTION_NAME); + if (prop.toLowerCase(Locale.ROOT).contains("preferredleader") == false) { + params.set("shardUnique", true); + } + QueryRequest request = new QueryRequest(params); + request.setPath("/admin/collections"); + QueryResponse resp = request.process(cluster.getSolrClient()); + assertEquals("Call to rebalanceLeaders failed ", 0, resp.getStatus()); + } + + // This important. I've (Erick Erickson) run across a situation where the "standard request" causes failures, but + // never the Admin request. So let's test both all the time for a given test. + // + // This sets an _individual_ replica to have the property, not collection-wide + private void setProp(Slice slice, Replica rep, String prop) throws IOException, SolrServerException { + if (useAdminToSetProps) { + setPropWithAdminRequest(slice, rep, prop); + } else { + setPropWithStandardRequest(slice, rep, prop); + } + } + + void setPropWithStandardRequest(Slice slice, Replica rep, String prop) throws IOException, SolrServerException { ModifiableSolrParams params = new ModifiableSolrParams(); params.set("action", CollectionParams.CollectionAction.ADDREPLICAPROP.toString()); - // Insure we get error returns when omitting required parameters - params.set("collection", COLLECTION_NAME); - params.set("shard", slice); + params.set("shard", slice.getName()); params.set("replica", rep.getName()); - params.set("property", "preferredLeader"); + params.set("property", prop); params.set("property.value", "true"); + // Test to insure that implicit shardUnique is added for preferredLeader. + if (prop.toLowerCase(Locale.ROOT).equals("preferredleader") == false) { + params.set("shardUnique", "true"); + } SolrRequest request = new QueryRequest(params); request.setPath("/admin/collections"); - cloudClient.request(request); + cluster.getSolrClient().request(request); + String propLC = prop.toLowerCase(Locale.ROOT); + waitForState("Expecting property '" + prop + "'to appear on replica " + rep.getName(), COLLECTION_NAME, + (n, c) -> "true".equals(c.getReplica(rep.getName()).getProperty(propLC))); + } - boolean waitForAllPreferreds() throws KeeperException, InterruptedException { - boolean goAgain = true; - TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); - while (! timeout.hasTimedOut()) { - goAgain = false; - Map slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap(); + void setPropWithAdminRequest(Slice slice, Replica rep, String prop) throws IOException, SolrServerException { + boolean setUnique = (prop.toLowerCase(Locale.ROOT).equals("preferredleader") == false); + CollectionAdminRequest.AddReplicaProp addProp = + CollectionAdminRequest.addReplicaProperty(COLLECTION_NAME, slice.getName(), rep.getName(), prop, "true"); + if (setUnique) { + addProp.setShardUnique(true); + } + CollectionAdminResponse resp = addProp.process(cluster.getSolrClient()); + assertEquals(0, resp.getStatus()); + String propLC = prop.toLowerCase(Locale.ROOT); + waitForState("Expecting property '" + prop + "'to appear on replica " + rep.getName(), COLLECTION_NAME, + (n, c) -> "true".equals(c.getReplica(rep.getName()).getProperty(propLC))); - for (Map.Entry ent : expected.entrySet()) { - Replica me = slices.get(ent.getKey()).getReplica(ent.getValue().getName()); - if (me.getBool("property.preferredleader", false) == false) { - goAgain = true; - break; + } + + private void delProp(Slice slice, Replica rep, String prop) throws IOException, SolrServerException { + String propLC = prop.toLowerCase(Locale.ROOT); + CollectionAdminResponse resp = CollectionAdminRequest.deleteReplicaProperty(COLLECTION_NAME, slice.getName(), rep.getName(), propLC) + .process(cluster.getSolrClient()); + assertEquals("Admin request failed; ", 0, resp.getStatus()); + waitForState("Expecting property '" + prop + "' to be removed from replica " + rep.getName(), COLLECTION_NAME, + (n, c) -> c.getReplica(rep.getName()).getProperty(prop) == null); + } + + // Intentionally un-balance the property to insure that BALANCESHARDUNIQUE does its job. There was an odd case + // where rebalancing didn't work very well if the Solr nodes were stopped and restarted that worked perfectly + // when if the nodes were _not_ restarted in the test. So we have to test that too. + private void concentratePropByRestartingJettys() throws Exception { + + List jettys = new ArrayList<>(cluster.getJettySolrRunners()); + Collections.shuffle(jettys, random()); + jettys.remove(random().nextInt(jettys.size())); + // Now we have a list of jettys, and there is one missing. Stop all of the remaining jettys, then start them again + // to concentrate the leaders. It's not necessary that all shards have a leader. + + for (JettySolrRunner jetty : jettys) { + cluster.stopJettySolrRunner(jetty); + cluster.waitForJettyToStop(jetty); + } + checkReplicasInactive(jettys); + + for (int idx = 0; idx < jettys.size(); ++idx) { + cluster.startJettySolrRunner(jettys.get(idx)); + } + cluster.waitForAllNodes(60); + // the nodes are present, but are all replica active? + checkAllReplicasActive(); + } + + // while banging my nead against a wall, I put a lot of force refresh statements in. Want to leave them in + // but have this be a no-op so if we start to get failures, we can re-enable with minimal effort. + private void forceUpdateCollectionStatus() throws KeeperException, InterruptedException { + // cluster.getSolrClient().getZkStateReader().forceUpdateCollection(COLLECTION_NAME); + } + + // Since we have to restart jettys, we don't want to try rebalancing etc. until we're sure all jettys that should + // be up are up and all replicas are active. + private void checkReplicasInactive(List downJettys) throws KeeperException, InterruptedException { + TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); + DocCollection docCollection = null; + Set liveNodes = null; + + Set downJettyNodes = new TreeSet<>(); + for (JettySolrRunner jetty : downJettys) { + downJettyNodes.add(jetty.getBaseUrl().getHost() + ":" + jetty.getBaseUrl().getPort() + "_solr"); + } + while (timeout.hasTimedOut() == false) { + forceUpdateCollectionStatus(); + docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME); + liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes(); + boolean expectedInactive = true; + + for (Slice slice : docCollection.getSlices()) { + for (Replica rep : slice.getReplicas()) { + if (downJettyNodes.contains(rep.getNodeName()) == false) { + continue; // We are on a live node + } + // A replica on an allegedly down node is reported as active. + if (rep.isActive(liveNodes)) { + expectedInactive = false; + } } } - if (goAgain) { - Thread.sleep(250); - } else { - return true; + if (expectedInactive) { + return; } + TimeUnit.MILLISECONDS.sleep(100); } - return false; + fail("timed out waiting for all replicas to become inactive: livenodes: " + liveNodes + + " Collection state: " + docCollection.toString()); } -} + // We need to wait around until all replicas are active before expecting rebalancing or distributing shard-unique + // properties to work. + private void checkAllReplicasActive() throws KeeperException, InterruptedException { + TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); + while (timeout.hasTimedOut() == false) { + forceUpdateCollectionStatus(); + DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME); + Set liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes(); + boolean allActive = true; + for (Slice slice : docCollection.getSlices()) { + for (Replica rep : slice.getReplicas()) { + if (rep.isActive(liveNodes) == false) { + allActive = false; + } + } + } + if (allActive) { + return; + } + TimeUnit.MILLISECONDS.sleep(100); + } + fail("timed out waiting for all replicas to become active"); + } + // use a simple heuristic to put as many replicas with the property on as few nodes as possible. The point is that + // then we can execute BALANCESHARDUNIQUE and be sure it worked correctly + private void concentrateProp(String prop) throws KeeperException, InterruptedException, IOException, SolrServerException { + // find all the live nodes + // for each slice, assign the leader to the first replica that is in the lowest position on live_nodes + List liveNodes = new ArrayList<>(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes()); + Collections.shuffle(liveNodes, random()); + + Map uniquePropMap = new TreeMap<>(); + forceUpdateCollectionStatus(); + DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME); + for (Slice slice : docCollection.getSlices()) { + Replica changedRep = null; + int livePos = Integer.MAX_VALUE; + for (Replica rep : slice.getReplicas()) { + int pos = liveNodes.indexOf(rep.getNodeName()); + if (pos >= 0 && pos < livePos) { + livePos = pos; + changedRep = rep; + } + } + if (livePos == Integer.MAX_VALUE) { + fail("Invalid state! We should have a replica to add the property to! " + docCollection.toString()); + } + + uniquePropMap.put(slice.getName(), changedRep.getName()); + // Now set the property on the "lowest" node in live_nodes. + setProp(slice, changedRep, prop); + } + verifyPropDistributedAsExpected(uniquePropMap, prop); + } + + // make sure that the property in question is unique per shard. + private Map verifyPropUniquePerShard(String prop) throws InterruptedException, KeeperException { + Map uniquePropMaps = new TreeMap<>(); + + TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); + while (timeout.hasTimedOut() == false) { + uniquePropMaps.clear(); + if (checkdUniquePropPerShard(uniquePropMaps, prop)) { + return uniquePropMaps; + } + TimeUnit.MILLISECONDS.sleep(100); + } + fail("There should be exactly one replica with value " + prop + " set to true per shard: " + + cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).toString()); + return null; // keeps IDE happy. + } + + // return true if every shard has exactly one replica with the unique property set to "true" + private boolean checkdUniquePropPerShard(Map uniques, String prop) throws KeeperException, InterruptedException { + forceUpdateCollectionStatus(); + DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME); + + for (Slice slice : docCollection.getSlices()) { + int propfCount = 0; + for (Replica rep : slice.getReplicas()) { + if (rep.getBool("property." + prop.toLowerCase(Locale.ROOT), false)) { + propfCount++; + uniques.put(slice.getName(), rep.getName()); + } + } + if (1 != propfCount) { + return false; + } + } + return true; + } +} \ No newline at end of file diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc index f650cdc7d6e..ddaaeb8257f 100644 --- a/solr/solr-ref-guide/src/collections-api.adoc +++ b/solr/solr-ref-guide/src/collections-api.adoc @@ -2163,6 +2163,8 @@ http://localhost:8983/solr/admin/collections?action=REBALANCELEADERS&collection= In this example, two replicas in the "alreadyLeaders" section already had the leader assigned to the same node as the `preferredLeader` property so no action was taken. +The "Success" tag indicates that the command rebalanced all leaders. If, for any reason some replicas with preferredLeader=true are not leaders, this will be "Failure" rather than "Success". If a replica cannot be made leader due to not being "Active", it's also considered a failure. + The replica in the "inactivePreferreds" section had the `preferredLeader` property set but the node was down and no action was taken. The three nodes in the "successes" section were made leaders because they had the `preferredLeader` property set but were not leaders and they were active. [source,xml] @@ -2172,6 +2174,9 @@ The replica in the "inactivePreferreds" section had the `preferredLeader` proper 0 123 + + All replicas with the preferredLeader property set are leaders + success @@ -2219,6 +2224,10 @@ The replica in the "inactivePreferreds" section had the `preferredLeader` proper Examining the clusterstate after issuing this call should show that every live node that has the `preferredLeader` property should also have the "leader" property set to _true_. +NOTE: The added work done by an NRT leader during indexing is quite small. The primary use-case is to redistribute the leader role if there are a large number of leaders concentrated on a small number of nodes. Rebalancing will likely not improve performance unless the imbalance of leadership roles is measured in multiples of 10. + +NOTE: The BALANCESHARDUNIQUE command that distributes the preferredLeader property does not guarantee perfect distribution and in some collection topoligies it is impossible to make that guarantee. + [[forceleader]] == FORCELEADER: Force Shard Leader