Merge pull request #16096 from ywelsch/feature/prefer-previous-primary
Prefer nodes that previously held primary shard for primary shard allocation
This commit is contained in:
commit
303b430480
|
@ -38,6 +38,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -175,8 +176,8 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||||
*/
|
*/
|
||||||
protected NodesAndVersions buildAllocationIdBasedNodes(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
|
protected NodesAndVersions buildAllocationIdBasedNodes(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
|
||||||
Set<String> lastActiveAllocationIds, AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState) {
|
Set<String> lastActiveAllocationIds, AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState) {
|
||||||
List<DiscoveryNode> matchingNodes = new ArrayList<>();
|
LinkedList<DiscoveryNode> matchingNodes = new LinkedList<>();
|
||||||
List<DiscoveryNode> nonMatchingNodes = new ArrayList<>();
|
LinkedList<DiscoveryNode> nonMatchingNodes = new LinkedList<>();
|
||||||
long highestVersion = -1;
|
long highestVersion = -1;
|
||||||
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
|
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
|
||||||
DiscoveryNode node = nodeShardState.getNode();
|
DiscoveryNode node = nodeShardState.getNode();
|
||||||
|
@ -200,10 +201,18 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||||
|
|
||||||
if (allocationId != null) {
|
if (allocationId != null) {
|
||||||
if (lastActiveAllocationIds.contains(allocationId)) {
|
if (lastActiveAllocationIds.contains(allocationId)) {
|
||||||
matchingNodes.add(node);
|
if (nodeShardState.primary()) {
|
||||||
|
matchingNodes.addFirst(node);
|
||||||
|
} else {
|
||||||
|
matchingNodes.addLast(node);
|
||||||
|
}
|
||||||
highestVersion = Math.max(highestVersion, nodeShardState.version());
|
highestVersion = Math.max(highestVersion, nodeShardState.version());
|
||||||
} else if (matchAnyShard) {
|
} else if (matchAnyShard) {
|
||||||
nonMatchingNodes.add(node);
|
if (nodeShardState.primary()) {
|
||||||
|
nonMatchingNodes.addFirst(node);
|
||||||
|
} else {
|
||||||
|
nonMatchingNodes.addLast(node);
|
||||||
|
}
|
||||||
highestVersion = Math.max(highestVersion, nodeShardState.version());
|
highestVersion = Math.max(highestVersion, nodeShardState.version());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,7 +139,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
|
||||||
} catch (Exception exception) {
|
} catch (Exception exception) {
|
||||||
logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
|
logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
|
||||||
String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
|
String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
|
||||||
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId, exception);
|
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId, shardStateMetaData.primary, exception);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
|
// old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
|
||||||
|
@ -150,11 +150,11 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
|
||||||
} else {
|
} else {
|
||||||
logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);
|
logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);
|
||||||
String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
|
String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
|
||||||
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId);
|
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId, shardStateMetaData.primary);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.trace("{} no local shard info found", shardId);
|
logger.trace("{} no local shard info found", shardId);
|
||||||
return new NodeGatewayStartedShards(clusterService.localNode(), -1, null);
|
return new NodeGatewayStartedShards(clusterService.localNode(), -1, null, false);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new ElasticsearchException("failed to load started shards", e);
|
throw new ElasticsearchException("failed to load started shards", e);
|
||||||
}
|
}
|
||||||
|
@ -279,18 +279,20 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
|
||||||
|
|
||||||
private long version = -1;
|
private long version = -1;
|
||||||
private String allocationId = null;
|
private String allocationId = null;
|
||||||
|
private boolean primary = false;
|
||||||
private Throwable storeException = null;
|
private Throwable storeException = null;
|
||||||
|
|
||||||
public NodeGatewayStartedShards() {
|
public NodeGatewayStartedShards() {
|
||||||
}
|
}
|
||||||
public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId) {
|
public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId, boolean primary) {
|
||||||
this(node, version, allocationId, null);
|
this(node, version, allocationId, primary, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId, Throwable storeException) {
|
public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId, boolean primary, Throwable storeException) {
|
||||||
super(node);
|
super(node);
|
||||||
this.version = version;
|
this.version = version;
|
||||||
this.allocationId = allocationId;
|
this.allocationId = allocationId;
|
||||||
|
this.primary = primary;
|
||||||
this.storeException = storeException;
|
this.storeException = storeException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -302,6 +304,10 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
|
||||||
return this.allocationId;
|
return this.allocationId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean primary() {
|
||||||
|
return this.primary;
|
||||||
|
}
|
||||||
|
|
||||||
public Throwable storeException() {
|
public Throwable storeException() {
|
||||||
return this.storeException;
|
return this.storeException;
|
||||||
}
|
}
|
||||||
|
@ -311,6 +317,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
|
||||||
super.readFrom(in);
|
super.readFrom(in);
|
||||||
version = in.readLong();
|
version = in.readLong();
|
||||||
allocationId = in.readOptionalString();
|
allocationId = in.readOptionalString();
|
||||||
|
primary = in.readBoolean();
|
||||||
if (in.readBoolean()) {
|
if (in.readBoolean()) {
|
||||||
storeException = in.readThrowable();
|
storeException = in.readThrowable();
|
||||||
}
|
}
|
||||||
|
@ -321,6 +328,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
|
||||||
super.writeTo(out);
|
super.writeTo(out);
|
||||||
out.writeLong(version);
|
out.writeLong(version);
|
||||||
out.writeOptionalString(allocationId);
|
out.writeOptionalString(allocationId);
|
||||||
|
out.writeBoolean(primary);
|
||||||
if (storeException != null) {
|
if (storeException != null) {
|
||||||
out.writeBoolean(true);
|
out.writeBoolean(true);
|
||||||
out.writeThrowable(storeException);
|
out.writeThrowable(storeException);
|
||||||
|
|
|
@ -35,15 +35,15 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.util.set.Sets;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.test.ESAllocationTestCase;
|
import org.elasticsearch.test.ESAllocationTestCase;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.anyOf;
|
import static org.hamcrest.Matchers.anyOf;
|
||||||
|
@ -104,7 +104,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
} else {
|
} else {
|
||||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0);
|
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0);
|
||||||
}
|
}
|
||||||
testAllocator.addData(node1, -1, null);
|
testAllocator.addData(node1, -1, null, randomBoolean());
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||||
|
@ -116,7 +116,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
*/
|
*/
|
||||||
public void testNoMatchingAllocationIdFound() {
|
public void testNoMatchingAllocationIdFound() {
|
||||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2");
|
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2");
|
||||||
testAllocator.addData(node1, 1, "id1");
|
testAllocator.addData(node1, 1, "id1", randomBoolean());
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||||
|
@ -129,7 +129,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
*/
|
*/
|
||||||
public void testNoActiveAllocationIds() {
|
public void testNoActiveAllocationIds() {
|
||||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
|
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
|
||||||
testAllocator.addData(node1, 1, null);
|
testAllocator.addData(node1, 1, null, randomBoolean());
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
|
@ -144,10 +144,10 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
final RoutingAllocation allocation;
|
final RoutingAllocation allocation;
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
||||||
testAllocator.addData(node1, 1, "allocId1", new CorruptIndexException("test", "test"));
|
testAllocator.addData(node1, 1, "allocId1", randomBoolean(), new CorruptIndexException("test", "test"));
|
||||||
} else {
|
} else {
|
||||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
|
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
|
||||||
testAllocator.addData(node1, 3, null, new CorruptIndexException("test", "test"));
|
testAllocator.addData(node1, 3, null, randomBoolean(), new CorruptIndexException("test", "test"));
|
||||||
}
|
}
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
|
@ -162,10 +162,10 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
final RoutingAllocation allocation;
|
final RoutingAllocation allocation;
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
||||||
testAllocator.addData(node1, 1, "allocId1");
|
testAllocator.addData(node1, 1, "allocId1", randomBoolean());
|
||||||
} else {
|
} else {
|
||||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0);
|
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0);
|
||||||
testAllocator.addData(node1, 3, null);
|
testAllocator.addData(node1, 3, null, randomBoolean());
|
||||||
}
|
}
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
|
@ -174,6 +174,24 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id()));
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that when there was a node that previously had the primary, it will be allocated to that same node again.
|
||||||
|
*/
|
||||||
|
public void testPreferAllocatingPreviousPrimary() {
|
||||||
|
String primaryAllocId = Strings.randomBase64UUID();
|
||||||
|
String replicaAllocId = Strings.randomBase64UUID();
|
||||||
|
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), primaryAllocId, replicaAllocId);
|
||||||
|
boolean node1HasPrimaryShard = randomBoolean();
|
||||||
|
testAllocator.addData(node1, 1, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard);
|
||||||
|
testAllocator.addData(node2, 1, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard);
|
||||||
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
|
assertThat(changed, equalTo(true));
|
||||||
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||||
|
DiscoveryNode allocatedNode = node1HasPrimaryShard ? node1 : node2;
|
||||||
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(allocatedNode.id()));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests that when there is a node to allocate to, but it is throttling (and it is the only one),
|
* Tests that when there is a node to allocate to, but it is throttling (and it is the only one),
|
||||||
* it will be moved to ignore unassigned until it can be allocated to.
|
* it will be moved to ignore unassigned until it can be allocated to.
|
||||||
|
@ -182,10 +200,10 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
final RoutingAllocation allocation;
|
final RoutingAllocation allocation;
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
||||||
testAllocator.addData(node1, 1, "allocId1");
|
testAllocator.addData(node1, 1, "allocId1", randomBoolean());
|
||||||
} else {
|
} else {
|
||||||
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0);
|
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0);
|
||||||
testAllocator.addData(node1, 3, null);
|
testAllocator.addData(node1, 3, null, randomBoolean());
|
||||||
}
|
}
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
|
@ -201,10 +219,10 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
final RoutingAllocation allocation;
|
final RoutingAllocation allocation;
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
||||||
testAllocator.addData(node1, 1, "allocId1");
|
testAllocator.addData(node1, 1, "allocId1", randomBoolean());
|
||||||
} else {
|
} else {
|
||||||
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0);
|
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0);
|
||||||
testAllocator.addData(node1, 3, null);
|
testAllocator.addData(node1, 3, null, randomBoolean());
|
||||||
}
|
}
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
|
@ -218,7 +236,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
*/
|
*/
|
||||||
public void testAllocateToTheHighestVersionOnLegacyIndex() {
|
public void testAllocateToTheHighestVersionOnLegacyIndex() {
|
||||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0);
|
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0);
|
||||||
testAllocator.addData(node1, 10, null).addData(node2, 12, null);
|
testAllocator.addData(node1, 10, null, randomBoolean()).addData(node2, 12, null, randomBoolean());
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
|
@ -232,7 +250,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
*/
|
*/
|
||||||
public void testRestore() {
|
public void testRestore() {
|
||||||
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders());
|
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders());
|
||||||
testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
|
testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
|
@ -245,7 +263,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
*/
|
*/
|
||||||
public void testRestoreThrottle() {
|
public void testRestoreThrottle() {
|
||||||
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders());
|
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders());
|
||||||
testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
|
testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
|
||||||
|
@ -257,7 +275,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
*/
|
*/
|
||||||
public void testRestoreForcesAllocateIfShardAvailable() {
|
public void testRestoreForcesAllocateIfShardAvailable() {
|
||||||
RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders());
|
RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders());
|
||||||
testAllocator.addData(node1, 1, randomFrom(null, "some allocId"));
|
testAllocator.addData(node1, 1, randomFrom(null, "some allocId"), randomBoolean());
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
|
@ -270,7 +288,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
*/
|
*/
|
||||||
public void testRestoreDoesNotAssignIfNoShardAvailable() {
|
public void testRestoreDoesNotAssignIfNoShardAvailable() {
|
||||||
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders());
|
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders());
|
||||||
testAllocator.addData(node1, -1, null);
|
testAllocator.addData(node1, -1, null, false);
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
|
@ -281,7 +299,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
Version version = randomFrom(Version.CURRENT, Version.V_2_0_0);
|
Version version = randomFrom(Version.CURRENT, Version.V_2_0_0);
|
||||||
MetaData metaData = MetaData.builder()
|
MetaData metaData = MetaData.builder()
|
||||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)).numberOfShards(1).numberOfReplicas(0)
|
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)).numberOfShards(1).numberOfReplicas(0)
|
||||||
.putActiveAllocationIds(0, version == Version.CURRENT ? new HashSet<>(Arrays.asList("allocId")) : Collections.emptySet()))
|
.putActiveAllocationIds(0, version == Version.CURRENT ? Sets.newHashSet("allocId") : Collections.emptySet()))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
RoutingTable routingTable = RoutingTable.builder()
|
RoutingTable routingTable = RoutingTable.builder()
|
||||||
|
@ -300,7 +318,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
*/
|
*/
|
||||||
public void testRecoverOnAnyNode() {
|
public void testRecoverOnAnyNode() {
|
||||||
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders());
|
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders());
|
||||||
testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
|
testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
|
@ -313,7 +331,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
*/
|
*/
|
||||||
public void testRecoverOnAnyNodeThrottle() {
|
public void testRecoverOnAnyNodeThrottle() {
|
||||||
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders());
|
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders());
|
||||||
testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
|
testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
|
||||||
|
@ -325,7 +343,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
*/
|
*/
|
||||||
public void testRecoverOnAnyNodeForcesAllocateIfShardAvailable() {
|
public void testRecoverOnAnyNodeForcesAllocateIfShardAvailable() {
|
||||||
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders());
|
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders());
|
||||||
testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
|
testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
|
@ -338,7 +356,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
*/
|
*/
|
||||||
public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() {
|
public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() {
|
||||||
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders());
|
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders());
|
||||||
testAllocator.addData(node1, -1, null);
|
testAllocator.addData(node1, -1, null, randomBoolean());
|
||||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||||
|
@ -351,7 +369,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)
|
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)
|
||||||
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
|
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
|
||||||
.put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true))
|
.put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true))
|
||||||
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, version == Version.CURRENT ? new HashSet<>(Arrays.asList("allocId")) : Collections.emptySet()))
|
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, version == Version.CURRENT ? Sets.newHashSet("allocId") : Collections.emptySet()))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
RoutingTable routingTable = RoutingTable.builder()
|
RoutingTable routingTable = RoutingTable.builder()
|
||||||
|
@ -387,7 +405,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||||
|
|
||||||
testAllocator.addData(node1, 1, null);
|
testAllocator.addData(node1, 1, null, randomBoolean());
|
||||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||||
changed = testAllocator.allocateUnassigned(allocation);
|
changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
|
@ -395,7 +413,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||||
|
|
||||||
testAllocator.addData(node2, 1, null);
|
testAllocator.addData(node2, 1, null, randomBoolean());
|
||||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||||
changed = testAllocator.allocateUnassigned(allocation);
|
changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
|
@ -428,7 +446,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||||
|
|
||||||
testAllocator.addData(node1, 1, null);
|
testAllocator.addData(node1, 1, null, randomBoolean());
|
||||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||||
changed = testAllocator.allocateUnassigned(allocation);
|
changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(false));
|
assertThat(changed, equalTo(false));
|
||||||
|
@ -436,7 +454,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||||
|
|
||||||
testAllocator.addData(node2, 2, null);
|
testAllocator.addData(node2, 2, null, randomBoolean());
|
||||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||||
changed = testAllocator.allocateUnassigned(allocation);
|
changed = testAllocator.allocateUnassigned(allocation);
|
||||||
assertThat(changed, equalTo(true));
|
assertThat(changed, equalTo(true));
|
||||||
|
@ -449,7 +467,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, boolean asNew, Version version, String... activeAllocationIds) {
|
private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, boolean asNew, Version version, String... activeAllocationIds) {
|
||||||
MetaData metaData = MetaData.builder()
|
MetaData metaData = MetaData.builder()
|
||||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version))
|
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version))
|
||||||
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, new HashSet<>(Arrays.asList(activeAllocationIds))))
|
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, Sets.newHashSet(activeAllocationIds)))
|
||||||
.build();
|
.build();
|
||||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
||||||
if (asNew) {
|
if (asNew) {
|
||||||
|
@ -477,15 +495,15 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestAllocator addData(DiscoveryNode node, long version, String allocationId) {
|
public TestAllocator addData(DiscoveryNode node, long version, String allocationId, boolean primary) {
|
||||||
return addData(node, version, allocationId, null);
|
return addData(node, version, allocationId, primary, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestAllocator addData(DiscoveryNode node, long version, String allocationId, @Nullable Throwable storeException) {
|
public TestAllocator addData(DiscoveryNode node, long version, String allocationId, boolean primary, @Nullable Throwable storeException) {
|
||||||
if (data == null) {
|
if (data == null) {
|
||||||
data = new HashMap<>();
|
data = new HashMap<>();
|
||||||
}
|
}
|
||||||
data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, allocationId, storeException));
|
data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, allocationId, primary, storeException));
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.util.set.Sets;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
|
@ -51,11 +52,9 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
||||||
import org.elasticsearch.test.ESAllocationTestCase;
|
import org.elasticsearch.test.ESAllocationTestCase;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
@ -289,7 +288,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
||||||
MetaData metaData = MetaData.builder()
|
MetaData metaData = MetaData.builder()
|
||||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT).put(settings))
|
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT).put(settings))
|
||||||
.numberOfShards(1).numberOfReplicas(1)
|
.numberOfShards(1).numberOfReplicas(1)
|
||||||
.putActiveAllocationIds(0, new HashSet<>(Arrays.asList(primaryShard.allocationId().getId()))))
|
.putActiveAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())))
|
||||||
.build();
|
.build();
|
||||||
RoutingTable routingTable = RoutingTable.builder()
|
RoutingTable routingTable = RoutingTable.builder()
|
||||||
.add(IndexRoutingTable.builder(shardId.getIndex())
|
.add(IndexRoutingTable.builder(shardId.getIndex())
|
||||||
|
@ -311,7 +310,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
||||||
MetaData metaData = MetaData.builder()
|
MetaData metaData = MetaData.builder()
|
||||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT))
|
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(1).numberOfReplicas(1)
|
.numberOfShards(1).numberOfReplicas(1)
|
||||||
.putActiveAllocationIds(0, new HashSet<>(Arrays.asList(primaryShard.allocationId().getId()))))
|
.putActiveAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())))
|
||||||
.build();
|
.build();
|
||||||
RoutingTable routingTable = RoutingTable.builder()
|
RoutingTable routingTable = RoutingTable.builder()
|
||||||
.add(IndexRoutingTable.builder(shardId.getIndex())
|
.add(IndexRoutingTable.builder(shardId.getIndex())
|
||||||
|
|
Loading…
Reference in New Issue