From a1ed3471103ef0f230c4fb2721ef9dbb901fcc2b Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 12 Dec 2017 09:51:18 +0100 Subject: [PATCH] Fail restore when the shard allocations max retries count is reached (#27493) This commit changes the RestoreService so that it now fails the snapshot restore if one of the shards to restore has failed to be allocated. It also adds a new RestoreInProgressAllocationDecider that forbids such shards to be allocated again. This way, when a restore is impossible or failed too many times, the user is forced to take a manual action (like deleting the index which failed shards) in order to try to restore it again. This behaviour has been implemented because when the allocation of a shard has been retried too many times, the MaxRetryDecider is engaged to prevent any future allocation of the failed shard. If it happens while restoring a snapshot, the restore hanged and was never completed because it stayed around waiting for the shards to be assigned (and that won't happen). It also blocked future attempts to restore the snapshot again. With this commit, the restore does not hang and is marked as failed, leaving failed shards around for investigation. This is the second part of the #26865 issue. Closes #26865 --- .../elasticsearch/cluster/ClusterModule.java | 2 + .../routing/allocation/AllocationService.java | 9 +- .../RestoreInProgressAllocationDecider.java | 86 ++++++++ .../snapshots/RestoreService.java | 37 ++-- .../cluster/ClusterModuleTests.java | 2 + .../allocation/ThrottlingAllocationTests.java | 38 +++- ...storeInProgressAllocationDeciderTests.java | 208 ++++++++++++++++++ .../SharedClusterSnapshotRestoreIT.java | 188 +++++++++++++++- 8 files changed, 541 insertions(+), 29 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java create mode 100644 core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index a4bb6a55925..9baa47fbc26 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -54,6 +54,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDeci import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; @@ -191,6 +192,7 @@ public class ClusterModule extends AbstractModule { addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings)); addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider(settings)); + addAllocationDecider(deciders, new RestoreInProgressAllocationDecider(settings)); addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings)); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 774e4b9301c..d79237b8a65 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -48,6 +48,8 @@ import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; @@ -135,13 +137,14 @@ public class AllocationService extends AbstractComponent { return newState; } + // Used for testing public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard) { - return applyFailedShards(clusterState, Collections.singletonList(new FailedShard(failedShard, null, null)), - Collections.emptyList()); + return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null)), emptyList()); } + // Used for testing public ClusterState applyFailedShards(ClusterState clusterState, List failedShards) { - return applyFailedShards(clusterState, failedShards, Collections.emptyList()); + return applyFailedShards(clusterState, failedShards, emptyList()); } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java new file mode 100644 index 00000000000..3fefd4e0abb --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.snapshots.Snapshot; + +/** + * This {@link AllocationDecider} prevents shards that have failed to be + * restored from a snapshot to be allocated. + */ +public class RestoreInProgressAllocationDecider extends AllocationDecider { + + public static final String NAME = "restore_in_progress"; + + /** + * Creates a new {@link RestoreInProgressAllocationDecider} instance from + * given settings + * + * @param settings {@link Settings} to use + */ + public RestoreInProgressAllocationDecider(Settings settings) { + super(settings); + } + + @Override + public Decision canAllocate(final ShardRouting shardRouting, final RoutingNode node, final RoutingAllocation allocation) { + return canAllocate(shardRouting, allocation); + } + + @Override + public Decision canAllocate(final ShardRouting shardRouting, final RoutingAllocation allocation) { + final RecoverySource recoverySource = shardRouting.recoverySource(); + if (recoverySource == null || recoverySource.getType() != RecoverySource.Type.SNAPSHOT) { + return allocation.decision(Decision.YES, NAME, "ignored as shard is not being recovered from a snapshot"); + } + + final Snapshot snapshot = ((RecoverySource.SnapshotRecoverySource) recoverySource).snapshot(); + final RestoreInProgress restoresInProgress = allocation.custom(RestoreInProgress.TYPE); + + if (restoresInProgress != null) { + for (RestoreInProgress.Entry restoreInProgress : restoresInProgress.entries()) { + if (restoreInProgress.snapshot().equals(snapshot)) { + RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId()); + if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) { + assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting + + "] to be in initializing state but got [" + shardRestoreStatus.state() + "]"; + return allocation.decision(Decision.YES, NAME, "shard is currently being restored"); + } + break; + } + } + } + return allocation.decision(Decision.NO, NAME, "shard has failed to be restored from the snapshot [%s] because of [%s] - " + + "manually close or delete the index [%s] in order to retry to restore the snapshot again or use the reroute API to force the " + + "allocation of an empty primary shard", snapshot, shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName()); + } + + @Override + public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard " + shardRouting; + return canAllocate(shardRouting, node, allocation); + } +} diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index a7d68a8197b..63b461afbd7 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -64,7 +64,6 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -534,7 +533,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp RecoverySource recoverySource = initializingShard.recoverySource(); if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot(); - changes(snapshot).startedShards.put(initializingShard.shardId(), + changes(snapshot).shards.put(initializingShard.shardId(), new ShardRestoreStatus(initializingShard.currentNodeId(), RestoreInProgress.State.SUCCESS)); } } @@ -550,7 +549,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp // to restore this shard on another node if the snapshot files are corrupt. In case where a node just left or crashed, // however, we only want to acknowledge the restore operation once it has been successfully restored on another node. if (unassignedInfo.getFailure() != null && Lucene.isCorruptionException(unassignedInfo.getFailure().getCause())) { - changes(snapshot).failedShards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(), + changes(snapshot).shards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(), RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage())); } } @@ -563,11 +562,24 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT && initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) { Snapshot snapshot = ((SnapshotRecoverySource) unassignedShard.recoverySource()).snapshot(); - changes(snapshot).failedShards.put(unassignedShard.shardId(), new ShardRestoreStatus(null, + changes(snapshot).shards.put(unassignedShard.shardId(), new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource())); } } + @Override + public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) { + RecoverySource recoverySource = unassignedShard.recoverySource(); + if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { + if (newUnassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO) { + Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot(); + String reason = "shard could not be allocated to any of the nodes"; + changes(snapshot).shards.put(unassignedShard.shardId(), + new ShardRestoreStatus(unassignedShard.currentNodeId(), RestoreInProgress.State.FAILURE, reason)); + } + } + } + /** * Helper method that creates update entry for the given shard id if such an entry does not exist yet. */ @@ -576,25 +588,21 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp } private static class Updates { - private Map failedShards = new HashMap<>(); - private Map startedShards = new HashMap<>(); + private Map shards = new HashMap<>(); } - public RestoreInProgress applyChanges(RestoreInProgress oldRestore) { + public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) { if (shardChanges.isEmpty() == false) { final List entries = new ArrayList<>(); for (RestoreInProgress.Entry entry : oldRestore.entries()) { Snapshot snapshot = entry.snapshot(); Updates updates = shardChanges.get(snapshot); - assert Sets.haveEmptyIntersection(updates.startedShards.keySet(), updates.failedShards.keySet()); - if (updates.startedShards.isEmpty() == false || updates.failedShards.isEmpty() == false) { + if (updates.shards.isEmpty() == false) { ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(entry.shards()); - for (Map.Entry startedShardEntry : updates.startedShards.entrySet()) { - shardsBuilder.put(startedShardEntry.getKey(), startedShardEntry.getValue()); - } - for (Map.Entry failedShardEntry : updates.failedShards.entrySet()) { - shardsBuilder.put(failedShardEntry.getKey(), failedShardEntry.getValue()); + for (Map.Entry shard : updates.shards.entrySet()) { + shardsBuilder.put(shard.getKey(), shard.getValue()); } + ImmutableOpenMap shards = shardsBuilder.build(); RestoreInProgress.State newState = overallState(RestoreInProgress.State.STARTED, shards); entries.add(new RestoreInProgress.Entry(entry.snapshot(), newState, entry.indices(), shards)); @@ -607,7 +615,6 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp return oldRestore; } } - } public static RestoreInProgress.Entry restoreInProgress(ClusterState state, Snapshot snapshot) { diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 6fd3d66c8f8..176616690f0 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocatio import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; @@ -183,6 +184,7 @@ public class ClusterModuleTests extends ModuleTestCase { EnableAllocationDecider.class, NodeVersionAllocationDecider.class, SnapshotInProgressAllocationDecider.class, + RestoreInProgressAllocationDecider.class, FilterAllocationDecider.class, SameShardAllocationDecider.class, DiskThresholdDecider.class, diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java index 5cafe410d56..8be4c858655 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -38,6 +39,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; @@ -46,7 +48,10 @@ import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.gateway.TestGatewayAllocator; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -309,6 +314,8 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { DiscoveryNode node1 = newNode("node1"); MetaData.Builder metaDataBuilder = new MetaData.Builder(metaData); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Snapshot snapshot = new Snapshot("repo", new SnapshotId("snap", "randomId")); + Set snapshotIndices = new HashSet<>(); for (ObjectCursor cursor: metaData.indices().values()) { Index index = cursor.value.getIndex(); IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(cursor.value); @@ -329,14 +336,14 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { routingTableBuilder.addAsFromDangling(indexMetaData); break; case 3: + snapshotIndices.add(index.getName()); routingTableBuilder.addAsNewRestore(indexMetaData, - new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT, - indexMetaData.getIndex().getName()), new IntHashSet()); + new SnapshotRecoverySource(snapshot, Version.CURRENT, indexMetaData.getIndex().getName()), new IntHashSet()); break; case 4: + snapshotIndices.add(index.getName()); routingTableBuilder.addAsRestore(indexMetaData, - new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT, - indexMetaData.getIndex().getName())); + new SnapshotRecoverySource(snapshot, Version.CURRENT, indexMetaData.getIndex().getName())); break; case 5: routingTableBuilder.addAsNew(indexMetaData); @@ -345,10 +352,31 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { throw new IndexOutOfBoundsException(); } } + + final RoutingTable routingTable = routingTableBuilder.build(); + + final ImmutableOpenMap.Builder restores = ImmutableOpenMap.builder(); + if (snapshotIndices.isEmpty() == false) { + // Some indices are restored from snapshot, the RestoreInProgress must be set accordingly + ImmutableOpenMap.Builder restoreShards = ImmutableOpenMap.builder(); + for (ShardRouting shard : routingTable.allShards()) { + if (shard.primary() && shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) { + ShardId shardId = shard.shardId(); + restoreShards.put(shardId, new RestoreInProgress.ShardRestoreStatus(node1.getId(), RestoreInProgress.State.INIT)); + } + } + + RestoreInProgress.Entry restore = new RestoreInProgress.Entry(snapshot, RestoreInProgress.State.INIT, + new ArrayList<>(snapshotIndices), restoreShards.build()); + restores.put(RestoreInProgress.TYPE, new RestoreInProgress(restore)); + } + return ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .nodes(DiscoveryNodes.builder().add(node1)) .metaData(metaDataBuilder.build()) - .routingTable(routingTableBuilder.build()).build(); + .routingTable(routingTable) + .customs(restores.build()) + .build(); } private void addInSyncAllocationIds(Index index, IndexMetaData.Builder indexMetaData, diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java new file mode 100644 index 00000000000..49d69272af6 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java @@ -0,0 +1,208 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing.allocation.decider; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.IOException; +import java.util.Collections; + +import static java.util.Collections.singletonList; + +/** + * Test {@link RestoreInProgressAllocationDecider} + */ +public class RestoreInProgressAllocationDeciderTests extends ESAllocationTestCase { + + public void testCanAllocatePrimary() { + ClusterState clusterState = createInitialClusterState(); + ShardRouting shard; + if (randomBoolean()) { + shard = clusterState.getRoutingTable().shardRoutingTable("test", 0).primaryShard(); + assertEquals(RecoverySource.Type.EMPTY_STORE, shard.recoverySource().getType()); + } else { + shard = clusterState.getRoutingTable().shardRoutingTable("test", 0).replicaShards().get(0); + assertEquals(RecoverySource.Type.PEER, shard.recoverySource().getType()); + } + + final Decision decision = executeAllocation(clusterState, shard); + assertEquals(Decision.Type.YES, decision.type()); + assertEquals("ignored as shard is not being recovered from a snapshot", decision.getExplanation()); + } + + public void testCannotAllocatePrimaryMissingInRestoreInProgress() { + ClusterState clusterState = createInitialClusterState(); + RoutingTable routingTable = RoutingTable.builder(clusterState.getRoutingTable()) + .addAsRestore(clusterState.getMetaData().index("test"), createSnapshotRecoverySource("_missing")) + .build(); + + clusterState = ClusterState.builder(clusterState) + .routingTable(routingTable) + .build(); + + ShardRouting primary = clusterState.getRoutingTable().shardRoutingTable("test", 0).primaryShard(); + assertEquals(ShardRoutingState.UNASSIGNED, primary.state()); + assertEquals(RecoverySource.Type.SNAPSHOT, primary.recoverySource().getType()); + + final Decision decision = executeAllocation(clusterState, primary); + assertEquals(Decision.Type.NO, decision.type()); + assertEquals("shard has failed to be restored from the snapshot [_repository:_missing/_uuid] because of " + + "[restore_source[_repository/_missing]] - manually close or delete the index [test] in order to retry to restore " + + "the snapshot again or use the reroute API to force the allocation of an empty primary shard", decision.getExplanation()); + } + + public void testCanAllocatePrimaryExistingInRestoreInProgress() { + RecoverySource.SnapshotRecoverySource recoverySource = createSnapshotRecoverySource("_existing"); + + ClusterState clusterState = createInitialClusterState(); + RoutingTable routingTable = RoutingTable.builder(clusterState.getRoutingTable()) + .addAsRestore(clusterState.getMetaData().index("test"), recoverySource) + .build(); + + clusterState = ClusterState.builder(clusterState) + .routingTable(routingTable) + .build(); + + ShardRouting primary = clusterState.getRoutingTable().shardRoutingTable("test", 0).primaryShard(); + assertEquals(ShardRoutingState.UNASSIGNED, primary.state()); + assertEquals(RecoverySource.Type.SNAPSHOT, primary.recoverySource().getType()); + + routingTable = clusterState.routingTable(); + + final RestoreInProgress.State shardState; + if (randomBoolean()) { + shardState = randomFrom(RestoreInProgress.State.STARTED, RestoreInProgress.State.INIT); + } else { + shardState = RestoreInProgress.State.FAILURE; + + UnassignedInfo currentInfo = primary.unassignedInfo(); + UnassignedInfo newInfo = new UnassignedInfo(currentInfo.getReason(), currentInfo.getMessage(), new IOException("i/o failure"), + currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), + currentInfo.getUnassignedTimeInMillis(), currentInfo.isDelayed(), currentInfo.getLastAllocationStatus()); + primary = primary.updateUnassigned(newInfo, primary.recoverySource()); + + IndexRoutingTable indexRoutingTable = routingTable.index("test"); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + newIndexRoutingTable.addShard(primary); + } else { + newIndexRoutingTable.addShard(shardRouting); + } + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + } + + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + shards.put(primary.shardId(), new RestoreInProgress.ShardRestoreStatus(clusterState.getNodes().getLocalNodeId(), shardState)); + + Snapshot snapshot = recoverySource.snapshot(); + RestoreInProgress.State restoreState = RestoreInProgress.State.STARTED; + RestoreInProgress.Entry restore = new RestoreInProgress.Entry(snapshot, restoreState, singletonList("test"), shards.build()); + + clusterState = ClusterState.builder(clusterState) + .putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restore)) + .routingTable(routingTable) + .build(); + + Decision decision = executeAllocation(clusterState, primary); + if (shardState == RestoreInProgress.State.FAILURE) { + assertEquals(Decision.Type.NO, decision.type()); + assertEquals("shard has failed to be restored from the snapshot [_repository:_existing/_uuid] because of " + + "[restore_source[_repository/_existing], failure IOException[i/o failure]] - manually close or delete the index " + + "[test] in order to retry to restore the snapshot again or use the reroute API to force the allocation of " + + "an empty primary shard", decision.getExplanation()); + } else { + assertEquals(Decision.Type.YES, decision.type()); + assertEquals("shard is currently being restored", decision.getExplanation()); + } + } + + private ClusterState createInitialClusterState() { + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(newNode("master", Collections.singleton(DiscoveryNode.Role.MASTER))) + .localNodeId("master") + .masterNodeId("master") + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(discoveryNodes) + .build(); + + assertEquals(2, clusterState.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size()); + return clusterState; + } + + private Decision executeAllocation(final ClusterState clusterState, final ShardRouting shardRouting) { + final AllocationDecider decider = new RestoreInProgressAllocationDecider(Settings.EMPTY); + final RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, Collections.singleton(decider)), + clusterState.getRoutingNodes(), clusterState, null, 0L); + allocation.debugDecision(true); + + final Decision decision; + if (randomBoolean()) { + decision = decider.canAllocate(shardRouting, allocation); + } else { + DiscoveryNode node = clusterState.getNodes().getMasterNode(); + decision = decider.canAllocate(shardRouting, new RoutingNode(node.getId(), node), allocation); + } + return decision; + } + + private RecoverySource.SnapshotRecoverySource createSnapshotRecoverySource(final String snapshotName) { + Snapshot snapshot = new Snapshot("_repository", new SnapshotId(snapshotName, "_uuid")); + return new RecoverySource.SnapshotRecoverySource(snapshot, Version.CURRENT, "test"); + } +} diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index fa3920c1e8d..3f9c80f3ffa 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -46,6 +46,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.Entry; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; @@ -55,6 +56,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -97,14 +102,15 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING; -import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist; @@ -117,9 +123,11 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertInde import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; @@ -824,6 +832,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas prepareCreate("test-idx").setSettings(Settings.builder().put("index.allocation.max_retries", Integer.MAX_VALUE)).get(); ensureGreen(); + final NumShards numShards = getNumShards("test-idx"); + logger.info("--> indexing some data"); for (int i = 0; i < 100; i++) { index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); @@ -848,14 +858,31 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> delete index"); cluster().wipeIndices("test-idx"); logger.info("--> restore index after deletion"); - RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - SearchResponse countResponse = client.prepareSearch("test-idx").setSize(0).get(); - assertThat(countResponse.getHits().getTotalHits(), equalTo(100L)); + final RestoreSnapshotResponse restoreResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") + .setWaitForCompletion(true) + .get(); + logger.info("--> total number of simulated failures during restore: [{}]", getFailureCount("test-repo")); + final RestoreInfo restoreInfo = restoreResponse.getRestoreInfo(); + assertThat(restoreInfo.totalShards(), equalTo(numShards.numPrimaries)); + + if (restoreInfo.successfulShards() == restoreInfo.totalShards()) { + // All shards were restored, we must find the exact number of hits + assertHitCount(client.prepareSearch("test-idx").setSize(0).get(), 100L); + } else { + // One or more shards failed to be restored. This can happen when there is + // only 1 data node: a shard failed because of the random IO exceptions + // during restore and then we don't allow the shard to be assigned on the + // same node again during the same reroute operation. Then another reroute + // operation is scheduled, but the RestoreInProgressAllocationDecider will + // block the shard to be assigned again because it failed during restore. + final ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().get(); + assertEquals(1, clusterStateResponse.getState().getNodes().getDataNodes().size()); + assertEquals(restoreInfo.failedShards(), + clusterStateResponse.getState().getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size()); + } } - @TestLogging("org.elasticsearch.cluster.routing:TRACE,org.elasticsearch.snapshots:TRACE") public void testDataFileCorruptionDuringRestore() throws Exception { Path repositoryLocation = randomRepoPath(); Client client = client(); @@ -907,6 +934,155 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas cluster().wipeIndices("test-idx"); } + /** + * Test that restoring a snapshot whose files can't be downloaded at all is not stuck or + * does not hang indefinitely. + */ + public void testUnrestorableFilesDuringRestore() throws Exception { + final String indexName = "unrestorable-files"; + final int maxRetries = randomIntBetween(1, 10); + + Settings createIndexSettings = Settings.builder().put(SETTING_ALLOCATION_MAX_RETRY.getKey(), maxRetries).build(); + + Settings repositorySettings = Settings.builder() + .put("random", randomAlphaOfLength(10)) + .put("max_failure_number", 10000000L) + // No lucene corruptions, we want to test retries + .put("use_lucene_corruption", false) + // Restoring a file will never complete + .put("random_data_file_io_exception_rate", 1.0) + .build(); + + Consumer checkUnassignedInfo = unassignedInfo -> { + assertThat(unassignedInfo.getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); + assertThat(unassignedInfo.getNumFailedAllocations(), anyOf(equalTo(maxRetries), equalTo(1))); + }; + + unrestorableUseCase(indexName, createIndexSettings, repositorySettings, Settings.EMPTY, checkUnassignedInfo, () -> {}); + } + + /** + * Test that restoring an index with shard allocation filtering settings that prevents + * its allocation does not hang indefinitely. + */ + public void testUnrestorableIndexDuringRestore() throws Exception { + final String indexName = "unrestorable-index"; + Settings restoreIndexSettings = Settings.builder().put("index.routing.allocation.include._name", randomAlphaOfLength(5)).build(); + + Consumer checkUnassignedInfo = unassignedInfo -> { + assertThat(unassignedInfo.getReason(), equalTo(UnassignedInfo.Reason.NEW_INDEX_RESTORED)); + }; + + Runnable fixupAction =() -> { + // remove the shard allocation filtering settings and use the Reroute API to retry the failed shards + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder() + .putNull("index.routing.allocation.include._name") + .build())); + assertAcked(client().admin().cluster().prepareReroute().setRetryFailed(true)); + }; + + unrestorableUseCase(indexName, Settings.EMPTY, Settings.EMPTY, restoreIndexSettings, checkUnassignedInfo, fixupAction); + } + + /** Execute the unrestorable test use case **/ + private void unrestorableUseCase(final String indexName, + final Settings createIndexSettings, + final Settings repositorySettings, + final Settings restoreIndexSettings, + final Consumer checkUnassignedInfo, + final Runnable fixUpAction) throws Exception { + // create a test repository + final Path repositoryLocation = randomRepoPath(); + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("fs") + .setSettings(Settings.builder().put("location", repositoryLocation))); + // create a test index + assertAcked(prepareCreate(indexName, Settings.builder().put(createIndexSettings))); + + // index some documents + final int nbDocs = scaledRandomIntBetween(10, 100); + for (int i = 0; i < nbDocs; i++) { + index(indexName, "doc", Integer.toString(i), "foo", "bar" + i); + } + flushAndRefresh(indexName); + assertThat(client().prepareSearch(indexName).setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs)); + + // create a snapshot + final NumShards numShards = getNumShards(indexName); + CreateSnapshotResponse snapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + .setWaitForCompletion(true) + .setIndices(indexName) + .get(); + + assertThat(snapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotResponse.getSnapshotInfo().successfulShards(), equalTo(numShards.numPrimaries)); + assertThat(snapshotResponse.getSnapshotInfo().failedShards(), equalTo(0)); + + // delete the test index + assertAcked(client().admin().indices().prepareDelete(indexName)); + + // update the test repository + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("mock") + .setSettings(Settings.builder() + .put("location", repositoryLocation) + .put(repositorySettings) + .build())); + + // attempt to restore the snapshot with the given settings + RestoreSnapshotResponse restoreResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") + .setIndices(indexName) + .setIndexSettings(restoreIndexSettings) + .setWaitForCompletion(true) + .get(); + + // check that all shards failed during restore + assertThat(restoreResponse.getRestoreInfo().totalShards(), equalTo(numShards.numPrimaries)); + assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(0)); + + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setCustoms(true).setRoutingTable(true).get(); + + // check that there is no restore in progress + RestoreInProgress restoreInProgress = clusterStateResponse.getState().custom(RestoreInProgress.TYPE); + assertNotNull("RestoreInProgress must be not null", restoreInProgress); + assertThat("RestoreInProgress must be empty", restoreInProgress.entries(), hasSize(0)); + + // check that the shards have been created but are not assigned + assertThat(clusterStateResponse.getState().getRoutingTable().allShards(indexName), hasSize(numShards.totalNumShards)); + + // check that every primary shard is unassigned + for (ShardRouting shard : clusterStateResponse.getState().getRoutingTable().allShards(indexName)) { + if (shard.primary()) { + assertThat(shard.state(), equalTo(ShardRoutingState.UNASSIGNED)); + assertThat(shard.recoverySource().getType(), equalTo(RecoverySource.Type.SNAPSHOT)); + assertThat(shard.unassignedInfo().getLastAllocationStatus(), equalTo(UnassignedInfo.AllocationStatus.DECIDERS_NO)); + checkUnassignedInfo.accept(shard.unassignedInfo()); + } + } + + // update the test repository in order to make it work + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("fs") + .setSettings(Settings.builder().put("location", repositoryLocation))); + + // execute action to eventually fix the situation + fixUpAction.run(); + + // delete the index and restore again + assertAcked(client().admin().indices().prepareDelete(indexName)); + + restoreResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).get(); + assertThat(restoreResponse.getRestoreInfo().totalShards(), equalTo(numShards.numPrimaries)); + assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(numShards.numPrimaries)); + + // Wait for the shards to be assigned + ensureGreen(indexName); + refresh(indexName); + + assertThat(client().prepareSearch(indexName).setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs)); + } + public void testDeletionOfFailingToRecoverIndexShouldStopRestore() throws Exception { Path repositoryLocation = randomRepoPath(); Client client = client();