Simplify handling of ignored unassigned shards
Fold ignored unassigned to a UnassignedShards and have simpler handling of them. Also remove the trapy way of adding an ignored unassigned shards today directly to the list, and have dedicated methods for it. This change also removes the useless moving of unassigned shards to the end, since anyhow we first, sort those unassigned shards, and second, we now have persistent "store exceptions" that should not cause "dead letter" shard allocation.
This commit is contained in:
parent
ca3e0c6d49
commit
32445bbc3a
|
@ -52,9 +52,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
|
|
||||||
private final Map<String, RoutingNode> nodesToShards = newHashMap();
|
private final Map<String, RoutingNode> nodesToShards = newHashMap();
|
||||||
|
|
||||||
private final UnassignedShards unassignedShards = new UnassignedShards();
|
private final UnassignedShards unassignedShards = new UnassignedShards(this);
|
||||||
|
|
||||||
private final List<ShardRouting> ignoredUnassignedShards = newArrayList();
|
|
||||||
|
|
||||||
private final Map<ShardId, List<ShardRouting>> assignedShards = newHashMap();
|
private final Map<ShardId, List<ShardRouting>> assignedShards = newHashMap();
|
||||||
|
|
||||||
|
@ -185,10 +183,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
return !unassignedShards.isEmpty();
|
return !unassignedShards.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<ShardRouting> ignoredUnassigned() {
|
|
||||||
return this.ignoredUnassignedShards;
|
|
||||||
}
|
|
||||||
|
|
||||||
public UnassignedShards unassigned() {
|
public UnassignedShards unassigned() {
|
||||||
return this.unassignedShards;
|
return this.unassignedShards;
|
||||||
}
|
}
|
||||||
|
@ -526,9 +520,11 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public final static class UnassignedShards implements Iterable<ShardRouting> {
|
public static final class UnassignedShards implements Iterable<ShardRouting> {
|
||||||
|
|
||||||
|
private final RoutingNodes nodes;
|
||||||
private final List<ShardRouting> unassigned;
|
private final List<ShardRouting> unassigned;
|
||||||
|
private final List<ShardRouting> ignored;
|
||||||
|
|
||||||
private int primaries = 0;
|
private int primaries = 0;
|
||||||
private long transactionId = 0;
|
private long transactionId = 0;
|
||||||
|
@ -536,14 +532,18 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
private final long sourceTransactionId;
|
private final long sourceTransactionId;
|
||||||
|
|
||||||
public UnassignedShards(UnassignedShards other) {
|
public UnassignedShards(UnassignedShards other) {
|
||||||
|
this.nodes = other.nodes;
|
||||||
source = other;
|
source = other;
|
||||||
sourceTransactionId = other.transactionId;
|
sourceTransactionId = other.transactionId;
|
||||||
unassigned = new ArrayList<>(other.unassigned);
|
unassigned = new ArrayList<>(other.unassigned);
|
||||||
|
ignored = new ArrayList<>(other.ignored);
|
||||||
primaries = other.primaries;
|
primaries = other.primaries;
|
||||||
}
|
}
|
||||||
|
|
||||||
public UnassignedShards() {
|
public UnassignedShards(RoutingNodes nodes) {
|
||||||
|
this.nodes = nodes;
|
||||||
unassigned = new ArrayList<>();
|
unassigned = new ArrayList<>();
|
||||||
|
ignored = new ArrayList<>();
|
||||||
source = null;
|
source = null;
|
||||||
sourceTransactionId = -1;
|
sourceTransactionId = -1;
|
||||||
}
|
}
|
||||||
|
@ -556,12 +556,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
transactionId++;
|
transactionId++;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addAll(Collection<ShardRouting> mutableShardRoutings) {
|
|
||||||
for (ShardRouting r : mutableShardRoutings) {
|
|
||||||
add(r);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void sort(Comparator<ShardRouting> comparator) {
|
public void sort(Comparator<ShardRouting> comparator) {
|
||||||
CollectionUtil.timSort(unassigned, comparator);
|
CollectionUtil.timSort(unassigned, comparator);
|
||||||
}
|
}
|
||||||
|
@ -575,29 +569,87 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<ShardRouting> iterator() {
|
public UnassignedIterator iterator() {
|
||||||
final Iterator<ShardRouting> iterator = unassigned.iterator();
|
return new UnassignedIterator();
|
||||||
return new Iterator<ShardRouting>() {
|
}
|
||||||
private ShardRouting current;
|
|
||||||
@Override
|
|
||||||
public boolean hasNext() {
|
|
||||||
return iterator.hasNext();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
/**
|
||||||
public ShardRouting next() {
|
* The list of ignored unassigned shards (read only). The ignored unassigned shards
|
||||||
return current = iterator.next();
|
* are not part of the formal unassigned list, but are kept around and used to build
|
||||||
}
|
* back the list of unassigned shards as part of the routing table.
|
||||||
|
*/
|
||||||
|
public List<ShardRouting> ignored() {
|
||||||
|
return Collections.unmodifiableList(ignored);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
/**
|
||||||
public void remove() {
|
* Adds a shard to the ignore unassigned list. Should be used with caution, typically,
|
||||||
iterator.remove();
|
* the correct usage is to removeAndIgnore from the iterator.
|
||||||
if (current.primary()) {
|
*/
|
||||||
primaries--;
|
public void ignoreShard(ShardRouting shard) {
|
||||||
}
|
ignored.add(shard);
|
||||||
transactionId++;
|
transactionId++;
|
||||||
|
}
|
||||||
|
|
||||||
|
public class UnassignedIterator implements Iterator<ShardRouting> {
|
||||||
|
|
||||||
|
private final Iterator<ShardRouting> iterator;
|
||||||
|
private ShardRouting current;
|
||||||
|
|
||||||
|
public UnassignedIterator() {
|
||||||
|
this.iterator = unassigned.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return iterator.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ShardRouting next() {
|
||||||
|
return current = iterator.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes the current unassigned shard and moves it from the unassigned list.
|
||||||
|
*/
|
||||||
|
public void initialize(String nodeId) {
|
||||||
|
initialize(nodeId, current.version());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes the current unassigned shard and moves it from the unassigned list.
|
||||||
|
*/
|
||||||
|
public void initialize(String nodeId, long version) {
|
||||||
|
innerRemove();
|
||||||
|
nodes.initialize(new ShardRouting(current, version), nodeId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes and ignores the unassigned shard (will be ignored for this run, but
|
||||||
|
* will be added back to unassigned once the metadata is constructed again).
|
||||||
|
*/
|
||||||
|
public void removeAndIgnore() {
|
||||||
|
innerRemove();
|
||||||
|
ignoreShard(current);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unsupported operation, just there for the interface. Use {@link #removeAndIgnore()} or
|
||||||
|
* {@link #initialize(String)}.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException("remove is not supported in unassigned iterator, use removeAndIgnore or initialize");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void innerRemove() {
|
||||||
|
iterator.remove();
|
||||||
|
if (current.primary()) {
|
||||||
|
primaries--;
|
||||||
}
|
}
|
||||||
};
|
transactionId++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isEmpty() {
|
public boolean isEmpty() {
|
||||||
|
@ -611,16 +663,19 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
public void clear() {
|
public void clear() {
|
||||||
transactionId++;
|
transactionId++;
|
||||||
unassigned.clear();
|
unassigned.clear();
|
||||||
|
ignored.clear();
|
||||||
primaries = 0;
|
primaries = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void transactionEnd(UnassignedShards shards) {
|
public void transactionEnd(UnassignedShards shards) {
|
||||||
assert shards.source == this && shards.sourceTransactionId == transactionId :
|
assert shards.source == this && shards.sourceTransactionId == transactionId :
|
||||||
"Expected ID: " + shards.sourceTransactionId + " actual: " + transactionId + " Expected Source: " + shards.source + " actual: " + this;
|
"Expected ID: " + shards.sourceTransactionId + " actual: " + transactionId + " Expected Source: " + shards.source + " actual: " + this;
|
||||||
transactionId++;
|
transactionId++;
|
||||||
this.unassigned.clear();
|
this.unassigned.clear();
|
||||||
this.unassigned.addAll(shards.unassigned);
|
this.unassigned.addAll(shards.unassigned);
|
||||||
this.primaries = shards.primaries;
|
this.ignored.clear();
|
||||||
|
this.ignored.addAll(shards.ignored);
|
||||||
|
this.primaries = shards.primaries;
|
||||||
}
|
}
|
||||||
|
|
||||||
public UnassignedShards transactionBegin() {
|
public UnassignedShards transactionBegin() {
|
||||||
|
|
|
@ -362,7 +362,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
|
||||||
indexBuilder.addShard(refData, shardRoutingEntry);
|
indexBuilder.addShard(refData, shardRoutingEntry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (ShardRouting shardRoutingEntry : Iterables.concat(routingNodes.unassigned(), routingNodes.ignoredUnassigned())) {
|
for (ShardRouting shardRoutingEntry : Iterables.concat(routingNodes.unassigned(), routingNodes.unassigned().ignored())) {
|
||||||
String index = shardRoutingEntry.index();
|
String index = shardRoutingEntry.index();
|
||||||
IndexRoutingTable.Builder indexBuilder = indexRoutingTableBuilders.get(index);
|
IndexRoutingTable.Builder indexBuilder = indexRoutingTableBuilders.get(index);
|
||||||
if (indexBuilder == null) {
|
if (indexBuilder == null) {
|
||||||
|
|
|
@ -460,22 +460,6 @@ public class AllocationService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// move all the shards matching the failed shard to the end of the unassigned list
|
|
||||||
// so we give a chance for other allocations and won't create poison failed allocations
|
|
||||||
// that can keep other shards from being allocated (because of limits applied on how many
|
|
||||||
// shards we can start per node)
|
|
||||||
List<ShardRouting> shardsToMove = Lists.newArrayList();
|
|
||||||
for (Iterator<ShardRouting> unassignedIt = routingNodes.unassigned().iterator(); unassignedIt.hasNext(); ) {
|
|
||||||
ShardRouting unassignedShardRouting = unassignedIt.next();
|
|
||||||
if (unassignedShardRouting.shardId().equals(failedShard.shardId())) {
|
|
||||||
unassignedIt.remove();
|
|
||||||
shardsToMove.add(unassignedShardRouting);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!shardsToMove.isEmpty()) {
|
|
||||||
routingNodes.unassigned().addAll(shardsToMove);
|
|
||||||
}
|
|
||||||
|
|
||||||
matchedNode.moveToUnassigned(unassignedInfo);
|
matchedNode.moveToUnassigned(unassignedInfo);
|
||||||
}
|
}
|
||||||
assert matchedNode.isRemoved() : "failedShard " + failedShard + " was matched but wasn't removed";
|
assert matchedNode.isRemoved() : "failedShard " + failedShard + " was matched but wasn't removed";
|
||||||
|
|
|
@ -41,7 +41,6 @@ import org.elasticsearch.node.settings.NodeSettingsService;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
|
||||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -292,7 +291,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
}
|
}
|
||||||
indices.addAll(allocation.routingTable().indicesRouting().keySet());
|
indices.addAll(allocation.routingTable().indicesRouting().keySet());
|
||||||
buildModelFromAssigned(routing.shards(assignedFilter));
|
buildModelFromAssigned(routing.shards(assignedFilter));
|
||||||
return allocateUnassigned(unassigned, routing.ignoredUnassigned());
|
return allocateUnassigned(unassigned);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static float absDelta(float lower, float higher) {
|
private static float absDelta(float lower, float higher) {
|
||||||
|
@ -551,7 +550,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
* Allocates all given shards on the minimal eligable node for the shards index
|
* Allocates all given shards on the minimal eligable node for the shards index
|
||||||
* with respect to the weight function. All given shards must be unassigned.
|
* with respect to the weight function. All given shards must be unassigned.
|
||||||
*/
|
*/
|
||||||
private boolean allocateUnassigned(RoutingNodes.UnassignedShards unassigned, List<ShardRouting> ignoredUnassigned) {
|
private boolean allocateUnassigned(RoutingNodes.UnassignedShards unassigned) {
|
||||||
assert !nodes.isEmpty();
|
assert !nodes.isEmpty();
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Start allocating unassigned shards");
|
logger.trace("Start allocating unassigned shards");
|
||||||
|
@ -600,9 +599,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
if (!shard.primary()) {
|
if (!shard.primary()) {
|
||||||
boolean drop = deciders.canAllocate(shard, allocation).type() == Type.NO;
|
boolean drop = deciders.canAllocate(shard, allocation).type() == Type.NO;
|
||||||
if (drop) {
|
if (drop) {
|
||||||
ignoredUnassigned.add(shard);
|
unassigned.ignoreShard(shard);
|
||||||
while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) {
|
while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) {
|
||||||
ignoredUnassigned.add(primary[++i]);
|
unassigned.ignoreShard(primary[++i]);
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -706,10 +705,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
} else if (logger.isTraceEnabled()) {
|
} else if (logger.isTraceEnabled()) {
|
||||||
logger.trace("No Node found to assign shard [{}]", shard);
|
logger.trace("No Node found to assign shard [{}]", shard);
|
||||||
}
|
}
|
||||||
ignoredUnassigned.add(shard);
|
unassigned.ignoreShard(shard);
|
||||||
if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas
|
if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas
|
||||||
while(secondaryLength > 0 && comparator.compare(shard, secondary[secondaryLength-1]) == 0) {
|
while(secondaryLength > 0 && comparator.compare(shard, secondary[secondaryLength-1]) == 0) {
|
||||||
ignoredUnassigned.add(secondary[--secondaryLength]);
|
unassigned.ignoreShard(secondary[--secondaryLength]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,12 +220,11 @@ public class AllocateAllocationCommand implements AllocationCommand {
|
||||||
throw new IllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision);
|
throw new IllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision);
|
||||||
}
|
}
|
||||||
// go over and remove it from the unassigned
|
// go over and remove it from the unassigned
|
||||||
for (Iterator<ShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
|
for (RoutingNodes.UnassignedShards.UnassignedIterator it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
|
||||||
if (it.next() != shardRouting) {
|
if (it.next() != shardRouting) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
it.remove();
|
it.initialize(routingNode.nodeId());
|
||||||
routingNodes.initialize(shardRouting, routingNode.nodeId());
|
|
||||||
if (shardRouting.primary()) {
|
if (shardRouting.primary()) {
|
||||||
// we need to clear the post allocation flag, since its an explicit allocation of the primary shard
|
// we need to clear the post allocation flag, since its an explicit allocation of the primary shard
|
||||||
// and we want to force allocate it (and create a new index for it)
|
// and we want to force allocate it (and create a new index for it)
|
||||||
|
|
|
@ -58,7 +58,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||||
final MetaData metaData = routingNodes.metaData();
|
final MetaData metaData = routingNodes.metaData();
|
||||||
|
|
||||||
final Iterator<ShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
|
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
|
||||||
while (unassignedIterator.hasNext()) {
|
while (unassignedIterator.hasNext()) {
|
||||||
ShardRouting shard = unassignedIterator.next();
|
ShardRouting shard = unassignedIterator.next();
|
||||||
|
|
||||||
|
@ -69,8 +69,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||||
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
|
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
|
||||||
if (shardState.hasData() == false) {
|
if (shardState.hasData() == false) {
|
||||||
logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
|
logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
|
||||||
unassignedIterator.remove();
|
unassignedIterator.removeAndIgnore();
|
||||||
routingNodes.ignoredUnassigned().add(shard);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,8 +82,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||||
// if we are restoring this shard we still can allocate
|
// if we are restoring this shard we still can allocate
|
||||||
if (shard.restoreSource() == null) {
|
if (shard.restoreSource() == null) {
|
||||||
// we can't really allocate, so ignore it and continue
|
// we can't really allocate, so ignore it and continue
|
||||||
unassignedIterator.remove();
|
unassignedIterator.removeAndIgnore();
|
||||||
routingNodes.ignoredUnassigned().add(shard);
|
|
||||||
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound);
|
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound);
|
||||||
} else {
|
} else {
|
||||||
logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource());
|
logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource());
|
||||||
|
@ -97,19 +95,16 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||||
DiscoveryNode node = nodesToAllocate.yesNodes.get(0);
|
DiscoveryNode node = nodesToAllocate.yesNodes.get(0);
|
||||||
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
|
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
|
||||||
changed = true;
|
changed = true;
|
||||||
routingNodes.initialize(new ShardRouting(shard, nodesAndVersions.highestVersion), node.id());
|
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion);
|
||||||
unassignedIterator.remove();
|
|
||||||
} else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) {
|
} else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) {
|
||||||
DiscoveryNode node = nodesToAllocate.noNodes.get(0);
|
DiscoveryNode node = nodesToAllocate.noNodes.get(0);
|
||||||
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
|
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
|
||||||
changed = true;
|
changed = true;
|
||||||
routingNodes.initialize(new ShardRouting(shard, nodesAndVersions.highestVersion), node.id());
|
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion);
|
||||||
unassignedIterator.remove();
|
|
||||||
} else {
|
} else {
|
||||||
// we are throttling this, but we have enough to allocate to this node, ignore it for now
|
// we are throttling this, but we have enough to allocate to this node, ignore it for now
|
||||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes);
|
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes);
|
||||||
unassignedIterator.remove();
|
unassignedIterator.removeAndIgnore();
|
||||||
routingNodes.ignoredUnassigned().add(shard);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return changed;
|
return changed;
|
||||||
|
|
|
@ -52,7 +52,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||||
final MetaData metaData = routingNodes.metaData();
|
final MetaData metaData = routingNodes.metaData();
|
||||||
|
|
||||||
final Iterator<ShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
|
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
|
||||||
while (unassignedIterator.hasNext()) {
|
while (unassignedIterator.hasNext()) {
|
||||||
ShardRouting shard = unassignedIterator.next();
|
ShardRouting shard = unassignedIterator.next();
|
||||||
if (shard.primary()) {
|
if (shard.primary()) {
|
||||||
|
@ -77,16 +77,14 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||||
|
|
||||||
if (!canBeAllocatedToAtLeastOneNode) {
|
if (!canBeAllocatedToAtLeastOneNode) {
|
||||||
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
|
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
|
||||||
unassignedIterator.remove();
|
unassignedIterator.removeAndIgnore();
|
||||||
routingNodes.ignoredUnassigned().add(shard);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetchData(shard, allocation);
|
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetchData(shard, allocation);
|
||||||
if (shardStores.hasData() == false) {
|
if (shardStores.hasData() == false) {
|
||||||
logger.trace("{}: ignoring allocation, still fetching shard stores", shard);
|
logger.trace("{}: ignoring allocation, still fetching shard stores", shard);
|
||||||
unassignedIterator.remove();
|
unassignedIterator.removeAndIgnore();
|
||||||
routingNodes.ignoredUnassigned().add(shard);
|
|
||||||
continue; // still fetching
|
continue; // still fetching
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -175,16 +173,14 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
|
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
|
||||||
}
|
}
|
||||||
// we are throttling this, but we have enough to allocate to this node, ignore it for now
|
// we are throttling this, but we have enough to allocate to this node, ignore it for now
|
||||||
unassignedIterator.remove();
|
unassignedIterator.removeAndIgnore();
|
||||||
routingNodes.ignoredUnassigned().add(shard);
|
|
||||||
} else {
|
} else {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
|
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
|
||||||
}
|
}
|
||||||
// we found a match
|
// we found a match
|
||||||
changed = true;
|
changed = true;
|
||||||
routingNodes.initialize(shard, lastNodeMatched.nodeId());
|
unassignedIterator.initialize(lastNodeMatched.nodeId());
|
||||||
unassignedIterator.remove();
|
|
||||||
}
|
}
|
||||||
} else if (hasReplicaData == false) {
|
} else if (hasReplicaData == false) {
|
||||||
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation
|
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation
|
||||||
|
@ -200,8 +196,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||||
* see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}).
|
* see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}).
|
||||||
*/
|
*/
|
||||||
changed = true;
|
changed = true;
|
||||||
unassignedIterator.remove();
|
unassignedIterator.removeAndIgnore();
|
||||||
routingNodes.ignoredUnassigned().add(shard);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,8 +77,8 @@ public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase
|
||||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
|
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
|
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
|
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -90,8 +90,8 @@ public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase
|
||||||
testAllocator.addData(node1, -1);
|
testAllocator.addData(node1, -1);
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
|
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
|
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -103,8 +103,8 @@ public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase
|
||||||
testAllocator.addData(node1, 3, new CorruptIndexException("test", "test"));
|
testAllocator.addData(node1, 3, new CorruptIndexException("test", "test"));
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
|
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
|
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -116,7 +116,7 @@ public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase
|
||||||
testAllocator.addData(node1, 10);
|
testAllocator.addData(node1, 10);
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().isEmpty(), equalTo(true));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id()));
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id()));
|
||||||
}
|
}
|
||||||
|
@ -131,8 +131,8 @@ public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase
|
||||||
testAllocator.addData(node1, 10);
|
testAllocator.addData(node1, 10);
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
|
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
|
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -145,7 +145,7 @@ public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase
|
||||||
testAllocator.addData(node1, 10);
|
testAllocator.addData(node1, 10);
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().isEmpty(), equalTo(true));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id()));
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id()));
|
||||||
}
|
}
|
||||||
|
@ -159,7 +159,7 @@ public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase
|
||||||
testAllocator.addData(node1, 10).addData(node2, 12);
|
testAllocator.addData(node1, 10).addData(node2, 12);
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().isEmpty(), equalTo(true));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
|
||||||
}
|
}
|
||||||
|
@ -185,7 +185,7 @@ public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase
|
||||||
testAllocator.addData(node1, -1).addData(node2, -1);
|
testAllocator.addData(node1, -1).addData(node2, -1);
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().isEmpty(), equalTo(true));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -208,23 +208,23 @@ public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase
|
||||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
|
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
|
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
|
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||||
|
|
||||||
testAllocator.addData(node1, 1);
|
testAllocator.addData(node1, 1);
|
||||||
allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
|
allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
|
||||||
changed = testAllocator.allocateUnassigned(allocation);
|
changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
|
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
|
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||||
|
|
||||||
testAllocator.addData(node2, 1);
|
testAllocator.addData(node2, 1);
|
||||||
allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
|
allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
|
||||||
changed = testAllocator.allocateUnassigned(allocation);
|
changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(0));
|
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), anyOf(equalTo(node2.id()), equalTo(node1.id())));
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), anyOf(equalTo(node2.id()), equalTo(node1.id())));
|
||||||
|
@ -250,23 +250,23 @@ public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase
|
||||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
|
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
|
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
|
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||||
|
|
||||||
testAllocator.addData(node1, 1);
|
testAllocator.addData(node1, 1);
|
||||||
allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
|
allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
|
||||||
changed = testAllocator.allocateUnassigned(allocation);
|
changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
|
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
|
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||||
|
|
||||||
testAllocator.addData(node2, 2);
|
testAllocator.addData(node2, 2);
|
||||||
allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
|
allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
|
||||||
changed = testAllocator.allocateUnassigned(allocation);
|
changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(0));
|
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
|
||||||
|
|
|
@ -30,7 +30,7 @@ import java.util.Map;
|
||||||
public class PriorityComparatorTests extends ElasticsearchTestCase {
|
public class PriorityComparatorTests extends ElasticsearchTestCase {
|
||||||
|
|
||||||
public void testPriorityComparatorSort() {
|
public void testPriorityComparatorSort() {
|
||||||
RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards();
|
RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards((RoutingNodes) null);
|
||||||
int numIndices = randomIntBetween(3, 99);
|
int numIndices = randomIntBetween(3, 99);
|
||||||
IndexMeta[] indices = new IndexMeta[numIndices];
|
IndexMeta[] indices = new IndexMeta[numIndices];
|
||||||
final Map<String, IndexMeta> map = new HashMap<>();
|
final Map<String, IndexMeta> map = new HashMap<>();
|
||||||
|
|
Loading…
Reference in New Issue