mirror of https://github.com/apache/lucene.git
Extract out the ExclusiveSliceProperty as a top level class
This commit is contained in:
parent
cd8592c850
commit
5b770b56d0
|
@ -0,0 +1,345 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
||||
import org.apache.solr.cloud.overseer.CollectionMutator;
|
||||
import org.apache.solr.cloud.overseer.SliceMutator;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
|
||||
|
||||
// Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.
|
||||
class ExclusiveSliceProperty {
|
||||
private ClusterState clusterState;
|
||||
private final boolean onlyActiveNodes;
|
||||
private final String property;
|
||||
private final DocCollection collection;
|
||||
private final String collectionName;
|
||||
|
||||
// Key structure. For each node, list all replicas on it regardless of whether they have the property or not.
|
||||
private final Map<String, List<SliceReplica>> nodesHostingReplicas = new HashMap<>();
|
||||
// Key structure. For each node, a list of the replicas _currently_ hosting the property.
|
||||
private final Map<String, List<SliceReplica>> nodesHostingProp = new HashMap<>();
|
||||
Set<String> shardsNeedingHosts = new HashSet<>();
|
||||
Map<String, Slice> changedSlices = new HashMap<>(); // Work on copies rather than the underlying cluster state.
|
||||
|
||||
private int origMaxPropPerNode = 0;
|
||||
private int origModulo = 0;
|
||||
private int tmpMaxPropPerNode = 0;
|
||||
private int tmpModulo = 0;
|
||||
Random rand = new Random();
|
||||
|
||||
private int assigned = 0;
|
||||
|
||||
ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) {
|
||||
this.clusterState = clusterState;
|
||||
String tmp = message.getStr(ZkStateReader.PROPERTY_PROP);
|
||||
if (StringUtils.startsWith(tmp, OverseerCollectionMessageHandler.COLL_PROP_PREFIX) == false) {
|
||||
tmp = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + tmp;
|
||||
}
|
||||
this.property = tmp.toLowerCase(Locale.ROOT);
|
||||
collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||
|
||||
if (StringUtils.isBlank(collectionName) || StringUtils.isBlank(property)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Overseer '" + message.getStr(Overseer.QUEUE_OPERATION) + "' requires both the '" + ZkStateReader.COLLECTION_PROP + "' and '" +
|
||||
ZkStateReader.PROPERTY_PROP + "' parameters. No action taken ");
|
||||
}
|
||||
|
||||
Boolean shardUnique = Boolean.parseBoolean(message.getStr(SHARD_UNIQUE));
|
||||
if (shardUnique == false &&
|
||||
SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(this.property) == false) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that"
|
||||
+ " the property be a pre-defined property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true' " +
|
||||
" Property: " + this.property + " shardUnique: " + Boolean.toString(shardUnique));
|
||||
}
|
||||
|
||||
collection = clusterState.getCollection(collectionName);
|
||||
if (collection == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Could not find collection ' " + collectionName + "' for overseer operation '" +
|
||||
message.getStr(Overseer.QUEUE_OPERATION) + "'. No action taken.");
|
||||
}
|
||||
onlyActiveNodes = Boolean.parseBoolean(message.getStr(ONLY_ACTIVE_NODES, "true"));
|
||||
}
|
||||
|
||||
|
||||
DocCollection getDocCollection() {
|
||||
return collection;
|
||||
}
|
||||
|
||||
private boolean isActive(Replica replica) {
|
||||
return replica.getState() == Replica.State.ACTIVE;
|
||||
}
|
||||
|
||||
// Collect a list of all the nodes that _can_ host the indicated property. Along the way, also collect any of
|
||||
// the replicas on that node that _already_ host the property as well as any slices that do _not_ have the
|
||||
// property hosted.
|
||||
//
|
||||
// Return true if anything node needs it's property reassigned. False if the property is already balanced for
|
||||
// the collection.
|
||||
|
||||
private boolean collectCurrentPropStats() {
|
||||
int maxAssigned = 0;
|
||||
// Get a list of potential replicas that can host the property _and_ their counts
|
||||
// Move any obvious entries to a list of replicas to change the property on
|
||||
Set<String> allHosts = new HashSet<>();
|
||||
for (Slice slice : collection.getSlices()) {
|
||||
boolean sliceHasProp = false;
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
if (onlyActiveNodes && isActive(replica) == false) {
|
||||
if (StringUtils.isNotBlank(replica.getStr(property))) {
|
||||
removeProp(slice, replica.getName()); // Note, we won't be committing this to ZK until later.
|
||||
}
|
||||
continue;
|
||||
}
|
||||
allHosts.add(replica.getNodeName());
|
||||
String nodeName = replica.getNodeName();
|
||||
if (StringUtils.isNotBlank(replica.getStr(property))) {
|
||||
if (sliceHasProp) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"'" + BALANCESHARDUNIQUE + "' should only be called for properties that have at most one member " +
|
||||
"in any slice with the property set. No action taken.");
|
||||
}
|
||||
if (nodesHostingProp.containsKey(nodeName) == false) {
|
||||
nodesHostingProp.put(nodeName, new ArrayList<>());
|
||||
}
|
||||
nodesHostingProp.get(nodeName).add(new SliceReplica(slice, replica));
|
||||
++assigned;
|
||||
maxAssigned = Math.max(maxAssigned, nodesHostingProp.get(nodeName).size());
|
||||
sliceHasProp = true;
|
||||
}
|
||||
if (nodesHostingReplicas.containsKey(nodeName) == false) {
|
||||
nodesHostingReplicas.put(nodeName, new ArrayList<>());
|
||||
}
|
||||
nodesHostingReplicas.get(nodeName).add(new SliceReplica(slice, replica));
|
||||
}
|
||||
}
|
||||
|
||||
// If the total number of already-hosted properties assigned to nodes
|
||||
// that have potential to host leaders is equal to the slice count _AND_ none of the current nodes has more than
|
||||
// the max number of properties, there's nothing to do.
|
||||
origMaxPropPerNode = collection.getSlices().size() / allHosts.size();
|
||||
|
||||
// Some nodes can have one more of the proeprty if the numbers aren't exactly even.
|
||||
origModulo = collection.getSlices().size() % allHosts.size();
|
||||
if (origModulo > 0) {
|
||||
origMaxPropPerNode++; // have to have some nodes with 1 more property.
|
||||
}
|
||||
|
||||
// We can say for sure that we need to rebalance if we don't have as many assigned properties as slices.
|
||||
if (assigned != collection.getSlices().size()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Make sure there are no more slices at the limit than the "leftovers"
|
||||
// Let's say there's 7 slices and 3 nodes. We need to distribute the property as 3 on node1, 2 on node2 and 2 on node3
|
||||
// (3, 2, 2) We need to be careful to not distribute them as 3, 3, 1. that's what this check is all about.
|
||||
int counter = origModulo;
|
||||
for (List<SliceReplica> list : nodesHostingProp.values()) {
|
||||
if (list.size() == origMaxPropPerNode) --counter;
|
||||
}
|
||||
if (counter == 0) return false; // nodes with 1 extra leader are exactly the needed number
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void removeSliceAlreadyHostedFromPossibles(String sliceName) {
|
||||
for (Map.Entry<String, List<SliceReplica>> entReplica : nodesHostingReplicas.entrySet()) {
|
||||
|
||||
ListIterator<SliceReplica> iter = entReplica.getValue().listIterator();
|
||||
while (iter.hasNext()) {
|
||||
SliceReplica sr = iter.next();
|
||||
if (sr.slice.getName().equals(sliceName))
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void balanceUnassignedReplicas() {
|
||||
tmpMaxPropPerNode = origMaxPropPerNode; // A bit clumsy, but don't want to duplicate code.
|
||||
tmpModulo = origModulo;
|
||||
|
||||
// Get the nodeName and shardName for the node that has the least room for this
|
||||
|
||||
while (shardsNeedingHosts.size() > 0) {
|
||||
String nodeName = "";
|
||||
int minSize = Integer.MAX_VALUE;
|
||||
SliceReplica srToChange = null;
|
||||
for (String slice : shardsNeedingHosts) {
|
||||
for (Map.Entry<String, List<SliceReplica>> ent : nodesHostingReplicas.entrySet()) {
|
||||
// A little tricky. If we don't set this to something below, then it means all possible places to
|
||||
// put this property are full up, so just put it somewhere.
|
||||
if (srToChange == null && ent.getValue().size() > 0) {
|
||||
srToChange = ent.getValue().get(0);
|
||||
}
|
||||
ListIterator<SliceReplica> iter = ent.getValue().listIterator();
|
||||
while (iter.hasNext()) {
|
||||
SliceReplica sr = iter.next();
|
||||
if (StringUtils.equals(slice, sr.slice.getName()) == false) {
|
||||
continue;
|
||||
}
|
||||
if (nodesHostingProp.containsKey(ent.getKey()) == false) {
|
||||
nodesHostingProp.put(ent.getKey(), new ArrayList<SliceReplica>());
|
||||
}
|
||||
if (minSize > nodesHostingReplicas.get(ent.getKey()).size() && nodesHostingProp.get(ent.getKey()).size() < tmpMaxPropPerNode) {
|
||||
minSize = nodesHostingReplicas.get(ent.getKey()).size();
|
||||
srToChange = sr;
|
||||
nodeName = ent.getKey();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Now, you have a slice and node to put it on
|
||||
shardsNeedingHosts.remove(srToChange.slice.getName());
|
||||
if (nodesHostingProp.containsKey(nodeName) == false) {
|
||||
nodesHostingProp.put(nodeName, new ArrayList<SliceReplica>());
|
||||
}
|
||||
nodesHostingProp.get(nodeName).add(srToChange);
|
||||
adjustLimits(nodesHostingProp.get(nodeName));
|
||||
removeSliceAlreadyHostedFromPossibles(srToChange.slice.getName());
|
||||
addProp(srToChange.slice, srToChange.replica.getName());
|
||||
}
|
||||
}
|
||||
|
||||
// Adjust the min/max counts per allowed per node. Special handling here for dealing with the fact
|
||||
// that no node should have more than 1 more replica with this property than any other.
|
||||
private void adjustLimits(List<SliceReplica> changeList) {
|
||||
if (changeList.size() == tmpMaxPropPerNode) {
|
||||
if (tmpModulo < 0) return;
|
||||
|
||||
--tmpModulo;
|
||||
if (tmpModulo == 0) {
|
||||
--tmpMaxPropPerNode;
|
||||
--tmpModulo; // Prevent dropping tmpMaxPropPerNode again.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Go through the list of presently-hosted properties and remove any that have too many replicas that host the property
|
||||
private void removeOverallocatedReplicas() {
|
||||
tmpMaxPropPerNode = origMaxPropPerNode; // A bit clumsy, but don't want to duplicate code.
|
||||
tmpModulo = origModulo;
|
||||
|
||||
for (Map.Entry<String, List<SliceReplica>> ent : nodesHostingProp.entrySet()) {
|
||||
while (ent.getValue().size() > tmpMaxPropPerNode) { // remove delta nodes
|
||||
ent.getValue().remove(rand.nextInt(ent.getValue().size()));
|
||||
}
|
||||
adjustLimits(ent.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
private void removeProp(Slice origSlice, String replicaName) {
|
||||
getReplicaFromChanged(origSlice, replicaName).getProperties().remove(property);
|
||||
}
|
||||
|
||||
private void addProp(Slice origSlice, String replicaName) {
|
||||
getReplicaFromChanged(origSlice, replicaName).getProperties().put(property, "true");
|
||||
}
|
||||
|
||||
// Just a place to encapsulate the fact that we need to have new slices (copy) to update before we
|
||||
// put this all in the cluster state.
|
||||
private Replica getReplicaFromChanged(Slice origSlice, String replicaName) {
|
||||
Slice newSlice = changedSlices.get(origSlice.getName());
|
||||
Replica replica;
|
||||
if (newSlice != null) {
|
||||
replica = newSlice.getReplica(replicaName);
|
||||
} else {
|
||||
newSlice = new Slice(origSlice.getName(), origSlice.getReplicasCopy(), origSlice.shallowCopy());
|
||||
changedSlices.put(origSlice.getName(), newSlice);
|
||||
replica = newSlice.getReplica(replicaName);
|
||||
}
|
||||
if (replica == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Should have been able to find replica '" +
|
||||
replicaName + "' in slice '" + origSlice.getName() + "'. No action taken");
|
||||
}
|
||||
return replica;
|
||||
|
||||
}
|
||||
// Main entry point for carrying out the action. Returns "true" if we have actually moved properties around.
|
||||
|
||||
boolean balanceProperty() {
|
||||
if (collectCurrentPropStats() == false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// we have two lists based on nodeName
|
||||
// 1> all the nodes that _could_ host a property for the slice
|
||||
// 2> all the nodes that _currently_ host a property for the slice.
|
||||
|
||||
// So, remove a replica from the nodes that have too many
|
||||
removeOverallocatedReplicas();
|
||||
|
||||
// prune replicas belonging to a slice that have the property currently assigned from the list of replicas
|
||||
// that could host the property.
|
||||
for (Map.Entry<String, List<SliceReplica>> entProp : nodesHostingProp.entrySet()) {
|
||||
for (SliceReplica srHosting : entProp.getValue()) {
|
||||
removeSliceAlreadyHostedFromPossibles(srHosting.slice.getName());
|
||||
}
|
||||
}
|
||||
|
||||
// Assemble the list of slices that do not have any replica hosting the property:
|
||||
for (Map.Entry<String, List<SliceReplica>> ent : nodesHostingReplicas.entrySet()) {
|
||||
ListIterator<SliceReplica> iter = ent.getValue().listIterator();
|
||||
while (iter.hasNext()) {
|
||||
SliceReplica sr = iter.next();
|
||||
shardsNeedingHosts.add(sr.slice.getName());
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, nodesHostingProp should contain _only_ lists of replicas that belong to slices that do _not_
|
||||
// have any replica hosting the property. So let's assign them.
|
||||
|
||||
balanceUnassignedReplicas();
|
||||
for (Slice newSlice : changedSlices.values()) {
|
||||
DocCollection docCollection = CollectionMutator.updateSlice(collectionName, clusterState.getCollection(collectionName), newSlice);
|
||||
clusterState = ClusterStateMutator.newState(clusterState, collectionName, docCollection);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private static class SliceReplica {
|
||||
Slice slice;
|
||||
Replica replica;
|
||||
|
||||
SliceReplica(Slice slice, Replica replica) {
|
||||
this.slice = slice;
|
||||
this.replica = replica;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,20 +21,14 @@ import java.io.IOException;
|
|||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
||||
import org.apache.solr.cloud.overseer.CollectionMutator;
|
||||
|
@ -44,11 +38,7 @@ import org.apache.solr.cloud.overseer.ReplicaMutator;
|
|||
import org.apache.solr.cloud.overseer.SliceMutator;
|
||||
import org.apache.solr.cloud.overseer.ZkStateWriter;
|
||||
import org.apache.solr.cloud.overseer.ZkWriteCommand;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
|
@ -67,10 +57,6 @@ import org.apache.zookeeper.KeeperException;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
|
||||
|
||||
/**
|
||||
* Cluster leader. Responsible for processing state updates, node assignments, creating/deleting
|
||||
* collections, shards, replicas and setting various properties.
|
||||
|
@ -446,306 +432,6 @@ public class Overseer implements Closeable {
|
|||
}
|
||||
|
||||
}
|
||||
// Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.
|
||||
private class ExclusiveSliceProperty {
|
||||
private ClusterState clusterState;
|
||||
private final boolean onlyActiveNodes;
|
||||
private final String property;
|
||||
private final DocCollection collection;
|
||||
private final String collectionName;
|
||||
|
||||
// Key structure. For each node, list all replicas on it regardless of whether they have the property or not.
|
||||
private final Map<String, List<SliceReplica>> nodesHostingReplicas = new HashMap<>();
|
||||
// Key structure. For each node, a list of the replicas _currently_ hosting the property.
|
||||
private final Map<String, List<SliceReplica>> nodesHostingProp = new HashMap<>();
|
||||
Set<String> shardsNeedingHosts = new HashSet<String>();
|
||||
Map<String, Slice> changedSlices = new HashMap<>(); // Work on copies rather than the underlying cluster state.
|
||||
|
||||
private int origMaxPropPerNode = 0;
|
||||
private int origModulo = 0;
|
||||
private int tmpMaxPropPerNode = 0;
|
||||
private int tmpModulo = 0;
|
||||
Random rand = new Random();
|
||||
|
||||
private int assigned = 0;
|
||||
|
||||
ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) {
|
||||
this.clusterState = clusterState;
|
||||
String tmp = message.getStr(ZkStateReader.PROPERTY_PROP);
|
||||
if (StringUtils.startsWith(tmp, OverseerCollectionMessageHandler.COLL_PROP_PREFIX) == false) {
|
||||
tmp = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + tmp;
|
||||
}
|
||||
this.property = tmp.toLowerCase(Locale.ROOT);
|
||||
collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||
|
||||
if (StringUtils.isBlank(collectionName) || StringUtils.isBlank(property)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Overseer '" + message.getStr(QUEUE_OPERATION) + "' requires both the '" + ZkStateReader.COLLECTION_PROP + "' and '" +
|
||||
ZkStateReader.PROPERTY_PROP + "' parameters. No action taken ");
|
||||
}
|
||||
|
||||
Boolean shardUnique = Boolean.parseBoolean(message.getStr(SHARD_UNIQUE));
|
||||
if (shardUnique == false &&
|
||||
SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(this.property) == false) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that"
|
||||
+ " the property be a pre-defined property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true' " +
|
||||
" Property: " + this.property + " shardUnique: " + Boolean.toString(shardUnique));
|
||||
}
|
||||
|
||||
collection = clusterState.getCollection(collectionName);
|
||||
if (collection == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Could not find collection ' " + collectionName + "' for overseer operation '" +
|
||||
message.getStr(QUEUE_OPERATION) + "'. No action taken.");
|
||||
}
|
||||
onlyActiveNodes = Boolean.parseBoolean(message.getStr(ONLY_ACTIVE_NODES, "true"));
|
||||
}
|
||||
|
||||
|
||||
private DocCollection getDocCollection() {
|
||||
return collection;
|
||||
}
|
||||
|
||||
private boolean isActive(Replica replica) {
|
||||
return replica.getState() == Replica.State.ACTIVE;
|
||||
}
|
||||
|
||||
// Collect a list of all the nodes that _can_ host the indicated property. Along the way, also collect any of
|
||||
// the replicas on that node that _already_ host the property as well as any slices that do _not_ have the
|
||||
// property hosted.
|
||||
//
|
||||
// Return true if anything node needs it's property reassigned. False if the property is already balanced for
|
||||
// the collection.
|
||||
|
||||
private boolean collectCurrentPropStats() {
|
||||
int maxAssigned = 0;
|
||||
// Get a list of potential replicas that can host the property _and_ their counts
|
||||
// Move any obvious entries to a list of replicas to change the property on
|
||||
Set<String> allHosts = new HashSet<>();
|
||||
for (Slice slice : collection.getSlices()) {
|
||||
boolean sliceHasProp = false;
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
if (onlyActiveNodes && isActive(replica) == false) {
|
||||
if (StringUtils.isNotBlank(replica.getStr(property))) {
|
||||
removeProp(slice, replica.getName()); // Note, we won't be committing this to ZK until later.
|
||||
}
|
||||
continue;
|
||||
}
|
||||
allHosts.add(replica.getNodeName());
|
||||
String nodeName = replica.getNodeName();
|
||||
if (StringUtils.isNotBlank(replica.getStr(property))) {
|
||||
if (sliceHasProp) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"'" + BALANCESHARDUNIQUE + "' should only be called for properties that have at most one member " +
|
||||
"in any slice with the property set. No action taken.");
|
||||
}
|
||||
if (nodesHostingProp.containsKey(nodeName) == false) {
|
||||
nodesHostingProp.put(nodeName, new ArrayList<SliceReplica>());
|
||||
}
|
||||
nodesHostingProp.get(nodeName).add(new SliceReplica(slice, replica));
|
||||
++assigned;
|
||||
maxAssigned = Math.max(maxAssigned, nodesHostingProp.get(nodeName).size());
|
||||
sliceHasProp = true;
|
||||
}
|
||||
if (nodesHostingReplicas.containsKey(nodeName) == false) {
|
||||
nodesHostingReplicas.put(nodeName, new ArrayList<SliceReplica>());
|
||||
}
|
||||
nodesHostingReplicas.get(nodeName).add(new SliceReplica(slice, replica));
|
||||
}
|
||||
}
|
||||
|
||||
// If the total number of already-hosted properties assigned to nodes
|
||||
// that have potential to host leaders is equal to the slice count _AND_ none of the current nodes has more than
|
||||
// the max number of properties, there's nothing to do.
|
||||
origMaxPropPerNode = collection.getSlices().size() / allHosts.size();
|
||||
|
||||
// Some nodes can have one more of the proeprty if the numbers aren't exactly even.
|
||||
origModulo = collection.getSlices().size() % allHosts.size();
|
||||
if (origModulo > 0) {
|
||||
origMaxPropPerNode++; // have to have some nodes with 1 more property.
|
||||
}
|
||||
|
||||
// We can say for sure that we need to rebalance if we don't have as many assigned properties as slices.
|
||||
if (assigned != collection.getSlices().size()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Make sure there are no more slices at the limit than the "leftovers"
|
||||
// Let's say there's 7 slices and 3 nodes. We need to distribute the property as 3 on node1, 2 on node2 and 2 on node3
|
||||
// (3, 2, 2) We need to be careful to not distribute them as 3, 3, 1. that's what this check is all about.
|
||||
int counter = origModulo;
|
||||
for (List<SliceReplica> list : nodesHostingProp.values()) {
|
||||
if (list.size() == origMaxPropPerNode) --counter;
|
||||
}
|
||||
if (counter == 0) return false; // nodes with 1 extra leader are exactly the needed number
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void removeSliceAlreadyHostedFromPossibles(String sliceName) {
|
||||
for (Map.Entry<String, List<SliceReplica>> entReplica : nodesHostingReplicas.entrySet()) {
|
||||
|
||||
ListIterator<SliceReplica> iter = entReplica.getValue().listIterator();
|
||||
while (iter.hasNext()) {
|
||||
SliceReplica sr = iter.next();
|
||||
if (sr.slice.getName().equals(sliceName))
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void balanceUnassignedReplicas() {
|
||||
tmpMaxPropPerNode = origMaxPropPerNode; // A bit clumsy, but don't want to duplicate code.
|
||||
tmpModulo = origModulo;
|
||||
|
||||
// Get the nodeName and shardName for the node that has the least room for this
|
||||
|
||||
while (shardsNeedingHosts.size() > 0) {
|
||||
String nodeName = "";
|
||||
int minSize = Integer.MAX_VALUE;
|
||||
SliceReplica srToChange = null;
|
||||
for (String slice : shardsNeedingHosts) {
|
||||
for (Map.Entry<String, List<SliceReplica>> ent : nodesHostingReplicas.entrySet()) {
|
||||
// A little tricky. If we don't set this to something below, then it means all possible places to
|
||||
// put this property are full up, so just put it somewhere.
|
||||
if (srToChange == null && ent.getValue().size() > 0) {
|
||||
srToChange = ent.getValue().get(0);
|
||||
}
|
||||
ListIterator<SliceReplica> iter = ent.getValue().listIterator();
|
||||
while (iter.hasNext()) {
|
||||
SliceReplica sr = iter.next();
|
||||
if (StringUtils.equals(slice, sr.slice.getName()) == false) {
|
||||
continue;
|
||||
}
|
||||
if (nodesHostingProp.containsKey(ent.getKey()) == false) {
|
||||
nodesHostingProp.put(ent.getKey(), new ArrayList<SliceReplica>());
|
||||
}
|
||||
if (minSize > nodesHostingReplicas.get(ent.getKey()).size() && nodesHostingProp.get(ent.getKey()).size() < tmpMaxPropPerNode) {
|
||||
minSize = nodesHostingReplicas.get(ent.getKey()).size();
|
||||
srToChange = sr;
|
||||
nodeName = ent.getKey();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Now, you have a slice and node to put it on
|
||||
shardsNeedingHosts.remove(srToChange.slice.getName());
|
||||
if (nodesHostingProp.containsKey(nodeName) == false) {
|
||||
nodesHostingProp.put(nodeName, new ArrayList<SliceReplica>());
|
||||
}
|
||||
nodesHostingProp.get(nodeName).add(srToChange);
|
||||
adjustLimits(nodesHostingProp.get(nodeName));
|
||||
removeSliceAlreadyHostedFromPossibles(srToChange.slice.getName());
|
||||
addProp(srToChange.slice, srToChange.replica.getName());
|
||||
}
|
||||
}
|
||||
|
||||
// Adjust the min/max counts per allowed per node. Special handling here for dealing with the fact
|
||||
// that no node should have more than 1 more replica with this property than any other.
|
||||
private void adjustLimits(List<SliceReplica> changeList) {
|
||||
if (changeList.size() == tmpMaxPropPerNode) {
|
||||
if (tmpModulo < 0) return;
|
||||
|
||||
--tmpModulo;
|
||||
if (tmpModulo == 0) {
|
||||
--tmpMaxPropPerNode;
|
||||
--tmpModulo; // Prevent dropping tmpMaxPropPerNode again.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Go through the list of presently-hosted properties and remove any that have too many replicas that host the property
|
||||
private void removeOverallocatedReplicas() {
|
||||
tmpMaxPropPerNode = origMaxPropPerNode; // A bit clumsy, but don't want to duplicate code.
|
||||
tmpModulo = origModulo;
|
||||
|
||||
for (Map.Entry<String, List<SliceReplica>> ent : nodesHostingProp.entrySet()) {
|
||||
while (ent.getValue().size() > tmpMaxPropPerNode) { // remove delta nodes
|
||||
ent.getValue().remove(rand.nextInt(ent.getValue().size()));
|
||||
}
|
||||
adjustLimits(ent.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
private void removeProp(Slice origSlice, String replicaName) {
|
||||
getReplicaFromChanged(origSlice, replicaName).getProperties().remove(property);
|
||||
}
|
||||
|
||||
private void addProp(Slice origSlice, String replicaName) {
|
||||
getReplicaFromChanged(origSlice, replicaName).getProperties().put(property, "true");
|
||||
}
|
||||
|
||||
// Just a place to encapsulate the fact that we need to have new slices (copy) to update before we
|
||||
// put this all in the cluster state.
|
||||
private Replica getReplicaFromChanged(Slice origSlice, String replicaName) {
|
||||
Slice newSlice = changedSlices.get(origSlice.getName());
|
||||
Replica replica;
|
||||
if (newSlice != null) {
|
||||
replica = newSlice.getReplica(replicaName);
|
||||
} else {
|
||||
newSlice = new Slice(origSlice.getName(), origSlice.getReplicasCopy(), origSlice.shallowCopy());
|
||||
changedSlices.put(origSlice.getName(), newSlice);
|
||||
replica = newSlice.getReplica(replicaName);
|
||||
}
|
||||
if (replica == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Should have been able to find replica '" +
|
||||
replicaName + "' in slice '" + origSlice.getName() + "'. No action taken");
|
||||
}
|
||||
return replica;
|
||||
|
||||
}
|
||||
// Main entry point for carrying out the action. Returns "true" if we have actually moved properties around.
|
||||
|
||||
private boolean balanceProperty() {
|
||||
if (collectCurrentPropStats() == false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// we have two lists based on nodeName
|
||||
// 1> all the nodes that _could_ host a property for the slice
|
||||
// 2> all the nodes that _currently_ host a property for the slice.
|
||||
|
||||
// So, remove a replica from the nodes that have too many
|
||||
removeOverallocatedReplicas();
|
||||
|
||||
// prune replicas belonging to a slice that have the property currently assigned from the list of replicas
|
||||
// that could host the property.
|
||||
for (Map.Entry<String, List<SliceReplica>> entProp : nodesHostingProp.entrySet()) {
|
||||
for (SliceReplica srHosting : entProp.getValue()) {
|
||||
removeSliceAlreadyHostedFromPossibles(srHosting.slice.getName());
|
||||
}
|
||||
}
|
||||
|
||||
// Assemble the list of slices that do not have any replica hosting the property:
|
||||
for (Map.Entry<String, List<SliceReplica>> ent : nodesHostingReplicas.entrySet()) {
|
||||
ListIterator<SliceReplica> iter = ent.getValue().listIterator();
|
||||
while (iter.hasNext()) {
|
||||
SliceReplica sr = iter.next();
|
||||
shardsNeedingHosts.add(sr.slice.getName());
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, nodesHostingProp should contain _only_ lists of replicas that belong to slices that do _not_
|
||||
// have any replica hosting the property. So let's assign them.
|
||||
|
||||
balanceUnassignedReplicas();
|
||||
for (Slice newSlice : changedSlices.values()) {
|
||||
DocCollection docCollection = CollectionMutator.updateSlice(collectionName, clusterState.getCollection(collectionName), newSlice);
|
||||
clusterState = ClusterStateMutator.newState(clusterState, collectionName, docCollection);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private class SliceReplica {
|
||||
private Slice slice;
|
||||
private Replica replica;
|
||||
|
||||
SliceReplica(Slice slice, Replica replica) {
|
||||
this.slice = slice;
|
||||
this.replica = replica;
|
||||
}
|
||||
}
|
||||
|
||||
class OverseerThread extends Thread implements Closeable {
|
||||
|
||||
|
|
Loading…
Reference in New Issue