Include stale replica shard info when explaining an unassigned primary (#22826)

Currently, if a previously allocated shard has no in-sync copy in the
cluster, but there is a stale replica copy, the explain API does not
include information about the stale replica copies in its output.  This
commit includes any shard copy information available (even for stale
copies) when explaining an unassigned primary shard that was previously
allocated in the cluster.

This situation can arise as follows: imagine an index with 1 primary and
1 replica and a cluster with 2 nodes.  If the node holding the replica
is shut down, and data continues to be indexed, only the primary will
have the latest data and the replica that has gone offline will be
marked as stale.  Now, suppose the node holding the primary is shut
down.  There are no copies of the shard data in the cluster.  Now, start
the first stopped node (holding the stale replica) back up.  The cluster
is red because there is no in-sync copy available.  Running the explain
API before would inform the user that there is no valid shard copy in
the cluster for that shard, but it would not provide any information
about the existence of the stale replica that exists on the restarted
node.  With this commit, the explain API provides information about all
the stale replica copies when explaining the unassigned primary.
This commit is contained in:
Ali Beyad 2017-01-31 16:31:55 -06:00 committed by GitHub
parent c223457ba1
commit 547eb5c22f
4 changed files with 237 additions and 44 deletions

View File

@ -246,7 +246,7 @@ public class AllocateUnassignedDecision extends AbstractAllocationDecision {
} else if (allocationDecision == AllocationDecision.AWAITING_INFO) {
return "cannot allocate because information about existing shard data is still being retrieved from some of the nodes";
} else if (allocationDecision == AllocationDecision.NO_VALID_SHARD_COPY) {
if (getNodeDecisions() != null && getNodeDecisions().isEmpty() == false) {
if (hasNodeWithStaleOrCorruptShard()) {
return "cannot allocate because all found copies of the shard are either stale or corrupt";
} else {
return "cannot allocate because a previous copy of the primary shard existed but can no longer be found on " +
@ -268,6 +268,13 @@ public class AllocateUnassignedDecision extends AbstractAllocationDecision {
}
}
private boolean hasNodeWithStaleOrCorruptShard() {
return getNodeDecisions() != null && getNodeDecisions().stream().anyMatch(result ->
result.getShardStoreInfo() != null
&& (result.getShardStoreInfo().getAllocationId() != null
|| result.getShardStoreInfo().getStoreException() != null));
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
checkDecisionState();

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable;
@ -48,14 +49,15 @@ public class NodeAllocationResult implements ToXContent, Writeable, Comparable<N
@Nullable
private final ShardStoreInfo shardStoreInfo;
private final AllocationDecision nodeDecision;
@Nullable
private final Decision canAllocateDecision;
private final int weightRanking;
public NodeAllocationResult(DiscoveryNode node, ShardStoreInfo shardStoreInfo, Decision decision) {
public NodeAllocationResult(DiscoveryNode node, ShardStoreInfo shardStoreInfo, @Nullable Decision decision) {
this.node = node;
this.shardStoreInfo = shardStoreInfo;
this.canAllocateDecision = decision;
this.nodeDecision = AllocationDecision.fromDecisionType(canAllocateDecision.type());
this.nodeDecision = decision != null ? AllocationDecision.fromDecisionType(canAllocateDecision.type()) : AllocationDecision.NO;
this.weightRanking = 0;
}
@ -78,7 +80,11 @@ public class NodeAllocationResult implements ToXContent, Writeable, Comparable<N
public NodeAllocationResult(StreamInput in) throws IOException {
node = new DiscoveryNode(in);
shardStoreInfo = in.readOptionalWriteable(ShardStoreInfo::new);
if (in.getVersion().before(Version.V_5_2_1_UNRELEASED)) {
canAllocateDecision = Decision.readFrom(in);
} else {
canAllocateDecision = in.readOptionalWriteable(Decision::readFrom);
}
nodeDecision = AllocationDecision.readFrom(in);
weightRanking = in.readVInt();
}
@ -87,7 +93,15 @@ public class NodeAllocationResult implements ToXContent, Writeable, Comparable<N
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeOptionalWriteable(shardStoreInfo);
if (out.getVersion().before(Version.V_5_2_1_UNRELEASED)) {
if (canAllocateDecision == null) {
Decision.NO.writeTo(out);
} else {
canAllocateDecision.writeTo(out);
}
} else {
out.writeOptionalWriteable(canAllocateDecision);
}
nodeDecision.writeTo(out);
out.writeVInt(weightRanking);
}
@ -108,8 +122,11 @@ public class NodeAllocationResult implements ToXContent, Writeable, Comparable<N
}
/**
* The decision details for allocating to this node.
* The decision details for allocating to this node. Returns {@code null} if
* no allocation decision was taken on the node; in this case, {@link #getNodeDecision()}
* will return {@link AllocationDecision#NO}.
*/
@Nullable
public Decision getCanAllocateDecision() {
return canAllocateDecision;
}
@ -153,7 +170,7 @@ public class NodeAllocationResult implements ToXContent, Writeable, Comparable<N
if (isWeightRanked()) {
builder.field("weight_ranking", getWeightRanking());
}
if (canAllocateDecision.getDecisions().isEmpty() == false) {
if (canAllocateDecision != null && canAllocateDecision.getDecisions().isEmpty() == false) {
builder.startArray("deciders");
canAllocateDecision.toXContent(builder, params);
builder.endArray();
@ -234,6 +251,15 @@ public class NodeAllocationResult implements ToXContent, Writeable, Comparable<N
return matchingBytes;
}
/**
* Gets the store exception when trying to read the store, if there was an error. If
* there was no error, returns {@code null}.
*/
@Nullable
public Exception getStoreException() {
return storeException;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(inSync);
@ -248,8 +274,13 @@ public class NodeAllocationResult implements ToXContent, Writeable, Comparable<N
{
if (matchingBytes < 0) {
// dealing with a primary shard
if (allocationId == null && storeException == null) {
// there was no information we could obtain of any shard data on the node
builder.field("found", false);
} else {
builder.field("in_sync", inSync);
}
}
if (allocationId != null) {
builder.field("allocation_id", allocationId);
}

View File

@ -49,6 +49,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
@ -183,7 +184,8 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
// this shard will be picked up when the node joins and we do another allocation reroute
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]",
unassignedShard.index(), unassignedShard.id(), nodeShardsResult.allocationsFound);
return AllocateUnassignedDecision.no(AllocationStatus.NO_VALID_SHARD_COPY, explain ? new ArrayList<>() : null);
return AllocateUnassignedDecision.no(AllocationStatus.NO_VALID_SHARD_COPY,
explain ? buildNodeDecisions(null, shardState, inSyncAllocationIds) : null);
}
}
@ -228,7 +230,7 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
List<NodeAllocationResult> nodeResults = null;
if (explain) {
nodeResults = buildNodeDecisions(nodesToAllocate, inSyncAllocationIds);
nodeResults = buildNodeDecisions(nodesToAllocate, shardState, inSyncAllocationIds);
}
if (allocation.hasPendingAsyncFetch()) {
return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeResults);
@ -244,13 +246,35 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
/**
* Builds a map of nodes to the corresponding allocation decisions for those nodes.
*/
private static List<NodeAllocationResult> buildNodeDecisions(NodesToAllocate nodesToAllocate, Set<String> inSyncAllocationIds) {
return Stream.of(nodesToAllocate.yesNodeShards, nodesToAllocate.throttleNodeShards, nodesToAllocate.noNodeShards)
private static List<NodeAllocationResult> buildNodeDecisions(NodesToAllocate nodesToAllocate,
FetchResult<NodeGatewayStartedShards> fetchedShardData,
Set<String> inSyncAllocationIds) {
List<NodeAllocationResult> nodeResults = new ArrayList<>();
Collection<NodeGatewayStartedShards> ineligibleShards;
if (nodesToAllocate != null) {
final Set<DiscoveryNode> discoNodes = new HashSet<>();
nodeResults.addAll(Stream.of(nodesToAllocate.yesNodeShards, nodesToAllocate.throttleNodeShards, nodesToAllocate.noNodeShards)
.flatMap(Collection::stream)
.map(dnode -> new NodeAllocationResult(dnode.nodeShardState.getNode(),
.map(dnode -> {
discoNodes.add(dnode.nodeShardState.getNode());
return new NodeAllocationResult(dnode.nodeShardState.getNode(),
shardStoreInfo(dnode.nodeShardState, inSyncAllocationIds),
dnode.decision))
.collect(Collectors.toList());
dnode.decision);
}).collect(Collectors.toList()));
ineligibleShards = fetchedShardData.getData().values().stream().filter(shardData ->
discoNodes.contains(shardData.getNode()) == false
).collect(Collectors.toList());
} else {
// there were no shard copies that were eligible for being assigned the allocation,
// so all fetched shard data are ineligible shards
ineligibleShards = fetchedShardData.getData().values();
}
nodeResults.addAll(ineligibleShards.stream().map(shardData ->
new NodeAllocationResult(shardData.getNode(), shardStoreInfo(shardData, inSyncAllocationIds), null)
).collect(Collectors.toList()));
return nodeResults;
}
private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardState, Set<String> inSyncAllocationIds) {
@ -389,7 +413,8 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
yesNodeShards.add(decidedNode);
}
}
return new NodesToAllocate(Collections.unmodifiableList(yesNodeShards), Collections.unmodifiableList(throttledNodeShards), Collections.unmodifiableList(noNodeShards));
return new NodesToAllocate(Collections.unmodifiableList(yesNodeShards), Collections.unmodifiableList(throttledNodeShards),
Collections.unmodifiableList(noNodeShards));
}
/**
@ -486,9 +511,9 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
protected abstract FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);
static class NodeShardsResult {
public final List<NodeGatewayStartedShards> orderedAllocationCandidates;
public final int allocationsFound;
private static class NodeShardsResult {
final List<NodeGatewayStartedShards> orderedAllocationCandidates;
final int allocationsFound;
public NodeShardsResult(List<NodeGatewayStartedShards> orderedAllocationCandidates, int allocationsFound) {
this.orderedAllocationCandidates = orderedAllocationCandidates;

View File

@ -124,7 +124,7 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
assertEquals(0L, allocateDecision.getRemainingDelayInMillis());
if (allocateDecision.getAllocationDecision() == AllocationDecision.NO_VALID_SHARD_COPY) {
assertEquals(0, allocateDecision.getNodeDecisions().size());
assertEquals(1, allocateDecision.getNodeDecisions().size());
// verify JSON output
try (XContentParser parser = getParser(explanation)) {
@ -138,7 +138,7 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
parser.nextToken();
assertEquals("cannot allocate because a previous copy of the primary shard existed but can no longer be found " +
"on the nodes in the cluster", parser.text());
assertEquals(Token.END_OBJECT, parser.nextToken());
verifyStaleShardCopyNodeDecisions(parser, 1, Collections.emptySet());
}
}
}
@ -1009,6 +1009,94 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
}
}
public void testCannotAllocateStaleReplicaExplanation() throws Exception {
logger.info("--> starting 3 nodes");
String masterNode = internalCluster().startNode();
internalCluster().startDataOnlyNodes(2);
logger.info("--> creating an index with 1 primary and 1 replica");
createIndexAndIndexData(1, 1,
Settings.builder().put("index.routing.allocation.exclude._name", masterNode).build(),
ActiveShardCount.ALL);
logger.info("--> stop node with the replica shard");
String stoppedNode = replicaNode().getName();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(stoppedNode));
logger.info("--> index more data, now the replica is stale");
indexData();
logger.info("--> stop the node with the primary");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName()));
logger.info("--> restart the node with the stale replica");
client().admin().indices().prepareUpdateSettings("idx").setSettings(
Settings.builder().put("index.routing.allocation.include._name", (String) null)).get();
String restartedNode = internalCluster().startNode(Settings.builder().put("node.name", stoppedNode).build());
// wait until the system has fetched shard data and we know there is no valid shard copy
assertBusy(() -> {
ClusterAllocationExplanation explanation = client().admin().cluster().prepareAllocationExplain()
.setIndex("idx").setShard(0).setPrimary(true).get().getExplanation();
assertEquals(AllocationDecision.NO_VALID_SHARD_COPY,
explanation.getShardAllocationDecision().getAllocateDecision().getAllocationDecision());
});
boolean includeYesDecisions = randomBoolean();
boolean includeDiskInfo = randomBoolean();
ClusterAllocationExplanation explanation = runExplain(true, includeYesDecisions, includeDiskInfo);
ShardId shardId = explanation.getShard();
boolean isPrimary = explanation.isPrimary();
ShardRoutingState shardRoutingState = explanation.getShardState();
DiscoveryNode currentNode = explanation.getCurrentNode();
UnassignedInfo unassignedInfo = explanation.getUnassignedInfo();
AllocateUnassignedDecision allocateDecision = explanation.getShardAllocationDecision().getAllocateDecision();
MoveDecision moveDecision = explanation.getShardAllocationDecision().getMoveDecision();
// verify shard info
assertEquals("idx", shardId.getIndexName());
assertEquals(0, shardId.getId());
assertTrue(isPrimary);
// verify current node info
assertEquals(ShardRoutingState.UNASSIGNED, shardRoutingState);
assertNull(currentNode);
// verify unassigned info
assertNotNull(unassignedInfo);
// verify decision object
assertTrue(allocateDecision.isDecisionTaken());
assertFalse(moveDecision.isDecisionTaken());
assertEquals(AllocationDecision.NO_VALID_SHARD_COPY, allocateDecision.getAllocationDecision());
assertEquals(2, allocateDecision.getNodeDecisions().size());
NodeAllocationResult nodeAllocationResult = allocateDecision.getNodeDecisions().get(0);
assertEquals(restartedNode, nodeAllocationResult.getNode().getName());
assertNotNull(nodeAllocationResult.getShardStoreInfo());
assertNotNull(nodeAllocationResult.getShardStoreInfo().getAllocationId());
assertFalse(nodeAllocationResult.getShardStoreInfo().isInSync());
assertNull(nodeAllocationResult.getShardStoreInfo().getStoreException());
nodeAllocationResult = allocateDecision.getNodeDecisions().get(1);
assertEquals(masterNode, nodeAllocationResult.getNode().getName());
assertNotNull(nodeAllocationResult.getShardStoreInfo());
assertNull(nodeAllocationResult.getShardStoreInfo().getAllocationId());
assertFalse(nodeAllocationResult.getShardStoreInfo().isInSync());
assertNull(nodeAllocationResult.getShardStoreInfo().getStoreException());
// verify JSON output
try (XContentParser parser = getParser(explanation)) {
verifyShardInfo(parser, true, includeDiskInfo, ShardRoutingState.UNASSIGNED);
parser.nextToken();
assertEquals("can_allocate", parser.currentName());
parser.nextToken();
assertEquals(AllocationDecision.NO_VALID_SHARD_COPY.toString(), parser.text());
parser.nextToken();
assertEquals("allocate_explanation", parser.currentName());
parser.nextToken();
assertEquals("cannot allocate because all found copies of the shard are either stale or corrupt", parser.text());
verifyStaleShardCopyNodeDecisions(parser, 2, Collections.singleton(restartedNode));
}
}
private void verifyClusterInfo(ClusterInfo clusterInfo, boolean includeDiskInfo, int numNodes) {
if (includeDiskInfo) {
assertThat(clusterInfo.getNodeMostAvailableDiskUsages().size(), greaterThanOrEqualTo(0));
@ -1057,12 +1145,16 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
.setWaitForActiveShards(activeShardCount)
.get();
if (activeShardCount != ActiveShardCount.NONE) {
indexData();
}
}
private void indexData() {
for (int i = 0; i < 10; i++) {
index("idx", "t", Integer.toString(i), Collections.singletonMap("f1", Integer.toString(i)));
}
flushAndRefresh("idx");
}
}
private String primaryNodeName() {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
@ -1154,6 +1246,39 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
}
}
private void verifyStaleShardCopyNodeDecisions(XContentParser parser, int numNodes, Set<String> foundStores) throws IOException {
parser.nextToken();
assertEquals("node_allocation_decisions", parser.currentName());
assertEquals(Token.START_ARRAY, parser.nextToken());
for (int i = 0; i < numNodes; i++) {
String nodeName = verifyNodeDecisionPrologue(parser);
assertEquals(AllocationDecision.NO.toString(), parser.text());
parser.nextToken();
assertEquals("store", parser.currentName());
assertEquals(Token.START_OBJECT, parser.nextToken());
parser.nextToken();
if (foundStores.contains(nodeName)) {
// shard data was found on the node, but it is stale
assertEquals("in_sync", parser.currentName());
parser.nextToken();
assertFalse(parser.booleanValue());
parser.nextToken();
assertEquals("allocation_id", parser.currentName());
parser.nextToken();
assertNotNull(parser.text());
} else {
// no shard data was found on the node
assertEquals("found", parser.currentName());
parser.nextToken();
assertFalse(parser.booleanValue());
}
assertEquals(Token.END_OBJECT, parser.nextToken());
parser.nextToken();
assertEquals(Token.END_OBJECT, parser.currentToken());
}
assertEquals(Token.END_ARRAY, parser.nextToken());
}
private void verifyNodeDecisions(XContentParser parser, Map<String, AllocationDecision> expectedNodeDecisions,
boolean includeYesDecisions, boolean reuseStore) throws IOException {
parser.nextToken();
@ -1162,24 +1287,8 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
boolean encounteredNo = false;
final int numNodes = expectedNodeDecisions.size();
for (int i = 0; i < numNodes; i++) {
assertEquals(Token.START_OBJECT, parser.nextToken());
parser.nextToken();
assertEquals("node_id", parser.currentName());
parser.nextToken();
assertNotNull(parser.text());
parser.nextToken();
assertEquals("node_name", parser.currentName());
parser.nextToken();
String nodeName = parser.text();
String nodeName = verifyNodeDecisionPrologue(parser);
AllocationDecision allocationDecision = expectedNodeDecisions.get(nodeName);
assertNotNull(nodeName);
parser.nextToken();
assertEquals("transport_address", parser.currentName());
parser.nextToken();
assertNotNull(parser.text());
parser.nextToken();
assertEquals("node_decision", parser.currentName());
parser.nextToken();
assertEquals(allocationDecision.toString(), parser.text());
if (allocationDecision != AllocationDecision.YES) {
encounteredNo = true;
@ -1217,6 +1326,27 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
assertEquals(Token.END_ARRAY, parser.nextToken());
}
private String verifyNodeDecisionPrologue(XContentParser parser) throws IOException {
assertEquals(Token.START_OBJECT, parser.nextToken());
parser.nextToken();
assertEquals("node_id", parser.currentName());
parser.nextToken();
assertNotNull(parser.text());
parser.nextToken();
assertEquals("node_name", parser.currentName());
parser.nextToken();
String nodeName = parser.text();
assertNotNull(nodeName);
parser.nextToken();
assertEquals("transport_address", parser.currentName());
parser.nextToken();
assertNotNull(parser.text());
parser.nextToken();
assertEquals("node_decision", parser.currentName());
parser.nextToken();
return nodeName;
}
private boolean verifyDeciders(XContentParser parser, AllocationDecision allocationDecision) throws IOException {
assertEquals(Token.START_ARRAY, parser.nextToken());
boolean atLeastOneMatchingDecisionFound = false;