Allocation: Shard Started messages should be matched using an exact match
When a node sends a shard started message to the master, the master goes through the routing table looking for the shard to start. At the moment we validate the indexUUID, the node the shard is assigned to and the fact that the shard is initializing. This check goes wrong if a relocating replica shard finishes recovery just at the moment the source node leaves the cluster. In this case the master will cancel the recovery and will likely assign a new initializing replica to the same target node. In this case the message from the relocation recovery can activate the new replica wrongfully. Also, the logic for decided whether an incoming shard started message will be applied was split between ShardStateAction and the AllocationService. This commit does the following: 1) Let ShardStateAction only filter basic stuff like index existence and indexUUID. 2) Move the trickier shard started matching logic to the AllocationService and make it stricter 3) Unify ShardStateAction filtering logic for both shard started and shard failed. 4) Add unit tests for all of the above. For an example test failure see: http://build-us-00.elastic.co/job/es_core_16_centos/388/ Closes #11999
This commit is contained in:
parent
5b4e86389c
commit
28090b3d73
|
@ -35,6 +35,7 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -145,26 +146,17 @@ public class ShardStateAction extends AbstractComponent {
|
|||
return currentState;
|
||||
}
|
||||
|
||||
MetaData metaData = currentState.getMetaData();
|
||||
final MetaData metaData = currentState.getMetaData();
|
||||
|
||||
|
||||
List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size());
|
||||
for (int i = 0; i < shardRoutingEntries.size(); i++) {
|
||||
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
|
||||
shardRoutingEntry.processed = true;
|
||||
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
|
||||
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
|
||||
// if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated
|
||||
// which is fine, we should just ignore this
|
||||
if (indexMetaData == null) {
|
||||
continue;
|
||||
}
|
||||
if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) {
|
||||
logger.debug("{} ignoring shard failed, different index uuid, current {}, got {}", shardRouting.shardId(), indexMetaData.getUUID(), shardRoutingEntry);
|
||||
continue;
|
||||
for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "failed", metaData, logger)) {
|
||||
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.reason));
|
||||
}
|
||||
|
||||
logger.debug("{} will apply shard failed {}", shardRouting.shardId(), shardRoutingEntry);
|
||||
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(shardRouting, shardRoutingEntry.reason));
|
||||
// mark all entries as processed
|
||||
for (ShardRoutingEntry entry : shardRoutingEntries) {
|
||||
entry.processed = true;
|
||||
}
|
||||
|
||||
RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied);
|
||||
|
@ -189,6 +181,31 @@ public class ShardStateAction extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
static List<ShardRoutingEntry> extractShardsToBeApplied(List<ShardRoutingEntry> shardRoutingEntries, String type, MetaData metaData, ESLogger logger) {
|
||||
List<ShardRoutingEntry> shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size());
|
||||
for (int i = 0; i < shardRoutingEntries.size(); i++) {
|
||||
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
|
||||
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
|
||||
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
|
||||
// if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated
|
||||
// which is fine, we should just ignore this
|
||||
if (indexMetaData == null) {
|
||||
logger.debug("{} ignoring shard {}, unknown index in {}", shardRouting.shardId(), type, shardRoutingEntry);
|
||||
continue;
|
||||
}
|
||||
if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) {
|
||||
logger.debug("{} ignoring shard {}, different index uuid, current {}, got {}", shardRouting.shardId(), type, indexMetaData.getUUID(), shardRoutingEntry);
|
||||
continue;
|
||||
}
|
||||
|
||||
// more debug info will be logged by the allocation service
|
||||
logger.trace("{} will apply shard {} {}", shardRouting.shardId(), type, shardRoutingEntry);
|
||||
shardRoutingsToBeApplied.add(shardRoutingEntry);
|
||||
}
|
||||
return shardRoutingsToBeApplied;
|
||||
|
||||
}
|
||||
|
||||
private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
|
||||
logger.debug("received shard started for {}", shardRoutingEntry);
|
||||
// buffer shard started requests, and the state update tasks will simply drain it
|
||||
|
@ -217,56 +234,15 @@ public class ShardStateAction extends AbstractComponent {
|
|||
RoutingTable routingTable = currentState.routingTable();
|
||||
MetaData metaData = currentState.getMetaData();
|
||||
|
||||
|
||||
List<ShardRouting> shardRoutingToBeApplied = new ArrayList<>(shardRoutingEntries.size());
|
||||
|
||||
for (int i = 0; i < shardRoutingEntries.size(); i++) {
|
||||
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
|
||||
shardRoutingEntry.processed = true;
|
||||
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
|
||||
try {
|
||||
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
|
||||
IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index());
|
||||
// if there is no metadata, no routing table or the current index is not of the right uuid, the index has been deleted while it was being allocated
|
||||
// which is fine, we should just ignore this
|
||||
if (indexMetaData == null) {
|
||||
continue;
|
||||
}
|
||||
if (indexRoutingTable == null) {
|
||||
continue;
|
||||
for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "started", metaData, logger)) {
|
||||
shardRoutingToBeApplied.add(entry.shardRouting);
|
||||
}
|
||||
|
||||
if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) {
|
||||
logger.debug("{} ignoring shard started, different index uuid, current {}, got {}", shardRouting.shardId(), indexMetaData.getUUID(), shardRoutingEntry);
|
||||
continue;
|
||||
}
|
||||
|
||||
// find the one that maps to us, if its already started, no need to do anything...
|
||||
// the shard might already be started since the nodes that is starting the shards might get cluster events
|
||||
// with the shard still initializing, and it will try and start it again (until the verification comes)
|
||||
|
||||
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardRouting.id());
|
||||
|
||||
boolean applyShardEvent = true;
|
||||
|
||||
for (ShardRouting entry : indexShardRoutingTable) {
|
||||
if (shardRouting.currentNodeId().equals(entry.currentNodeId())) {
|
||||
// we found the same shard that exists on the same node id
|
||||
if (!entry.initializing()) {
|
||||
// shard is in initialized state, skipping event (probable already started)
|
||||
logger.debug("{} ignoring shard started event for {}, current state: {}", shardRouting.shardId(), shardRoutingEntry, entry.state());
|
||||
applyShardEvent = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (applyShardEvent) {
|
||||
shardRoutingToBeApplied.add(shardRouting);
|
||||
logger.debug("{} will apply shard started {}", shardRouting.shardId(), shardRoutingEntry);
|
||||
}
|
||||
|
||||
} catch (Throwable t) {
|
||||
logger.error("{} unexpected failure while processing shard started [{}]", t, shardRouting.shardId(), shardRouting);
|
||||
}
|
||||
// mark all entries as processed
|
||||
for (ShardRoutingEntry entry : shardRoutingEntries) {
|
||||
entry.processed = true;
|
||||
}
|
||||
|
||||
if (shardRoutingToBeApplied.isEmpty()) {
|
||||
|
@ -307,18 +283,18 @@ public class ShardStateAction extends AbstractComponent {
|
|||
|
||||
static class ShardRoutingEntry extends TransportRequest {
|
||||
|
||||
private ShardRouting shardRouting;
|
||||
ShardRouting shardRouting;
|
||||
|
||||
private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
|
||||
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
|
||||
|
||||
private String reason;
|
||||
String reason;
|
||||
|
||||
volatile boolean processed; // state field, no need to serialize
|
||||
|
||||
ShardRoutingEntry() {
|
||||
}
|
||||
|
||||
private ShardRoutingEntry(ShardRouting shardRouting, String indexUUID, String reason) {
|
||||
ShardRoutingEntry(ShardRouting shardRouting, String indexUUID, String reason) {
|
||||
this.shardRouting = shardRouting;
|
||||
this.reason = reason;
|
||||
this.indexUUID = indexUUID;
|
||||
|
|
|
@ -331,14 +331,20 @@ public class AllocationService extends AbstractComponent {
|
|||
if (currentRoutingNode != null) {
|
||||
for (ShardRouting shard : currentRoutingNode) {
|
||||
if (shard.shardId().equals(startedShard.shardId())) {
|
||||
if (shard.equals(startedShard)) {
|
||||
relocatingNodeId = shard.relocatingNodeId();
|
||||
if (!shard.started()) {
|
||||
dirty = true;
|
||||
routingNodes.started(shard);
|
||||
logger.trace("{} marked as started", shard);
|
||||
} else {
|
||||
logger.debug("failed to find shard [{}] in order to start it [no matching shard on node], ignoring", startedShard);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.debug("failed to find shard [{}] in order to start it [failed to find node], ignoring", startedShard);
|
||||
|
||||
}
|
||||
|
||||
// startedShard is the current state of the shard (post relocation for example)
|
||||
|
@ -404,6 +410,7 @@ public class AllocationService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
if (dirty) {
|
||||
logger.debug("failed shard {} found in routingNodes, failing it", failedShard);
|
||||
// now, find the node that we are relocating *from*, and cancel its relocation
|
||||
RoutingNode relocatingFromNode = routingNodes.node(failedShard.relocatingNodeId());
|
||||
if (relocatingFromNode != null) {
|
||||
|
@ -440,6 +447,7 @@ public class AllocationService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
if (dirty) {
|
||||
logger.debug("failed shard {} found in routingNodes, failing it", failedShard);
|
||||
// next, we need to find the target initializing shard that is recovering from, and remove it...
|
||||
RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.relocatingNodeId());
|
||||
if (initializingNode != null) {
|
||||
|
@ -490,7 +498,9 @@ public class AllocationService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (!dirty) {
|
||||
if (dirty) {
|
||||
logger.debug("failed shard {} found in routingNodes and failed", failedShard);
|
||||
} else {
|
||||
logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,14 @@ public class FailedRerouteAllocation extends RoutingAllocation {
|
|||
this.shard = shard;
|
||||
this.details = details;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "FailedShard{" +
|
||||
"shard=" + shard +
|
||||
", details='" + details + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
private final List<FailedShard> failedShards;
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.action.shard;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.DummyTransportAddress;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
|
||||
public class ShardStateActionTest extends ElasticsearchTestCase {
|
||||
|
||||
public void testShardFiltering() {
|
||||
final IndexMetaData indexMetaData = IndexMetaData.builder("test")
|
||||
.settings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_UUID, "test_uuid"))
|
||||
.numberOfShards(2).numberOfReplicas(0)
|
||||
.build();
|
||||
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.put(new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT)).masterNodeId("node1")
|
||||
.put(new DiscoveryNode("node2", DummyTransportAddress.INSTANCE, Version.CURRENT))
|
||||
)
|
||||
.metaData(MetaData.builder().put(indexMetaData, false));
|
||||
|
||||
final ShardRouting initShard = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1);
|
||||
final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", randomBoolean(), ShardRoutingState.STARTED, 1);
|
||||
final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", randomBoolean(), ShardRoutingState.RELOCATING, 1);
|
||||
stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder("test")
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId(), true).addShard(initShard).build())
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId(), true).addShard(startedShard).build())
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId(), true).addShard(relocatingShard).build())));
|
||||
|
||||
ClusterState state = stateBuilder.build();
|
||||
|
||||
ArrayList<ShardStateAction.ShardRoutingEntry> listToFilter = new ArrayList<>();
|
||||
ArrayList<ShardStateAction.ShardRoutingEntry> expectedToBeApplied = new ArrayList<>();
|
||||
|
||||
listToFilter.add(new ShardStateAction.ShardRoutingEntry(initShard, indexMetaData.uuid() + "_suffix", "wrong_uuid"));
|
||||
|
||||
listToFilter.add(new ShardStateAction.ShardRoutingEntry(relocatingShard.buildTargetRelocatingShard(), indexMetaData.uuid(), "relocating_to_node"));
|
||||
expectedToBeApplied.add(listToFilter.get(listToFilter.size() - 1));
|
||||
|
||||
listToFilter.add(new ShardStateAction.ShardRoutingEntry(startedShard, indexMetaData.uuid(), "started shard"));
|
||||
expectedToBeApplied.add(listToFilter.get(listToFilter.size() - 1));
|
||||
|
||||
listToFilter.add(new ShardStateAction.ShardRoutingEntry(TestShardRouting.newShardRouting(initShard.index() + "_NA", initShard.id(),
|
||||
initShard.currentNodeId(), initShard.primary(), initShard.state(), initShard.version()), indexMetaData.uuid(), "wrong_uuid"));
|
||||
|
||||
List<ShardStateAction.ShardRoutingEntry> toBeApplied = ShardStateAction.extractShardsToBeApplied(listToFilter, "for testing", state.metaData(), logger);
|
||||
if (toBeApplied.size() != expectedToBeApplied.size()) {
|
||||
fail("size mismatch.\n Got: \n [" + toBeApplied + "], \n expected: \n [" + expectedToBeApplied + "]");
|
||||
}
|
||||
for (int i = 0; i < toBeApplied.size(); i++) {
|
||||
final ShardStateAction.ShardRoutingEntry found = toBeApplied.get(i);
|
||||
final ShardStateAction.ShardRoutingEntry expected = expectedToBeApplied.get(i);
|
||||
assertThat(found, equalTo(expected));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,131 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.isEmptyOrNullString;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class StartedShardsRoutingTests extends ElasticsearchAllocationTestCase {
|
||||
|
||||
@Test
|
||||
public void tesStartedShardsMatching() {
|
||||
AllocationService allocation = createAllocationService();
|
||||
|
||||
logger.info("--> building initial cluster state");
|
||||
final IndexMetaData indexMetaData = IndexMetaData.builder("test")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(3).numberOfReplicas(0)
|
||||
.build();
|
||||
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")))
|
||||
.metaData(MetaData.builder().put(indexMetaData, false));
|
||||
|
||||
final ShardRouting initShard = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1);
|
||||
final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", randomBoolean(), ShardRoutingState.STARTED, 1);
|
||||
final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", randomBoolean(), ShardRoutingState.RELOCATING, 1);
|
||||
stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder("test")
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId(), true).addShard(initShard).build())
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId(), true).addShard(startedShard).build())
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId(), true).addShard(relocatingShard).build())));
|
||||
|
||||
ClusterState state = stateBuilder.build();
|
||||
|
||||
logger.info("--> test starting of shard");
|
||||
|
||||
RoutingAllocation.Result result = allocation.applyStartedShards(state, Arrays.asList(
|
||||
TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), initShard.primary(),
|
||||
ShardRoutingState.INITIALIZING, randomInt())), false);
|
||||
assertTrue("failed to start " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
|
||||
assertTrue(initShard + "isn't started \ncurrent routing table:" + result.routingTable().prettyPrint(),
|
||||
result.routingTable().index("test").shard(initShard.id()).allShardsStarted());
|
||||
|
||||
|
||||
logger.info("--> testing shard variants that shouldn't match the started shard");
|
||||
|
||||
result = allocation.applyStartedShards(state, Arrays.asList(
|
||||
TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), !initShard.primary(),
|
||||
ShardRoutingState.INITIALIZING, 1)), false);
|
||||
assertFalse("wrong primary flag shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
|
||||
|
||||
result = allocation.applyStartedShards(state, Arrays.asList(
|
||||
TestShardRouting.newShardRouting(initShard.index(), initShard.id(), "some_node", initShard.currentNodeId(), initShard.primary(),
|
||||
ShardRoutingState.INITIALIZING, 1)), false);
|
||||
assertFalse("relocating shard from node shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
|
||||
|
||||
result = allocation.applyStartedShards(state, Arrays.asList(
|
||||
TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), "some_node", initShard.primary(),
|
||||
ShardRoutingState.INITIALIZING, 1)), false);
|
||||
assertFalse("relocating shard to node shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
|
||||
|
||||
|
||||
logger.info("--> testing double starting");
|
||||
|
||||
result = allocation.applyStartedShards(state, Arrays.asList(
|
||||
TestShardRouting.newShardRouting(startedShard.index(), startedShard.id(), startedShard.currentNodeId(), startedShard.relocatingNodeId(), startedShard.primary(),
|
||||
ShardRoutingState.INITIALIZING, 1)), false);
|
||||
assertFalse("duplicate starting of the same shard should be ignored \ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
|
||||
|
||||
logger.info("--> testing starting of relocating shards");
|
||||
result = allocation.applyStartedShards(state, Arrays.asList(
|
||||
TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(),
|
||||
ShardRoutingState.INITIALIZING, randomInt())), false);
|
||||
assertTrue("failed to start " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
|
||||
ShardRouting shardRouting = result.routingTable().index("test").shard(relocatingShard.id()).getShards().get(0);
|
||||
assertThat(shardRouting.state(), equalTo(ShardRoutingState.STARTED));
|
||||
assertThat(shardRouting.currentNodeId(), equalTo("node2"));
|
||||
assertThat(shardRouting.relocatingNodeId(), nullValue());
|
||||
|
||||
logger.info("--> testing shard variants that shouldn't match the relocating shard");
|
||||
|
||||
result = allocation.applyStartedShards(state, Arrays.asList(
|
||||
TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), !relocatingShard.primary(),
|
||||
ShardRoutingState.INITIALIZING, 1)), false);
|
||||
assertFalse("wrong primary flag shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
|
||||
|
||||
result = allocation.applyStartedShards(state, Arrays.asList(
|
||||
TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), "some_node", relocatingShard.currentNodeId(), relocatingShard.primary(),
|
||||
ShardRoutingState.INITIALIZING, 1)), false);
|
||||
assertFalse("relocating shard to a different node shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
|
||||
|
||||
result = allocation.applyStartedShards(state, Arrays.asList(
|
||||
TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), "some_node", relocatingShard.primary(),
|
||||
ShardRoutingState.INITIALIZING, 1)), false);
|
||||
assertFalse("relocating shard from a different node shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
|
||||
|
||||
result = allocation.applyStartedShards(state, Arrays.asList(
|
||||
TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.primary(),
|
||||
ShardRoutingState.INITIALIZING, 1)), false);
|
||||
assertFalse("non-relocating shard shouldn't start shard" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue