Fix ConcurrentModificationException in ReplicaShardAllocator

This commit is contained in:
Yannick Welsch 2016-05-31 19:42:17 +02:00
parent 7df5fcd4b6
commit ad093a78c5
1 changed files with 14 additions and 4 deletions

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
@ -41,6 +42,8 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@ -59,7 +62,9 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
public boolean processExistingRecoveries(RoutingAllocation allocation) { public boolean processExistingRecoveries(RoutingAllocation allocation) {
boolean changed = false; boolean changed = false;
MetaData metaData = allocation.metaData(); MetaData metaData = allocation.metaData();
for (RoutingNode routingNode : allocation.routingNodes()) { RoutingNodes routingNodes = allocation.routingNodes();
List<Tuple<ShardRouting, UnassignedInfo>> recoveriesToCancel = new ArrayList<>();
for (RoutingNode routingNode : routingNodes) {
for (ShardRouting shard : routingNode) { for (ShardRouting shard : routingNode) {
if (shard.primary() == true) { if (shard.primary() == true) {
continue; continue;
@ -104,14 +109,19 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
// so we found a better one, cancel this one // so we found a better one, cancel this one
logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]", logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]",
currentNode, nodeWithHighestMatch); currentNode, nodeWithHighestMatch);
allocation.routingNodes().moveToUnassigned(shard, new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]", "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+ nodeWithHighestMatch + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false)); null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false);
// don't cancel shard in the loop as it will cause a ConcurrentModificationException
recoveriesToCancel.add(new Tuple<>(shard, unassignedInfo));
changed = true; changed = true;
} }
} }
} }
} }
for (Tuple<ShardRouting, UnassignedInfo> cancellation : recoveriesToCancel) {
routingNodes.moveToUnassigned(cancellation.v1(), cancellation.v2());
}
return changed; return changed;
} }