From 94f19d7e3761b86a443c9303cc7d5192b8bc3465 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 9 Feb 2016 11:59:10 +0100 Subject: [PATCH] Reuse existing allocation id for primary shard allocation Closes #16530 --- .../cluster/routing/AllocationId.java | 7 + .../cluster/routing/RoutingNodes.java | 15 +- .../cluster/routing/ShardRouting.java | 10 +- .../allocator/BalancedShardsAllocator.java | 4 +- .../AbstractAllocateAllocationCommand.java | 2 +- .../gateway/PrimaryShardAllocator.java | 162 +++++++++--------- .../gateway/ReplicaShardAllocator.java | 2 +- ...ransportNodesListGatewayStartedShards.java | 23 +++ .../cluster/routing/AllocationIdTests.java | 10 +- .../routing/RandomShardRoutingMutator.java | 2 +- .../cluster/routing/ShardRoutingHelper.java | 6 +- .../cluster/routing/ShardRoutingTests.java | 2 +- .../cluster/routing/UnassignedInfoTests.java | 2 +- .../allocation/BalanceConfigurationTests.java | 20 +-- .../gateway/PrimaryShardAllocatorTests.java | 7 +- 15 files changed, 156 insertions(+), 118 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java b/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java index 528ed8b1c3f..a5e96e60e64 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java @@ -96,6 +96,13 @@ public class AllocationId implements ToXContent { return new AllocationId(Strings.randomBase64UUID(), null); } + /** + * Creates a new allocation id for initializing allocation based on an existing id. + */ + public static AllocationId newInitializing(String existingAllocationId) { + return new AllocationId(existingAllocationId, null); + } + /** * Creates a new allocation id for the target initializing shard that is the result * of a relocation. diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 02bcea4ff2d..c1a5f3ff208 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.index.Index; @@ -420,11 +421,13 @@ public class RoutingNodes implements Iterable { /** * Moves a shard from unassigned to initialize state + * + * @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated. */ - public void initialize(ShardRouting shard, String nodeId, long expectedSize) { + public void initialize(ShardRouting shard, String nodeId, @Nullable String existingAllocationId, long expectedSize) { ensureMutable(); assert shard.unassigned() : shard; - shard.initialize(nodeId, expectedSize); + shard.initialize(nodeId, existingAllocationId, expectedSize); node(nodeId).add(shard); inactiveShardCount++; if (shard.primary()) { @@ -692,10 +695,12 @@ public class RoutingNodes implements Iterable { /** * Initializes the current unassigned shard and moves it from the unassigned list. + * + * @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated. */ - public void initialize(String nodeId, long expectedShardSize) { + public void initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize) { innerRemove(); - nodes.initialize(new ShardRouting(current), nodeId, expectedShardSize); + nodes.initialize(new ShardRouting(current), nodeId, existingAllocationId, expectedShardSize); } /** @@ -711,7 +716,7 @@ public class RoutingNodes implements Iterable { /** * Unsupported operation, just there for the interface. Use {@link #removeAndIgnore()} or - * {@link #initialize(String, long)}. + * {@link #initialize(String, String, long)}. */ @Override public void remove() { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 7535aa1226e..336b6547de4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -410,14 +410,20 @@ public final class ShardRouting implements Streamable, ToXContent { /** * Initializes an unassigned shard on a node. + * + * @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated. */ - void initialize(String nodeId, long expectedShardSize) { + void initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize) { ensureNotFrozen(); assert state == ShardRoutingState.UNASSIGNED : this; assert relocatingNodeId == null : this; state = ShardRoutingState.INITIALIZING; currentNodeId = nodeId; - allocationId = AllocationId.newInitializing(); + if (existingAllocationId == null) { + allocationId = AllocationId.newInitializing(); + } else { + allocationId = AllocationId.newInitializing(existingAllocationId); + } this.expectedShardSize = expectedShardSize; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 15c303a2f70..e12020cfa74 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -702,7 +702,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if (logger.isTraceEnabled()) { logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); } - routingNodes.initialize(shard, minNode.getNodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); + routingNodes.initialize(shard, minNode.getNodeId(), null, allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); changed = true; continue; // don't add to ignoreUnassigned } else { @@ -790,7 +790,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards routingNodes.relocate(candidate, minNode.getNodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); } else { - routingNodes.initialize(candidate, minNode.getNodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); + routingNodes.initialize(candidate, minNode.getNodeId(), null, allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); } return true; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AbstractAllocateAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AbstractAllocateAllocationCommand.java index 5a13b3b9683..3a89507871e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AbstractAllocateAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AbstractAllocateAllocationCommand.java @@ -242,7 +242,7 @@ public abstract class AbstractAllocateAllocationCommand implements AllocationCom if (shardRoutingChanges != null) { shardRoutingChanges.accept(unassigned); } - it.initialize(routingNode.nodeId(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); + it.initialize(routingNode.nodeId(), null, allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); return; } assert false : "shard to initialize not found in list of unassigned shards"; diff --git a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java index 012b33d7571..8809f68853b 100644 --- a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java @@ -32,15 +32,14 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; import org.elasticsearch.index.shard.ShardStateMetaData; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -98,7 +97,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { continue; } - final AsyncShardFetch.FetchResult shardState = fetchData(shard, allocation); + final AsyncShardFetch.FetchResult shardState = fetchData(shard, allocation); if (shardState.hasData() == false) { logger.trace("{}: ignoring allocation, still fetching shard started state", shard); allocation.setHasPendingAsyncFetch(); @@ -110,7 +109,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { final boolean snapshotRestore = shard.restoreSource() != null; final boolean recoverOnAnyNode = recoverOnAnyNode(indexMetaData); - final NodesResult nodesResult; + final NodeShardsResult nodeShardsResult; final boolean enoughAllocationsFound; if (lastActiveAllocationIds.isEmpty()) { @@ -118,20 +117,20 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { // when we load an old index (after upgrading cluster) or restore a snapshot of an old index // fall back to old version-based allocation mode // Note that once the shard has been active, lastActiveAllocationIds will be non-empty - nodesResult = buildVersionBasedNodes(shard, snapshotRestore || recoverOnAnyNode, allocation.getIgnoreNodes(shard.shardId()), shardState); + nodeShardsResult = buildVersionBasedNodeShardsResult(shard, snapshotRestore || recoverOnAnyNode, allocation.getIgnoreNodes(shard.shardId()), shardState); if (snapshotRestore || recoverOnAnyNode) { - enoughAllocationsFound = nodesResult.allocationsFound > 0; + enoughAllocationsFound = nodeShardsResult.allocationsFound > 0; } else { - enoughAllocationsFound = isEnoughVersionBasedAllocationsFound(shard, indexMetaData, nodesResult); + enoughAllocationsFound = isEnoughVersionBasedAllocationsFound(indexMetaData, nodeShardsResult); } - logger.debug("[{}][{}]: version-based allocation for pre-{} index found {} allocations of {}", shard.index(), shard.id(), Version.V_3_0_0, nodesResult.allocationsFound, shard); + logger.debug("[{}][{}]: version-based allocation for pre-{} index found {} allocations of {}", shard.index(), shard.id(), Version.V_3_0_0, nodeShardsResult.allocationsFound, shard); } else { assert lastActiveAllocationIds.isEmpty() == false; // use allocation ids to select nodes - nodesResult = buildAllocationIdBasedNodes(shard, snapshotRestore || recoverOnAnyNode, + nodeShardsResult = buildAllocationIdBasedNodeShardsResult(shard, snapshotRestore || recoverOnAnyNode, allocation.getIgnoreNodes(shard.shardId()), lastActiveAllocationIds, shardState); - enoughAllocationsFound = nodesResult.allocationsFound > 0; - logger.debug("[{}][{}]: found {} allocations of {} based on allocation ids: [{}]", shard.index(), shard.id(), nodesResult.allocationsFound, shard, lastActiveAllocationIds); + enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0; + logger.debug("[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", shard.index(), shard.id(), nodeShardsResult.orderedAllocationCandidates.size(), shard, lastActiveAllocationIds); } if (enoughAllocationsFound == false){ @@ -144,25 +143,25 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { } else { // we can't really allocate, so ignore it and continue unassignedIterator.removeAndIgnore(); - logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodesResult.allocationsFound); + logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodeShardsResult.allocationsFound); } continue; } - final NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodesResult.nodes); - if (nodesToAllocate.yesNodes.isEmpty() == false) { - DiscoveryNode node = nodesToAllocate.yesNodes.get(0); - logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node); + final NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodeShardsResult.orderedAllocationCandidates); + if (nodesToAllocate.yesNodeShards.isEmpty() == false) { + NodeGatewayStartedShards nodeShardState = nodesToAllocate.yesNodeShards.get(0); + logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeShardState.getNode()); changed = true; - unassignedIterator.initialize(node.id(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); - } else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) { - DiscoveryNode node = nodesToAllocate.noNodes.get(0); - logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node); + unassignedIterator.initialize(nodeShardState.getNode().id(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + } else if (nodesToAllocate.throttleNodeShards.isEmpty() == true && nodesToAllocate.noNodeShards.isEmpty() == false) { + NodeGatewayStartedShards nodeShardState = nodesToAllocate.noNodeShards.get(0); + logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeShardState.getNode()); changed = true; - unassignedIterator.initialize(node.id(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + unassignedIterator.initialize(nodeShardState.getNode().id(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); } else { // we are throttling this, but we have enough to allocate to this node, ignore it for now - logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes); + logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodeShards); unassignedIterator.removeAndIgnore(); } } @@ -174,11 +173,12 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { * lastActiveAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but * entries with matching allocation id are always at the front of the list. */ - protected NodesResult buildAllocationIdBasedNodes(ShardRouting shard, boolean matchAnyShard, Set ignoreNodes, - Set lastActiveAllocationIds, AsyncShardFetch.FetchResult shardState) { - LinkedList matchingNodes = new LinkedList<>(); - LinkedList nonMatchingNodes = new LinkedList<>(); - for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { + protected NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard, Set ignoreNodes, + Set lastActiveAllocationIds, AsyncShardFetch.FetchResult shardState) { + LinkedList matchingNodeShardStates = new LinkedList<>(); + LinkedList nonMatchingNodeShardStates = new LinkedList<>(); + int numberOfAllocationsFound = 0; + for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { DiscoveryNode node = nodeShardState.getNode(); String allocationId = nodeShardState.allocationId(); @@ -199,36 +199,37 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { } if (allocationId != null) { + numberOfAllocationsFound++; if (lastActiveAllocationIds.contains(allocationId)) { if (nodeShardState.primary()) { - matchingNodes.addFirst(node); + matchingNodeShardStates.addFirst(nodeShardState); } else { - matchingNodes.addLast(node); + matchingNodeShardStates.addLast(nodeShardState); } } else if (matchAnyShard) { if (nodeShardState.primary()) { - nonMatchingNodes.addFirst(node); + nonMatchingNodeShardStates.addFirst(nodeShardState); } else { - nonMatchingNodes.addLast(node); + nonMatchingNodeShardStates.addLast(nodeShardState); } } } } - List nodes = new ArrayList<>(); - nodes.addAll(matchingNodes); - nodes.addAll(nonMatchingNodes); + List nodeShardStates = new ArrayList<>(); + nodeShardStates.addAll(matchingNodeShardStates); + nodeShardStates.addAll(nonMatchingNodeShardStates); if (logger.isTraceEnabled()) { - logger.trace("{} candidates for allocation: {}", shard, nodes.stream().map(DiscoveryNode::name).collect(Collectors.joining(", "))); + logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", "))); } - return new NodesResult(nodes, nodes.size()); + return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); } /** * used by old version-based allocation */ - private boolean isEnoughVersionBasedAllocationsFound(ShardRouting shard, IndexMetaData indexMetaData, NodesResult nodesAndVersions) { + private boolean isEnoughVersionBasedAllocationsFound(IndexMetaData indexMetaData, NodeShardsResult nodeShardsResult) { // check if the counts meets the minimum set int requiredAllocation = 1; // if we restore from a repository one copy is more then enough @@ -253,45 +254,44 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { requiredAllocation = Integer.parseInt(initialShards); } - return nodesAndVersions.allocationsFound >= requiredAllocation; + return nodeShardsResult.allocationsFound >= requiredAllocation; } /** - * Split the list of nodes to lists of yes/no/throttle nodes based on allocation deciders + * Split the list of node shard states into groups yes/no/throttle based on allocation deciders */ - private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, List nodes) { - List yesNodes = new ArrayList<>(); - List throttledNodes = new ArrayList<>(); - List noNodes = new ArrayList<>(); - for (DiscoveryNode discoNode : nodes) { - RoutingNode node = allocation.routingNodes().node(discoNode.id()); + private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, List nodeShardStates) { + List yesNodeShards = new ArrayList<>(); + List throttledNodeShards = new ArrayList<>(); + List noNodeShards = new ArrayList<>(); + for (NodeGatewayStartedShards nodeShardState : nodeShardStates) { + RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().id()); if (node == null) { continue; } Decision decision = allocation.deciders().canAllocate(shard, node, allocation); if (decision.type() == Decision.Type.THROTTLE) { - throttledNodes.add(discoNode); + throttledNodeShards.add(nodeShardState); } else if (decision.type() == Decision.Type.NO) { - noNodes.add(discoNode); + noNodeShards.add(nodeShardState); } else { - yesNodes.add(discoNode); + yesNodeShards.add(nodeShardState); } } - return new NodesToAllocate(Collections.unmodifiableList(yesNodes), Collections.unmodifiableList(throttledNodes), Collections.unmodifiableList(noNodes)); + return new NodesToAllocate(Collections.unmodifiableList(yesNodeShards), Collections.unmodifiableList(throttledNodeShards), Collections.unmodifiableList(noNodeShards)); } /** - * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have the highest shard version - * are added to the list. Otherwise, any node that has a shard is added to the list, but entries with highest - * version are always at the front of the list. + * Builds a list of previously started shards. If matchAnyShard is set to false, only shards with the highest shard version are added to + * the list. Otherwise, any existing shard is added to the list, but entries with highest version are always at the front of the list. */ - NodesResult buildVersionBasedNodes(ShardRouting shard, boolean matchAnyShard, Set ignoreNodes, - AsyncShardFetch.FetchResult shardState) { - final Map nodesWithVersion = new HashMap<>(); + NodeShardsResult buildVersionBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard, Set ignoreNodes, + AsyncShardFetch.FetchResult shardState) { + final List allocationCandidates = new ArrayList<>(); int numberOfAllocationsFound = 0; long highestVersion = ShardStateMetaData.NO_VERSION; - for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { + for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { long version = nodeShardState.legacyVersion(); DiscoveryNode node = nodeShardState.getNode(); @@ -315,38 +315,29 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { if (version > highestVersion) { highestVersion = version; if (matchAnyShard == false) { - nodesWithVersion.clear(); + allocationCandidates.clear(); } - nodesWithVersion.put(node, version); + allocationCandidates.add(nodeShardState); } else if (version == highestVersion) { // If the candidate is the same, add it to the // list, but keep the current candidate - nodesWithVersion.put(node, version); + allocationCandidates.add(nodeShardState); } } } - // Now that we have a map of nodes to versions along with the - // number of allocations found (and not ignored), we need to sort - // it so the node with the highest version is at the beginning - List nodesWithHighestVersion = new ArrayList<>(); - nodesWithHighestVersion.addAll(nodesWithVersion.keySet()); - CollectionUtil.timSort(nodesWithHighestVersion, new Comparator() { - @Override - public int compare(DiscoveryNode o1, DiscoveryNode o2) { - return Long.compare(nodesWithVersion.get(o2), nodesWithVersion.get(o1)); - } - }); + // sort array so the node with the highest version is at the beginning + CollectionUtil.timSort(allocationCandidates, Comparator.comparing(NodeGatewayStartedShards::legacyVersion).reversed()); if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder("["); - for (DiscoveryNode n : nodesWithVersion.keySet()) { - sb.append("[").append(n.getName()).append("]").append(" -> ").append(nodesWithVersion.get(n)).append(", "); + for (NodeGatewayStartedShards n : allocationCandidates) { + sb.append("[").append(n.getNode().getName()).append("]").append(" -> ").append(n.legacyVersion()).append(", "); } sb.append("]"); logger.trace("{} candidates for allocation: {}", shard, sb.toString()); } - return new NodesResult(Collections.unmodifiableList(nodesWithHighestVersion), numberOfAllocationsFound); + return new NodeShardsResult(Collections.unmodifiableList(allocationCandidates), numberOfAllocationsFound); } /** @@ -358,27 +349,28 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { && IndexMetaData.INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING.get(metaData.getSettings(), this.settings); } - protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); + protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); - static class NodesResult { - public final List nodes; + static class NodeShardsResult { + public final List orderedAllocationCandidates; public final int allocationsFound; - public NodesResult(List nodes, int allocationsFound) { - this.nodes = nodes; + public NodeShardsResult(List orderedAllocationCandidates, int allocationsFound) { + this.orderedAllocationCandidates = orderedAllocationCandidates; this.allocationsFound = allocationsFound; } } static class NodesToAllocate { - final List yesNodes; - final List throttleNodes; - final List noNodes; + final List yesNodeShards; + final List throttleNodeShards; + final List noNodeShards; - public NodesToAllocate(List yesNodes, List throttleNodes, List noNodes) { - this.yesNodes = yesNodes; - this.throttleNodes = throttleNodes; - this.noNodes = noNodes; + public NodesToAllocate(List yesNodeShards, List throttleNodeShards, + List noNodeShards) { + this.yesNodeShards = yesNodeShards; + this.throttleNodeShards = throttleNodeShards; + this.noNodeShards = noNodeShards; } } } diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 2c25ce50365..e2b6f0d27ed 100644 --- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -173,7 +173,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node()); // we found a match changed = true; - unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); + unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), null, allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); } } else if (matchingNodes.hasAnyData() == false) { // if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed diff --git a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index 79e9c53a72e..03f8dc81703 100644 --- a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -335,5 +335,28 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction out.writeBoolean(false); } } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + NodeGatewayStartedShards that = (NodeGatewayStartedShards) o; + + if (legacyVersion != that.legacyVersion) return false; + if (primary != that.primary) return false; + if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) return false; + return storeException != null ? storeException.equals(that.storeException) : that.storeException == null; + + } + + @Override + public int hashCode() { + int result = Long.hashCode(legacyVersion); + result = 31 * result + (allocationId != null ? allocationId.hashCode() : 0); + result = 31 * result + (primary ? 1 : 0); + result = 31 * result + (storeException != null ? storeException.hashCode() : 0); + return result; + } } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java index 00acf1ebabc..53a0faf0705 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java @@ -42,7 +42,7 @@ public class AllocationIdTests extends ESTestCase { assertThat(shard.allocationId(), nullValue()); logger.info("-- initialize the shard"); - shard.initialize("node1", -1); + shard.initialize("node1", null, -1); AllocationId allocationId = shard.allocationId(); assertThat(allocationId, notNullValue()); assertThat(allocationId.getId(), notNullValue()); @@ -59,7 +59,7 @@ public class AllocationIdTests extends ESTestCase { public void testSuccessfulRelocation() { logger.info("-- build started shard"); ShardRouting shard = ShardRouting.newUnassigned(new Index("test","_na_"), 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); - shard.initialize("node1", -1); + shard.initialize("node1", null, -1); shard.moveToStarted(); AllocationId allocationId = shard.allocationId(); @@ -82,7 +82,7 @@ public class AllocationIdTests extends ESTestCase { public void testCancelRelocation() { logger.info("-- build started shard"); ShardRouting shard = ShardRouting.newUnassigned(new Index("test","_na_"), 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); - shard.initialize("node1", -1); + shard.initialize("node1", null, -1); shard.moveToStarted(); AllocationId allocationId = shard.allocationId(); @@ -102,7 +102,7 @@ public class AllocationIdTests extends ESTestCase { public void testMoveToUnassigned() { logger.info("-- build started shard"); ShardRouting shard = ShardRouting.newUnassigned(new Index("test","_na_"), 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); - shard.initialize("node1", -1); + shard.initialize("node1", null, -1); shard.moveToStarted(); logger.info("-- move to unassigned"); @@ -113,7 +113,7 @@ public class AllocationIdTests extends ESTestCase { public void testReinitializing() { logger.info("-- build started shard"); ShardRouting shard = ShardRouting.newUnassigned(new Index("test","_na_"), 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); - shard.initialize("node1", -1); + shard.initialize("node1", null, -1); shard.moveToStarted(); AllocationId allocationId = shard.allocationId(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java b/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java index 72ecc171eed..2b581a9d82e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java @@ -42,7 +42,7 @@ public final class RandomShardRoutingMutator { break; case 1: if (shardRouting.unassigned()) { - shardRouting.initialize(randomFrom(nodes), -1); + shardRouting.initialize(randomFrom(nodes), null, -1); } break; case 2: diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java index 5d3466b5e43..7299cbdf590 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java @@ -25,7 +25,7 @@ package org.elasticsearch.cluster.routing; public class ShardRoutingHelper { public static void relocate(ShardRouting routing, String nodeId) { - relocate(routing, nodeId, -1); + relocate(routing, nodeId, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); } public static void relocate(ShardRouting routing, String nodeId, long expectedByteSize) { @@ -37,11 +37,11 @@ public class ShardRoutingHelper { } public static void initialize(ShardRouting routing, String nodeId) { - initialize(routing, nodeId, -1); + initialize(routing, nodeId, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); } public static void initialize(ShardRouting routing, String nodeId, long expectedSize) { - routing.initialize(nodeId, expectedSize); + routing.initialize(nodeId, null, expectedSize); } public static void reinit(ShardRouting routing) { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java index dd38b0c7ea3..4e70761b169 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java @@ -254,7 +254,7 @@ public class ShardRoutingTests extends ESTestCase { } try { - routing.initialize("boom", -1); + routing.initialize("boom", null, -1); fail("must be frozen"); } catch (IllegalStateException ex) { // expected diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 95f69a768a8..ba73181ad97 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -187,7 +187,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase { ShardRouting shard = TestShardRouting.newShardRouting("test", 1, null, null, null, true, ShardRoutingState.UNASSIGNED, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); ShardRouting mutable = new ShardRouting(shard); assertThat(mutable.unassignedInfo(), notNullValue()); - mutable.initialize("test_node", -1); + mutable.initialize("test_node", null, -1); assertThat(mutable.state(), equalTo(ShardRoutingState.INITIALIZING)); assertThat(mutable.unassignedInfo(), notNullValue()); mutable.moveToStarted(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index 08cbdc09fe0..68706d96df7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -365,37 +365,37 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { switch (sr.id()) { case 0: if (sr.primary()) { - allocation.routingNodes().initialize(sr, "node1", -1); + allocation.routingNodes().initialize(sr, "node1", null, -1); } else { - allocation.routingNodes().initialize(sr, "node0", -1); + allocation.routingNodes().initialize(sr, "node0", null, -1); } break; case 1: if (sr.primary()) { - allocation.routingNodes().initialize(sr, "node1", -1); + allocation.routingNodes().initialize(sr, "node1", null, -1); } else { - allocation.routingNodes().initialize(sr, "node2", -1); + allocation.routingNodes().initialize(sr, "node2", null, -1); } break; case 2: if (sr.primary()) { - allocation.routingNodes().initialize(sr, "node3", -1); + allocation.routingNodes().initialize(sr, "node3", null, -1); } else { - allocation.routingNodes().initialize(sr, "node2", -1); + allocation.routingNodes().initialize(sr, "node2", null, -1); } break; case 3: if (sr.primary()) { - allocation.routingNodes().initialize(sr, "node3", -1); + allocation.routingNodes().initialize(sr, "node3", null, -1); } else { - allocation.routingNodes().initialize(sr, "node1", -1); + allocation.routingNodes().initialize(sr, "node1", null, -1); } break; case 4: if (sr.primary()) { - allocation.routingNodes().initialize(sr, "node2", -1); + allocation.routingNodes().initialize(sr, "node2", null, -1); } else { - allocation.routingNodes().initialize(sr, "node0", -1); + allocation.routingNodes().initialize(sr, "node0", null, -1); } break; } diff --git a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java index e2830b1e226..d1cd8d974c6 100644 --- a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java @@ -161,7 +161,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { */ public void testFoundAllocationAndAllocating() { final RoutingAllocation allocation; - if (randomBoolean()) { + boolean useAllocationIds = randomBoolean(); + if (useAllocationIds) { allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); testAllocator.addData(node1, 1, "allocId1", randomBoolean()); } else { @@ -173,6 +174,10 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id())); + if (useAllocationIds) { + // check that allocation id is reused + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1")); + } } /**