Fail restore when the shard allocations max retries count is reached (#27493)

This commit changes the RestoreService so that it now fails the snapshot 
restore if one of the shards to restore has failed to be allocated. It also adds
a new RestoreInProgressAllocationDecider that forbids such shards to be 
allocated again. This way, when a restore is impossible or failed too many 
times, the user is forced to take a manual action (like deleting the index 
which failed shards) in order to try to restore it again.

This behaviour has been implemented because when the allocation of a 
shard has been retried too many times, the MaxRetryDecider is engaged 
to prevent any future allocation of the failed shard. If it happens while 
restoring a snapshot, the restore hanged and was never completed because 
it stayed around waiting for the shards to be assigned (and that won't happen).
It also blocked future attempts to restore the snapshot again. With this commit,
the restore does not hang and is marked as failed, leaving failed shards 
around for investigation.

This is the second part of the #26865 issue.

Closes #26865
This commit is contained in:
Tanguy Leroux 2017-12-12 09:51:18 +01:00 committed by GitHub
parent cfc3b2d344
commit a1ed347110
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 541 additions and 29 deletions

View File

@ -54,6 +54,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDeci
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
@ -191,6 +192,7 @@ public class ClusterModule extends AbstractModule {
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings));
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider(settings));
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider(settings));
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));

View File

@ -48,6 +48,8 @@ import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
@ -135,13 +137,14 @@ public class AllocationService extends AbstractComponent {
return newState;
}
// Used for testing
public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
return applyFailedShards(clusterState, Collections.singletonList(new FailedShard(failedShard, null, null)),
Collections.emptyList());
return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null)), emptyList());
}
// Used for testing
public ClusterState applyFailedShards(ClusterState clusterState, List<FailedShard> failedShards) {
return applyFailedShards(clusterState, failedShards, Collections.emptyList());
return applyFailedShards(clusterState, failedShards, emptyList());
}
/**

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.Snapshot;
/**
* This {@link AllocationDecider} prevents shards that have failed to be
* restored from a snapshot to be allocated.
*/
public class RestoreInProgressAllocationDecider extends AllocationDecider {
public static final String NAME = "restore_in_progress";
/**
* Creates a new {@link RestoreInProgressAllocationDecider} instance from
* given settings
*
* @param settings {@link Settings} to use
*/
public RestoreInProgressAllocationDecider(Settings settings) {
super(settings);
}
@Override
public Decision canAllocate(final ShardRouting shardRouting, final RoutingNode node, final RoutingAllocation allocation) {
return canAllocate(shardRouting, allocation);
}
@Override
public Decision canAllocate(final ShardRouting shardRouting, final RoutingAllocation allocation) {
final RecoverySource recoverySource = shardRouting.recoverySource();
if (recoverySource == null || recoverySource.getType() != RecoverySource.Type.SNAPSHOT) {
return allocation.decision(Decision.YES, NAME, "ignored as shard is not being recovered from a snapshot");
}
final Snapshot snapshot = ((RecoverySource.SnapshotRecoverySource) recoverySource).snapshot();
final RestoreInProgress restoresInProgress = allocation.custom(RestoreInProgress.TYPE);
if (restoresInProgress != null) {
for (RestoreInProgress.Entry restoreInProgress : restoresInProgress.entries()) {
if (restoreInProgress.snapshot().equals(snapshot)) {
RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId());
if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) {
assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting
+ "] to be in initializing state but got [" + shardRestoreStatus.state() + "]";
return allocation.decision(Decision.YES, NAME, "shard is currently being restored");
}
break;
}
}
}
return allocation.decision(Decision.NO, NAME, "shard has failed to be restored from the snapshot [%s] because of [%s] - " +
"manually close or delete the index [%s] in order to retry to restore the snapshot again or use the reroute API to force the " +
"allocation of an empty primary shard", snapshot, shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName());
}
@Override
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard " + shardRouting;
return canAllocate(shardRouting, node, allocation);
}
}

View File

@ -64,7 +64,6 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
@ -534,7 +533,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
RecoverySource recoverySource = initializingShard.recoverySource();
if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot();
changes(snapshot).startedShards.put(initializingShard.shardId(),
changes(snapshot).shards.put(initializingShard.shardId(),
new ShardRestoreStatus(initializingShard.currentNodeId(), RestoreInProgress.State.SUCCESS));
}
}
@ -550,7 +549,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
// to restore this shard on another node if the snapshot files are corrupt. In case where a node just left or crashed,
// however, we only want to acknowledge the restore operation once it has been successfully restored on another node.
if (unassignedInfo.getFailure() != null && Lucene.isCorruptionException(unassignedInfo.getFailure().getCause())) {
changes(snapshot).failedShards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(),
changes(snapshot).shards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(),
RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage()));
}
}
@ -563,11 +562,24 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT &&
initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) {
Snapshot snapshot = ((SnapshotRecoverySource) unassignedShard.recoverySource()).snapshot();
changes(snapshot).failedShards.put(unassignedShard.shardId(), new ShardRestoreStatus(null,
changes(snapshot).shards.put(unassignedShard.shardId(), new ShardRestoreStatus(null,
RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource()));
}
}
@Override
public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) {
RecoverySource recoverySource = unassignedShard.recoverySource();
if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
if (newUnassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO) {
Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot();
String reason = "shard could not be allocated to any of the nodes";
changes(snapshot).shards.put(unassignedShard.shardId(),
new ShardRestoreStatus(unassignedShard.currentNodeId(), RestoreInProgress.State.FAILURE, reason));
}
}
}
/**
* Helper method that creates update entry for the given shard id if such an entry does not exist yet.
*/
@ -576,25 +588,21 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
}
private static class Updates {
private Map<ShardId, ShardRestoreStatus> failedShards = new HashMap<>();
private Map<ShardId, ShardRestoreStatus> startedShards = new HashMap<>();
private Map<ShardId, ShardRestoreStatus> shards = new HashMap<>();
}
public RestoreInProgress applyChanges(RestoreInProgress oldRestore) {
public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) {
if (shardChanges.isEmpty() == false) {
final List<RestoreInProgress.Entry> entries = new ArrayList<>();
for (RestoreInProgress.Entry entry : oldRestore.entries()) {
Snapshot snapshot = entry.snapshot();
Updates updates = shardChanges.get(snapshot);
assert Sets.haveEmptyIntersection(updates.startedShards.keySet(), updates.failedShards.keySet());
if (updates.startedShards.isEmpty() == false || updates.failedShards.isEmpty() == false) {
if (updates.shards.isEmpty() == false) {
ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder(entry.shards());
for (Map.Entry<ShardId, ShardRestoreStatus> startedShardEntry : updates.startedShards.entrySet()) {
shardsBuilder.put(startedShardEntry.getKey(), startedShardEntry.getValue());
}
for (Map.Entry<ShardId, ShardRestoreStatus> failedShardEntry : updates.failedShards.entrySet()) {
shardsBuilder.put(failedShardEntry.getKey(), failedShardEntry.getValue());
for (Map.Entry<ShardId, ShardRestoreStatus> shard : updates.shards.entrySet()) {
shardsBuilder.put(shard.getKey(), shard.getValue());
}
ImmutableOpenMap<ShardId, ShardRestoreStatus> shards = shardsBuilder.build();
RestoreInProgress.State newState = overallState(RestoreInProgress.State.STARTED, shards);
entries.add(new RestoreInProgress.Entry(entry.snapshot(), newState, entry.indices(), shards));
@ -607,7 +615,6 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
return oldRestore;
}
}
}
public static RestoreInProgress.Entry restoreInProgress(ClusterState state, Snapshot snapshot) {

View File

@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocatio
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
@ -183,6 +184,7 @@ public class ClusterModuleTests extends ModuleTestCase {
EnableAllocationDecider.class,
NodeVersionAllocationDecider.class,
SnapshotInProgressAllocationDecider.class,
RestoreInProgressAllocationDecider.class,
FilterAllocationDecider.class,
SameShardAllocationDecider.class,
DiskThresholdDecider.class,

View File

@ -25,6 +25,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -38,6 +39,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
@ -46,7 +48,10 @@ import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
@ -309,6 +314,8 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
DiscoveryNode node1 = newNode("node1");
MetaData.Builder metaDataBuilder = new MetaData.Builder(metaData);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Snapshot snapshot = new Snapshot("repo", new SnapshotId("snap", "randomId"));
Set<String> snapshotIndices = new HashSet<>();
for (ObjectCursor<IndexMetaData> cursor: metaData.indices().values()) {
Index index = cursor.value.getIndex();
IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(cursor.value);
@ -329,14 +336,14 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
routingTableBuilder.addAsFromDangling(indexMetaData);
break;
case 3:
snapshotIndices.add(index.getName());
routingTableBuilder.addAsNewRestore(indexMetaData,
new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT,
indexMetaData.getIndex().getName()), new IntHashSet());
new SnapshotRecoverySource(snapshot, Version.CURRENT, indexMetaData.getIndex().getName()), new IntHashSet());
break;
case 4:
snapshotIndices.add(index.getName());
routingTableBuilder.addAsRestore(indexMetaData,
new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT,
indexMetaData.getIndex().getName()));
new SnapshotRecoverySource(snapshot, Version.CURRENT, indexMetaData.getIndex().getName()));
break;
case 5:
routingTableBuilder.addAsNew(indexMetaData);
@ -345,10 +352,31 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
throw new IndexOutOfBoundsException();
}
}
final RoutingTable routingTable = routingTableBuilder.build();
final ImmutableOpenMap.Builder<String, ClusterState.Custom> restores = ImmutableOpenMap.builder();
if (snapshotIndices.isEmpty() == false) {
// Some indices are restored from snapshot, the RestoreInProgress must be set accordingly
ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> restoreShards = ImmutableOpenMap.builder();
for (ShardRouting shard : routingTable.allShards()) {
if (shard.primary() && shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
ShardId shardId = shard.shardId();
restoreShards.put(shardId, new RestoreInProgress.ShardRestoreStatus(node1.getId(), RestoreInProgress.State.INIT));
}
}
RestoreInProgress.Entry restore = new RestoreInProgress.Entry(snapshot, RestoreInProgress.State.INIT,
new ArrayList<>(snapshotIndices), restoreShards.build());
restores.put(RestoreInProgress.TYPE, new RestoreInProgress(restore));
}
return ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.nodes(DiscoveryNodes.builder().add(node1))
.metaData(metaDataBuilder.build())
.routingTable(routingTableBuilder.build()).build();
.routingTable(routingTable)
.customs(restores.build())
.build();
}
private void addInSyncAllocationIds(Index index, IndexMetaData.Builder indexMetaData,

View File

@ -0,0 +1,208 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.decider;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import java.io.IOException;
import java.util.Collections;
import static java.util.Collections.singletonList;
/**
* Test {@link RestoreInProgressAllocationDecider}
*/
public class RestoreInProgressAllocationDeciderTests extends ESAllocationTestCase {
public void testCanAllocatePrimary() {
ClusterState clusterState = createInitialClusterState();
ShardRouting shard;
if (randomBoolean()) {
shard = clusterState.getRoutingTable().shardRoutingTable("test", 0).primaryShard();
assertEquals(RecoverySource.Type.EMPTY_STORE, shard.recoverySource().getType());
} else {
shard = clusterState.getRoutingTable().shardRoutingTable("test", 0).replicaShards().get(0);
assertEquals(RecoverySource.Type.PEER, shard.recoverySource().getType());
}
final Decision decision = executeAllocation(clusterState, shard);
assertEquals(Decision.Type.YES, decision.type());
assertEquals("ignored as shard is not being recovered from a snapshot", decision.getExplanation());
}
public void testCannotAllocatePrimaryMissingInRestoreInProgress() {
ClusterState clusterState = createInitialClusterState();
RoutingTable routingTable = RoutingTable.builder(clusterState.getRoutingTable())
.addAsRestore(clusterState.getMetaData().index("test"), createSnapshotRecoverySource("_missing"))
.build();
clusterState = ClusterState.builder(clusterState)
.routingTable(routingTable)
.build();
ShardRouting primary = clusterState.getRoutingTable().shardRoutingTable("test", 0).primaryShard();
assertEquals(ShardRoutingState.UNASSIGNED, primary.state());
assertEquals(RecoverySource.Type.SNAPSHOT, primary.recoverySource().getType());
final Decision decision = executeAllocation(clusterState, primary);
assertEquals(Decision.Type.NO, decision.type());
assertEquals("shard has failed to be restored from the snapshot [_repository:_missing/_uuid] because of " +
"[restore_source[_repository/_missing]] - manually close or delete the index [test] in order to retry to restore " +
"the snapshot again or use the reroute API to force the allocation of an empty primary shard", decision.getExplanation());
}
public void testCanAllocatePrimaryExistingInRestoreInProgress() {
RecoverySource.SnapshotRecoverySource recoverySource = createSnapshotRecoverySource("_existing");
ClusterState clusterState = createInitialClusterState();
RoutingTable routingTable = RoutingTable.builder(clusterState.getRoutingTable())
.addAsRestore(clusterState.getMetaData().index("test"), recoverySource)
.build();
clusterState = ClusterState.builder(clusterState)
.routingTable(routingTable)
.build();
ShardRouting primary = clusterState.getRoutingTable().shardRoutingTable("test", 0).primaryShard();
assertEquals(ShardRoutingState.UNASSIGNED, primary.state());
assertEquals(RecoverySource.Type.SNAPSHOT, primary.recoverySource().getType());
routingTable = clusterState.routingTable();
final RestoreInProgress.State shardState;
if (randomBoolean()) {
shardState = randomFrom(RestoreInProgress.State.STARTED, RestoreInProgress.State.INIT);
} else {
shardState = RestoreInProgress.State.FAILURE;
UnassignedInfo currentInfo = primary.unassignedInfo();
UnassignedInfo newInfo = new UnassignedInfo(currentInfo.getReason(), currentInfo.getMessage(), new IOException("i/o failure"),
currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(),
currentInfo.getUnassignedTimeInMillis(), currentInfo.isDelayed(), currentInfo.getLastAllocationStatus());
primary = primary.updateUnassigned(newInfo, primary.recoverySource());
IndexRoutingTable indexRoutingTable = routingTable.index("test");
IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex());
for (final ObjectCursor<IndexShardRoutingTable> shardEntry : indexRoutingTable.getShards().values()) {
final IndexShardRoutingTable shardRoutingTable = shardEntry.value;
for (ShardRouting shardRouting : shardRoutingTable.getShards()) {
if (shardRouting.primary()) {
newIndexRoutingTable.addShard(primary);
} else {
newIndexRoutingTable.addShard(shardRouting);
}
}
}
routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build();
}
ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shards = ImmutableOpenMap.builder();
shards.put(primary.shardId(), new RestoreInProgress.ShardRestoreStatus(clusterState.getNodes().getLocalNodeId(), shardState));
Snapshot snapshot = recoverySource.snapshot();
RestoreInProgress.State restoreState = RestoreInProgress.State.STARTED;
RestoreInProgress.Entry restore = new RestoreInProgress.Entry(snapshot, restoreState, singletonList("test"), shards.build());
clusterState = ClusterState.builder(clusterState)
.putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restore))
.routingTable(routingTable)
.build();
Decision decision = executeAllocation(clusterState, primary);
if (shardState == RestoreInProgress.State.FAILURE) {
assertEquals(Decision.Type.NO, decision.type());
assertEquals("shard has failed to be restored from the snapshot [_repository:_existing/_uuid] because of " +
"[restore_source[_repository/_existing], failure IOException[i/o failure]] - manually close or delete the index " +
"[test] in order to retry to restore the snapshot again or use the reroute API to force the allocation of " +
"an empty primary shard", decision.getExplanation());
} else {
assertEquals(Decision.Type.YES, decision.type());
assertEquals("shard is currently being restored", decision.getExplanation());
}
}
private ClusterState createInitialClusterState() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder()
.add(newNode("master", Collections.singleton(DiscoveryNode.Role.MASTER)))
.localNodeId("master")
.masterNodeId("master")
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(routingTable)
.nodes(discoveryNodes)
.build();
assertEquals(2, clusterState.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size());
return clusterState;
}
private Decision executeAllocation(final ClusterState clusterState, final ShardRouting shardRouting) {
final AllocationDecider decider = new RestoreInProgressAllocationDecider(Settings.EMPTY);
final RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, Collections.singleton(decider)),
clusterState.getRoutingNodes(), clusterState, null, 0L);
allocation.debugDecision(true);
final Decision decision;
if (randomBoolean()) {
decision = decider.canAllocate(shardRouting, allocation);
} else {
DiscoveryNode node = clusterState.getNodes().getMasterNode();
decision = decider.canAllocate(shardRouting, new RoutingNode(node.getId(), node), allocation);
}
return decision;
}
private RecoverySource.SnapshotRecoverySource createSnapshotRecoverySource(final String snapshotName) {
Snapshot snapshot = new Snapshot("_repository", new SnapshotId(snapshotName, "_uuid"));
return new RecoverySource.SnapshotRecoverySource(snapshot, Version.CURRENT, "test");
}
}

View File

@ -46,6 +46,7 @@ import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.Entry;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
@ -55,6 +56,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
@ -97,14 +102,15 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist;
@ -117,9 +123,11 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertInde
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
@ -824,6 +832,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
prepareCreate("test-idx").setSettings(Settings.builder().put("index.allocation.max_retries", Integer.MAX_VALUE)).get();
ensureGreen();
final NumShards numShards = getNumShards("test-idx");
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
@ -848,14 +858,31 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> delete index");
cluster().wipeIndices("test-idx");
logger.info("--> restore index after deletion");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
SearchResponse countResponse = client.prepareSearch("test-idx").setSize(0).get();
assertThat(countResponse.getHits().getTotalHits(), equalTo(100L));
final RestoreSnapshotResponse restoreResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.get();
logger.info("--> total number of simulated failures during restore: [{}]", getFailureCount("test-repo"));
final RestoreInfo restoreInfo = restoreResponse.getRestoreInfo();
assertThat(restoreInfo.totalShards(), equalTo(numShards.numPrimaries));
if (restoreInfo.successfulShards() == restoreInfo.totalShards()) {
// All shards were restored, we must find the exact number of hits
assertHitCount(client.prepareSearch("test-idx").setSize(0).get(), 100L);
} else {
// One or more shards failed to be restored. This can happen when there is
// only 1 data node: a shard failed because of the random IO exceptions
// during restore and then we don't allow the shard to be assigned on the
// same node again during the same reroute operation. Then another reroute
// operation is scheduled, but the RestoreInProgressAllocationDecider will
// block the shard to be assigned again because it failed during restore.
final ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().get();
assertEquals(1, clusterStateResponse.getState().getNodes().getDataNodes().size());
assertEquals(restoreInfo.failedShards(),
clusterStateResponse.getState().getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size());
}
}
@TestLogging("org.elasticsearch.cluster.routing:TRACE,org.elasticsearch.snapshots:TRACE")
public void testDataFileCorruptionDuringRestore() throws Exception {
Path repositoryLocation = randomRepoPath();
Client client = client();
@ -907,6 +934,155 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
cluster().wipeIndices("test-idx");
}
/**
* Test that restoring a snapshot whose files can't be downloaded at all is not stuck or
* does not hang indefinitely.
*/
public void testUnrestorableFilesDuringRestore() throws Exception {
final String indexName = "unrestorable-files";
final int maxRetries = randomIntBetween(1, 10);
Settings createIndexSettings = Settings.builder().put(SETTING_ALLOCATION_MAX_RETRY.getKey(), maxRetries).build();
Settings repositorySettings = Settings.builder()
.put("random", randomAlphaOfLength(10))
.put("max_failure_number", 10000000L)
// No lucene corruptions, we want to test retries
.put("use_lucene_corruption", false)
// Restoring a file will never complete
.put("random_data_file_io_exception_rate", 1.0)
.build();
Consumer<UnassignedInfo> checkUnassignedInfo = unassignedInfo -> {
assertThat(unassignedInfo.getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
assertThat(unassignedInfo.getNumFailedAllocations(), anyOf(equalTo(maxRetries), equalTo(1)));
};
unrestorableUseCase(indexName, createIndexSettings, repositorySettings, Settings.EMPTY, checkUnassignedInfo, () -> {});
}
/**
* Test that restoring an index with shard allocation filtering settings that prevents
* its allocation does not hang indefinitely.
*/
public void testUnrestorableIndexDuringRestore() throws Exception {
final String indexName = "unrestorable-index";
Settings restoreIndexSettings = Settings.builder().put("index.routing.allocation.include._name", randomAlphaOfLength(5)).build();
Consumer<UnassignedInfo> checkUnassignedInfo = unassignedInfo -> {
assertThat(unassignedInfo.getReason(), equalTo(UnassignedInfo.Reason.NEW_INDEX_RESTORED));
};
Runnable fixupAction =() -> {
// remove the shard allocation filtering settings and use the Reroute API to retry the failed shards
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder()
.putNull("index.routing.allocation.include._name")
.build()));
assertAcked(client().admin().cluster().prepareReroute().setRetryFailed(true));
};
unrestorableUseCase(indexName, Settings.EMPTY, Settings.EMPTY, restoreIndexSettings, checkUnassignedInfo, fixupAction);
}
/** Execute the unrestorable test use case **/
private void unrestorableUseCase(final String indexName,
final Settings createIndexSettings,
final Settings repositorySettings,
final Settings restoreIndexSettings,
final Consumer<UnassignedInfo> checkUnassignedInfo,
final Runnable fixUpAction) throws Exception {
// create a test repository
final Path repositoryLocation = randomRepoPath();
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs")
.setSettings(Settings.builder().put("location", repositoryLocation)));
// create a test index
assertAcked(prepareCreate(indexName, Settings.builder().put(createIndexSettings)));
// index some documents
final int nbDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < nbDocs; i++) {
index(indexName, "doc", Integer.toString(i), "foo", "bar" + i);
}
flushAndRefresh(indexName);
assertThat(client().prepareSearch(indexName).setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs));
// create a snapshot
final NumShards numShards = getNumShards(indexName);
CreateSnapshotResponse snapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices(indexName)
.get();
assertThat(snapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotResponse.getSnapshotInfo().successfulShards(), equalTo(numShards.numPrimaries));
assertThat(snapshotResponse.getSnapshotInfo().failedShards(), equalTo(0));
// delete the test index
assertAcked(client().admin().indices().prepareDelete(indexName));
// update the test repository
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("mock")
.setSettings(Settings.builder()
.put("location", repositoryLocation)
.put(repositorySettings)
.build()));
// attempt to restore the snapshot with the given settings
RestoreSnapshotResponse restoreResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
.setIndices(indexName)
.setIndexSettings(restoreIndexSettings)
.setWaitForCompletion(true)
.get();
// check that all shards failed during restore
assertThat(restoreResponse.getRestoreInfo().totalShards(), equalTo(numShards.numPrimaries));
assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(0));
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setCustoms(true).setRoutingTable(true).get();
// check that there is no restore in progress
RestoreInProgress restoreInProgress = clusterStateResponse.getState().custom(RestoreInProgress.TYPE);
assertNotNull("RestoreInProgress must be not null", restoreInProgress);
assertThat("RestoreInProgress must be empty", restoreInProgress.entries(), hasSize(0));
// check that the shards have been created but are not assigned
assertThat(clusterStateResponse.getState().getRoutingTable().allShards(indexName), hasSize(numShards.totalNumShards));
// check that every primary shard is unassigned
for (ShardRouting shard : clusterStateResponse.getState().getRoutingTable().allShards(indexName)) {
if (shard.primary()) {
assertThat(shard.state(), equalTo(ShardRoutingState.UNASSIGNED));
assertThat(shard.recoverySource().getType(), equalTo(RecoverySource.Type.SNAPSHOT));
assertThat(shard.unassignedInfo().getLastAllocationStatus(), equalTo(UnassignedInfo.AllocationStatus.DECIDERS_NO));
checkUnassignedInfo.accept(shard.unassignedInfo());
}
}
// update the test repository in order to make it work
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs")
.setSettings(Settings.builder().put("location", repositoryLocation)));
// execute action to eventually fix the situation
fixUpAction.run();
// delete the index and restore again
assertAcked(client().admin().indices().prepareDelete(indexName));
restoreResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).get();
assertThat(restoreResponse.getRestoreInfo().totalShards(), equalTo(numShards.numPrimaries));
assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(numShards.numPrimaries));
// Wait for the shards to be assigned
ensureGreen(indexName);
refresh(indexName);
assertThat(client().prepareSearch(indexName).setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs));
}
public void testDeletionOfFailingToRecoverIndexShouldStopRestore() throws Exception {
Path repositoryLocation = randomRepoPath();
Client client = client();