Prefer nodes that previously held primary shard for primary shard allocation

This commit is contained in:
Yannick Welsch 2016-01-20 11:02:19 +01:00
parent 12df1e7a6f
commit 1eb8d5bd6f
4 changed files with 81 additions and 47 deletions

View File

@ -38,6 +38,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -175,8 +176,8 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
*/ */
protected NodesAndVersions buildAllocationIdBasedNodes(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes, protected NodesAndVersions buildAllocationIdBasedNodes(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
Set<String> lastActiveAllocationIds, AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState) { Set<String> lastActiveAllocationIds, AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState) {
List<DiscoveryNode> matchingNodes = new ArrayList<>(); LinkedList<DiscoveryNode> matchingNodes = new LinkedList<>();
List<DiscoveryNode> nonMatchingNodes = new ArrayList<>(); LinkedList<DiscoveryNode> nonMatchingNodes = new LinkedList<>();
long highestVersion = -1; long highestVersion = -1;
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
DiscoveryNode node = nodeShardState.getNode(); DiscoveryNode node = nodeShardState.getNode();
@ -200,10 +201,18 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
if (allocationId != null) { if (allocationId != null) {
if (lastActiveAllocationIds.contains(allocationId)) { if (lastActiveAllocationIds.contains(allocationId)) {
matchingNodes.add(node); if (nodeShardState.primary()) {
matchingNodes.addFirst(node);
} else {
matchingNodes.addLast(node);
}
highestVersion = Math.max(highestVersion, nodeShardState.version()); highestVersion = Math.max(highestVersion, nodeShardState.version());
} else if (matchAnyShard) { } else if (matchAnyShard) {
nonMatchingNodes.add(node); if (nodeShardState.primary()) {
nonMatchingNodes.addFirst(node);
} else {
nonMatchingNodes.addLast(node);
}
highestVersion = Math.max(highestVersion, nodeShardState.version()); highestVersion = Math.max(highestVersion, nodeShardState.version());
} }
} }

View File

@ -139,7 +139,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
} catch (Exception exception) { } catch (Exception exception) {
logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : ""); logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null; String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId, exception); return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId, shardStateMetaData.primary, exception);
} }
} }
// old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata // old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
@ -150,11 +150,11 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
} else { } else {
logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData); logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);
String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null; String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId); return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId, shardStateMetaData.primary);
} }
} }
logger.trace("{} no local shard info found", shardId); logger.trace("{} no local shard info found", shardId);
return new NodeGatewayStartedShards(clusterService.localNode(), -1, null); return new NodeGatewayStartedShards(clusterService.localNode(), -1, null, false);
} catch (Exception e) { } catch (Exception e) {
throw new ElasticsearchException("failed to load started shards", e); throw new ElasticsearchException("failed to load started shards", e);
} }
@ -279,18 +279,20 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
private long version = -1; private long version = -1;
private String allocationId = null; private String allocationId = null;
private boolean primary = false;
private Throwable storeException = null; private Throwable storeException = null;
public NodeGatewayStartedShards() { public NodeGatewayStartedShards() {
} }
public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId) { public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId, boolean primary) {
this(node, version, allocationId, null); this(node, version, allocationId, primary, null);
} }
public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId, Throwable storeException) { public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId, boolean primary, Throwable storeException) {
super(node); super(node);
this.version = version; this.version = version;
this.allocationId = allocationId; this.allocationId = allocationId;
this.primary = primary;
this.storeException = storeException; this.storeException = storeException;
} }
@ -302,6 +304,10 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
return this.allocationId; return this.allocationId;
} }
public boolean primary() {
return this.primary;
}
public Throwable storeException() { public Throwable storeException() {
return this.storeException; return this.storeException;
} }
@ -311,6 +317,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
super.readFrom(in); super.readFrom(in);
version = in.readLong(); version = in.readLong();
allocationId = in.readOptionalString(); allocationId = in.readOptionalString();
primary = in.readBoolean();
if (in.readBoolean()) { if (in.readBoolean()) {
storeException = in.readThrowable(); storeException = in.readThrowable();
} }
@ -321,6 +328,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
super.writeTo(out); super.writeTo(out);
out.writeLong(version); out.writeLong(version);
out.writeOptionalString(allocationId); out.writeOptionalString(allocationId);
out.writeBoolean(primary);
if (storeException != null) { if (storeException != null) {
out.writeBoolean(true); out.writeBoolean(true);
out.writeThrowable(storeException); out.writeThrowable(storeException);

View File

@ -35,15 +35,15 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.test.ESAllocationTestCase;
import org.junit.Before; import org.junit.Before;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.anyOf;
@ -104,7 +104,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
} else { } else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0);
} }
testAllocator.addData(node1, -1, null); testAllocator.addData(node1, -1, null, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false)); assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -116,7 +116,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/ */
public void testNoMatchingAllocationIdFound() { public void testNoMatchingAllocationIdFound() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2"); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2");
testAllocator.addData(node1, 1, "id1"); testAllocator.addData(node1, 1, "id1", randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false)); assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -129,7 +129,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/ */
public void testNoActiveAllocationIds() { public void testNoActiveAllocationIds() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
testAllocator.addData(node1, 1, null); testAllocator.addData(node1, 1, null, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true)); assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -144,10 +144,10 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
final RoutingAllocation allocation; final RoutingAllocation allocation;
if (randomBoolean()) { if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, 1, "allocId1", new CorruptIndexException("test", "test")); testAllocator.addData(node1, 1, "allocId1", randomBoolean(), new CorruptIndexException("test", "test"));
} else { } else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
testAllocator.addData(node1, 3, null, new CorruptIndexException("test", "test")); testAllocator.addData(node1, 3, null, randomBoolean(), new CorruptIndexException("test", "test"));
} }
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false)); assertThat(changed, equalTo(false));
@ -162,10 +162,10 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
final RoutingAllocation allocation; final RoutingAllocation allocation;
if (randomBoolean()) { if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, 1, "allocId1"); testAllocator.addData(node1, 1, "allocId1", randomBoolean());
} else { } else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0);
testAllocator.addData(node1, 3, null); testAllocator.addData(node1, 3, null, randomBoolean());
} }
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true)); assertThat(changed, equalTo(true));
@ -174,6 +174,24 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id())); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id()));
} }
/**
* Tests that when there was a node that previously had the primary, it will be allocated to that same node again.
*/
public void testPreferAllocatingPreviousPrimary() {
String primaryAllocId = Strings.randomBase64UUID();
String replicaAllocId = Strings.randomBase64UUID();
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), primaryAllocId, replicaAllocId);
boolean node1HasPrimaryShard = randomBoolean();
testAllocator.addData(node1, 1, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard);
testAllocator.addData(node2, 1, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
DiscoveryNode allocatedNode = node1HasPrimaryShard ? node1 : node2;
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(allocatedNode.id()));
}
/** /**
* Tests that when there is a node to allocate to, but it is throttling (and it is the only one), * Tests that when there is a node to allocate to, but it is throttling (and it is the only one),
* it will be moved to ignore unassigned until it can be allocated to. * it will be moved to ignore unassigned until it can be allocated to.
@ -182,10 +200,10 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
final RoutingAllocation allocation; final RoutingAllocation allocation;
if (randomBoolean()) { if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, 1, "allocId1"); testAllocator.addData(node1, 1, "allocId1", randomBoolean());
} else { } else {
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0); allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0);
testAllocator.addData(node1, 3, null); testAllocator.addData(node1, 3, null, randomBoolean());
} }
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false)); assertThat(changed, equalTo(false));
@ -201,10 +219,10 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
final RoutingAllocation allocation; final RoutingAllocation allocation;
if (randomBoolean()) { if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, 1, "allocId1"); testAllocator.addData(node1, 1, "allocId1", randomBoolean());
} else { } else {
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0); allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0);
testAllocator.addData(node1, 3, null); testAllocator.addData(node1, 3, null, randomBoolean());
} }
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true)); assertThat(changed, equalTo(true));
@ -218,7 +236,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/ */
public void testAllocateToTheHighestVersionOnLegacyIndex() { public void testAllocateToTheHighestVersionOnLegacyIndex() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0);
testAllocator.addData(node1, 10, null).addData(node2, 12, null); testAllocator.addData(node1, 10, null, randomBoolean()).addData(node2, 12, null, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true)); assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -232,7 +250,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/ */
public void testRestore() { public void testRestore() {
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders()); RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders());
testAllocator.addData(node1, 1, randomFrom(null, "allocId")); testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true)); assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -245,7 +263,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/ */
public void testRestoreThrottle() { public void testRestoreThrottle() {
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders()); RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders());
testAllocator.addData(node1, 1, randomFrom(null, "allocId")); testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false)); assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
@ -257,7 +275,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/ */
public void testRestoreForcesAllocateIfShardAvailable() { public void testRestoreForcesAllocateIfShardAvailable() {
RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders()); RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders());
testAllocator.addData(node1, 1, randomFrom(null, "some allocId")); testAllocator.addData(node1, 1, randomFrom(null, "some allocId"), randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true)); assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -270,7 +288,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/ */
public void testRestoreDoesNotAssignIfNoShardAvailable() { public void testRestoreDoesNotAssignIfNoShardAvailable() {
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders()); RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders());
testAllocator.addData(node1, -1, null); testAllocator.addData(node1, -1, null, false);
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false)); assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -281,7 +299,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
Version version = randomFrom(Version.CURRENT, Version.V_2_0_0); Version version = randomFrom(Version.CURRENT, Version.V_2_0_0);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)).numberOfShards(1).numberOfReplicas(0) .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)).numberOfShards(1).numberOfReplicas(0)
.putActiveAllocationIds(0, version == Version.CURRENT ? new HashSet<>(Arrays.asList("allocId")) : Collections.emptySet())) .putActiveAllocationIds(0, version == Version.CURRENT ? Sets.newHashSet("allocId") : Collections.emptySet()))
.build(); .build();
RoutingTable routingTable = RoutingTable.builder() RoutingTable routingTable = RoutingTable.builder()
@ -300,7 +318,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/ */
public void testRecoverOnAnyNode() { public void testRecoverOnAnyNode() {
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders()); RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders());
testAllocator.addData(node1, 1, randomFrom(null, "allocId")); testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true)); assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -313,7 +331,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/ */
public void testRecoverOnAnyNodeThrottle() { public void testRecoverOnAnyNodeThrottle() {
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders()); RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders());
testAllocator.addData(node1, 1, randomFrom(null, "allocId")); testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false)); assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
@ -325,7 +343,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/ */
public void testRecoverOnAnyNodeForcesAllocateIfShardAvailable() { public void testRecoverOnAnyNodeForcesAllocateIfShardAvailable() {
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders()); RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders());
testAllocator.addData(node1, 1, randomFrom(null, "allocId")); testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true)); assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -338,7 +356,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/ */
public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() { public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() {
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders()); RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders());
testAllocator.addData(node1, -1, null); testAllocator.addData(node1, -1, null, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation); boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false)); assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -351,7 +369,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version) .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true)) .put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true))
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, version == Version.CURRENT ? new HashSet<>(Arrays.asList("allocId")) : Collections.emptySet())) .numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, version == Version.CURRENT ? Sets.newHashSet("allocId") : Collections.emptySet()))
.build(); .build();
RoutingTable routingTable = RoutingTable.builder() RoutingTable routingTable = RoutingTable.builder()
@ -387,7 +405,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
testAllocator.addData(node1, 1, null); testAllocator.addData(node1, 1, null, randomBoolean());
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
changed = testAllocator.allocateUnassigned(allocation); changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false)); assertThat(changed, equalTo(false));
@ -395,7 +413,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
testAllocator.addData(node2, 1, null); testAllocator.addData(node2, 1, null, randomBoolean());
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
changed = testAllocator.allocateUnassigned(allocation); changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true)); assertThat(changed, equalTo(true));
@ -428,7 +446,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
testAllocator.addData(node1, 1, null); testAllocator.addData(node1, 1, null, randomBoolean());
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
changed = testAllocator.allocateUnassigned(allocation); changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false)); assertThat(changed, equalTo(false));
@ -436,7 +454,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
testAllocator.addData(node2, 2, null); testAllocator.addData(node2, 2, null, randomBoolean());
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
changed = testAllocator.allocateUnassigned(allocation); changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true)); assertThat(changed, equalTo(true));
@ -449,7 +467,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, boolean asNew, Version version, String... activeAllocationIds) { private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, boolean asNew, Version version, String... activeAllocationIds) {
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)) .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version))
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, new HashSet<>(Arrays.asList(activeAllocationIds)))) .numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, Sets.newHashSet(activeAllocationIds)))
.build(); .build();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
if (asNew) { if (asNew) {
@ -477,15 +495,15 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
return this; return this;
} }
public TestAllocator addData(DiscoveryNode node, long version, String allocationId) { public TestAllocator addData(DiscoveryNode node, long version, String allocationId, boolean primary) {
return addData(node, version, allocationId, null); return addData(node, version, allocationId, primary, null);
} }
public TestAllocator addData(DiscoveryNode node, long version, String allocationId, @Nullable Throwable storeException) { public TestAllocator addData(DiscoveryNode node, long version, String allocationId, boolean primary, @Nullable Throwable storeException) {
if (data == null) { if (data == null) {
data = new HashMap<>(); data = new HashMap<>();
} }
data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, allocationId, storeException)); data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, allocationId, primary, storeException));
return this; return this;
} }

View File

@ -43,6 +43,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
@ -51,11 +52,9 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.test.ESAllocationTestCase;
import org.junit.Before; import org.junit.Before;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -289,7 +288,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT).put(settings)) .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT).put(settings))
.numberOfShards(1).numberOfReplicas(1) .numberOfShards(1).numberOfReplicas(1)
.putActiveAllocationIds(0, new HashSet<>(Arrays.asList(primaryShard.allocationId().getId())))) .putActiveAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())))
.build(); .build();
RoutingTable routingTable = RoutingTable.builder() RoutingTable routingTable = RoutingTable.builder()
.add(IndexRoutingTable.builder(shardId.getIndex()) .add(IndexRoutingTable.builder(shardId.getIndex())
@ -311,7 +310,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)) .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT))
.numberOfShards(1).numberOfReplicas(1) .numberOfShards(1).numberOfReplicas(1)
.putActiveAllocationIds(0, new HashSet<>(Arrays.asList(primaryShard.allocationId().getId())))) .putActiveAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())))
.build(); .build();
RoutingTable routingTable = RoutingTable.builder() RoutingTable routingTable = RoutingTable.builder()
.add(IndexRoutingTable.builder(shardId.getIndex()) .add(IndexRoutingTable.builder(shardId.getIndex())