Merge pull request #16530 from ywelsch/fix/reuse-alloc-id

Reuse existing allocation id for primary shard allocation
This commit is contained in:
Yannick Welsch 2016-02-11 14:02:15 +01:00
commit 092d381b69
15 changed files with 156 additions and 118 deletions

View File

@ -96,6 +96,13 @@ public class AllocationId implements ToXContent {
return new AllocationId(Strings.randomBase64UUID(), null);
}
/**
* Creates a new allocation id for initializing allocation based on an existing id.
*/
public static AllocationId newInitializing(String existingAllocationId) {
return new AllocationId(existingAllocationId, null);
}
/**
* Creates a new allocation id for the target initializing shard that is the result
* of a relocation.

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.Index;
@ -420,11 +421,13 @@ public class RoutingNodes implements Iterable<RoutingNode> {
/**
* Moves a shard from unassigned to initialize state
*
* @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated.
*/
public void initialize(ShardRouting shard, String nodeId, long expectedSize) {
public void initialize(ShardRouting shard, String nodeId, @Nullable String existingAllocationId, long expectedSize) {
ensureMutable();
assert shard.unassigned() : shard;
shard.initialize(nodeId, expectedSize);
shard.initialize(nodeId, existingAllocationId, expectedSize);
node(nodeId).add(shard);
inactiveShardCount++;
if (shard.primary()) {
@ -692,10 +695,12 @@ public class RoutingNodes implements Iterable<RoutingNode> {
/**
* Initializes the current unassigned shard and moves it from the unassigned list.
*
* @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated.
*/
public void initialize(String nodeId, long expectedShardSize) {
public void initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize) {
innerRemove();
nodes.initialize(new ShardRouting(current), nodeId, expectedShardSize);
nodes.initialize(new ShardRouting(current), nodeId, existingAllocationId, expectedShardSize);
}
/**
@ -711,7 +716,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
/**
* Unsupported operation, just there for the interface. Use {@link #removeAndIgnore()} or
* {@link #initialize(String, long)}.
* {@link #initialize(String, String, long)}.
*/
@Override
public void remove() {

View File

@ -410,14 +410,20 @@ public final class ShardRouting implements Streamable, ToXContent {
/**
* Initializes an unassigned shard on a node.
*
* @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated.
*/
void initialize(String nodeId, long expectedShardSize) {
void initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize) {
ensureNotFrozen();
assert state == ShardRoutingState.UNASSIGNED : this;
assert relocatingNodeId == null : this;
state = ShardRoutingState.INITIALIZING;
currentNodeId = nodeId;
allocationId = AllocationId.newInitializing();
if (existingAllocationId == null) {
allocationId = AllocationId.newInitializing();
} else {
allocationId = AllocationId.newInitializing(existingAllocationId);
}
this.expectedShardSize = expectedShardSize;
}

View File

@ -702,7 +702,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}
routingNodes.initialize(shard, minNode.getNodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
routingNodes.initialize(shard, minNode.getNodeId(), null, allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
changed = true;
continue; // don't add to ignoreUnassigned
} else {
@ -790,7 +790,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
routingNodes.relocate(candidate, minNode.getNodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
} else {
routingNodes.initialize(candidate, minNode.getNodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
routingNodes.initialize(candidate, minNode.getNodeId(), null, allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
return true;

View File

@ -242,7 +242,7 @@ public abstract class AbstractAllocateAllocationCommand implements AllocationCom
if (shardRoutingChanges != null) {
shardRoutingChanges.accept(unassigned);
}
it.initialize(routingNode.nodeId(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
it.initialize(routingNode.nodeId(), null, allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
return;
}
assert false : "shard to initialize not found in list of unassigned shards";

View File

@ -32,15 +32,14 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.elasticsearch.index.shard.ShardStateMetaData;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -98,7 +97,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
continue;
}
final AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
final AsyncShardFetch.FetchResult<NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
if (shardState.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
allocation.setHasPendingAsyncFetch();
@ -110,7 +109,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
final boolean snapshotRestore = shard.restoreSource() != null;
final boolean recoverOnAnyNode = recoverOnAnyNode(indexMetaData);
final NodesResult nodesResult;
final NodeShardsResult nodeShardsResult;
final boolean enoughAllocationsFound;
if (lastActiveAllocationIds.isEmpty()) {
@ -118,20 +117,20 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
// when we load an old index (after upgrading cluster) or restore a snapshot of an old index
// fall back to old version-based allocation mode
// Note that once the shard has been active, lastActiveAllocationIds will be non-empty
nodesResult = buildVersionBasedNodes(shard, snapshotRestore || recoverOnAnyNode, allocation.getIgnoreNodes(shard.shardId()), shardState);
nodeShardsResult = buildVersionBasedNodeShardsResult(shard, snapshotRestore || recoverOnAnyNode, allocation.getIgnoreNodes(shard.shardId()), shardState);
if (snapshotRestore || recoverOnAnyNode) {
enoughAllocationsFound = nodesResult.allocationsFound > 0;
enoughAllocationsFound = nodeShardsResult.allocationsFound > 0;
} else {
enoughAllocationsFound = isEnoughVersionBasedAllocationsFound(shard, indexMetaData, nodesResult);
enoughAllocationsFound = isEnoughVersionBasedAllocationsFound(indexMetaData, nodeShardsResult);
}
logger.debug("[{}][{}]: version-based allocation for pre-{} index found {} allocations of {}", shard.index(), shard.id(), Version.V_3_0_0, nodesResult.allocationsFound, shard);
logger.debug("[{}][{}]: version-based allocation for pre-{} index found {} allocations of {}", shard.index(), shard.id(), Version.V_3_0_0, nodeShardsResult.allocationsFound, shard);
} else {
assert lastActiveAllocationIds.isEmpty() == false;
// use allocation ids to select nodes
nodesResult = buildAllocationIdBasedNodes(shard, snapshotRestore || recoverOnAnyNode,
nodeShardsResult = buildAllocationIdBasedNodeShardsResult(shard, snapshotRestore || recoverOnAnyNode,
allocation.getIgnoreNodes(shard.shardId()), lastActiveAllocationIds, shardState);
enoughAllocationsFound = nodesResult.allocationsFound > 0;
logger.debug("[{}][{}]: found {} allocations of {} based on allocation ids: [{}]", shard.index(), shard.id(), nodesResult.allocationsFound, shard, lastActiveAllocationIds);
enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0;
logger.debug("[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", shard.index(), shard.id(), nodeShardsResult.orderedAllocationCandidates.size(), shard, lastActiveAllocationIds);
}
if (enoughAllocationsFound == false){
@ -144,25 +143,25 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
} else {
// we can't really allocate, so ignore it and continue
unassignedIterator.removeAndIgnore();
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodesResult.allocationsFound);
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodeShardsResult.allocationsFound);
}
continue;
}
final NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodesResult.nodes);
if (nodesToAllocate.yesNodes.isEmpty() == false) {
DiscoveryNode node = nodesToAllocate.yesNodes.get(0);
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
final NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodeShardsResult.orderedAllocationCandidates);
if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
NodeGatewayStartedShards nodeShardState = nodesToAllocate.yesNodeShards.get(0);
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeShardState.getNode());
changed = true;
unassignedIterator.initialize(node.id(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) {
DiscoveryNode node = nodesToAllocate.noNodes.get(0);
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
unassignedIterator.initialize(nodeShardState.getNode().id(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else if (nodesToAllocate.throttleNodeShards.isEmpty() == true && nodesToAllocate.noNodeShards.isEmpty() == false) {
NodeGatewayStartedShards nodeShardState = nodesToAllocate.noNodeShards.get(0);
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeShardState.getNode());
changed = true;
unassignedIterator.initialize(node.id(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
unassignedIterator.initialize(nodeShardState.getNode().id(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else {
// we are throttling this, but we have enough to allocate to this node, ignore it for now
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes);
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodeShards);
unassignedIterator.removeAndIgnore();
}
}
@ -174,11 +173,12 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
* lastActiveAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
* entries with matching allocation id are always at the front of the list.
*/
protected NodesResult buildAllocationIdBasedNodes(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
Set<String> lastActiveAllocationIds, AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState) {
LinkedList<DiscoveryNode> matchingNodes = new LinkedList<>();
LinkedList<DiscoveryNode> nonMatchingNodes = new LinkedList<>();
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
protected NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
Set<String> lastActiveAllocationIds, AsyncShardFetch.FetchResult<NodeGatewayStartedShards> shardState) {
LinkedList<NodeGatewayStartedShards> matchingNodeShardStates = new LinkedList<>();
LinkedList<NodeGatewayStartedShards> nonMatchingNodeShardStates = new LinkedList<>();
int numberOfAllocationsFound = 0;
for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
DiscoveryNode node = nodeShardState.getNode();
String allocationId = nodeShardState.allocationId();
@ -199,36 +199,37 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
}
if (allocationId != null) {
numberOfAllocationsFound++;
if (lastActiveAllocationIds.contains(allocationId)) {
if (nodeShardState.primary()) {
matchingNodes.addFirst(node);
matchingNodeShardStates.addFirst(nodeShardState);
} else {
matchingNodes.addLast(node);
matchingNodeShardStates.addLast(nodeShardState);
}
} else if (matchAnyShard) {
if (nodeShardState.primary()) {
nonMatchingNodes.addFirst(node);
nonMatchingNodeShardStates.addFirst(nodeShardState);
} else {
nonMatchingNodes.addLast(node);
nonMatchingNodeShardStates.addLast(nodeShardState);
}
}
}
}
List<DiscoveryNode> nodes = new ArrayList<>();
nodes.addAll(matchingNodes);
nodes.addAll(nonMatchingNodes);
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
nodeShardStates.addAll(matchingNodeShardStates);
nodeShardStates.addAll(nonMatchingNodeShardStates);
if (logger.isTraceEnabled()) {
logger.trace("{} candidates for allocation: {}", shard, nodes.stream().map(DiscoveryNode::name).collect(Collectors.joining(", ")));
logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")));
}
return new NodesResult(nodes, nodes.size());
return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
}
/**
* used by old version-based allocation
*/
private boolean isEnoughVersionBasedAllocationsFound(ShardRouting shard, IndexMetaData indexMetaData, NodesResult nodesAndVersions) {
private boolean isEnoughVersionBasedAllocationsFound(IndexMetaData indexMetaData, NodeShardsResult nodeShardsResult) {
// check if the counts meets the minimum set
int requiredAllocation = 1;
// if we restore from a repository one copy is more then enough
@ -253,45 +254,44 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
requiredAllocation = Integer.parseInt(initialShards);
}
return nodesAndVersions.allocationsFound >= requiredAllocation;
return nodeShardsResult.allocationsFound >= requiredAllocation;
}
/**
* Split the list of nodes to lists of yes/no/throttle nodes based on allocation deciders
* Split the list of node shard states into groups yes/no/throttle based on allocation deciders
*/
private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, List<DiscoveryNode> nodes) {
List<DiscoveryNode> yesNodes = new ArrayList<>();
List<DiscoveryNode> throttledNodes = new ArrayList<>();
List<DiscoveryNode> noNodes = new ArrayList<>();
for (DiscoveryNode discoNode : nodes) {
RoutingNode node = allocation.routingNodes().node(discoNode.id());
private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, List<NodeGatewayStartedShards> nodeShardStates) {
List<NodeGatewayStartedShards> yesNodeShards = new ArrayList<>();
List<NodeGatewayStartedShards> throttledNodeShards = new ArrayList<>();
List<NodeGatewayStartedShards> noNodeShards = new ArrayList<>();
for (NodeGatewayStartedShards nodeShardState : nodeShardStates) {
RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().id());
if (node == null) {
continue;
}
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
throttledNodes.add(discoNode);
throttledNodeShards.add(nodeShardState);
} else if (decision.type() == Decision.Type.NO) {
noNodes.add(discoNode);
noNodeShards.add(nodeShardState);
} else {
yesNodes.add(discoNode);
yesNodeShards.add(nodeShardState);
}
}
return new NodesToAllocate(Collections.unmodifiableList(yesNodes), Collections.unmodifiableList(throttledNodes), Collections.unmodifiableList(noNodes));
return new NodesToAllocate(Collections.unmodifiableList(yesNodeShards), Collections.unmodifiableList(throttledNodeShards), Collections.unmodifiableList(noNodeShards));
}
/**
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have the highest shard version
* are added to the list. Otherwise, any node that has a shard is added to the list, but entries with highest
* version are always at the front of the list.
* Builds a list of previously started shards. If matchAnyShard is set to false, only shards with the highest shard version are added to
* the list. Otherwise, any existing shard is added to the list, but entries with highest version are always at the front of the list.
*/
NodesResult buildVersionBasedNodes(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState) {
final Map<DiscoveryNode, Long> nodesWithVersion = new HashMap<>();
NodeShardsResult buildVersionBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
AsyncShardFetch.FetchResult<NodeGatewayStartedShards> shardState) {
final List<NodeGatewayStartedShards> allocationCandidates = new ArrayList<>();
int numberOfAllocationsFound = 0;
long highestVersion = ShardStateMetaData.NO_VERSION;
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
long version = nodeShardState.legacyVersion();
DiscoveryNode node = nodeShardState.getNode();
@ -315,38 +315,29 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
if (version > highestVersion) {
highestVersion = version;
if (matchAnyShard == false) {
nodesWithVersion.clear();
allocationCandidates.clear();
}
nodesWithVersion.put(node, version);
allocationCandidates.add(nodeShardState);
} else if (version == highestVersion) {
// If the candidate is the same, add it to the
// list, but keep the current candidate
nodesWithVersion.put(node, version);
allocationCandidates.add(nodeShardState);
}
}
}
// Now that we have a map of nodes to versions along with the
// number of allocations found (and not ignored), we need to sort
// it so the node with the highest version is at the beginning
List<DiscoveryNode> nodesWithHighestVersion = new ArrayList<>();
nodesWithHighestVersion.addAll(nodesWithVersion.keySet());
CollectionUtil.timSort(nodesWithHighestVersion, new Comparator<DiscoveryNode>() {
@Override
public int compare(DiscoveryNode o1, DiscoveryNode o2) {
return Long.compare(nodesWithVersion.get(o2), nodesWithVersion.get(o1));
}
});
// sort array so the node with the highest version is at the beginning
CollectionUtil.timSort(allocationCandidates, Comparator.comparing(NodeGatewayStartedShards::legacyVersion).reversed());
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("[");
for (DiscoveryNode n : nodesWithVersion.keySet()) {
sb.append("[").append(n.getName()).append("]").append(" -> ").append(nodesWithVersion.get(n)).append(", ");
for (NodeGatewayStartedShards n : allocationCandidates) {
sb.append("[").append(n.getNode().getName()).append("]").append(" -> ").append(n.legacyVersion()).append(", ");
}
sb.append("]");
logger.trace("{} candidates for allocation: {}", shard, sb.toString());
}
return new NodesResult(Collections.unmodifiableList(nodesWithHighestVersion), numberOfAllocationsFound);
return new NodeShardsResult(Collections.unmodifiableList(allocationCandidates), numberOfAllocationsFound);
}
/**
@ -358,27 +349,28 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
&& IndexMetaData.INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING.get(metaData.getSettings(), this.settings);
}
protected abstract AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);
protected abstract AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);
static class NodesResult {
public final List<DiscoveryNode> nodes;
static class NodeShardsResult {
public final List<NodeGatewayStartedShards> orderedAllocationCandidates;
public final int allocationsFound;
public NodesResult(List<DiscoveryNode> nodes, int allocationsFound) {
this.nodes = nodes;
public NodeShardsResult(List<NodeGatewayStartedShards> orderedAllocationCandidates, int allocationsFound) {
this.orderedAllocationCandidates = orderedAllocationCandidates;
this.allocationsFound = allocationsFound;
}
}
static class NodesToAllocate {
final List<DiscoveryNode> yesNodes;
final List<DiscoveryNode> throttleNodes;
final List<DiscoveryNode> noNodes;
final List<NodeGatewayStartedShards> yesNodeShards;
final List<NodeGatewayStartedShards> throttleNodeShards;
final List<NodeGatewayStartedShards> noNodeShards;
public NodesToAllocate(List<DiscoveryNode> yesNodes, List<DiscoveryNode> throttleNodes, List<DiscoveryNode> noNodes) {
this.yesNodes = yesNodes;
this.throttleNodes = throttleNodes;
this.noNodes = noNodes;
public NodesToAllocate(List<NodeGatewayStartedShards> yesNodeShards, List<NodeGatewayStartedShards> throttleNodeShards,
List<NodeGatewayStartedShards> noNodeShards) {
this.yesNodeShards = yesNodeShards;
this.throttleNodeShards = throttleNodeShards;
this.noNodeShards = noNodeShards;
}
}
}

View File

@ -173,7 +173,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node());
// we found a match
changed = true;
unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), null, allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
} else if (matchingNodes.hasAnyData() == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed

View File

@ -335,5 +335,28 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
out.writeBoolean(false);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NodeGatewayStartedShards that = (NodeGatewayStartedShards) o;
if (legacyVersion != that.legacyVersion) return false;
if (primary != that.primary) return false;
if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) return false;
return storeException != null ? storeException.equals(that.storeException) : that.storeException == null;
}
@Override
public int hashCode() {
int result = Long.hashCode(legacyVersion);
result = 31 * result + (allocationId != null ? allocationId.hashCode() : 0);
result = 31 * result + (primary ? 1 : 0);
result = 31 * result + (storeException != null ? storeException.hashCode() : 0);
return result;
}
}
}

View File

@ -42,7 +42,7 @@ public class AllocationIdTests extends ESTestCase {
assertThat(shard.allocationId(), nullValue());
logger.info("-- initialize the shard");
shard.initialize("node1", -1);
shard.initialize("node1", null, -1);
AllocationId allocationId = shard.allocationId();
assertThat(allocationId, notNullValue());
assertThat(allocationId.getId(), notNullValue());
@ -59,7 +59,7 @@ public class AllocationIdTests extends ESTestCase {
public void testSuccessfulRelocation() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned(new Index("test","_na_"), 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard.initialize("node1", -1);
shard.initialize("node1", null, -1);
shard.moveToStarted();
AllocationId allocationId = shard.allocationId();
@ -82,7 +82,7 @@ public class AllocationIdTests extends ESTestCase {
public void testCancelRelocation() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned(new Index("test","_na_"), 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard.initialize("node1", -1);
shard.initialize("node1", null, -1);
shard.moveToStarted();
AllocationId allocationId = shard.allocationId();
@ -102,7 +102,7 @@ public class AllocationIdTests extends ESTestCase {
public void testMoveToUnassigned() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned(new Index("test","_na_"), 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard.initialize("node1", -1);
shard.initialize("node1", null, -1);
shard.moveToStarted();
logger.info("-- move to unassigned");
@ -113,7 +113,7 @@ public class AllocationIdTests extends ESTestCase {
public void testReinitializing() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned(new Index("test","_na_"), 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard.initialize("node1", -1);
shard.initialize("node1", null, -1);
shard.moveToStarted();
AllocationId allocationId = shard.allocationId();

View File

@ -42,7 +42,7 @@ public final class RandomShardRoutingMutator {
break;
case 1:
if (shardRouting.unassigned()) {
shardRouting.initialize(randomFrom(nodes), -1);
shardRouting.initialize(randomFrom(nodes), null, -1);
}
break;
case 2:

View File

@ -25,7 +25,7 @@ package org.elasticsearch.cluster.routing;
public class ShardRoutingHelper {
public static void relocate(ShardRouting routing, String nodeId) {
relocate(routing, nodeId, -1);
relocate(routing, nodeId, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
}
public static void relocate(ShardRouting routing, String nodeId, long expectedByteSize) {
@ -37,11 +37,11 @@ public class ShardRoutingHelper {
}
public static void initialize(ShardRouting routing, String nodeId) {
initialize(routing, nodeId, -1);
initialize(routing, nodeId, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
}
public static void initialize(ShardRouting routing, String nodeId, long expectedSize) {
routing.initialize(nodeId, expectedSize);
routing.initialize(nodeId, null, expectedSize);
}
public static void reinit(ShardRouting routing) {

View File

@ -254,7 +254,7 @@ public class ShardRoutingTests extends ESTestCase {
}
try {
routing.initialize("boom", -1);
routing.initialize("boom", null, -1);
fail("must be frozen");
} catch (IllegalStateException ex) {
// expected

View File

@ -187,7 +187,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
ShardRouting shard = TestShardRouting.newShardRouting("test", 1, null, null, null, true, ShardRoutingState.UNASSIGNED, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting mutable = new ShardRouting(shard);
assertThat(mutable.unassignedInfo(), notNullValue());
mutable.initialize("test_node", -1);
mutable.initialize("test_node", null, -1);
assertThat(mutable.state(), equalTo(ShardRoutingState.INITIALIZING));
assertThat(mutable.unassignedInfo(), notNullValue());
mutable.moveToStarted();

View File

@ -365,37 +365,37 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
switch (sr.id()) {
case 0:
if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node1", -1);
allocation.routingNodes().initialize(sr, "node1", null, -1);
} else {
allocation.routingNodes().initialize(sr, "node0", -1);
allocation.routingNodes().initialize(sr, "node0", null, -1);
}
break;
case 1:
if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node1", -1);
allocation.routingNodes().initialize(sr, "node1", null, -1);
} else {
allocation.routingNodes().initialize(sr, "node2", -1);
allocation.routingNodes().initialize(sr, "node2", null, -1);
}
break;
case 2:
if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node3", -1);
allocation.routingNodes().initialize(sr, "node3", null, -1);
} else {
allocation.routingNodes().initialize(sr, "node2", -1);
allocation.routingNodes().initialize(sr, "node2", null, -1);
}
break;
case 3:
if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node3", -1);
allocation.routingNodes().initialize(sr, "node3", null, -1);
} else {
allocation.routingNodes().initialize(sr, "node1", -1);
allocation.routingNodes().initialize(sr, "node1", null, -1);
}
break;
case 4:
if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node2", -1);
allocation.routingNodes().initialize(sr, "node2", null, -1);
} else {
allocation.routingNodes().initialize(sr, "node0", -1);
allocation.routingNodes().initialize(sr, "node0", null, -1);
}
break;
}

View File

@ -161,7 +161,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/
public void testFoundAllocationAndAllocating() {
final RoutingAllocation allocation;
if (randomBoolean()) {
boolean useAllocationIds = randomBoolean();
if (useAllocationIds) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, 1, "allocId1", randomBoolean());
} else {
@ -173,6 +174,10 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id()));
if (useAllocationIds) {
// check that allocation id is reused
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1"));
}
}
/**