mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-16 18:04:52 +00:00
Prioritize primary shard movement during shard allocation (#1445)
When some node or set of nodes is excluded (based on some cluster setting) BalancedShardsAllocator iterates over them in breadth first order picking 1 shard from each node and repeating the process until all shards are balanced. Since shards from each node are picked randomly it's possible the p and r of shard1 is relocated first leaving behind both p and r of shard2. If the excluded nodes were to go down the cluster becomes red. This commit introduces a new setting "cluster.routing.allocation.move.primary_first" that prioritizes the p of both shard1 and shard2 first so the cluster does not become red if the excluded nodes were to go down before relocating other shards. Note that with this setting enabled performance of this change is a direct function of number of indices, shards, replicas, and nodes. The larger the indices, replicas, and distribution scale, the slower the allocation becomes. This should be used with care. Signed-off-by: Ankit Jain <jain.ankitk@gmail.com>
This commit is contained in:
parent
8b8d04173c
commit
6eb8f6f307
@ -34,6 +34,7 @@ package org.opensearch.cluster.routing;
|
||||
|
||||
import org.opensearch.cluster.node.DiscoveryNode;
|
||||
import org.opensearch.common.Nullable;
|
||||
import org.opensearch.common.collect.Tuple;
|
||||
import org.opensearch.index.Index;
|
||||
import org.opensearch.index.shard.ShardId;
|
||||
|
||||
@ -48,6 +49,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
/**
|
||||
* A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards
|
||||
@ -55,11 +57,87 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class RoutingNode implements Iterable<ShardRouting> {
|
||||
|
||||
static class BucketedShards implements Iterable<ShardRouting> {
|
||||
private final Tuple<LinkedHashMap<ShardId, ShardRouting>, LinkedHashMap<ShardId, ShardRouting>> shardTuple; // LinkedHashMap to
|
||||
// preserve order
|
||||
|
||||
BucketedShards(LinkedHashMap<ShardId, ShardRouting> primaryShards, LinkedHashMap<ShardId, ShardRouting> replicaShards) {
|
||||
this.shardTuple = new Tuple(primaryShards, replicaShards);
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return this.shardTuple.v1().isEmpty() && this.shardTuple.v2().isEmpty();
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return this.shardTuple.v1().size() + this.shardTuple.v2().size();
|
||||
}
|
||||
|
||||
public boolean containsKey(ShardId shardId) {
|
||||
return this.shardTuple.v1().containsKey(shardId) || this.shardTuple.v2().containsKey(shardId);
|
||||
}
|
||||
|
||||
public ShardRouting get(ShardId shardId) {
|
||||
if (this.shardTuple.v1().containsKey(shardId)) {
|
||||
return this.shardTuple.v1().get(shardId);
|
||||
}
|
||||
return this.shardTuple.v2().get(shardId);
|
||||
}
|
||||
|
||||
public ShardRouting add(ShardRouting shardRouting) {
|
||||
return put(shardRouting.shardId(), shardRouting);
|
||||
}
|
||||
|
||||
public ShardRouting put(ShardId shardId, ShardRouting shardRouting) {
|
||||
ShardRouting ret;
|
||||
if (shardRouting.primary()) {
|
||||
ret = this.shardTuple.v1().put(shardId, shardRouting);
|
||||
if (this.shardTuple.v2().containsKey(shardId)) {
|
||||
ret = this.shardTuple.v2().remove(shardId);
|
||||
}
|
||||
} else {
|
||||
ret = this.shardTuple.v2().put(shardId, shardRouting);
|
||||
if (this.shardTuple.v1().containsKey(shardId)) {
|
||||
ret = this.shardTuple.v1().remove(shardId);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
public ShardRouting remove(ShardId shardId) {
|
||||
if (this.shardTuple.v1().containsKey(shardId)) {
|
||||
return this.shardTuple.v1().remove(shardId);
|
||||
}
|
||||
return this.shardTuple.v2().remove(shardId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<ShardRouting> iterator() {
|
||||
final Iterator<ShardRouting> primaryIterator = Collections.unmodifiableCollection(this.shardTuple.v1().values()).iterator();
|
||||
final Iterator<ShardRouting> replicaIterator = Collections.unmodifiableCollection(this.shardTuple.v2().values()).iterator();
|
||||
return new Iterator<ShardRouting>() {
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return primaryIterator.hasNext() || replicaIterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardRouting next() {
|
||||
if (primaryIterator.hasNext()) {
|
||||
return primaryIterator.next();
|
||||
}
|
||||
return replicaIterator.next();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private final String nodeId;
|
||||
|
||||
private final DiscoveryNode node;
|
||||
|
||||
private final LinkedHashMap<ShardId, ShardRouting> shards; // LinkedHashMap to preserve order
|
||||
private final BucketedShards shards;
|
||||
|
||||
private final LinkedHashSet<ShardRouting> initializingShards;
|
||||
|
||||
@ -67,44 +145,44 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
||||
|
||||
private final HashMap<Index, LinkedHashSet<ShardRouting>> shardsByIndex;
|
||||
|
||||
public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
|
||||
this(nodeId, node, buildShardRoutingMap(shards));
|
||||
}
|
||||
|
||||
RoutingNode(String nodeId, DiscoveryNode node, LinkedHashMap<ShardId, ShardRouting> shards) {
|
||||
public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shardRoutings) {
|
||||
this.nodeId = nodeId;
|
||||
this.node = node;
|
||||
this.shards = shards;
|
||||
final LinkedHashMap<ShardId, ShardRouting> primaryShards = new LinkedHashMap<>();
|
||||
final LinkedHashMap<ShardId, ShardRouting> replicaShards = new LinkedHashMap<>();
|
||||
this.shards = new BucketedShards(primaryShards, replicaShards);
|
||||
this.relocatingShards = new LinkedHashSet<>();
|
||||
this.initializingShards = new LinkedHashSet<>();
|
||||
this.shardsByIndex = new LinkedHashMap<>();
|
||||
for (ShardRouting shardRouting : shards.values()) {
|
||||
|
||||
for (ShardRouting shardRouting : shardRoutings) {
|
||||
if (shardRouting.initializing()) {
|
||||
initializingShards.add(shardRouting);
|
||||
} else if (shardRouting.relocating()) {
|
||||
relocatingShards.add(shardRouting);
|
||||
}
|
||||
shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting);
|
||||
}
|
||||
assert invariant();
|
||||
}
|
||||
|
||||
private static LinkedHashMap<ShardId, ShardRouting> buildShardRoutingMap(ShardRouting... shardRoutings) {
|
||||
final LinkedHashMap<ShardId, ShardRouting> shards = new LinkedHashMap<>();
|
||||
for (ShardRouting shardRouting : shardRoutings) {
|
||||
ShardRouting previousValue = shards.put(shardRouting.shardId(), shardRouting);
|
||||
ShardRouting previousValue;
|
||||
if (shardRouting.primary()) {
|
||||
previousValue = primaryShards.put(shardRouting.shardId(), shardRouting);
|
||||
} else {
|
||||
previousValue = replicaShards.put(shardRouting.shardId(), shardRouting);
|
||||
}
|
||||
|
||||
if (previousValue != null) {
|
||||
throw new IllegalArgumentException(
|
||||
"Cannot have two different shards with same shard id " + shardRouting.shardId() + " on same node "
|
||||
);
|
||||
}
|
||||
}
|
||||
return shards;
|
||||
|
||||
assert invariant();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<ShardRouting> iterator() {
|
||||
return Collections.unmodifiableCollection(shards.values()).iterator();
|
||||
return shards.iterator();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -139,7 +217,7 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
||||
*/
|
||||
void add(ShardRouting shard) {
|
||||
assert invariant();
|
||||
if (shards.containsKey(shard.shardId())) {
|
||||
if (shards.add(shard) != null) {
|
||||
throw new IllegalStateException(
|
||||
"Trying to add a shard "
|
||||
+ shard.shardId()
|
||||
@ -152,7 +230,6 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
||||
+ "]"
|
||||
);
|
||||
}
|
||||
shards.put(shard.shardId(), shard);
|
||||
|
||||
if (shard.initializing()) {
|
||||
initializingShards.add(shard);
|
||||
@ -322,7 +399,7 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
||||
public String prettyPrint() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("-----node_id[").append(nodeId).append("][").append(node == null ? "X" : "V").append("]\n");
|
||||
for (ShardRouting entry : shards.values()) {
|
||||
for (ShardRouting entry : shards) {
|
||||
sb.append("--------").append(entry.shortSummary()).append('\n');
|
||||
}
|
||||
return sb.toString();
|
||||
@ -345,7 +422,9 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
||||
}
|
||||
|
||||
public List<ShardRouting> copyShards() {
|
||||
return new ArrayList<>(shards.values());
|
||||
List<ShardRouting> result = new ArrayList<>();
|
||||
shards.forEach(result::add);
|
||||
return result;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
@ -355,23 +434,20 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
||||
private boolean invariant() {
|
||||
|
||||
// initializingShards must consistent with that in shards
|
||||
Collection<ShardRouting> shardRoutingsInitializing = shards.values()
|
||||
.stream()
|
||||
Collection<ShardRouting> shardRoutingsInitializing = StreamSupport.stream(shards.spliterator(), false)
|
||||
.filter(ShardRouting::initializing)
|
||||
.collect(Collectors.toList());
|
||||
assert initializingShards.size() == shardRoutingsInitializing.size();
|
||||
assert initializingShards.containsAll(shardRoutingsInitializing);
|
||||
|
||||
// relocatingShards must consistent with that in shards
|
||||
Collection<ShardRouting> shardRoutingsRelocating = shards.values()
|
||||
.stream()
|
||||
Collection<ShardRouting> shardRoutingsRelocating = StreamSupport.stream(shards.spliterator(), false)
|
||||
.filter(ShardRouting::relocating)
|
||||
.collect(Collectors.toList());
|
||||
assert relocatingShards.size() == shardRoutingsRelocating.size();
|
||||
assert relocatingShards.containsAll(shardRoutingsRelocating);
|
||||
|
||||
final Map<Index, Set<ShardRouting>> shardRoutingsByIndex = shards.values()
|
||||
.stream()
|
||||
final Map<Index, Set<ShardRouting>> shardRoutingsByIndex = StreamSupport.stream(shards.spliterator(), false)
|
||||
.collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet()));
|
||||
assert shardRoutingsByIndex.equals(shardsByIndex);
|
||||
|
||||
|
@ -56,7 +56,6 @@ import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Map;
|
||||
@ -108,10 +107,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||
this.readOnly = readOnly;
|
||||
final RoutingTable routingTable = clusterState.routingTable();
|
||||
|
||||
Map<String, LinkedHashMap<ShardId, ShardRouting>> nodesToShards = new HashMap<>();
|
||||
// fill in the nodeToShards with the "live" nodes
|
||||
for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
|
||||
nodesToShards.put(cursor.value.getId(), new LinkedHashMap<>()); // LinkedHashMap to preserve order
|
||||
String nodeId = cursor.value.getId();
|
||||
this.nodesToShards.put(cursor.value.getId(), new RoutingNode(nodeId, clusterState.nodes().get(nodeId)));
|
||||
}
|
||||
|
||||
// fill in the inverse of node -> shards allocated
|
||||
@ -125,27 +124,23 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||
// by the ShardId, as this is common for primary and replicas.
|
||||
// A replica Set might have one (and not more) replicas with the state of RELOCATING.
|
||||
if (shard.assignedToNode()) {
|
||||
Map<ShardId, ShardRouting> entries = nodesToShards.computeIfAbsent(
|
||||
RoutingNode routingNode = this.nodesToShards.computeIfAbsent(
|
||||
shard.currentNodeId(),
|
||||
k -> new LinkedHashMap<>()
|
||||
); // LinkedHashMap to preserve order
|
||||
ShardRouting previousValue = entries.put(shard.shardId(), shard);
|
||||
if (previousValue != null) {
|
||||
throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node");
|
||||
}
|
||||
k -> new RoutingNode(shard.currentNodeId(), clusterState.nodes().get(shard.currentNodeId()))
|
||||
);
|
||||
routingNode.add(shard);
|
||||
assignedShardsAdd(shard);
|
||||
if (shard.relocating()) {
|
||||
relocatingShards++;
|
||||
// LinkedHashMap to preserve order.
|
||||
// Add the counterpart shard with relocatingNodeId reflecting the source from which
|
||||
// it's relocating from.
|
||||
entries = nodesToShards.computeIfAbsent(shard.relocatingNodeId(), k -> new LinkedHashMap<>());
|
||||
routingNode = nodesToShards.computeIfAbsent(
|
||||
shard.relocatingNodeId(),
|
||||
k -> new RoutingNode(shard.relocatingNodeId(), clusterState.nodes().get(shard.relocatingNodeId()))
|
||||
);
|
||||
ShardRouting targetShardRouting = shard.getTargetRelocatingShard();
|
||||
addInitialRecovery(targetShardRouting, indexShard.primary);
|
||||
previousValue = entries.put(targetShardRouting.shardId(), targetShardRouting);
|
||||
if (previousValue != null) {
|
||||
throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node");
|
||||
}
|
||||
routingNode.add(targetShardRouting);
|
||||
assignedShardsAdd(targetShardRouting);
|
||||
} else if (shard.initializing()) {
|
||||
if (shard.primary()) {
|
||||
@ -160,10 +155,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (Map.Entry<String, LinkedHashMap<ShardId, ShardRouting>> entry : nodesToShards.entrySet()) {
|
||||
String nodeId = entry.getKey();
|
||||
this.nodesToShards.put(nodeId, new RoutingNode(nodeId, clusterState.nodes().get(nodeId), entry.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
private void addRecovery(ShardRouting routing) {
|
||||
@ -1289,37 +1280,97 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
|
||||
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
|
||||
* all the nodes have been returned.
|
||||
* @param movePrimaryFirst if true, all primary shards are iterated over before iterating replica for any node
|
||||
* @return iterator of shard routings
|
||||
*/
|
||||
public Iterator<ShardRouting> nodeInterleavedShardIterator() {
|
||||
public Iterator<ShardRouting> nodeInterleavedShardIterator(boolean movePrimaryFirst) {
|
||||
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
|
||||
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
|
||||
queue.add(entry.getValue().copyShards().iterator());
|
||||
}
|
||||
return new Iterator<ShardRouting>() {
|
||||
public boolean hasNext() {
|
||||
while (!queue.isEmpty()) {
|
||||
if (queue.peek().hasNext()) {
|
||||
if (movePrimaryFirst) {
|
||||
return new Iterator<ShardRouting>() {
|
||||
private Queue<ShardRouting> replicaShards = new ArrayDeque<>();
|
||||
private Queue<Iterator<ShardRouting>> replicaIterators = new ArrayDeque<>();
|
||||
|
||||
public boolean hasNext() {
|
||||
while (!queue.isEmpty()) {
|
||||
if (queue.peek().hasNext()) {
|
||||
return true;
|
||||
}
|
||||
queue.poll();
|
||||
}
|
||||
if (!replicaShards.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
queue.poll();
|
||||
while (!replicaIterators.isEmpty()) {
|
||||
if (replicaIterators.peek().hasNext()) {
|
||||
return true;
|
||||
}
|
||||
replicaIterators.poll();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public ShardRouting next() {
|
||||
if (hasNext() == false) {
|
||||
throw new NoSuchElementException();
|
||||
public ShardRouting next() {
|
||||
if (hasNext() == false) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
while (!queue.isEmpty()) {
|
||||
Iterator<ShardRouting> iter = queue.poll();
|
||||
if (iter.hasNext()) {
|
||||
ShardRouting result = iter.next();
|
||||
if (result.primary()) {
|
||||
queue.offer(iter);
|
||||
return result;
|
||||
}
|
||||
replicaShards.offer(result);
|
||||
replicaIterators.offer(iter);
|
||||
}
|
||||
}
|
||||
if (!replicaShards.isEmpty()) {
|
||||
return replicaShards.poll();
|
||||
}
|
||||
Iterator<ShardRouting> replicaIterator = replicaIterators.poll();
|
||||
ShardRouting replicaShard = replicaIterator.next();
|
||||
replicaIterators.offer(replicaIterator);
|
||||
|
||||
assert !replicaShard.primary();
|
||||
return replicaShard;
|
||||
}
|
||||
Iterator<ShardRouting> iter = queue.poll();
|
||||
ShardRouting result = iter.next();
|
||||
queue.offer(iter);
|
||||
return result;
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new Iterator<ShardRouting>() {
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
while (!queue.isEmpty()) {
|
||||
if (queue.peek().hasNext()) {
|
||||
return true;
|
||||
}
|
||||
queue.poll();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardRouting next() {
|
||||
if (hasNext() == false) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
Iterator<ShardRouting> iter = queue.poll();
|
||||
queue.offer(iter);
|
||||
return iter.next();
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private static final class Recoveries {
|
||||
|
@ -109,6 +109,12 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
||||
Property.Dynamic,
|
||||
Property.NodeScope
|
||||
);
|
||||
public static final Setting<Boolean> SHARD_MOVE_PRIMARY_FIRST_SETTING = Setting.boolSetting(
|
||||
"cluster.routing.allocation.move.primary_first",
|
||||
false,
|
||||
Property.Dynamic,
|
||||
Property.NodeScope
|
||||
);
|
||||
public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting(
|
||||
"cluster.routing.allocation.balance.threshold",
|
||||
1.0f,
|
||||
@ -117,6 +123,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
||||
Property.NodeScope
|
||||
);
|
||||
|
||||
private volatile boolean movePrimaryFirst;
|
||||
private volatile WeightFunction weightFunction;
|
||||
private volatile float threshold;
|
||||
|
||||
@ -128,10 +135,15 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
||||
public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) {
|
||||
setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings));
|
||||
setThreshold(THRESHOLD_SETTING.get(settings));
|
||||
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
|
||||
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
|
||||
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
|
||||
}
|
||||
|
||||
private void setMovePrimaryFirst(boolean movePrimaryFirst) {
|
||||
this.movePrimaryFirst = movePrimaryFirst;
|
||||
}
|
||||
|
||||
private void setWeightFunction(float indexBalance, float shardBalanceFactor) {
|
||||
weightFunction = new WeightFunction(indexBalance, shardBalanceFactor);
|
||||
}
|
||||
@ -146,7 +158,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
||||
failAllocationOfNewPrimaries(allocation);
|
||||
return;
|
||||
}
|
||||
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
|
||||
final Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold);
|
||||
balancer.allocateUnassigned();
|
||||
balancer.moveShards();
|
||||
balancer.balance();
|
||||
@ -154,7 +166,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
||||
|
||||
@Override
|
||||
public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) {
|
||||
Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
|
||||
Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold);
|
||||
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
|
||||
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
|
||||
if (shard.unassigned()) {
|
||||
@ -283,6 +295,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
||||
private final Map<String, ModelNode> nodes;
|
||||
private final RoutingAllocation allocation;
|
||||
private final RoutingNodes routingNodes;
|
||||
private final boolean movePrimaryFirst;
|
||||
private final WeightFunction weight;
|
||||
|
||||
private final float threshold;
|
||||
@ -291,9 +304,10 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
||||
private final NodeSorter sorter;
|
||||
private final Set<RoutingNode> inEligibleTargetNode;
|
||||
|
||||
public Balancer(Logger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) {
|
||||
public Balancer(Logger logger, RoutingAllocation allocation, boolean movePrimaryFirst, WeightFunction weight, float threshold) {
|
||||
this.logger = logger;
|
||||
this.allocation = allocation;
|
||||
this.movePrimaryFirst = movePrimaryFirst;
|
||||
this.weight = weight;
|
||||
this.threshold = threshold;
|
||||
this.routingNodes = allocation.routingNodes();
|
||||
@ -725,7 +739,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
||||
for (ModelNode currentNode : sorter.modelNodes) {
|
||||
checkAndAddInEligibleTargetNode(currentNode.getRoutingNode());
|
||||
}
|
||||
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext();) {
|
||||
boolean primariesThrottled = false;
|
||||
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(movePrimaryFirst); it.hasNext();) {
|
||||
// Verify if the cluster concurrent recoveries have been reached.
|
||||
if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) {
|
||||
logger.info(
|
||||
@ -745,11 +760,23 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
||||
|
||||
ShardRouting shardRouting = it.next();
|
||||
|
||||
// Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled
|
||||
if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) {
|
||||
logger.info(
|
||||
"Cannot move any replica shard in the cluster as movePrimaryFirst is enabled and primary shards"
|
||||
+ "are being throttled. Skipping shard iteration"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Verify if the shard is allowed to move if outgoing recovery on the node hosting the primary shard
|
||||
// is not being throttled.
|
||||
Decision canMoveAwayDecision = allocation.deciders().canMoveAway(shardRouting, allocation);
|
||||
if (canMoveAwayDecision.type() != Decision.Type.YES) {
|
||||
if (logger.isDebugEnabled()) logger.debug("Cannot move away shard [{}] Skipping this shard", shardRouting);
|
||||
if (shardRouting.primary() && canMoveAwayDecision.type() == Type.THROTTLE) {
|
||||
primariesThrottled = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -219,6 +219,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
|
||||
BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING,
|
||||
BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING,
|
||||
BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING,
|
||||
BalancedShardsAllocator.THRESHOLD_SETTING,
|
||||
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
|
@ -0,0 +1,117 @@
|
||||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.cluster.routing;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
|
||||
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||
import org.opensearch.cluster.ClusterStateListener;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.test.InternalTestCluster;
|
||||
import org.opensearch.test.OpenSearchIntegTestCase;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
|
||||
|
||||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
|
||||
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
|
||||
public class MovePrimaryFirstTests extends OpenSearchIntegTestCase {
|
||||
|
||||
protected String startDataOnlyNode(final String zone) {
|
||||
final Settings settings = Settings.builder().put("node.attr.zone", zone).build();
|
||||
return internalCluster().startDataOnlyNode(settings);
|
||||
}
|
||||
|
||||
protected void createAndIndex(String index, int replicaCount, int shardCount) {
|
||||
assertAcked(
|
||||
prepareCreate(
|
||||
index,
|
||||
-1,
|
||||
Settings.builder()
|
||||
.put("number_of_shards", shardCount)
|
||||
.put("number_of_replicas", replicaCount)
|
||||
.put("max_result_window", 20000)
|
||||
)
|
||||
);
|
||||
int startDocCountId = 0;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
index(index, "_doc", Integer.toString(startDocCountId), "foo", "bar" + startDocCountId);
|
||||
++startDocCountId;
|
||||
}
|
||||
flushAndRefresh(index);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates two nodes each in two zones and shuts down nodes in one zone
|
||||
* after relocating half the number of shards. Since, primaries are relocated
|
||||
* first, cluster should stay green as primary should have relocated
|
||||
*/
|
||||
public void testClusterGreenAfterPartialRelocation() throws InterruptedException {
|
||||
internalCluster().startMasterOnlyNodes(1);
|
||||
final String z1 = "zone-1", z2 = "zone-2";
|
||||
final int primaryShardCount = 100;
|
||||
final String z1n1 = startDataOnlyNode(z1);
|
||||
ensureGreen();
|
||||
createAndIndex("foo", 1, primaryShardCount);
|
||||
ensureYellow();
|
||||
// Start second node in same zone only after yellow cluster to ensure
|
||||
// that one gets all primaries and other all secondaries
|
||||
final String z1n2 = startDataOnlyNode(z1);
|
||||
ensureGreen();
|
||||
|
||||
// Enable cluster level setting for moving primaries first and keep new
|
||||
// zone nodes excluded to prevent any shard relocation
|
||||
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
|
||||
settingsRequest.persistentSettings(
|
||||
Settings.builder().put("cluster.routing.allocation.move.primary_first", true).put("cluster.routing.allocation.exclude.zone", z2)
|
||||
);
|
||||
client().admin().cluster().updateSettings(settingsRequest).actionGet();
|
||||
|
||||
final String z2n1 = startDataOnlyNode(z2);
|
||||
final String z2n2 = startDataOnlyNode(z2);
|
||||
|
||||
// Create cluster state listener to compute number of shards on new zone
|
||||
// nodes before counting down the latch
|
||||
final CountDownLatch primaryMoveLatch = new CountDownLatch(1);
|
||||
final ClusterStateListener listener = event -> {
|
||||
if (event.routingTableChanged()) {
|
||||
final RoutingNodes routingNodes = event.state().getRoutingNodes();
|
||||
int startedz2n1 = 0;
|
||||
int startedz2n2 = 0;
|
||||
for (Iterator<RoutingNode> it = routingNodes.iterator(); it.hasNext();) {
|
||||
RoutingNode routingNode = it.next();
|
||||
final String nodeName = routingNode.node().getName();
|
||||
if (nodeName.equals(z2n1)) {
|
||||
startedz2n1 = routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
|
||||
} else if (nodeName.equals(z2n2)) {
|
||||
startedz2n2 = routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
|
||||
}
|
||||
}
|
||||
if (startedz2n1 >= primaryShardCount / 2 && startedz2n2 >= primaryShardCount / 2) {
|
||||
primaryMoveLatch.countDown();
|
||||
}
|
||||
}
|
||||
};
|
||||
internalCluster().clusterService().addListener(listener);
|
||||
|
||||
// Exclude zone1 nodes for allocation and await latch count down
|
||||
settingsRequest = new ClusterUpdateSettingsRequest();
|
||||
settingsRequest.persistentSettings(Settings.builder().put("cluster.routing.allocation.exclude.zone", z1));
|
||||
client().admin().cluster().updateSettings(settingsRequest);
|
||||
primaryMoveLatch.await();
|
||||
|
||||
// Shutdown both nodes in zone and ensure cluster stays green
|
||||
try {
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(z1n1));
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(z1n2));
|
||||
} catch (Exception e) {}
|
||||
ensureGreen();
|
||||
}
|
||||
}
|
@ -41,6 +41,7 @@ import org.opensearch.index.shard.ShardId;
|
||||
import org.opensearch.test.OpenSearchTestCase;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.Iterator;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
@ -86,6 +87,29 @@ public class RoutingNodeTests extends OpenSearchTestCase {
|
||||
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetadata.INDEX_UUID_NA_VALUE, 4)), equalTo(relocatingShard0));
|
||||
}
|
||||
|
||||
public void testPrimaryFirstIterator() {
|
||||
ShardRouting initializingShard3 = TestShardRouting.newShardRouting("test", 3, "node-1", false, ShardRoutingState.INITIALIZING);
|
||||
ShardRouting relocatingShard4 = TestShardRouting.newShardRouting("test", 4, "node-1", "node-2", true, ShardRoutingState.RELOCATING);
|
||||
ShardRouting initializingShard5 = TestShardRouting.newShardRouting("test", 5, "node-1", true, ShardRoutingState.INITIALIZING);
|
||||
routingNode.add(initializingShard3);
|
||||
routingNode.add(relocatingShard4);
|
||||
routingNode.add(initializingShard5);
|
||||
final Iterator<ShardRouting> shardRoutingIterator = routingNode.iterator();
|
||||
assertTrue(shardRoutingIterator.hasNext());
|
||||
assertThat(shardRoutingIterator.next(), equalTo(relocatingShard4));
|
||||
assertTrue(shardRoutingIterator.hasNext());
|
||||
assertThat(shardRoutingIterator.next(), equalTo(initializingShard5));
|
||||
assertTrue(shardRoutingIterator.hasNext());
|
||||
assertThat(shardRoutingIterator.next(), equalTo(unassignedShard0));
|
||||
assertTrue(shardRoutingIterator.hasNext());
|
||||
assertThat(shardRoutingIterator.next(), equalTo(initializingShard0));
|
||||
assertTrue(shardRoutingIterator.hasNext());
|
||||
assertThat(shardRoutingIterator.next(), equalTo(relocatingShard0));
|
||||
assertTrue(shardRoutingIterator.hasNext());
|
||||
assertThat(shardRoutingIterator.next(), equalTo(initializingShard3));
|
||||
assertFalse(shardRoutingIterator.hasNext());
|
||||
}
|
||||
|
||||
public void testUpdate() {
|
||||
ShardRouting startedShard0 = TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED);
|
||||
ShardRouting startedShard1 = TestShardRouting.newShardRouting("test", 1, "node-1", "node-2", false, ShardRoutingState.RELOCATING);
|
||||
|
@ -0,0 +1,163 @@
|
||||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Modifications Copyright OpenSearch Contributors. See
|
||||
* GitHub history for details.
|
||||
*/
|
||||
|
||||
package org.opensearch.cluster.routing;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.opensearch.Version;
|
||||
import org.opensearch.cluster.ClusterState;
|
||||
import org.opensearch.cluster.OpenSearchAllocationTestCase;
|
||||
import org.opensearch.cluster.metadata.IndexMetadata;
|
||||
import org.opensearch.cluster.metadata.Metadata;
|
||||
import org.opensearch.cluster.node.DiscoveryNodes;
|
||||
import org.opensearch.cluster.routing.allocation.AllocationService;
|
||||
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public class RoutingNodesTests extends OpenSearchAllocationTestCase {
|
||||
private static final String TEST_INDEX_1 = "test1";
|
||||
private static final String TEST_INDEX_2 = "test2";
|
||||
private RoutingTable emptyRoutingTable;
|
||||
private int numberOfShards;
|
||||
private int numberOfReplicas;
|
||||
private int shardsPerIndex;
|
||||
private int totalNumberOfShards;
|
||||
private static final Settings DEFAULT_SETTINGS = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
private final AllocationService ALLOCATION_SERVICE = createAllocationService(
|
||||
Settings.builder()
|
||||
.put("cluster.routing.allocation.node_concurrent_recoveries", Integer.MAX_VALUE) // don't limit recoveries
|
||||
.put("cluster.routing.allocation.node_initial_primaries_recoveries", Integer.MAX_VALUE)
|
||||
.put(
|
||||
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(),
|
||||
Integer.MAX_VALUE
|
||||
)
|
||||
.build()
|
||||
);
|
||||
private ClusterState clusterState;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
this.numberOfShards = 5;
|
||||
this.numberOfReplicas = 2;
|
||||
this.shardsPerIndex = this.numberOfShards * (this.numberOfReplicas + 1);
|
||||
this.totalNumberOfShards = this.shardsPerIndex * 2;
|
||||
logger.info("Setup test with {} shards and {} replicas.", this.numberOfShards, this.numberOfReplicas);
|
||||
this.emptyRoutingTable = new RoutingTable.Builder().build();
|
||||
Metadata metadata = Metadata.builder().put(createIndexMetadata(TEST_INDEX_1)).put(createIndexMetadata(TEST_INDEX_2)).build();
|
||||
|
||||
RoutingTable testRoutingTable = new RoutingTable.Builder().add(
|
||||
new IndexRoutingTable.Builder(metadata.index(TEST_INDEX_1).getIndex()).initializeAsNew(metadata.index(TEST_INDEX_1)).build()
|
||||
)
|
||||
.add(
|
||||
new IndexRoutingTable.Builder(metadata.index(TEST_INDEX_2).getIndex()).initializeAsNew(metadata.index(TEST_INDEX_2)).build()
|
||||
)
|
||||
.build();
|
||||
this.clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
|
||||
.metadata(metadata)
|
||||
.routingTable(testRoutingTable)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Puts primary shard index routings into initializing state
|
||||
*/
|
||||
private void initPrimaries() {
|
||||
logger.info("adding {} nodes and performing rerouting", this.numberOfReplicas + 1);
|
||||
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
|
||||
for (int i = 0; i < this.numberOfReplicas + 1; i++) {
|
||||
discoBuilder = discoBuilder.add(newNode("node" + i));
|
||||
}
|
||||
this.clusterState = ClusterState.builder(clusterState).nodes(discoBuilder).build();
|
||||
ClusterState rerouteResult = ALLOCATION_SERVICE.reroute(clusterState, "reroute");
|
||||
assertThat(rerouteResult, not(equalTo(this.clusterState)));
|
||||
this.clusterState = rerouteResult;
|
||||
}
|
||||
|
||||
/**
|
||||
* Moves initializing shards into started state
|
||||
*/
|
||||
private void startInitializingShards(String index) {
|
||||
clusterState = startInitializingShardsAndReroute(ALLOCATION_SERVICE, clusterState, index);
|
||||
}
|
||||
|
||||
private IndexMetadata.Builder createIndexMetadata(String indexName) {
|
||||
return new IndexMetadata.Builder(indexName).settings(DEFAULT_SETTINGS)
|
||||
.numberOfReplicas(this.numberOfReplicas)
|
||||
.numberOfShards(this.numberOfShards);
|
||||
}
|
||||
|
||||
public void testInterleavedShardIterator() {
|
||||
// Initialize all the shards for test index 1 and 2
|
||||
initPrimaries();
|
||||
startInitializingShards(TEST_INDEX_1);
|
||||
startInitializingShards(TEST_INDEX_1);
|
||||
startInitializingShards(TEST_INDEX_2);
|
||||
startInitializingShards(TEST_INDEX_2);
|
||||
|
||||
// Create primary shard count imbalance between two nodes
|
||||
final RoutingNode node0 = this.clusterState.getRoutingNodes().node("node0");
|
||||
final RoutingNode node1 = this.clusterState.getRoutingNodes().node("node1");
|
||||
final List<ShardRouting> shardRoutingList = node0.shardsWithState(TEST_INDEX_1, ShardRoutingState.STARTED);
|
||||
for (ShardRouting routing : shardRoutingList) {
|
||||
if (routing.primary()) {
|
||||
node0.remove(routing);
|
||||
ShardRouting swap = node1.getByShardId(routing.shardId());
|
||||
node0.add(swap);
|
||||
node1.remove(swap);
|
||||
node1.add(routing);
|
||||
}
|
||||
}
|
||||
|
||||
// Get primary first shard iterator and assert primary shards are iterated over first
|
||||
final Iterator<ShardRouting> iterator = this.clusterState.getRoutingNodes().nodeInterleavedShardIterator(true);
|
||||
boolean iteratingPrimary = true;
|
||||
int shardCount = 0;
|
||||
while (iterator.hasNext()) {
|
||||
final ShardRouting shard = iterator.next();
|
||||
if (iteratingPrimary) {
|
||||
iteratingPrimary = shard.primary();
|
||||
} else {
|
||||
assert shard.primary() == false;
|
||||
}
|
||||
shardCount++;
|
||||
}
|
||||
assert shardCount == this.totalNumberOfShards;
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user