From b39961b2a6ae880bfcfe17d5e80c834be705f15d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 3 Sep 2013 14:38:04 +0200 Subject: [PATCH] Prevent ShardAllocator to modify the unassigned while running allocations The unassinged list is used to make allocation decisions but is currently modified during allocation runs which causes primaries to be throttled during allocation. If this happens newly allocated indices can be stalled for a long time turning a cluster into a RED state if concurrent relocations and / or recoveries are happening. Closes #3610 --- .../allocator/BalancedShardsAllocator.java | 52 ++++++++++++++++--- .../decider/ThrottlingAllocationDecider.java | 12 +---- .../PreferPrimaryAllocationTests.java | 2 - 3 files changed, 46 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index bc774aec1c2..6a0e50ef0c8 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -334,14 +334,13 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards return new NodeSorter(nodesArray(), weight, this); } - private boolean initialize(RoutingNodes routing) { + private boolean initialize(RoutingNodes routing, List unassigned) { if (logger.isTraceEnabled()) { logger.trace("Start distributing Shards"); } - indices.addAll(allocation.routingTable().indicesRouting().keySet()); buildModelFromAssigned(routing.shards(assignedFilter)); - return allocateUnassigned(allocation.routingNodes().unassigned(), allocation.routingNodes().ignoredUnassigned()); + return allocateUnassigned(unassigned, routing.ignoredUnassigned()); } /** @@ -366,8 +365,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if (logger.isTraceEnabled()) { logger.trace("Start balancing cluster"); } - - boolean changed = initialize(allocation.routingNodes()); + final TransactionalList unassigned = new TransactionalList(allocation.routingNodes().unassigned()); + boolean changed = initialize(allocation.routingNodes(), unassigned); NodeSorter sorter = newNodeSorter(); if (nodes.size() > 1) { /* skip if we only have one node */ for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) { @@ -445,6 +444,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } } } + unassigned.commit(); return changed; } @@ -519,7 +519,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if (logger.isTraceEnabled()) { logger.trace("Try moving shard [{}] from [{}]", shard, node); } - boolean changed = initialize(allocation.routingNodes()); + final TransactionalList unassigned = new TransactionalList(allocation.routingNodes().unassigned()); + boolean changed = initialize(allocation.routingNodes(), unassigned); final ModelNode sourceNode = nodes.get(node.nodeId()); assert sourceNode != null; @@ -533,6 +534,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * This is not guaranteed to be balanced after this operation we still try best effort to * allocate on the minimal eligible node. */ + for (ModelNode currentNode : nodes) { if (currentNode.getNodeId().equals(node.nodeId())) { continue; @@ -549,10 +551,11 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if (logger.isTraceEnabled()) { logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId()); } - return true; + changed = true; + break; } } - + unassigned.commit(); return changed; } @@ -1039,4 +1042,37 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards return weights[weights.length - 1] - weights[0]; } } + + /** + * A list that makes a full copy of the original list and applies all + * modification to the copied list once {@link TransactionalList#commit()} + * is called. + * + */ + @SuppressWarnings("serial") + private static final class TransactionalList extends ArrayList { + + private final List originalList; + private List assertingList; // only with assert + + TransactionalList(List originalList) { + super(originalList); + assert copyAsseringList(originalList); + this.originalList = originalList; + } + + private boolean copyAsseringList(List orig) { + this.assertingList = new ArrayList(orig); + return true; + } + + public void commit() { + /* Ensure that the actual source list is not modified while + * the transaction is running */ + assert assertingList.equals(originalList) : "The list was modified outside of the scope"; + originalList.clear(); + originalList.addAll(this); + + } + } } diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 5c011d6e232..8af6d854776 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -74,16 +74,8 @@ public class ThrottlingAllocationDecider extends AllocationDecider { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (shardRouting.primary()) { - boolean primaryUnassigned = false; - List unassigned = allocation.routingNodes().unassigned(); - for (int i1 = 0; i1 < unassigned.size(); i1++) { - MutableShardRouting shard = unassigned.get(i1); - if (shard.shardId().equals(shardRouting.shardId())) { - primaryUnassigned = true; - break; - } - } - if (primaryUnassigned) { + assert shardRouting.unassigned() || shardRouting.active(); + if (shardRouting.unassigned()) { // primary is unassigned, means we are going to do recovery from gateway // count *just the primary* currently doing recovery on the node and check against concurrent_recoveries int primariesInRecovery = 0; diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/PreferPrimaryAllocationTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/PreferPrimaryAllocationTests.java index 8295d3e9ab6..3ff528eb8c4 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/PreferPrimaryAllocationTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/PreferPrimaryAllocationTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.test.unit.cluster.routing.allocation; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RoutingTable; @@ -45,7 +44,6 @@ public class PreferPrimaryAllocationTests extends ElasticsearchTestCase { private final ESLogger logger = Loggers.getLogger(PreferPrimaryAllocationTests.class); - @LuceneTestCase.AwaitsFix(bugUrl = "seems like unassigned is cleared so throttling for primaries is not properly working") @Test public void testPreferPrimaryAllocationOverReplicas() { logger.info("create an allocation with 1 initial recoveries");