From 28090b3d7366044a8cadb0e6d47c324f94c1684a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 1 Jul 2015 17:13:39 +0200 Subject: [PATCH] Allocation: Shard Started messages should be matched using an exact match When a node sends a shard started message to the master, the master goes through the routing table looking for the shard to start. At the moment we validate the indexUUID, the node the shard is assigned to and the fact that the shard is initializing. This check goes wrong if a relocating replica shard finishes recovery just at the moment the source node leaves the cluster. In this case the master will cancel the recovery and will likely assign a new initializing replica to the same target node. In this case the message from the relocation recovery can activate the new replica wrongfully. Also, the logic for decided whether an incoming shard started message will be applied was split between ShardStateAction and the AllocationService. This commit does the following: 1) Let ShardStateAction only filter basic stuff like index existence and indexUUID. 2) Move the trickier shard started matching logic to the AllocationService and make it stricter 3) Unify ShardStateAction filtering logic for both shard started and shard failed. 4) Add unit tests for all of the above. For an example test failure see: http://build-us-00.elastic.co/job/es_core_16_centos/388/ Closes #11999 --- .../action/shard/ShardStateAction.java | 114 ++++++--------- .../routing/allocation/AllocationService.java | 18 ++- .../allocation/FailedRerouteAllocation.java | 8 ++ .../action/shard/ShardStateActionTest.java | 90 ++++++++++++ .../allocation/StartedShardsRoutingTests.java | 131 ++++++++++++++++++ 5 files changed, 288 insertions(+), 73 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTest.java create mode 100644 core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 80420e325eb..34b2b6194b9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.threadpool.ThreadPool; @@ -145,26 +146,17 @@ public class ShardStateAction extends AbstractComponent { return currentState; } - MetaData metaData = currentState.getMetaData(); + final MetaData metaData = currentState.getMetaData(); + List shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size()); - for (int i = 0; i < shardRoutingEntries.size(); i++) { - ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i); - shardRoutingEntry.processed = true; - ShardRouting shardRouting = shardRoutingEntry.shardRouting; - IndexMetaData indexMetaData = metaData.index(shardRouting.index()); - // if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated - // which is fine, we should just ignore this - if (indexMetaData == null) { - continue; - } - if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) { - logger.debug("{} ignoring shard failed, different index uuid, current {}, got {}", shardRouting.shardId(), indexMetaData.getUUID(), shardRoutingEntry); - continue; - } + for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "failed", metaData, logger)) { + shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.reason)); + } - logger.debug("{} will apply shard failed {}", shardRouting.shardId(), shardRoutingEntry); - shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(shardRouting, shardRoutingEntry.reason)); + // mark all entries as processed + for (ShardRoutingEntry entry : shardRoutingEntries) { + entry.processed = true; } RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied); @@ -189,6 +181,31 @@ public class ShardStateAction extends AbstractComponent { }); } + static List extractShardsToBeApplied(List shardRoutingEntries, String type, MetaData metaData, ESLogger logger) { + List shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size()); + for (int i = 0; i < shardRoutingEntries.size(); i++) { + ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i); + ShardRouting shardRouting = shardRoutingEntry.shardRouting; + IndexMetaData indexMetaData = metaData.index(shardRouting.index()); + // if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated + // which is fine, we should just ignore this + if (indexMetaData == null) { + logger.debug("{} ignoring shard {}, unknown index in {}", shardRouting.shardId(), type, shardRoutingEntry); + continue; + } + if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) { + logger.debug("{} ignoring shard {}, different index uuid, current {}, got {}", shardRouting.shardId(), type, indexMetaData.getUUID(), shardRoutingEntry); + continue; + } + + // more debug info will be logged by the allocation service + logger.trace("{} will apply shard {} {}", shardRouting.shardId(), type, shardRoutingEntry); + shardRoutingsToBeApplied.add(shardRoutingEntry); + } + return shardRoutingsToBeApplied; + + } + private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { logger.debug("received shard started for {}", shardRoutingEntry); // buffer shard started requests, and the state update tasks will simply drain it @@ -217,56 +234,15 @@ public class ShardStateAction extends AbstractComponent { RoutingTable routingTable = currentState.routingTable(); MetaData metaData = currentState.getMetaData(); + List shardRoutingToBeApplied = new ArrayList<>(shardRoutingEntries.size()); + for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "started", metaData, logger)) { + shardRoutingToBeApplied.add(entry.shardRouting); + } - for (int i = 0; i < shardRoutingEntries.size(); i++) { - ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i); - shardRoutingEntry.processed = true; - ShardRouting shardRouting = shardRoutingEntry.shardRouting; - try { - IndexMetaData indexMetaData = metaData.index(shardRouting.index()); - IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index()); - // if there is no metadata, no routing table or the current index is not of the right uuid, the index has been deleted while it was being allocated - // which is fine, we should just ignore this - if (indexMetaData == null) { - continue; - } - if (indexRoutingTable == null) { - continue; - } - - if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) { - logger.debug("{} ignoring shard started, different index uuid, current {}, got {}", shardRouting.shardId(), indexMetaData.getUUID(), shardRoutingEntry); - continue; - } - - // find the one that maps to us, if its already started, no need to do anything... - // the shard might already be started since the nodes that is starting the shards might get cluster events - // with the shard still initializing, and it will try and start it again (until the verification comes) - - IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardRouting.id()); - - boolean applyShardEvent = true; - - for (ShardRouting entry : indexShardRoutingTable) { - if (shardRouting.currentNodeId().equals(entry.currentNodeId())) { - // we found the same shard that exists on the same node id - if (!entry.initializing()) { - // shard is in initialized state, skipping event (probable already started) - logger.debug("{} ignoring shard started event for {}, current state: {}", shardRouting.shardId(), shardRoutingEntry, entry.state()); - applyShardEvent = false; - } - } - } - - if (applyShardEvent) { - shardRoutingToBeApplied.add(shardRouting); - logger.debug("{} will apply shard started {}", shardRouting.shardId(), shardRoutingEntry); - } - - } catch (Throwable t) { - logger.error("{} unexpected failure while processing shard started [{}]", t, shardRouting.shardId(), shardRouting); - } + // mark all entries as processed + for (ShardRoutingEntry entry : shardRoutingEntries) { + entry.processed = true; } if (shardRoutingToBeApplied.isEmpty()) { @@ -307,18 +283,18 @@ public class ShardStateAction extends AbstractComponent { static class ShardRoutingEntry extends TransportRequest { - private ShardRouting shardRouting; + ShardRouting shardRouting; - private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; + String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; - private String reason; + String reason; volatile boolean processed; // state field, no need to serialize ShardRoutingEntry() { } - private ShardRoutingEntry(ShardRouting shardRouting, String indexUUID, String reason) { + ShardRoutingEntry(ShardRouting shardRouting, String indexUUID, String reason) { this.shardRouting = shardRouting; this.reason = reason; this.indexUUID = indexUUID; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index c02ad4ea075..ca073815194 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -247,7 +247,7 @@ public class AllocationService extends AbstractComponent { } } for (ShardRouting shardToFail : shardsToFail) { - changed |= applyFailedShard(allocation, shardToFail, false, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing")); + changed |= applyFailedShard(allocation, shardToFail, false, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing")); } // now, go over and elect a new primary if possible, not, from this code block on, if one is elected, @@ -331,14 +331,20 @@ public class AllocationService extends AbstractComponent { if (currentRoutingNode != null) { for (ShardRouting shard : currentRoutingNode) { if (shard.shardId().equals(startedShard.shardId())) { - relocatingNodeId = shard.relocatingNodeId(); - if (!shard.started()) { + if (shard.equals(startedShard)) { + relocatingNodeId = shard.relocatingNodeId(); dirty = true; routingNodes.started(shard); + logger.trace("{} marked as started", shard); + } else { + logger.debug("failed to find shard [{}] in order to start it [no matching shard on node], ignoring", startedShard); } break; } } + } else { + logger.debug("failed to find shard [{}] in order to start it [failed to find node], ignoring", startedShard); + } // startedShard is the current state of the shard (post relocation for example) @@ -404,6 +410,7 @@ public class AllocationService extends AbstractComponent { } } if (dirty) { + logger.debug("failed shard {} found in routingNodes, failing it", failedShard); // now, find the node that we are relocating *from*, and cancel its relocation RoutingNode relocatingFromNode = routingNodes.node(failedShard.relocatingNodeId()); if (relocatingFromNode != null) { @@ -440,6 +447,7 @@ public class AllocationService extends AbstractComponent { } } if (dirty) { + logger.debug("failed shard {} found in routingNodes, failing it", failedShard); // next, we need to find the target initializing shard that is recovering from, and remove it... RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.relocatingNodeId()); if (initializingNode != null) { @@ -490,7 +498,9 @@ public class AllocationService extends AbstractComponent { } } } - if (!dirty) { + if (dirty) { + logger.debug("failed shard {} found in routingNodes and failed", failedShard); + } else { logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java index 305768c8d28..1daf70d57e5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java @@ -45,6 +45,14 @@ public class FailedRerouteAllocation extends RoutingAllocation { this.shard = shard; this.details = details; } + + @Override + public String toString() { + return "FailedShard{" + + "shard=" + shard + + ", details='" + details + '\'' + + '}'; + } } private final List failedShards; diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTest.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTest.java new file mode 100644 index 00000000000..899fac79e8b --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.action.shard; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.test.ElasticsearchTestCase; + +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + + +public class ShardStateActionTest extends ElasticsearchTestCase { + + public void testShardFiltering() { + final IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_UUID, "test_uuid")) + .numberOfShards(2).numberOfReplicas(0) + .build(); + ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder() + .put(new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT)).masterNodeId("node1") + .put(new DiscoveryNode("node2", DummyTransportAddress.INSTANCE, Version.CURRENT)) + ) + .metaData(MetaData.builder().put(indexMetaData, false)); + + final ShardRouting initShard = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1); + final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", randomBoolean(), ShardRoutingState.STARTED, 1); + final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", randomBoolean(), ShardRoutingState.RELOCATING, 1); + stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder("test") + .addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId(), true).addShard(initShard).build()) + .addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId(), true).addShard(startedShard).build()) + .addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId(), true).addShard(relocatingShard).build()))); + + ClusterState state = stateBuilder.build(); + + ArrayList listToFilter = new ArrayList<>(); + ArrayList expectedToBeApplied = new ArrayList<>(); + + listToFilter.add(new ShardStateAction.ShardRoutingEntry(initShard, indexMetaData.uuid() + "_suffix", "wrong_uuid")); + + listToFilter.add(new ShardStateAction.ShardRoutingEntry(relocatingShard.buildTargetRelocatingShard(), indexMetaData.uuid(), "relocating_to_node")); + expectedToBeApplied.add(listToFilter.get(listToFilter.size() - 1)); + + listToFilter.add(new ShardStateAction.ShardRoutingEntry(startedShard, indexMetaData.uuid(), "started shard")); + expectedToBeApplied.add(listToFilter.get(listToFilter.size() - 1)); + + listToFilter.add(new ShardStateAction.ShardRoutingEntry(TestShardRouting.newShardRouting(initShard.index() + "_NA", initShard.id(), + initShard.currentNodeId(), initShard.primary(), initShard.state(), initShard.version()), indexMetaData.uuid(), "wrong_uuid")); + + List toBeApplied = ShardStateAction.extractShardsToBeApplied(listToFilter, "for testing", state.metaData(), logger); + if (toBeApplied.size() != expectedToBeApplied.size()) { + fail("size mismatch.\n Got: \n [" + toBeApplied + "], \n expected: \n [" + expectedToBeApplied + "]"); + } + for (int i = 0; i < toBeApplied.size(); i++) { + final ShardStateAction.ShardRoutingEntry found = toBeApplied.get(i); + final ShardStateAction.ShardRoutingEntry expected = expectedToBeApplied.get(i); + assertThat(found, equalTo(expected)); + } + } +} \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java new file mode 100644 index 00000000000..9a36ba3d3a0 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java @@ -0,0 +1,131 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ElasticsearchAllocationTestCase; +import org.junit.Test; + +import java.util.Arrays; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isEmptyOrNullString; +import static org.hamcrest.Matchers.nullValue; + +public class StartedShardsRoutingTests extends ElasticsearchAllocationTestCase { + + @Test + public void tesStartedShardsMatching() { + AllocationService allocation = createAllocationService(); + + logger.info("--> building initial cluster state"); + final IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(3).numberOfReplicas(0) + .build(); + ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))) + .metaData(MetaData.builder().put(indexMetaData, false)); + + final ShardRouting initShard = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1); + final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", randomBoolean(), ShardRoutingState.STARTED, 1); + final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", randomBoolean(), ShardRoutingState.RELOCATING, 1); + stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder("test") + .addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId(), true).addShard(initShard).build()) + .addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId(), true).addShard(startedShard).build()) + .addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId(), true).addShard(relocatingShard).build()))); + + ClusterState state = stateBuilder.build(); + + logger.info("--> test starting of shard"); + + RoutingAllocation.Result result = allocation.applyStartedShards(state, Arrays.asList( + TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), initShard.primary(), + ShardRoutingState.INITIALIZING, randomInt())), false); + assertTrue("failed to start " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + assertTrue(initShard + "isn't started \ncurrent routing table:" + result.routingTable().prettyPrint(), + result.routingTable().index("test").shard(initShard.id()).allShardsStarted()); + + + logger.info("--> testing shard variants that shouldn't match the started shard"); + + result = allocation.applyStartedShards(state, Arrays.asList( + TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), !initShard.primary(), + ShardRoutingState.INITIALIZING, 1)), false); + assertFalse("wrong primary flag shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + + result = allocation.applyStartedShards(state, Arrays.asList( + TestShardRouting.newShardRouting(initShard.index(), initShard.id(), "some_node", initShard.currentNodeId(), initShard.primary(), + ShardRoutingState.INITIALIZING, 1)), false); + assertFalse("relocating shard from node shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + + result = allocation.applyStartedShards(state, Arrays.asList( + TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), "some_node", initShard.primary(), + ShardRoutingState.INITIALIZING, 1)), false); + assertFalse("relocating shard to node shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + + + logger.info("--> testing double starting"); + + result = allocation.applyStartedShards(state, Arrays.asList( + TestShardRouting.newShardRouting(startedShard.index(), startedShard.id(), startedShard.currentNodeId(), startedShard.relocatingNodeId(), startedShard.primary(), + ShardRoutingState.INITIALIZING, 1)), false); + assertFalse("duplicate starting of the same shard should be ignored \ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + + logger.info("--> testing starting of relocating shards"); + result = allocation.applyStartedShards(state, Arrays.asList( + TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(), + ShardRoutingState.INITIALIZING, randomInt())), false); + assertTrue("failed to start " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + ShardRouting shardRouting = result.routingTable().index("test").shard(relocatingShard.id()).getShards().get(0); + assertThat(shardRouting.state(), equalTo(ShardRoutingState.STARTED)); + assertThat(shardRouting.currentNodeId(), equalTo("node2")); + assertThat(shardRouting.relocatingNodeId(), nullValue()); + + logger.info("--> testing shard variants that shouldn't match the relocating shard"); + + result = allocation.applyStartedShards(state, Arrays.asList( + TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), !relocatingShard.primary(), + ShardRoutingState.INITIALIZING, 1)), false); + assertFalse("wrong primary flag shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + + result = allocation.applyStartedShards(state, Arrays.asList( + TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), "some_node", relocatingShard.currentNodeId(), relocatingShard.primary(), + ShardRoutingState.INITIALIZING, 1)), false); + assertFalse("relocating shard to a different node shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + + result = allocation.applyStartedShards(state, Arrays.asList( + TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), "some_node", relocatingShard.primary(), + ShardRoutingState.INITIALIZING, 1)), false); + assertFalse("relocating shard from a different node shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + + result = allocation.applyStartedShards(state, Arrays.asList( + TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.primary(), + ShardRoutingState.INITIALIZING, 1)), false); + assertFalse("non-relocating shard shouldn't start shard" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + + } +}