diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 13d08590216..602a05c4321 100644 --- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -41,6 +42,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; +import java.util.ArrayList; +import java.util.List; import java.util.Map; /** @@ -59,7 +62,9 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { public boolean processExistingRecoveries(RoutingAllocation allocation) { boolean changed = false; MetaData metaData = allocation.metaData(); - for (RoutingNode routingNode : allocation.routingNodes()) { + RoutingNodes routingNodes = allocation.routingNodes(); + List> recoveriesToCancel = new ArrayList<>(); + for (RoutingNode routingNode : routingNodes) { for (ShardRouting shard : routingNode) { if (shard.primary() == true) { continue; @@ -104,14 +109,19 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { // so we found a better one, cancel this one logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]", currentNode, nodeWithHighestMatch); - allocation.routingNodes().moveToUnassigned(shard, new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, - "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]", - null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false)); + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, + "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+ nodeWithHighestMatch + "]", + null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false); + // don't cancel shard in the loop as it will cause a ConcurrentModificationException + recoveriesToCancel.add(new Tuple<>(shard, unassignedInfo)); changed = true; } } } } + for (Tuple cancellation : recoveriesToCancel) { + routingNodes.moveToUnassigned(cancellation.v1(), cancellation.v2()); + } return changed; }