Allocation: use recently added allocation ids for shard started/failed

On top of that:
 1) A relocation target shards' allocation id is changed to include the allocation id of the source shard under relocatingId (similar to shard routing semantics)
 2) The logic around state change for finalize shard relocation is simplified - one simple start the target shard (we previously had unused logic around relocating state)

Closes #12299
This commit is contained in:
Boaz Leskes 2015-07-16 16:45:37 +02:00
parent c56ce0e242
commit 275848eb9b
12 changed files with 224 additions and 329 deletions

View File

@ -145,17 +145,12 @@ public class ShardStateAction extends AbstractComponent {
return currentState;
}
final MetaData metaData = currentState.getMetaData();
List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size());
for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "failed", metaData, logger)) {
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.message, entry.failure));
}
// mark all entries as processed
for (ShardRoutingEntry entry : shardRoutingEntries) {
entry.processed = true;
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.message, entry.failure));
}
RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied);
@ -180,31 +175,6 @@ public class ShardStateAction extends AbstractComponent {
});
}
static List<ShardRoutingEntry> extractShardsToBeApplied(List<ShardRoutingEntry> shardRoutingEntries, String type, MetaData metaData, ESLogger logger) {
List<ShardRoutingEntry> shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size());
for (int i = 0; i < shardRoutingEntries.size(); i++) {
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
// if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated
// which is fine, we should just ignore this
if (indexMetaData == null) {
logger.debug("{} ignoring shard {}, unknown index in {}", shardRouting.shardId(), type, shardRoutingEntry);
continue;
}
if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) {
logger.debug("{} ignoring shard {}, different index uuid, current {}, got {}", shardRouting.shardId(), type, indexMetaData.getIndexUUID(), shardRoutingEntry);
continue;
}
// more debug info will be logged by the allocation service
logger.trace("{} will apply shard {} {}", shardRouting.shardId(), type, shardRoutingEntry);
shardRoutingsToBeApplied.add(shardRoutingEntry);
}
return shardRoutingsToBeApplied;
}
private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);
// buffer shard started requests, and the state update tasks will simply drain it
@ -230,18 +200,12 @@ public class ShardStateAction extends AbstractComponent {
return currentState;
}
RoutingTable routingTable = currentState.routingTable();
MetaData metaData = currentState.getMetaData();
List<ShardRouting> shardRoutingToBeApplied = new ArrayList<>(shardRoutingEntries.size());
for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "started", metaData, logger)) {
shardRoutingToBeApplied.add(entry.shardRouting);
}
// mark all entries as processed
for (ShardRoutingEntry entry : shardRoutingEntries) {
entry.processed = true;
shardRoutingToBeApplied.add(entry.shardRouting);
}
if (shardRoutingToBeApplied.isEmpty()) {

View File

@ -22,6 +22,8 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
@ -34,7 +36,7 @@ import java.io.IOException;
* relocationId. Once relocation is done, the new allocation id is set to the relocationId. This is similar
* behavior to how ShardRouting#currentNodeId is used.
*/
public class AllocationId {
public class AllocationId implements ToXContent {
private final String id;
private final String relocationId;
@ -67,7 +69,7 @@ public class AllocationId {
*/
public static AllocationId newTargetRelocation(AllocationId allocationId) {
assert allocationId.getRelocationId() != null;
return new AllocationId(allocationId.getRelocationId(), null);
return new AllocationId(allocationId.getRelocationId(), allocationId.getId());
}
/**
@ -81,19 +83,24 @@ public class AllocationId {
/**
* Creates a new allocation id representing a cancelled relocation.
*/
*
* Note that this is expected to be called on the allocation id
* of the *source* shard
* */
public static AllocationId cancelRelocation(AllocationId allocationId) {
assert allocationId.getRelocationId() != null;
return new AllocationId(allocationId.getId(), null);
}
/**
* Creates a new allocation id finalizing a relocation, moving the transient
* relocation id to be the actual id.
* Creates a new allocation id finalizing a relocation.
*
* Note that this is expected to be called on the allocation id
* of the *target* shard and thus it only needs to clear the relocating id.
*/
public static AllocationId finishRelocation(AllocationId allocationId) {
assert allocationId.getRelocationId() != null;
return new AllocationId(allocationId.getRelocationId(), null);
return new AllocationId(allocationId.getId(), null);
}
/**
@ -126,4 +133,20 @@ public class AllocationId {
result = 31 * result + (relocationId != null ? relocationId.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "[id=" + id + (relocationId == null ? "" : ", rId=" + relocationId) + "]";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("allocation_id");
builder.field("id", id);
if (relocationId != null) {
builder.field("relocation_id", relocationId);
}
builder.endObject();
return builder;
}
}

View File

@ -394,15 +394,14 @@ public class RoutingNodes implements Iterable<RoutingNode> {
* Mark a shard as started and adjusts internal statistics.
*/
public void started(ShardRouting shard) {
if (!shard.active() && shard.relocatingNodeId() == null) {
assert !shard.active() : "expected an intializing shard " + shard;
if (shard.relocatingNodeId() == null) {
// if this is not a target shard for relocation, we need to update statistics
inactiveShardCount--;
if (shard.primary()) {
inactivePrimaryCount--;
}
} else if (shard.relocating()) {
relocatingShards--;
}
assert !shard.started();
shard.moveToStarted();
}
@ -757,6 +756,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
private final RoutingNode iterable;
private ShardRouting shard;
private final Iterator<ShardRouting> delegate;
private boolean removed = false;
public RoutingNodeIterator(RoutingNode iterable) {
this.delegate = iterable.mutableIterator();
@ -770,6 +770,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
@Override
public ShardRouting next() {
removed = false;
return shard = delegate.next();
}
@ -777,6 +778,13 @@ public class RoutingNodes implements Iterable<RoutingNode> {
public void remove() {
delegate.remove();
RoutingNodes.this.remove(shard);
removed = true;
}
/** returns true if {@link #remove()} or {@link #moveToUnassigned(UnassignedInfo)} were called on the current shard */
public boolean isRemoved() {
return removed;
}
@Override
@ -785,10 +793,16 @@ public class RoutingNodes implements Iterable<RoutingNode> {
}
public void moveToUnassigned(UnassignedInfo unassignedInfo) {
remove();
if (isRemoved() == false) {
remove();
}
ShardRouting unassigned = new ShardRouting(shard); // protective copy of the mutable shard
unassigned.moveToUnassigned(unassignedInfo);
unassigned().add(unassigned);
}
public ShardRouting current() {
return shard;
}
}
}

View File

@ -441,11 +441,12 @@ public final class ShardRouting implements Streamable, ToXContent {
void moveToStarted() {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : this;
assert state == ShardRoutingState.INITIALIZING : "expected an initializing shard " + this;
relocatingNodeId = null;
restoreSource = null;
unassignedInfo = null; // we keep the unassigned data until the shard is started
if (state == ShardRoutingState.RELOCATING) {
if (allocationId.getRelocationId() != null) {
// target relocation
allocationId = AllocationId.finishRelocation(allocationId);
}
state = ShardRoutingState.STARTED;
@ -502,6 +503,9 @@ public final class ShardRouting implements Streamable, ToXContent {
if (relocatingNodeId != null ? !relocatingNodeId.equals(that.relocatingNodeId) : that.relocatingNodeId != null) {
return false;
}
if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) {
return false;
}
if (state != that.state) {
return false;
}
@ -526,6 +530,7 @@ public final class ShardRouting implements Streamable, ToXContent {
result = 31 * result + (primary ? 1 : 0);
result = 31 * result + (state != null ? state.hashCode() : 0);
result = 31 * result + (restoreSource != null ? restoreSource.hashCode() : 0);
result = 31 * result + (allocationId != null ? allocationId.hashCode() : 0);
return hashCode = result;
}
@ -549,10 +554,14 @@ public final class ShardRouting implements Streamable, ToXContent {
} else {
sb.append("[R]");
}
sb.append(", v[").append(version).append("]");
if (this.restoreSource != null) {
sb.append(", restoring[" + restoreSource + "]");
}
sb.append(", s[").append(state).append("]");
if (allocationId != null) {
sb.append(", a").append(allocationId);
}
if (this.unassignedInfo != null) {
sb.append(", ").append(unassignedInfo.toString());
}
@ -567,11 +576,16 @@ public final class ShardRouting implements Streamable, ToXContent {
.field("node", currentNodeId())
.field("relocating_node", relocatingNodeId())
.field("shard", shardId().id())
.field("index", shardId().index().name());
.field("index", shardId().index().name())
.field("version", version);
if (restoreSource() != null) {
builder.field("restore_source");
restoreSource().toXContent(builder, params);
}
if (allocationId != null) {
allocationId.toXContent(builder, params);
}
if (unassignedInfo != null) {
unassignedInfo.toXContent(builder, params);
}

View File

@ -257,19 +257,22 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
return nextDelay == Long.MAX_VALUE ? 0l : nextDelay;
}
@Override
public String toString() {
public String shortSummary() {
StringBuilder sb = new StringBuilder();
sb.append("unassigned_info[[reason=").append(reason).append("]");
sb.append("[reason=").append(reason).append("]");
sb.append(", at[").append(DATE_TIME_FORMATTER.printer().print(timestamp)).append("]");
String details = getDetails();
if (details != null) {
sb.append(", details[").append(details).append("]");
}
sb.append("]");
return sb.toString();
}
@Override
public String toString() {
return "unassigned_info[" + shortSummary() + "]";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("unassigned_info");

View File

@ -22,7 +22,6 @@ package org.elasticsearch.cluster.routing.allocation;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -325,47 +324,50 @@ public class AllocationService extends AbstractComponent {
for (ShardRouting startedShard : startedShardEntries) {
assert startedShard.initializing();
// retrieve the relocating node id before calling startedShard().
String relocatingNodeId = null;
// validate index still exists. strictly speaking this is not needed but it gives clearer logs
if (routingNodes.routingTable().index(startedShard.index()) == null) {
logger.debug("{} ignoring shard started, unknown index (routing: {})", startedShard.shardId(), startedShard);
continue;
}
RoutingNodes.RoutingNodeIterator currentRoutingNode = routingNodes.routingNodeIter(startedShard.currentNodeId());
if (currentRoutingNode != null) {
for (ShardRouting shard : currentRoutingNode) {
if (shard.shardId().equals(startedShard.shardId())) {
if (shard.equals(startedShard)) {
relocatingNodeId = shard.relocatingNodeId();
dirty = true;
routingNodes.started(shard);
logger.trace("{} marked as started", shard);
} else {
logger.debug("failed to find shard [{}] in order to start it [no matching shard on node], ignoring", startedShard);
}
break;
}
}
} else {
logger.debug("failed to find shard [{}] in order to start it [failed to find node], ignoring", startedShard);
if (currentRoutingNode == null) {
logger.debug("{} failed to find shard in order to start it [failed to find node], ignoring (routing: {})", startedShard.shardId(), startedShard);
continue;
}
for (ShardRouting shard : currentRoutingNode) {
if (shard.allocationId().getId().equals(startedShard.allocationId().getId())) {
if (shard.active()) {
logger.trace("{} shard is already started, ignoring (routing: {})", startedShard.shardId(), startedShard);
} else {
dirty = true;
// override started shard with the latest copy. Capture it now , before starting the shard destroys it...
startedShard = new ShardRouting(shard);
routingNodes.started(shard);
logger.trace("{} marked shard as started (routing: {})", startedShard.shardId(), startedShard);
}
break;
}
}
// startedShard is the current state of the shard (post relocation for example)
// this means that after relocation, the state will be started and the currentNodeId will be
// the node we relocated to
if (relocatingNodeId == null) {
if (startedShard.relocatingNodeId() == null) {
continue;
}
RoutingNodes.RoutingNodeIterator sourceRoutingNode = routingNodes.routingNodeIter(relocatingNodeId);
RoutingNodes.RoutingNodeIterator sourceRoutingNode = routingNodes.routingNodeIter(startedShard.relocatingNodeId());
if (sourceRoutingNode != null) {
while (sourceRoutingNode.hasNext()) {
ShardRouting shard = sourceRoutingNode.next();
if (shard.shardId().equals(startedShard.shardId())) {
if (shard.relocating()) {
dirty = true;
sourceRoutingNode.remove();
break;
}
if (shard.allocationId().getId().equals(startedShard.allocationId().getRelocationId())) {
assert shard.relocating() : "source shard for relocation is not marked as relocating. source " + shard + ", started target " + startedShard;
dirty = true;
sourceRoutingNode.remove();
break;
}
}
}
@ -378,133 +380,105 @@ public class AllocationService extends AbstractComponent {
* require relocation.
*/
private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, boolean addToIgnoreList, UnassignedInfo unassignedInfo) {
// create a copy of the failed shard, since we assume we can change possible references to it without
// changing the state of failed shard
failedShard = new ShardRouting(failedShard);
IndexRoutingTable indexRoutingTable = allocation.routingTable().index(failedShard.index());
if (indexRoutingTable == null) {
logger.debug("{} ignoring shard failure, unknown index in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
return false;
}
RoutingNodes routingNodes = allocation.routingNodes();
boolean dirty = false;
if (failedShard.relocatingNodeId() != null) {
// the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node)
if (failedShard.initializing()) {
// the shard is initializing and recovering from another node
// first, we need to cancel the current node that is being initialized
RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.currentNodeId());
if (initializingNode != null) {
while (initializingNode.hasNext()) {
ShardRouting shardRouting = initializingNode.next();
if (shardRouting.equals(failedShard)) {
dirty = true;
initializingNode.remove();
if (addToIgnoreList) {
// make sure we ignore this shard on the relevant node
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
}
break;
}
}
}
if (dirty) {
logger.debug("failed shard {} found in routingNodes, failing it", failedShard);
// now, find the node that we are relocating *from*, and cancel its relocation
RoutingNode relocatingFromNode = routingNodes.node(failedShard.relocatingNodeId());
if (relocatingFromNode != null) {
for (ShardRouting shardRouting : relocatingFromNode) {
if (shardRouting.shardId().equals(failedShard.shardId()) && shardRouting.relocating()) {
dirty = true;
routingNodes.cancelRelocation(shardRouting);
break;
}
}
}
} else {
logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard);
}
return dirty;
} else if (failedShard.relocating()) {
// the shard is relocating, meaning its the source the shard is relocating from
// first, we need to cancel the current relocation from the current node
// now, find the node that we are recovering from, cancel the relocation, remove it from the node
// and add it to the unassigned shards list...
RoutingNodes.RoutingNodeIterator relocatingFromNode = routingNodes.routingNodeIter(failedShard.currentNodeId());
if (relocatingFromNode != null) {
while (relocatingFromNode.hasNext()) {
ShardRouting shardRouting = relocatingFromNode.next();
if (shardRouting.equals(failedShard)) {
dirty = true;
if (addToIgnoreList) {
// make sure we ignore this shard on the relevant node
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
}
relocatingFromNode.moveToUnassigned(unassignedInfo);
break;
}
}
}
if (dirty) {
logger.debug("failed shard {} found in routingNodes, failing it", failedShard);
// next, we need to find the target initializing shard that is recovering from, and remove it...
RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.relocatingNodeId());
if (initializingNode != null) {
while (initializingNode.hasNext()) {
ShardRouting shardRouting = initializingNode.next();
if (shardRouting.shardId().equals(failedShard.shardId()) && shardRouting.initializing()) {
dirty = true;
initializingNode.remove();
}
}
}
} else {
logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard);
}
} else {
throw new IllegalStateException("illegal state for a failed shard, relocating node id is set, but state does not match: " + failedShard);
RoutingNodes.RoutingNodeIterator matchedNode = routingNodes.routingNodeIter(failedShard.currentNodeId());
if (matchedNode == null) {
logger.debug("{} ignoring shard failure, unknown node in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
return false;
}
boolean matchedShard = false;
while (matchedNode.hasNext()) {
ShardRouting routing = matchedNode.next();
if (routing.allocationId().getId().equals(failedShard.allocationId().getId())) {
matchedShard = true;
logger.debug("{} failed shard {} found in routingNodes, failing it ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
break;
}
} else {
// the shard is not relocating, its either started, or initializing, just cancel it and move on...
RoutingNodes.RoutingNodeIterator node = routingNodes.routingNodeIter(failedShard.currentNodeId());
if (node != null) {
while (node.hasNext()) {
ShardRouting shardRouting = node.next();
if (shardRouting.equals(failedShard)) {
dirty = true;
if (addToIgnoreList) {
// make sure we ignore this shard on the relevant node
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
}
// move all the shards matching the failed shard to the end of the unassigned list
// so we give a chance for other allocations and won't create poison failed allocations
// that can keep other shards from being allocated (because of limits applied on how many
// shards we can start per node)
List<ShardRouting> shardsToMove = Lists.newArrayList();
for (Iterator<ShardRouting> unassignedIt = routingNodes.unassigned().iterator(); unassignedIt.hasNext(); ) {
ShardRouting unassignedShardRouting = unassignedIt.next();
if (unassignedShardRouting.shardId().equals(failedShard.shardId())) {
unassignedIt.remove();
shardsToMove.add(unassignedShardRouting);
}
}
if (!shardsToMove.isEmpty()) {
routingNodes.unassigned().addAll(shardsToMove);
}
}
node.moveToUnassigned(unassignedInfo);
if (matchedShard == false) {
logger.debug("{} ignoring shard failure, unknown allocation id in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
return false;
}
// replace incoming instance to make sure we work on the latest one. Copy it to maintain information during modifications.
failedShard = new ShardRouting(matchedNode.current());
// remove the current copy of the shard
matchedNode.remove();
if (addToIgnoreList) {
// make sure we ignore this shard on the relevant node
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
}
if (failedShard.relocatingNodeId() != null && failedShard.initializing()) {
// The shard is a target of a relocating shard. In that case we only
// need to remove the target shard and cancel the source relocation.
// No shard is left unassigned
logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", failedShard, unassignedInfo.shortSummary());
RoutingNode relocatingFromNode = routingNodes.node(failedShard.relocatingNodeId());
if (relocatingFromNode != null) {
for (ShardRouting shardRouting : relocatingFromNode) {
if (shardRouting.allocationId().getId().equals(failedShard.allocationId().getRelocationId())) {
logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", failedShard.shardId(), shardRouting, unassignedInfo.shortSummary());
routingNodes.cancelRelocation(shardRouting);
break;
}
}
}
if (dirty) {
logger.debug("failed shard {} found in routingNodes and failed", failedShard);
} else {
logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard);
} else {
// The fail shard is the main copy of the current shard routing. Any
// relocation will be cancelled (and the target shard removed as well)
// and the shard copy needs to be marked as unassigned
if (failedShard.relocatingNodeId() != null) {
// handle relocation source shards. we need to find the target initializing shard that is recovering from, and remove it...
assert failedShard.initializing() == false; // should have been dealt with and returned
assert failedShard.relocating();
RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.relocatingNodeId());
if (initializingNode != null) {
while (initializingNode.hasNext()) {
ShardRouting shardRouting = initializingNode.next();
if (shardRouting.allocationId().getId().equals(failedShard.allocationId().getRelocationId())) {
assert shardRouting.initializing() : shardRouting;
assert failedShard.allocationId().getId().equals(shardRouting.allocationId().getRelocationId())
: "found target shard's allocation relocation id is different than source";
logger.trace("{} is removed due to the failure of the source shard", shardRouting);
initializingNode.remove();
}
}
}
}
// move all the shards matching the failed shard to the end of the unassigned list
// so we give a chance for other allocations and won't create poison failed allocations
// that can keep other shards from being allocated (because of limits applied on how many
// shards we can start per node)
List<ShardRouting> shardsToMove = Lists.newArrayList();
for (Iterator<ShardRouting> unassignedIt = routingNodes.unassigned().iterator(); unassignedIt.hasNext(); ) {
ShardRouting unassignedShardRouting = unassignedIt.next();
if (unassignedShardRouting.shardId().equals(failedShard.shardId())) {
unassignedIt.remove();
shardsToMove.add(unassignedShardRouting);
}
}
if (!shardsToMove.isEmpty()) {
routingNodes.unassigned().addAll(shardsToMove);
}
matchedNode.moveToUnassigned(unassignedInfo);
}
return dirty;
assert matchedNode.isRemoved() : "failedShard " + failedShard + " was matched but wasn't removed";
return true;
}
}

View File

@ -192,17 +192,18 @@ public class ExceptionSerializationTests extends ElasticsearchTestCase {
}
public void testIllegalShardRoutingStateException() throws IOException {
ShardRouting routing = TestShardRouting.newShardRouting("test", 0, "xyz", "def", false, ShardRoutingState.STARTED, 0);
final ShardRouting routing = TestShardRouting.newShardRouting("test", 0, "xyz", "def", false, ShardRoutingState.STARTED, 0);
final String routingAsString = routing.toString();
IllegalShardRoutingStateException serialize = serialize(new IllegalShardRoutingStateException(routing, "foo", new NullPointerException()));
assertNotNull(serialize.shard());
assertEquals(routing, serialize.shard());
assertEquals("[test][0], node[xyz], relocating [def], [R], s[STARTED]: foo", serialize.getMessage());
assertEquals(routingAsString + ": foo", serialize.getMessage());
assertTrue(serialize.getCause() instanceof NullPointerException);
serialize = serialize(new IllegalShardRoutingStateException(routing, "bar", null));
assertNotNull(serialize.shard());
assertEquals(routing, serialize.shard());
assertEquals("[test][0], node[xyz], relocating [def], [R], s[STARTED]: bar", serialize.getMessage());
assertEquals(routingAsString + ": bar", serialize.getMessage());
assertNull(serialize.getCause());
}

View File

@ -1,90 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.shard;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.test.ElasticsearchTestCase;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
public class ShardStateActionTest extends ElasticsearchTestCase {
public void testShardFiltering() {
final IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, "test_uuid"))
.numberOfShards(2).numberOfReplicas(0)
.build();
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder()
.put(new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT)).masterNodeId("node1")
.put(new DiscoveryNode("node2", DummyTransportAddress.INSTANCE, Version.CURRENT))
)
.metaData(MetaData.builder().put(indexMetaData, false));
final ShardRouting initShard = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1);
final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", randomBoolean(), ShardRoutingState.STARTED, 1);
final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", randomBoolean(), ShardRoutingState.RELOCATING, 1);
stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder("test")
.addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId(), true).addShard(initShard).build())
.addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId(), true).addShard(startedShard).build())
.addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId(), true).addShard(relocatingShard).build())));
ClusterState state = stateBuilder.build();
ArrayList<ShardStateAction.ShardRoutingEntry> listToFilter = new ArrayList<>();
ArrayList<ShardStateAction.ShardRoutingEntry> expectedToBeApplied = new ArrayList<>();
listToFilter.add(new ShardStateAction.ShardRoutingEntry(initShard, indexMetaData.indexUUID() + "_suffix", "wrong_uuid", null));
listToFilter.add(new ShardStateAction.ShardRoutingEntry(relocatingShard.buildTargetRelocatingShard(), indexMetaData.indexUUID(), "relocating_to_node", null));
expectedToBeApplied.add(listToFilter.get(listToFilter.size() - 1));
listToFilter.add(new ShardStateAction.ShardRoutingEntry(startedShard, indexMetaData.indexUUID(), "started shard", null));
expectedToBeApplied.add(listToFilter.get(listToFilter.size() - 1));
listToFilter.add(new ShardStateAction.ShardRoutingEntry(TestShardRouting.newShardRouting(initShard.index() + "_NA", initShard.id(),
initShard.currentNodeId(), initShard.primary(), initShard.state(), initShard.version()), indexMetaData.indexUUID(), "wrong_uuid", null));
List<ShardStateAction.ShardRoutingEntry> toBeApplied = ShardStateAction.extractShardsToBeApplied(listToFilter, "for testing", state.metaData(), logger);
if (toBeApplied.size() != expectedToBeApplied.size()) {
fail("size mismatch.\n Got: \n [" + toBeApplied + "], \n expected: \n [" + expectedToBeApplied + "]");
}
for (int i = 0; i < toBeApplied.size(); i++) {
final ShardStateAction.ShardRoutingEntry found = toBeApplied.get(i);
final ShardStateAction.ShardRoutingEntry expected = expectedToBeApplied.get(i);
assertThat(found, equalTo(expected));
}
}
}

View File

@ -62,16 +62,15 @@ public class AllocationIdTests extends ElasticsearchTestCase {
assertThat(shard.allocationId(), not(equalTo(allocationId)));
assertThat(shard.allocationId().getId(), equalTo(allocationId.getId()));
assertThat(shard.allocationId().getRelocationId(), notNullValue());
allocationId = shard.allocationId();
ShardRouting target = shard.buildTargetRelocatingShard();
assertThat(target.allocationId().getId(), equalTo(shard.allocationId().getRelocationId()));
assertThat(target.allocationId().getRelocationId(), nullValue());
assertThat(target.allocationId().getRelocationId(), equalTo(shard.allocationId().getId()));
logger.info("-- finalize the relocation");
shard.moveToStarted();
assertThat(shard.allocationId().getId(), equalTo(target.allocationId().getId()));
assertThat(shard.allocationId().getRelocationId(), nullValue());
target.moveToStarted();
assertThat(target.allocationId().getId(), equalTo(shard.allocationId().getRelocationId()));
assertThat(target.allocationId().getRelocationId(), nullValue());
}
@Test

View File

@ -33,6 +33,10 @@ public class TestShardRouting {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, buildAllocationId(state), true);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, AllocationId allocationId, long version) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, allocationId, true);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, null, buildAllocationId(state), true);
}

View File

@ -256,9 +256,9 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
logger.info("fail the first shard, will have no place to be rerouted to (single node), so stays unassigned");
prevRoutingTable = routingTable;
routingTable = strategy.applyFailedShard(clusterState, TestShardRouting.newShardRouting("test", 0, "node1", true, INITIALIZING, 0)).routingTable();
ShardRouting firstShard = clusterState.routingNodes().node("node1").get(0);
routingTable = strategy.applyFailedShard(clusterState, firstShard).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
@ -272,7 +272,7 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
}
logger.info("fail the shard again, see that nothing happens");
assertThat(strategy.applyFailedShard(clusterState, TestShardRouting.newShardRouting("test", 0, "node1", true, INITIALIZING, 0)).changed(), equalTo(false));
assertThat(strategy.applyFailedShard(clusterState, firstShard).changed(), equalTo(false));
}
@Test
@ -371,9 +371,9 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
logger.info("fail the first shard, will start INITIALIZING on the second node");
prevRoutingTable = routingTable;
routingTable = strategy.applyFailedShard(clusterState, TestShardRouting.newShardRouting("test", 0, nodeHoldingPrimary, true, INITIALIZING, 0)).routingTable();
final ShardRouting firstShard = clusterState.routingNodes().node("node1").get(0);
routingTable = strategy.applyFailedShard(clusterState, firstShard).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
@ -387,7 +387,7 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
}
logger.info("fail the shard again, see that nothing happens");
assertThat(strategy.applyFailedShard(clusterState, TestShardRouting.newShardRouting("test", 0, nodeHoldingPrimary, true, INITIALIZING, 0)).changed(), equalTo(false));
assertThat(strategy.applyFailedShard(clusterState, firstShard).changed(), equalTo(false));
}
@Test

View File

@ -64,68 +64,57 @@ public class StartedShardsRoutingTests extends ElasticsearchAllocationTestCase {
RoutingAllocation.Result result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), initShard.primary(),
ShardRoutingState.INITIALIZING, randomInt())), false);
ShardRoutingState.INITIALIZING, initShard.allocationId(), randomInt())), false);
assertTrue("failed to start " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
assertTrue(initShard + "isn't started \ncurrent routing table:" + result.routingTable().prettyPrint(),
result.routingTable().index("test").shard(initShard.id()).allShardsStarted());
logger.info("--> testing shard variants that shouldn't match the started shard");
logger.info("--> testing shard variants that shouldn't match the initializing shard");
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), !initShard.primary(),
TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), initShard.primary(),
ShardRoutingState.INITIALIZING, 1)), false);
assertFalse("wrong primary flag shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
assertFalse("wrong allocation id flag shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(initShard.index(), initShard.id(), "some_node", initShard.currentNodeId(), initShard.primary(),
ShardRoutingState.INITIALIZING, 1)), false);
ShardRoutingState.INITIALIZING, AllocationId.newTargetRelocation(AllocationId.newRelocation(initShard.allocationId()))
, 1)), false);
assertFalse("relocating shard from node shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), "some_node", initShard.primary(),
ShardRoutingState.INITIALIZING, 1)), false);
assertFalse("relocating shard to node shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
logger.info("--> testing double starting");
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(startedShard.index(), startedShard.id(), startedShard.currentNodeId(), startedShard.relocatingNodeId(), startedShard.primary(),
ShardRoutingState.INITIALIZING, 1)), false);
ShardRoutingState.INITIALIZING, startedShard.allocationId(), 1)), false);
assertFalse("duplicate starting of the same shard should be ignored \ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
logger.info("--> testing starting of relocating shards");
final AllocationId targetAllocationId = AllocationId.newTargetRelocation(relocatingShard.allocationId());
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(),
ShardRoutingState.INITIALIZING, randomInt())), false);
ShardRoutingState.INITIALIZING, targetAllocationId, randomInt())), false);
assertTrue("failed to start " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
ShardRouting shardRouting = result.routingTable().index("test").shard(relocatingShard.id()).getShards().get(0);
assertThat(shardRouting.state(), equalTo(ShardRoutingState.STARTED));
assertThat(shardRouting.currentNodeId(), equalTo("node2"));
assertThat(shardRouting.relocatingNodeId(), nullValue());
logger.info("--> testing shard variants that shouldn't match the relocating shard");
logger.info("--> testing shard variants that shouldn't match the initializing relocating shard");
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), !relocatingShard.primary(),
ShardRoutingState.INITIALIZING, 1)), false);
assertFalse("wrong primary flag shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(),
ShardRoutingState.INITIALIZING, relocatingShard.version())));
assertFalse("wrong allocation id shouldn't start shard" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), "some_node", relocatingShard.currentNodeId(), relocatingShard.primary(),
ShardRoutingState.INITIALIZING, 1)), false);
assertFalse("relocating shard to a different node shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), "some_node", relocatingShard.primary(),
ShardRoutingState.INITIALIZING, 1)), false);
assertFalse("relocating shard from a different node shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.primary(),
ShardRoutingState.INITIALIZING, 1)), false);
assertFalse("non-relocating shard shouldn't start shard" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(),
ShardRoutingState.INITIALIZING, relocatingShard.allocationId(), randomInt())), false);
assertFalse("wrong allocation id shouldn't start shard even if relocatingId==shard.id" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
}
}