From 075047065d5dd692d0a74f6ca70e796880f84df1 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 12 Oct 2016 09:06:37 +0200 Subject: [PATCH] Keep snapshot restore state and routing table in sync (#20836) The snapshot restore state tracks information about shards being restored from a snapshot in the cluster state. For example it records if a shard has been successfully restored or if restoring it was not possible due to a corruption of the snapshot. Recording these events is usually based on changes to the shard routing table, i.e., when a shard is started after a successful restore or failed after an unsuccessful one. As of now, there were two communication channels to transmit recovery failure / success to update the routing table and the restore state. This lead to issues where a shard was failed but the restore state was not updated due to connection issues between data and master node. In some rare situations, this lead to an issue where the restore state could not be properly cleaned up anymore by the master, making it impossible to start new restore operations. The following change updates routing table and restore state in the same cluster state update so that both always stay in sync. It also eliminates the extra communication channel for restore operations and uses standard cluster state listener mechanism to update restore listener upon successful completion of a snapshot. --- .../TransportRestoreSnapshotAction.java | 56 +- .../metadata/MetaDataDeleteIndexService.java | 24 +- .../routing/allocation/AllocationService.java | 21 +- .../routing/allocation/RoutingAllocation.java | 17 +- .../cluster/IndicesClusterStateService.java | 22 +- .../snapshots/RestoreService.java | 559 +++++++----------- .../MetaDataIndexAliasesServiceTests.java | 4 +- .../SharedClusterSnapshotRestoreIT.java | 66 ++- .../snapshots/mockstore/MockRepository.java | 10 +- 9 files changed, 373 insertions(+), 406 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index 070db6c5248..3ef12dfff0e 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -22,19 +22,27 @@ package org.elasticsearch.action.admin.cluster.snapshots.restore; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; + /** * Transport action for restore snapshot operation */ @@ -78,28 +86,44 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction() { + restoreService.restoreSnapshot(restoreRequest, new ActionListener() { @Override - public void onResponse(RestoreInfo restoreInfo) { - if (restoreInfo == null && request.waitForCompletion()) { - restoreService.addListener(new ActionListener() { + public void onResponse(RestoreCompletionResponse restoreCompletionResponse) { + if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { + final Snapshot snapshot = restoreCompletionResponse.getSnapshot(); + + ClusterStateListener clusterStateListener = new ClusterStateListener() { @Override - public void onResponse(RestoreService.RestoreCompletionResponse restoreCompletionResponse) { - final Snapshot snapshot = restoreCompletionResponse.getSnapshot(); - if (snapshot.getRepository().equals(request.repository()) && - snapshot.getSnapshotId().getName().equals(request.snapshot())) { - listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); - restoreService.removeListener(this); + public void clusterChanged(ClusterChangedEvent changedEvent) { + final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), snapshot); + final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), snapshot); + if (prevEntry == null) { + // When there is a master failure after a restore has been started, this listener might not be registered + // on the current master and as such it might miss some intermediary cluster states due to batching. + // Clean up listener in that case and acknowledge completion of restore operation to client. + clusterService.remove(this); + listener.onResponse(new RestoreSnapshotResponse(null)); + } else if (newEntry == null) { + clusterService.remove(this); + ImmutableOpenMap shards = prevEntry.shards(); + assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state(); + assert RestoreService.completed(shards) : "expected all restore entries to be completed"; + RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(), + prevEntry.indices(), + shards.size(), + shards.size() - RestoreService.failedShards(shards)); + RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri); + logger.debug("restore of [{}] completed", snapshot); + listener.onResponse(response); + } else { + // restore not completed yet, wait for next cluster state update } } + }; - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + clusterService.addLast(clusterStateListener); } else { - listener.onResponse(new RestoreSnapshotResponse(restoreInfo)); + listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index 22553dd9929..aa4fc847309 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -23,20 +23,22 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.delete.DeleteIndexClusterStateUpdateRequest; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; +import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotsService; -import java.util.Arrays; -import java.util.Collection; import java.util.Set; import static java.util.stream.Collectors.toSet; @@ -73,7 +75,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { @Override public ClusterState execute(final ClusterState currentState) { - return deleteIndices(currentState, Arrays.asList(request.indices())); + return deleteIndices(currentState, Sets.newHashSet(request.indices())); } }); } @@ -81,7 +83,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { /** * Delete some indices from the cluster state. */ - public ClusterState deleteIndices(ClusterState currentState, Collection indices) { + public ClusterState deleteIndices(ClusterState currentState, Set indices) { final MetaData meta = currentState.metaData(); final Set metaDatas = indices.stream().map(i -> meta.getIndexSafe(i)).collect(toSet()); // Check if index deletion conflicts with any running snapshots @@ -107,11 +109,25 @@ public class MetaDataDeleteIndexService extends AbstractComponent { MetaData newMetaData = metaDataBuilder.build(); ClusterBlocks blocks = clusterBlocksBuilder.build(); + + // update snapshot restore entries + ImmutableOpenMap customs = currentState.getCustoms(); + final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); + if (restoreInProgress != null) { + RestoreInProgress updatedRestoreInProgress = RestoreService.updateRestoreStateWithDeletedIndices(restoreInProgress, indices); + if (updatedRestoreInProgress != restoreInProgress) { + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(customs); + builder.put(RestoreInProgress.TYPE, updatedRestoreInProgress); + customs = builder.build(); + } + } + return allocationService.reroute( ClusterState.builder(currentState) .routingTable(routingTableBuilder.build()) .metaData(newMetaData) .blocks(blocks) + .customs(customs) .build(), "deleted indices [" + indices + "]"); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 323adf78046..b3eaa517934 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -34,6 +35,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -94,17 +96,24 @@ public class AllocationService extends AbstractComponent { } protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, RoutingAllocation allocation, String reason) { - return buildResultAndLogHealthChange(oldState, allocation, reason, new RoutingExplanations()); - } - - protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, RoutingAllocation allocation, String reason, - RoutingExplanations explanations) { RoutingTable oldRoutingTable = oldState.routingTable(); RoutingNodes newRoutingNodes = allocation.routingNodes(); final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(oldRoutingTable.version(), newRoutingNodes).build(); MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges(newRoutingTable); assert newRoutingTable.validate(newMetaData); // validates the routing table is coherent with the cluster state metadata - final ClusterState newState = ClusterState.builder(oldState).routingTable(newRoutingTable).metaData(newMetaData).build(); + final ClusterState.Builder newStateBuilder = ClusterState.builder(oldState) + .routingTable(newRoutingTable) + .metaData(newMetaData); + final RestoreInProgress restoreInProgress = allocation.custom(RestoreInProgress.TYPE); + if (restoreInProgress != null) { + RestoreInProgress updatedRestoreInProgress = allocation.updateRestoreInfoWithRoutingChanges(restoreInProgress); + if (updatedRestoreInProgress != restoreInProgress) { + ImmutableOpenMap.Builder customsBuilder = ImmutableOpenMap.builder(allocation.getCustoms()); + customsBuilder.put(RestoreInProgress.TYPE, updatedRestoreInProgress); + newStateBuilder.customs(customsBuilder.build()); + } + } + final ClusterState newState = newStateBuilder.build(); logClusterHealthStateChange( new ClusterStateHealth(oldState), new ClusterStateHealth(newState), diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 8429493b0e7..886b42f57d6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingChangesObserver; @@ -30,6 +31,8 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.RestoreService.RestoreInProgressUpdater; import java.util.HashMap; import java.util.HashSet; @@ -76,8 +79,9 @@ public class RoutingAllocation { private final IndexMetaDataUpdater indexMetaDataUpdater = new IndexMetaDataUpdater(); private final RoutingNodesChangedObserver nodesChangedObserver = new RoutingNodesChangedObserver(); + private final RestoreInProgressUpdater restoreInProgressUpdater = new RestoreInProgressUpdater(); private final RoutingChangesObserver routingChangesObserver = new RoutingChangesObserver.DelegatingRoutingChangesObserver( - nodesChangedObserver, indexMetaDataUpdater + nodesChangedObserver, indexMetaDataUpdater, restoreInProgressUpdater ); @@ -154,6 +158,10 @@ public class RoutingAllocation { return (T)customs.get(key); } + public ImmutableOpenMap getCustoms() { + return customs; + } + /** * Get explanations of current routing * @return explanation of routing @@ -234,6 +242,13 @@ public class RoutingAllocation { return indexMetaDataUpdater.applyChanges(metaData, newRoutingTable); } + /** + * Returns updated {@link RestoreInProgress} based on the changes that were made to the routing nodes + */ + public RestoreInProgress updateRestoreInfoWithRoutingChanges(RestoreInProgress restoreInProgress) { + return restoreInProgressUpdater.applyChanges(restoreInProgress); + } + /** * Returns true iff changes were made to the routing nodes */ diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index bbbcfc96d57..b55074f592d 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -570,8 +570,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple /** * Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard - * routing to *require* peer recovery, use {@link ShardRouting#recoverySource()} to - * check if its needed or not. + * routing to *require* peer recovery, use {@link ShardRouting#recoverySource()} to check if its needed or not. */ private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) { @@ -610,29 +609,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple @Override public void onRecoveryDone(RecoveryState state) { - if (state.getRecoverySource().getType() == Type.SNAPSHOT) { - SnapshotRecoverySource snapshotRecoverySource = (SnapshotRecoverySource) state.getRecoverySource(); - restoreService.indexShardRestoreCompleted(snapshotRecoverySource.snapshot(), shardRouting.shardId()); - } shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); } @Override public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { - if (state.getRecoverySource().getType() == Type.SNAPSHOT) { - try { - if (Lucene.isCorruptionException(e.getCause())) { - SnapshotRecoverySource snapshotRecoverySource = (SnapshotRecoverySource) state.getRecoverySource(); - restoreService.failRestore(snapshotRecoverySource.snapshot(), shardRouting.shardId()); - } - } catch (Exception inner) { - e.addSuppressed(inner); - } finally { - handleRecoveryFailure(shardRouting, sendShardFailure, e); - } - } else { - handleRecoveryFailure(shardRouting, sendShardFailure, e); - } + handleRecoveryFailure(shardRouting, sendShardFailure, e); } } diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 5e30a3b52b3..ed6317fcb30 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.Version; @@ -30,6 +31,9 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus; @@ -41,41 +45,32 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.metadata.RepositoriesMetaData; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.elasticsearch.cluster.routing.RoutingChangesObserver; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -83,12 +78,10 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; import static java.util.Collections.unmodifiableSet; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; @@ -117,14 +110,12 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet; * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * at the {@link ShardRouting#recoverySource()} property. *

- * At the end of the successful restore process {@code IndexShardSnapshotAndRestoreService} calls {@link #indexShardRestoreCompleted(Snapshot, ShardId)}, - * which updates {@link RestoreInProgress} in cluster state or removes it when all shards are completed. In case of + * At the end of the successful restore process {@code RestoreService} calls {@link #cleanupRestoreState(ClusterChangedEvent)}, + * which removes {@link RestoreInProgress} when all shards are completed. In case of * restore failure a normal recovery fail-over process kicks in. */ public class RestoreService extends AbstractComponent implements ClusterStateListener { - public static final String UPDATE_RESTORE_ACTION_NAME = "internal:cluster/snapshot/update_restore"; - private static final Set UNMODIFIABLE_SETTINGS = unmodifiableSet(newHashSet( SETTING_NUMBER_OF_SHARDS, SETTING_VERSION_CREATED, @@ -148,33 +139,29 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis private final RepositoriesService repositoriesService; - private final TransportService transportService; - private final AllocationService allocationService; private final MetaDataCreateIndexService createIndexService; private final MetaDataIndexUpgradeService metaDataIndexUpgradeService; - private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); - - private final BlockingQueue updatedSnapshotStateQueue = ConcurrentCollections.newBlockingQueue(); private final ClusterSettings clusterSettings; + private final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor; + @Inject - public RestoreService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, TransportService transportService, + public RestoreService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, AllocationService allocationService, MetaDataCreateIndexService createIndexService, MetaDataIndexUpgradeService metaDataIndexUpgradeService, ClusterSettings clusterSettings) { super(settings); this.clusterService = clusterService; this.repositoriesService = repositoriesService; - this.transportService = transportService; this.allocationService = allocationService; this.createIndexService = createIndexService; this.metaDataIndexUpgradeService = metaDataIndexUpgradeService; - transportService.registerRequestHandler(UPDATE_RESTORE_ACTION_NAME, UpdateIndexShardRestoreStatusRequest::new, ThreadPool.Names.SAME, new UpdateRestoreStateRequestHandler()); clusterService.add(this); this.clusterSettings = clusterSettings; + this.cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(logger); } /** @@ -183,7 +170,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis * @param request restore request * @param listener restore listener */ - public void restoreSnapshot(final RestoreRequest request, final ActionListener listener) { + public void restoreSnapshot(final RestoreRequest request, final ActionListener listener) { try { // Read snapshot info and metadata from the repository Repository repository = repositoriesService.repository(request.repositoryName); @@ -314,7 +301,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } shards = shardsBuilder.build(); - RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshot, RestoreInProgress.State.INIT, Collections.unmodifiableList(new ArrayList<>(renamedIndices.keySet())), shards); + RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshot, overallState(RestoreInProgress.State.INIT, shards), Collections.unmodifiableList(new ArrayList<>(renamedIndices.keySet())), shards); builder.putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restoreEntry)); } else { shards = ImmutableOpenMap.of(); @@ -469,7 +456,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(restoreInfo); + listener.onResponse(new RestoreCompletionResponse(snapshot, restoreInfo)); } }); @@ -480,19 +467,33 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } } - /** - * This method is used by {@link IndexShard} to notify - * {@code RestoreService} about shard restore completion. - * - * @param snapshot snapshot - * @param shardId shard id - */ - public void indexShardRestoreCompleted(Snapshot snapshot, ShardId shardId) { - logger.trace("[{}] successfully restored shard [{}]", snapshot, shardId); - UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshot, shardId, - new ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(), RestoreInProgress.State.SUCCESS)); - transportService.sendRequest(clusterService.state().nodes().getMasterNode(), - UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); + public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set deletedIndices) { + boolean changesMade = false; + final List entries = new ArrayList<>(); + for (RestoreInProgress.Entry entry : oldRestore.entries()) { + ImmutableOpenMap.Builder shardsBuilder = null; + for (ObjectObjectCursor cursor : entry.shards()) { + ShardId shardId = cursor.key; + if (deletedIndices.contains(shardId.getIndex())) { + changesMade = true; + if (shardsBuilder == null) { + shardsBuilder = ImmutableOpenMap.builder(entry.shards()); + } + shardsBuilder.put(shardId, new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, "index was deleted")); + } + } + if (shardsBuilder != null) { + ImmutableOpenMap shards = shardsBuilder.build(); + entries.add(new RestoreInProgress.Entry(entry.snapshot(), overallState(RestoreInProgress.State.STARTED, shards), entry.indices(), shards)); + } else { + entries.add(entry); + } + } + if (changesMade) { + return new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()])); + } else { + return oldRestore; + } } public static final class RestoreCompletionResponse { @@ -513,168 +514,201 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } } - /** - * Updates shard restore record in the cluster state. - * - * @param request update shard status request - */ - private void updateRestoreStateOnMaster(final UpdateIndexShardRestoreStatusRequest request) { - logger.trace("received updated snapshot restore state [{}]", request); - updatedSnapshotStateQueue.add(request); + public static class RestoreInProgressUpdater extends RoutingChangesObserver.AbstractRoutingChangesObserver { + private final Map shardChanges = new HashMap<>(); - clusterService.submitStateUpdateTask("update snapshot state", new ClusterStateUpdateTask() { - private final List drainedRequests = new ArrayList<>(); - private Map>> batchedRestoreInfo = null; - - @Override - public ClusterState execute(ClusterState currentState) { - - if (request.processed) { - return currentState; + @Override + public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) { + // mark snapshot as completed + if (initializingShard.primary()) { + RecoverySource recoverySource = initializingShard.recoverySource(); + if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { + Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot(); + changes(snapshot).startedShards.put(initializingShard.shardId(), + new ShardRestoreStatus(initializingShard.currentNodeId(), RestoreInProgress.State.SUCCESS)); } + } + } - updatedSnapshotStateQueue.drainTo(drainedRequests); - - final int batchSize = drainedRequests.size(); - - // nothing to process (a previous event has processed it already) - if (batchSize == 0) { - return currentState; + @Override + public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) { + if (failedShard.primary() && failedShard.initializing()) { + RecoverySource recoverySource = failedShard.recoverySource(); + if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { + Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot(); + // mark restore entry for this shard as failed when it's due to a file corruption. There is no need wait on retries + // to restore this shard on another node if the snapshot files are corrupt. In case where a node just left or crashed, + // however, we only want to acknowledge the restore operation once it has been successfully restored on another node. + if (unassignedInfo.getFailure() != null && Lucene.isCorruptionException(unassignedInfo.getFailure().getCause())) { + changes(snapshot).failedShards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(), + RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage())); + } } + } + } - final RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE); - if (restore != null) { - int changedCount = 0; - final List entries = new ArrayList<>(); - for (RestoreInProgress.Entry entry : restore.entries()) { - ImmutableOpenMap.Builder shardsBuilder = null; + @Override + public void shardInitialized(ShardRouting unassignedShard, ShardRouting initializedShard) { + // if we force an empty primary, we should also fail the restore entry + if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT && + initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) { + Snapshot snapshot = ((SnapshotRecoverySource) unassignedShard.recoverySource()).snapshot(); + changes(snapshot).failedShards.put(unassignedShard.shardId(), new ShardRestoreStatus(null, + RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource())); + } + } - for (int i = 0; i < batchSize; i++) { - final UpdateIndexShardRestoreStatusRequest updateSnapshotState = drainedRequests.get(i); - updateSnapshotState.processed = true; + /** + * Helper method that creates update entry for the given shard id if such an entry does not exist yet. + */ + private Updates changes(Snapshot snapshot) { + return shardChanges.computeIfAbsent(snapshot, k -> new Updates()); + } - if (entry.snapshot().equals(updateSnapshotState.snapshot())) { - logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state()); - if (shardsBuilder == null) { - shardsBuilder = ImmutableOpenMap.builder(entry.shards()); - } - shardsBuilder.put(updateSnapshotState.shardId(), updateSnapshotState.status()); - changedCount++; - } + private static class Updates { + private Map failedShards = new HashMap<>(); + private Map startedShards = new HashMap<>(); + } + + public RestoreInProgress applyChanges(RestoreInProgress oldRestore) { + if (shardChanges.isEmpty() == false) { + final List entries = new ArrayList<>(); + for (RestoreInProgress.Entry entry : oldRestore.entries()) { + Snapshot snapshot = entry.snapshot(); + Updates updates = shardChanges.get(snapshot); + assert Sets.haveEmptyIntersection(updates.startedShards.keySet(), updates.failedShards.keySet()); + if (updates.startedShards.isEmpty() == false || updates.failedShards.isEmpty() == false) { + ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(entry.shards()); + for (Map.Entry startedShardEntry : updates.startedShards.entrySet()) { + shardsBuilder.put(startedShardEntry.getKey(), startedShardEntry.getValue()); } - - if (shardsBuilder != null) { - ImmutableOpenMap shards = shardsBuilder.build(); - if (!completed(shards)) { - entries.add(new RestoreInProgress.Entry(entry.snapshot(), RestoreInProgress.State.STARTED, entry.indices(), shards)); - } else { - logger.info("restore [{}] is done", entry.snapshot()); - if (batchedRestoreInfo == null) { - batchedRestoreInfo = new HashMap<>(); - } - assert !batchedRestoreInfo.containsKey(entry.snapshot()); - batchedRestoreInfo.put(entry.snapshot(), - new Tuple<>( - new RestoreInfo(entry.snapshot().getSnapshotId().getName(), - entry.indices(), - shards.size(), - shards.size() - failedShards(shards)), - shards)); - } - } else { - entries.add(entry); + for (Map.Entry failedShardEntry : updates.failedShards.entrySet()) { + shardsBuilder.put(failedShardEntry.getKey(), failedShardEntry.getValue()); } - } - - if (changedCount > 0) { - logger.trace("changed cluster state triggered by {} snapshot restore state updates", changedCount); - - final RestoreInProgress updatedRestore = new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()])); - return ClusterState.builder(currentState).putCustom(RestoreInProgress.TYPE, updatedRestore).build(); + ImmutableOpenMap shards = shardsBuilder.build(); + RestoreInProgress.State newState = overallState(RestoreInProgress.State.STARTED, shards); + entries.add(new RestoreInProgress.Entry(entry.snapshot(), newState, entry.indices(), shards)); + } else { + entries.add(entry); } } - return currentState; + return new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()])); + } else { + return oldRestore; } + } - @Override - public void onFailure(String source, @Nullable Exception e) { - for (UpdateIndexShardRestoreStatusRequest request : drainedRequests) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]", request.snapshot(), request.shardId(), request.status()), e); - } - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (batchedRestoreInfo != null) { - for (final Entry>> entry : batchedRestoreInfo.entrySet()) { - final Snapshot snapshot = entry.getKey(); - final RestoreInfo restoreInfo = entry.getValue().v1(); - final ImmutableOpenMap shards = entry.getValue().v2(); - RoutingTable routingTable = newState.getRoutingTable(); - final List waitForStarted = new ArrayList<>(); - for (ObjectObjectCursor shard : shards) { - if (shard.value.state() == RestoreInProgress.State.SUCCESS ) { - ShardId shardId = shard.key; - ShardRouting shardRouting = findPrimaryShard(routingTable, shardId); - if (shardRouting != null && !shardRouting.active()) { - logger.trace("[{}][{}] waiting for the shard to start", snapshot, shardId); - waitForStarted.add(shardId); - } - } - } - if (waitForStarted.isEmpty()) { - notifyListeners(snapshot, restoreInfo); - } else { - clusterService.addLast(new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent event) { - if (event.routingTableChanged()) { - RoutingTable routingTable = event.state().getRoutingTable(); - for (Iterator iterator = waitForStarted.iterator(); iterator.hasNext();) { - ShardId shardId = iterator.next(); - ShardRouting shardRouting = findPrimaryShard(routingTable, shardId); - // Shard disappeared (index deleted) or became active - if (shardRouting == null || shardRouting.active()) { - iterator.remove(); - logger.trace("[{}][{}] shard disappeared or started - removing", snapshot, shardId); - } - } - } - if (waitForStarted.isEmpty()) { - notifyListeners(snapshot, restoreInfo); - clusterService.remove(this); - } - } - }); - } - } - } - } - - private ShardRouting findPrimaryShard(RoutingTable routingTable, ShardId shardId) { - IndexRoutingTable indexRoutingTable = routingTable.index(shardId.getIndex()); - if (indexRoutingTable != null) { - IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId.id()); - if (indexShardRoutingTable != null) { - return indexShardRoutingTable.primaryShard(); - } - } - return null; - } - - private void notifyListeners(Snapshot snapshot, RestoreInfo restoreInfo) { - for (ActionListener listener : listeners) { - try { - listener.onResponse(new RestoreCompletionResponse(snapshot, restoreInfo)); - } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("failed to update snapshot status for [{}]", listener), e); - } - } - } - }); } - private boolean completed(ImmutableOpenMap shards) { + public static RestoreInProgress.Entry restoreInProgress(ClusterState state, Snapshot snapshot) { + final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE); + if (restoreInProgress != null) { + for (RestoreInProgress.Entry e : restoreInProgress.entries()) { + if (e.snapshot().equals(snapshot)) { + return e; + } + } + } + return null; + } + + static class CleanRestoreStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { + + static class Task { + final Snapshot snapshot; + + Task(Snapshot snapshot) { + this.snapshot = snapshot; + } + + @Override + public String toString() { + return "clean restore state for restoring snapshot " + snapshot; + } + } + + private final Logger logger; + + public CleanRestoreStateTaskExecutor(Logger logger) { + this.logger = logger; + } + + @Override + public BatchResult execute(final ClusterState currentState, final List tasks) throws Exception { + final BatchResult.Builder resultBuilder = BatchResult.builder().successes(tasks); + Set completedSnapshots = tasks.stream().map(e -> e.snapshot).collect(Collectors.toSet()); + final List entries = new ArrayList<>(); + final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); + boolean changed = false; + if (restoreInProgress != null) { + for (RestoreInProgress.Entry entry : restoreInProgress.entries()) { + if (completedSnapshots.contains(entry.snapshot()) == false) { + entries.add(entry); + } else { + changed = true; + } + } + } + if (changed == false) { + return resultBuilder.build(currentState); + } + RestoreInProgress updatedRestoreInProgress = new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()])); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(currentState.getCustoms()); + builder.put(RestoreInProgress.TYPE, updatedRestoreInProgress); + ImmutableOpenMap customs = builder.build(); + return resultBuilder.build(ClusterState.builder(currentState).customs(customs).build()); + } + + @Override + public void onFailure(final String source, final Exception e) { + logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); + } + + @Override + public void onNoLongerMaster(String source) { + logger.debug("no longer master while processing restore state update [{}]", source); + } + + } + + private void cleanupRestoreState(ClusterChangedEvent event) { + ClusterState state = event.state(); + + RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE); + if (restoreInProgress != null) { + for (RestoreInProgress.Entry entry : restoreInProgress.entries()) { + if (entry.state().completed()) { + assert completed(entry.shards()) : "state says completed but restore entries are not"; + clusterService.submitStateUpdateTask( + "clean up snapshot restore state", + new CleanRestoreStateTaskExecutor.Task(entry.snapshot()), + ClusterStateTaskConfig.build(Priority.URGENT), + cleanRestoreStateTaskExecutor, + cleanRestoreStateTaskExecutor); + } + } + } + } + + public static RestoreInProgress.State overallState(RestoreInProgress.State nonCompletedState, + ImmutableOpenMap shards) { + boolean hasFailed = false; + for (ObjectCursor status : shards.values()) { + if (!status.value.state().completed()) { + return nonCompletedState; + } + if (status.value.state() == RestoreInProgress.State.FAILURE) { + hasFailed = true; + } + } + if (hasFailed) { + return RestoreInProgress.State.FAILURE; + } else { + return RestoreInProgress.State.SUCCESS; + } + } + + public static boolean completed(ImmutableOpenMap shards) { for (ObjectCursor status : shards.values()) { if (!status.value.state().completed()) { return false; @@ -683,7 +717,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis return true; } - private int failedShards(ImmutableOpenMap shards) { + public static int failedShards(ImmutableOpenMap shards) { int failedShards = 0; for (ObjectCursor status : shards.values()) { if (status.value.state() == RestoreInProgress.State.FAILURE) { @@ -727,53 +761,6 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } } - /** - * Checks if any of the deleted indices are still recovering and fails recovery on the shards of these indices - * - * @param event cluster changed event - */ - private void processDeletedIndices(ClusterChangedEvent event) { - RestoreInProgress restore = event.state().custom(RestoreInProgress.TYPE); - if (restore == null) { - // Not restoring - nothing to do - return; - } - - if (!event.indicesDeleted().isEmpty()) { - // Some indices were deleted, let's make sure all indices that we are restoring still exist - for (RestoreInProgress.Entry entry : restore.entries()) { - List shardsToFail = null; - for (ObjectObjectCursor shard : entry.shards()) { - if (!shard.value.state().completed()) { - if (!event.state().metaData().hasIndex(shard.key.getIndex().getName())) { - if (shardsToFail == null) { - shardsToFail = new ArrayList<>(); - } - shardsToFail.add(shard.key); - } - } - } - if (shardsToFail != null) { - for (ShardId shardId : shardsToFail) { - logger.trace("[{}] failing running shard restore [{}]", entry.snapshot(), shardId); - updateRestoreStateOnMaster(new UpdateIndexShardRestoreStatusRequest(entry.snapshot(), shardId, new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, "index was deleted"))); - } - } - } - } - } - - /** - * Fails the given snapshot restore operation for the given shard - */ - public void failRestore(Snapshot snapshot, ShardId shardId) { - logger.debug("[{}] failed to restore shard [{}]", snapshot, shardId); - UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshot, shardId, - new ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(), RestoreInProgress.State.FAILURE)); - transportService.sendRequest(clusterService.state().nodes().getMasterNode(), - UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); - } - private boolean failed(SnapshotInfo snapshot, String index) { for (SnapshotShardFailure failure : snapshot.shardFailures()) { if (index.equals(failure.index())) { @@ -810,34 +797,11 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } } - /** - * Adds restore completion listener - *

- * This listener is called for each snapshot that finishes restore operation in the cluster. It's responsibility of - * the listener to decide if it's called for the appropriate snapshot or not. - * - * @param listener restore completion listener - */ - public void addListener(ActionListener listener) { - this.listeners.add(listener); - } - - /** - * Removes restore completion listener - *

- * This listener is called for each snapshot that finishes restore operation in the cluster. - * - * @param listener restore completion listener - */ - public void removeListener(ActionListener listener) { - this.listeners.remove(listener); - } - @Override public void clusterChanged(ClusterChangedEvent event) { try { if (event.localNodeMaster()) { - processDeletedIndices(event); + cleanupRestoreState(event); } } catch (Exception t) { logger.warn("Failed to update restore state ", t); @@ -1061,69 +1025,4 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } } - - /** - * Internal class that is used to send notifications about finished shard restore operations to master node - */ - public static class UpdateIndexShardRestoreStatusRequest extends TransportRequest { - private Snapshot snapshot; - private ShardId shardId; - private ShardRestoreStatus status; - - volatile boolean processed; // state field, no need to serialize - - public UpdateIndexShardRestoreStatusRequest() { - - } - - private UpdateIndexShardRestoreStatusRequest(Snapshot snapshot, ShardId shardId, ShardRestoreStatus status) { - this.snapshot = snapshot; - this.shardId = shardId; - this.status = status; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - snapshot = new Snapshot(in); - shardId = ShardId.readShardId(in); - status = ShardRestoreStatus.readShardRestoreStatus(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - snapshot.writeTo(out); - shardId.writeTo(out); - status.writeTo(out); - } - - public Snapshot snapshot() { - return snapshot; - } - - public ShardId shardId() { - return shardId; - } - - public ShardRestoreStatus status() { - return status; - } - - @Override - public String toString() { - return "" + snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; - } - } - - /** - * Internal class that is used to send notifications about finished shard restore operations to master node - */ - class UpdateRestoreStateRequestHandler implements TransportRequestHandler { - @Override - public void messageReceived(UpdateIndexShardRestoreStatusRequest request, final TransportChannel channel) throws Exception { - updateRestoreStateOnMaster(request); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } } diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesServiceTests.java index a11c74d657b..8a342057dab 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesServiceTests.java @@ -33,7 +33,7 @@ import java.util.Collection; import static java.util.Collections.singletonList; import static org.hamcrest.Matchers.contains; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyCollectionOf; +import static org.mockito.Matchers.anySetOf; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -45,7 +45,7 @@ public class MetaDataIndexAliasesServiceTests extends ESTestCase { public MetaDataIndexAliasesServiceTests() { // Mock any deletes so we don't need to worry about how MetaDataDeleteIndexService does its job - when(deleteIndexService.deleteIndices(any(ClusterState.class), anyCollectionOf(Index.class))).then(i -> { + when(deleteIndexService.deleteIndices(any(ClusterState.class), anySetOf(Index.class))).then(i -> { ClusterState state = (ClusterState) i.getArguments()[0]; @SuppressWarnings("unchecked") Collection indices = (Collection) i.getArguments()[1]; diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 3e43cc83045..fb55f5bb767 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -686,6 +686,46 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> total number of simulated failures during restore: [{}]", getFailureCount("test-repo")); } + public void testDataFileCorruptionDuringRestore() throws Exception { + Path repositoryLocation = randomRepoPath(); + Client client = client(); + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(Settings.builder().put("location", repositoryLocation))); + + prepareCreate("test-idx").setSettings(Settings.builder().put("index.allocation.max_retries", Integer.MAX_VALUE)).get(); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().totalHits(), equalTo(100L)); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(createSnapshotResponse.getSnapshotInfo().successfulShards())); + + logger.info("--> update repository with mock version"); + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("mock").setSettings( + Settings.builder() + .put("location", repositoryLocation) + .put("random", randomAsciiOfLength(10)) + .put("use_lucene_corruption", true) + .put("random_data_file_io_exception_rate", 1.0))); + + // Test restore after index deletion + logger.info("--> delete index"); + cluster().wipeIndices("test-idx"); + logger.info("--> restore corrupt index"); + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards())); + } + public void testDeletionOfFailingToRecoverIndexShouldStopRestore() throws Exception { Path repositoryLocation = randomRepoPath(); Client client = client(); @@ -2199,32 +2239,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertFalse(snapshotListener.timedOut()); // Check that cluster state update task was called only once assertEquals(1, snapshotListener.count()); - - logger.info("--> close indices"); - client.admin().indices().prepareClose("test-idx").get(); - - BlockingClusterStateListener restoreListener = new BlockingClusterStateListener(clusterService, "restore_snapshot[", "update snapshot state", Priority.HIGH); - - try { - clusterService.addFirst(restoreListener); - logger.info("--> restore snapshot"); - ListenableActionFuture futureRestore = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute(); - - // Await until shard updates are in pending state. - assertBusyPendingTasks("update snapshot state", numberOfShards); - restoreListener.unblock(); - - RestoreSnapshotResponse restoreSnapshotResponse = futureRestore.actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(numberOfShards)); - - } finally { - clusterService.remove(restoreListener); - } - - // Check that we didn't timeout - assertFalse(restoreListener.timedOut()); - // Check that cluster state update task was called only once - assertEquals(1, restoreListener.count()); } public void testSnapshotName() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 06c4ec10af0..ca3aeb674bd 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -81,6 +82,8 @@ public class MockRepository extends FsRepository { private final double randomDataFileIOExceptionRate; + private final boolean useLuceneCorruptionException; + private final long maximumNumberOfFailures; private final long waitAfterUnblock; @@ -101,6 +104,7 @@ public class MockRepository extends FsRepository { super(overrideSettings(metadata, environment), environment); randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); + useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false); maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L); blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false); blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false); @@ -245,7 +249,11 @@ public class MockRepository extends FsRepository { if (blobName.startsWith("__")) { if (shouldFail(blobName, randomDataFileIOExceptionRate) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) { logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path()); - throw new IOException("Random IOException"); + if (useLuceneCorruptionException) { + throw new CorruptIndexException("Random corruption", "random file"); + } else { + throw new IOException("Random IOException"); + } } else if (blockOnDataFiles) { logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path()); if (blockExecution() && waitAfterUnblock > 0) {