Do not cancel recovery for copy on broken node (#48265)

This change fixes a poisonous situation where an ongoing recovery was
canceled because a better copy was found on a node that the cluster had
previously tried allocating the shard to but failed. The solution is to
keep track of the set of nodes that an allocation was failed on so that
we can avoid canceling the current recovery for a copy on failed nodes.

Closes #47974
This commit is contained in:
Nhat Nguyen 2019-11-01 09:23:46 -04:00
parent 5e4501eb3f
commit 9a42e71dd9
12 changed files with 281 additions and 34 deletions

View File

@ -547,7 +547,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas";
UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
"primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT);
unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, Collections.emptySet());
failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver);
}
}
@ -873,7 +873,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(),
currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(),
currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(),
allocationStatus);
allocationStatus, currInfo.getFailedNodeIds());
ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource());
changes.unassignedInfoUpdated(shard, newInfo);
shard = updatedShard;

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
@ -39,8 +40,11 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
/**
* Holds additional information as to why the shard is in unassigned state.
@ -214,6 +218,7 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable {
private final String message;
private final Exception failure;
private final int failedAllocations;
private final Set<String> failedNodeIds;
private final AllocationStatus lastAllocationStatus; // result of the last allocation attempt for this shard
/**
@ -224,7 +229,7 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable {
**/
public UnassignedInfo(Reason reason, String message) {
this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT);
AllocationStatus.NO_ATTEMPT, Collections.emptySet());
}
/**
@ -235,9 +240,11 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable {
* @param unassignedTimeMillis the time of unassignment used to display to in our reporting.
* @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.
* @param lastAllocationStatus the result of the last allocation attempt for this shard
* @param failedNodeIds a set of nodeIds that failed to complete allocations for this shard
*/
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Exception failure, int failedAllocations,
long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus) {
long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus,
Set<String> failedNodeIds) {
this.reason = Objects.requireNonNull(reason);
this.unassignedTimeMillis = unassignedTimeMillis;
this.unassignedTimeNanos = unassignedTimeNanos;
@ -246,6 +253,7 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable {
this.failure = failure;
this.failedAllocations = failedAllocations;
this.lastAllocationStatus = Objects.requireNonNull(lastAllocationStatus);
this.failedNodeIds = Collections.unmodifiableSet(failedNodeIds);
assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) :
"failedAllocations: " + failedAllocations + " for reason " + reason;
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
@ -263,6 +271,11 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable {
this.failure = in.readException();
this.failedAllocations = in.readVInt();
this.lastAllocationStatus = AllocationStatus.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
this.failedNodeIds = Collections.unmodifiableSet(in.readSet(StreamInput::readString));
} else {
this.failedNodeIds = Collections.emptySet();
}
}
public void writeTo(StreamOutput out) throws IOException {
@ -280,6 +293,9 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable {
out.writeException(failure);
out.writeVInt(failedAllocations);
lastAllocationStatus.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeCollection(failedNodeIds, StreamOutput::writeString);
}
}
/**
@ -354,6 +370,19 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable {
return lastAllocationStatus;
}
/**
* A set of nodeIds that failed to complete allocations for this shard. {@link org.elasticsearch.gateway.ReplicaShardAllocator}
* uses this set to avoid repeatedly canceling ongoing recoveries for copies on those nodes although they can perform noop recoveries.
* This set will be discarded when a shard moves to started. And if a shard is failed while started (i.e., from started to unassigned),
* the currently assigned node won't be added to this set.
*
* @see org.elasticsearch.gateway.ReplicaShardAllocator#processExistingRecoveries(RoutingAllocation)
* @see org.elasticsearch.cluster.routing.allocation.AllocationService#applyFailedShards(ClusterState, List, List)
*/
public Set<String> getFailedNodeIds() {
return failedNodeIds;
}
/**
* Calculates the delay left based on current time (in nanoseconds) and the delay defined by the index settings.
* Only relevant if shard is effectively delayed (see {@link #isDelayed()})
@ -410,6 +439,9 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable {
if (failedAllocations > 0) {
sb.append(", failed_attempts[").append(failedAllocations).append("]");
}
if (failedNodeIds.isEmpty() == false) {
sb.append(", failed_nodes[").append(failedNodeIds).append("]");
}
sb.append(", delayed=").append(delayed);
String details = getDetails();
@ -433,6 +465,9 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable {
if (failedAllocations > 0) {
builder.field("failed_attempts", failedAllocations);
}
if (failedNodeIds.isEmpty() == false) {
builder.field("failed_nodes", failedNodeIds);
}
builder.field("delayed", delayed);
String details = getDetails();
if (details != null) {
@ -466,13 +501,16 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable {
if (reason != that.reason) {
return false;
}
if (message != null ? !message.equals(that.message) : that.message != null) {
if (Objects.equals(message, that.message) == false) {
return false;
}
if (lastAllocationStatus != that.lastAllocationStatus) {
return false;
}
return !(failure != null ? !failure.equals(that.failure) : that.failure != null);
if (Objects.equals(failure, that.failure) == false) {
return false;
}
return failedNodeIds.equals(that.failedNodeIds);
}
@Override
@ -484,6 +522,7 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable {
result = 31 * result + (message != null ? message.hashCode() : 0);
result = 31 * result + (failure != null ? failure.hashCode() : 0);
result = 31 * result + lastAllocationStatus.hashCode();
result = 31 * result + failedNodeIds.hashCode();
return result;
}

View File

@ -45,9 +45,11 @@ import org.elasticsearch.gateway.GatewayAllocator;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -193,10 +195,18 @@ public class AllocationService {
shardToFail.shardId(), shardToFail, failedShard);
}
int failedAllocations = failedShard.unassignedInfo() != null ? failedShard.unassignedInfo().getNumFailedAllocations() : 0;
final Set<String> failedNodeIds;
if (failedShard.unassignedInfo() != null) {
failedNodeIds = new HashSet<>(failedShard.unassignedInfo().getFailedNodeIds().size() + 1);
failedNodeIds.addAll(failedShard.unassignedInfo().getFailedNodeIds());
failedNodeIds.add(failedShard.currentNodeId());
} else {
failedNodeIds = Collections.emptySet();
}
String message = "failed shard on node [" + shardToFail.currentNodeId() + "]: " + failedShardEntry.getMessage();
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message,
failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT);
AllocationStatus.NO_ATTEMPT, failedNodeIds);
if (failedShardEntry.markAsStale()) {
allocation.removeAllocationId(failedShard);
}
@ -288,8 +298,8 @@ public class AllocationService {
if (newComputedLeftDelayNanos == 0) {
unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(),
unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus()),
shardRouting.recoverySource(), allocation.changes());
unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus(),
unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes());
}
}
}
@ -307,7 +317,7 @@ public class AllocationService {
UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), unassignedInfo.getMessage(),
unassignedInfo.getFailure(), 0, unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(),
unassignedInfo.getLastAllocationStatus()), shardRouting.recoverySource(), allocation.changes());
unassignedInfo.getLastAllocationStatus(), Collections.emptySet()), shardRouting.recoverySource(), allocation.changes());
}
}
@ -416,7 +426,8 @@ public class AllocationService {
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT,
Collections.emptySet());
allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes());
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard

View File

@ -152,7 +152,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
if (shardRouting.primary() && unassignedInfo.getLastAllocationStatus() == AllocationStatus.NO_ATTEMPT) {
unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(),
unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO),
unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO,
unassignedInfo.getFailedNodeIds()),
shardRouting.recoverySource(), allocation.changes());
}
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
/**
@ -139,7 +140,7 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation
", " + shardRouting.unassignedInfo().getMessage();
unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, unassignedInfoMessage,
shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false,
shardRouting.unassignedInfo().getLastAllocationStatus());
shardRouting.unassignedInfo().getLastAllocationStatus(), Collections.emptySet());
}
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate,

View File

@ -43,10 +43,12 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
@ -95,7 +97,7 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
continue;
}
MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryNode, primaryStore, shardStores, false);
MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, shardStores, false);
if (matchingNodes.getNodeWithHighestMatch() != null) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
@ -106,11 +108,13 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
// we found a better match that can perform noop recovery, cancel the existing allocation.
logger.debug("cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]",
currentNode, nodeWithHighestMatch);
final Set<String> failedNodeIds =
shard.unassignedInfo() == null ? Collections.emptySet() : shard.unassignedInfo().getFailedNodeIds();
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+
nodeWithHighestMatch + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT);
UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodeIds);
// don't cancel shard in the loop as it will cause a ConcurrentModificationException
shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo,
metaData.getIndexSafe(shard.index()), allocation.changes()));
@ -186,7 +190,8 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
return AllocateUnassignedDecision.NOT_TAKEN;
}
MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryNode, primaryStore, shardStores, explain);
MatchingNodes matchingNodes = findMatchingNodes(
unassignedShard, allocation, false, primaryNode, primaryStore, shardStores, explain);
assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions";
List<NodeAllocationResult> nodeDecisions = augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.nodeDecisions);
@ -297,7 +302,7 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
return nodeFilesStore.storeFilesMetaData();
}
private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation,
private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, boolean noMatchFailedNodes,
DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data,
boolean explain) {
@ -305,6 +310,10 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
Map<String, NodeAllocationResult> nodeDecisions = explain ? new HashMap<>() : null;
for (Map.Entry<DiscoveryNode, NodeStoreFilesMetaData> nodeStoreEntry : data.getData().entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey();
if (noMatchFailedNodes && shard.unassignedInfo() != null &&
shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) {
continue;
}
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData();
// we don't have any files at all, it is an empty index
if (storeFilesMetaData.isEmpty()) {

View File

@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
@ -45,6 +46,9 @@ import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
@ -81,9 +85,12 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
public void testSerialization() throws Exception {
UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values());
int failedAllocations = randomIntBetween(1, 100);
Set<String> failedNodes = IntStream.range(0, between(0, failedAllocations))
.mapToObj(n -> "failed-node-" + n).collect(Collectors.toSet());
UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ?
new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null, null,
randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT):
failedAllocations, System.nanoTime(), System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT, failedNodes):
new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null);
BytesStreamOutput out = new BytesStreamOutput();
meta.writeTo(out);
@ -95,17 +102,25 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
assertThat(read.getMessage(), equalTo(meta.getMessage()));
assertThat(read.getDetails(), equalTo(meta.getDetails()));
assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations()));
assertThat(read.getFailedNodeIds(), equalTo(meta.getFailedNodeIds()));
}
public void testBwcSerialization() throws Exception {
final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CLOSED, "message");
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_7_0_0)));
Version version = VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.CURRENT);
out.setVersion(version);
unassignedInfo.writeTo(out);
out.close();
UnassignedInfo read = new UnassignedInfo(out.bytes().streamInput());
assertThat(read.getReason(), equalTo(UnassignedInfo.Reason.REINITIALIZED));
StreamInput in = out.bytes().streamInput();
in.setVersion(version);
UnassignedInfo read = new UnassignedInfo(in);
if (version.before(Version.V_7_0_0)) {
assertThat(read.getReason(), equalTo(UnassignedInfo.Reason.REINITIALIZED));
} else {
assertThat(read.getReason(), equalTo(UnassignedInfo.Reason.INDEX_CLOSED));
}
assertThat(read.getUnassignedTimeInMillis(), equalTo(unassignedInfo.getUnassignedTimeInMillis()));
assertThat(read.getMessage(), equalTo(unassignedInfo.getMessage()));
assertThat(read.getDetails(), equalTo(unassignedInfo.getDetails()));
@ -312,7 +327,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
public void testRemainingDelayCalculation() throws Exception {
final long baseTime = System.nanoTime();
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime,
System.currentTimeMillis(), randomBoolean(), AllocationStatus.NO_ATTEMPT);
System.currentTimeMillis(), randomBoolean(), AllocationStatus.NO_ATTEMPT, Collections.emptySet());
final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos();
final Settings indexSettings = Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build();

View File

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
public class TrackFailedAllocationNodesTests extends ESAllocationTestCase {
public void testTrackFailedNodes() {
int maxRetries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY);
AllocationService allocationService = createAllocationService();
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("idx").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.build();
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
for (int i = 0; i < 5; i++) {
discoNodes.add(newNode("node-" + i));
}
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.nodes(discoNodes)
.metaData(metaData).routingTable(RoutingTable.builder().addAsNew(metaData.index("idx")).build())
.build();
clusterState = allocationService.reroute(clusterState, "reroute");
Set<String> failedNodeIds = new HashSet<>();
// track the failed nodes if shard is not started
for (int i = 0; i < maxRetries; i++) {
failedNodeIds.add(clusterState.routingTable().index("idx").shard(0).shards().get(0).currentNodeId());
clusterState = allocationService.applyFailedShard(
clusterState, clusterState.routingTable().index("idx").shard(0).shards().get(0), randomBoolean());
assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(),
equalTo(failedNodeIds));
}
// reroute with retryFailed=true should discard the failedNodes
assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).state(), equalTo(ShardRoutingState.UNASSIGNED));
clusterState = allocationService.reroute(clusterState, new AllocationCommands(), false, true).getClusterState();
assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), empty());
// do not track the failed nodes while shard is started
clusterState = startInitializingShardsAndReroute(allocationService, clusterState);
assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).state(), equalTo(ShardRoutingState.STARTED));
clusterState = allocationService.applyFailedShard(
clusterState, clusterState.routingTable().index("idx").shard(0).shards().get(0), false);
assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), empty());
}
}

View File

@ -117,8 +117,8 @@ public class RestoreInProgressAllocationDeciderTests extends ESAllocationTestCas
UnassignedInfo currentInfo = primary.unassignedInfo();
UnassignedInfo newInfo = new UnassignedInfo(currentInfo.getReason(), currentInfo.getMessage(), new IOException("i/o failure"),
currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(),
currentInfo.getUnassignedTimeInMillis(), currentInfo.isDelayed(), currentInfo.getLastAllocationStatus());
currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), currentInfo.getUnassignedTimeInMillis(),
currentInfo.isDelayed(), currentInfo.getLastAllocationStatus(), currentInfo.getFailedNodeIds());
primary = primary.updateUnassigned(newInfo, primary.recoverySource());
IndexRoutingTable indexRoutingTable = routingTable.index("test");

View File

@ -26,6 +26,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
@ -282,6 +284,45 @@ public class ReplicaShardAllocatorIT extends ESIntegTestCase {
assertThat(internalCluster().nodesInclude(indexName), allOf(hasItem(nodeWithHigherMatching), not(hasItem(nodeWithLowerMatching))));
}
/**
* Make sure that we do not repeatedly cancel an ongoing recovery for a noop copy on a broken node.
*/
public void testDoNotCancelRecoveryForBrokenNode() throws Exception {
internalCluster().startMasterOnlyNode();
String nodeWithPrimary = internalCluster().startDataOnlyNode();
String indexName = "test";
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")));
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList()));
client().admin().indices().prepareFlush(indexName).get();
String brokenNode = internalCluster().startDataOnlyNode();
MockTransportService transportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary);
CountDownLatch newNodeStarted = new CountDownLatch(1);
transportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.TRANSLOG_OPS)) {
if (brokenNode.equals(connection.getNode().getName())) {
try {
newNodeStarted.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
throw new CircuitBreakingException("not enough memory for indexing", 100, 50, CircuitBreaker.Durability.TRANSIENT);
}
}
connection.sendRequest(requestId, action, request, options);
});
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)));
internalCluster().startDataOnlyNode();
newNodeStarted.countDown();
ensureGreen(indexName);
transportService.clearAllRules();
}
private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception {
assertBusy(() -> {
Index index = resolveIndex(indexName);

View File

@ -61,12 +61,16 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.unmodifiableMap;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
private static final org.apache.lucene.util.Version MIN_SUPPORTED_LUCENE_VERSION = org.elasticsearch.Version.CURRENT
@ -182,7 +186,17 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
}
public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() {
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders());
final UnassignedInfo unassignedInfo;
final Set<String> failedNodes;
if (randomBoolean()) {
failedNodes = Collections.emptySet();
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null);
} else {
failedNodes = new HashSet<>(randomSubsetOf(Arrays.asList("node-4", "node-5", "node-6")));
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10),
System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodes);
}
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo);
long retainingSeqNo = randomLongBetween(1, Long.MAX_VALUE);
testAllocator.addData(node1, Arrays.asList(newRetentionLease(node1, retainingSeqNo), newRetentionLease(node3, retainingSeqNo)),
"MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
@ -190,8 +204,11 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
testAllocator.processExistingRecoveries(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId));
List<ShardRouting> unassignedShards = allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED);
assertThat(unassignedShards, hasSize(1));
assertThat(unassignedShards.get(0).shardId(), equalTo(shardId));
assertThat(unassignedShards.get(0).unassignedInfo().getNumFailedAllocations(), equalTo(0));
assertThat(unassignedShards.get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodes));
}
public void testNotCancellingRecoveryIfCurrentRecoveryHasRetentionLease() {
@ -357,7 +374,15 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
}
public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() {
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders());
final UnassignedInfo unassignedInfo;
if (randomBoolean()) {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null);
} else {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10),
System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT,
Collections.singleton("node-4"));
}
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo);
List<RetentionLease> retentionLeases = new ArrayList<>();
if (randomBoolean()) {
long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE);
@ -388,6 +413,28 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0));
}
public void testDoNotCancelForBrokenNode() {
Set<String> failedNodes = new HashSet<>();
failedNodes.add(node3.getId());
if (randomBoolean()) {
failedNodes.add("node4");
}
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null,
randomIntBetween(failedNodes.size(), 10), System.nanoTime(), System.currentTimeMillis(), false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodes);
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo);
long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE);
List<RetentionLease> retentionLeases = Arrays.asList(
newRetentionLease(node1, retainingSeqNoOnPrimary), newRetentionLease(node3, retainingSeqNoOnPrimary));
testAllocator
.addData(node1, retentionLeases, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION))
.addData(node2, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION))
.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
testAllocator.processExistingRecoveries(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED), empty());
}
private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) {
return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.CLUSTER_RECOVERED);
}
@ -410,8 +457,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
.addShard(ShardRouting.newUnassigned(shardId, false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(reason, null, null, failedAllocations, System.nanoTime(),
System.currentTimeMillis(), delayed, UnassignedInfo.AllocationStatus.NO_ATTEMPT)
))
System.currentTimeMillis(), delayed, UnassignedInfo.AllocationStatus.NO_ATTEMPT,
Collections.emptySet())))
.build())
)
.build();
@ -422,7 +469,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime());
}
private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) {
private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders, UnassignedInfo unassignedInfo) {
ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId, node1.getId(), true, ShardRoutingState.STARTED);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT))
@ -434,8 +481,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
.addShard(primaryShard)
.addShard(TestShardRouting.newShardRouting(shardId, node2.getId(), null, false,
ShardRoutingState.INITIALIZING,
new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)))
ShardRoutingState.INITIALIZING, unassignedInfo))
.build())
)
.build();
@ -446,6 +492,10 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime());
}
private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) {
return onePrimaryOnNode1And1ReplicaRecovering(deciders, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
}
static RetentionLease newRetentionLease(DiscoveryNode node, long retainingSeqNo) {
return new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()),
retainingSeqNo, randomNonNegativeLong(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE);

View File

@ -997,7 +997,6 @@ public class IndexRecoveryIT extends ESIntegTestCase {
assertThat(recoveryStates.get(0).getTranslog().recoveredOperations(), greaterThan(0));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47974")
public void testDoNotInfinitelyWaitForMapping() {
internalCluster().ensureAtLeastNumDataNodes(3);
createIndex("test", Settings.builder()