diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index 070db6c5248..3ef12dfff0e 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -22,19 +22,27 @@ package org.elasticsearch.action.admin.cluster.snapshots.restore; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; + /** * Transport action for restore snapshot operation */ @@ -78,28 +86,44 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction() { + restoreService.restoreSnapshot(restoreRequest, new ActionListener() { @Override - public void onResponse(RestoreInfo restoreInfo) { - if (restoreInfo == null && request.waitForCompletion()) { - restoreService.addListener(new ActionListener() { + public void onResponse(RestoreCompletionResponse restoreCompletionResponse) { + if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { + final Snapshot snapshot = restoreCompletionResponse.getSnapshot(); + + ClusterStateListener clusterStateListener = new ClusterStateListener() { @Override - public void onResponse(RestoreService.RestoreCompletionResponse restoreCompletionResponse) { - final Snapshot snapshot = restoreCompletionResponse.getSnapshot(); - if (snapshot.getRepository().equals(request.repository()) && - snapshot.getSnapshotId().getName().equals(request.snapshot())) { - listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); - restoreService.removeListener(this); + public void clusterChanged(ClusterChangedEvent changedEvent) { + final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), snapshot); + final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), snapshot); + if (prevEntry == null) { + // When there is a master failure after a restore has been started, this listener might not be registered + // on the current master and as such it might miss some intermediary cluster states due to batching. + // Clean up listener in that case and acknowledge completion of restore operation to client. + clusterService.remove(this); + listener.onResponse(new RestoreSnapshotResponse(null)); + } else if (newEntry == null) { + clusterService.remove(this); + ImmutableOpenMap shards = prevEntry.shards(); + assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state(); + assert RestoreService.completed(shards) : "expected all restore entries to be completed"; + RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(), + prevEntry.indices(), + shards.size(), + shards.size() - RestoreService.failedShards(shards)); + RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri); + logger.debug("restore of [{}] completed", snapshot); + listener.onResponse(response); + } else { + // restore not completed yet, wait for next cluster state update } } + }; - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + clusterService.addLast(clusterStateListener); } else { - listener.onResponse(new RestoreSnapshotResponse(restoreInfo)); + listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index 22553dd9929..aa4fc847309 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -23,20 +23,22 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.delete.DeleteIndexClusterStateUpdateRequest; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; +import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotsService; -import java.util.Arrays; -import java.util.Collection; import java.util.Set; import static java.util.stream.Collectors.toSet; @@ -73,7 +75,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { @Override public ClusterState execute(final ClusterState currentState) { - return deleteIndices(currentState, Arrays.asList(request.indices())); + return deleteIndices(currentState, Sets.newHashSet(request.indices())); } }); } @@ -81,7 +83,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { /** * Delete some indices from the cluster state. */ - public ClusterState deleteIndices(ClusterState currentState, Collection indices) { + public ClusterState deleteIndices(ClusterState currentState, Set indices) { final MetaData meta = currentState.metaData(); final Set metaDatas = indices.stream().map(i -> meta.getIndexSafe(i)).collect(toSet()); // Check if index deletion conflicts with any running snapshots @@ -107,11 +109,25 @@ public class MetaDataDeleteIndexService extends AbstractComponent { MetaData newMetaData = metaDataBuilder.build(); ClusterBlocks blocks = clusterBlocksBuilder.build(); + + // update snapshot restore entries + ImmutableOpenMap customs = currentState.getCustoms(); + final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); + if (restoreInProgress != null) { + RestoreInProgress updatedRestoreInProgress = RestoreService.updateRestoreStateWithDeletedIndices(restoreInProgress, indices); + if (updatedRestoreInProgress != restoreInProgress) { + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(customs); + builder.put(RestoreInProgress.TYPE, updatedRestoreInProgress); + customs = builder.build(); + } + } + return allocationService.reroute( ClusterState.builder(currentState) .routingTable(routingTableBuilder.build()) .metaData(newMetaData) .blocks(blocks) + .customs(customs) .build(), "deleted indices [" + indices + "]"); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 323adf78046..b3eaa517934 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -34,6 +35,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -94,17 +96,24 @@ public class AllocationService extends AbstractComponent { } protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, RoutingAllocation allocation, String reason) { - return buildResultAndLogHealthChange(oldState, allocation, reason, new RoutingExplanations()); - } - - protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, RoutingAllocation allocation, String reason, - RoutingExplanations explanations) { RoutingTable oldRoutingTable = oldState.routingTable(); RoutingNodes newRoutingNodes = allocation.routingNodes(); final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(oldRoutingTable.version(), newRoutingNodes).build(); MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges(newRoutingTable); assert newRoutingTable.validate(newMetaData); // validates the routing table is coherent with the cluster state metadata - final ClusterState newState = ClusterState.builder(oldState).routingTable(newRoutingTable).metaData(newMetaData).build(); + final ClusterState.Builder newStateBuilder = ClusterState.builder(oldState) + .routingTable(newRoutingTable) + .metaData(newMetaData); + final RestoreInProgress restoreInProgress = allocation.custom(RestoreInProgress.TYPE); + if (restoreInProgress != null) { + RestoreInProgress updatedRestoreInProgress = allocation.updateRestoreInfoWithRoutingChanges(restoreInProgress); + if (updatedRestoreInProgress != restoreInProgress) { + ImmutableOpenMap.Builder customsBuilder = ImmutableOpenMap.builder(allocation.getCustoms()); + customsBuilder.put(RestoreInProgress.TYPE, updatedRestoreInProgress); + newStateBuilder.customs(customsBuilder.build()); + } + } + final ClusterState newState = newStateBuilder.build(); logClusterHealthStateChange( new ClusterStateHealth(oldState), new ClusterStateHealth(newState), diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 8429493b0e7..886b42f57d6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingChangesObserver; @@ -30,6 +31,8 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.RestoreService.RestoreInProgressUpdater; import java.util.HashMap; import java.util.HashSet; @@ -76,8 +79,9 @@ public class RoutingAllocation { private final IndexMetaDataUpdater indexMetaDataUpdater = new IndexMetaDataUpdater(); private final RoutingNodesChangedObserver nodesChangedObserver = new RoutingNodesChangedObserver(); + private final RestoreInProgressUpdater restoreInProgressUpdater = new RestoreInProgressUpdater(); private final RoutingChangesObserver routingChangesObserver = new RoutingChangesObserver.DelegatingRoutingChangesObserver( - nodesChangedObserver, indexMetaDataUpdater + nodesChangedObserver, indexMetaDataUpdater, restoreInProgressUpdater ); @@ -154,6 +158,10 @@ public class RoutingAllocation { return (T)customs.get(key); } + public ImmutableOpenMap getCustoms() { + return customs; + } + /** * Get explanations of current routing * @return explanation of routing @@ -234,6 +242,13 @@ public class RoutingAllocation { return indexMetaDataUpdater.applyChanges(metaData, newRoutingTable); } + /** + * Returns updated {@link RestoreInProgress} based on the changes that were made to the routing nodes + */ + public RestoreInProgress updateRestoreInfoWithRoutingChanges(RestoreInProgress restoreInProgress) { + return restoreInProgressUpdater.applyChanges(restoreInProgress); + } + /** * Returns true iff changes were made to the routing nodes */ diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index bbbcfc96d57..b55074f592d 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -570,8 +570,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple /** * Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard - * routing to *require* peer recovery, use {@link ShardRouting#recoverySource()} to - * check if its needed or not. + * routing to *require* peer recovery, use {@link ShardRouting#recoverySource()} to check if its needed or not. */ private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) { @@ -610,29 +609,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple @Override public void onRecoveryDone(RecoveryState state) { - if (state.getRecoverySource().getType() == Type.SNAPSHOT) { - SnapshotRecoverySource snapshotRecoverySource = (SnapshotRecoverySource) state.getRecoverySource(); - restoreService.indexShardRestoreCompleted(snapshotRecoverySource.snapshot(), shardRouting.shardId()); - } shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); } @Override public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { - if (state.getRecoverySource().getType() == Type.SNAPSHOT) { - try { - if (Lucene.isCorruptionException(e.getCause())) { - SnapshotRecoverySource snapshotRecoverySource = (SnapshotRecoverySource) state.getRecoverySource(); - restoreService.failRestore(snapshotRecoverySource.snapshot(), shardRouting.shardId()); - } - } catch (Exception inner) { - e.addSuppressed(inner); - } finally { - handleRecoveryFailure(shardRouting, sendShardFailure, e); - } - } else { - handleRecoveryFailure(shardRouting, sendShardFailure, e); - } + handleRecoveryFailure(shardRouting, sendShardFailure, e); } } diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 5e30a3b52b3..ed6317fcb30 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.Version; @@ -30,6 +31,9 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus; @@ -41,41 +45,32 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.metadata.RepositoriesMetaData; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.elasticsearch.cluster.routing.RoutingChangesObserver; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -83,12 +78,10 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; import static java.util.Collections.unmodifiableSet; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; @@ -117,14 +110,12 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet; * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * at the {@link ShardRouting#recoverySource()} property. *

- * At the end of the successful restore process {@code IndexShardSnapshotAndRestoreService} calls {@link #indexShardRestoreCompleted(Snapshot, ShardId)}, - * which updates {@link RestoreInProgress} in cluster state or removes it when all shards are completed. In case of + * At the end of the successful restore process {@code RestoreService} calls {@link #cleanupRestoreState(ClusterChangedEvent)}, + * which removes {@link RestoreInProgress} when all shards are completed. In case of * restore failure a normal recovery fail-over process kicks in. */ public class RestoreService extends AbstractComponent implements ClusterStateListener { - public static final String UPDATE_RESTORE_ACTION_NAME = "internal:cluster/snapshot/update_restore"; - private static final Set UNMODIFIABLE_SETTINGS = unmodifiableSet(newHashSet( SETTING_NUMBER_OF_SHARDS, SETTING_VERSION_CREATED, @@ -148,33 +139,29 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis private final RepositoriesService repositoriesService; - private final TransportService transportService; - private final AllocationService allocationService; private final MetaDataCreateIndexService createIndexService; private final MetaDataIndexUpgradeService metaDataIndexUpgradeService; - private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); - - private final BlockingQueue updatedSnapshotStateQueue = ConcurrentCollections.newBlockingQueue(); private final ClusterSettings clusterSettings; + private final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor; + @Inject - public RestoreService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, TransportService transportService, + public RestoreService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, AllocationService allocationService, MetaDataCreateIndexService createIndexService, MetaDataIndexUpgradeService metaDataIndexUpgradeService, ClusterSettings clusterSettings) { super(settings); this.clusterService = clusterService; this.repositoriesService = repositoriesService; - this.transportService = transportService; this.allocationService = allocationService; this.createIndexService = createIndexService; this.metaDataIndexUpgradeService = metaDataIndexUpgradeService; - transportService.registerRequestHandler(UPDATE_RESTORE_ACTION_NAME, UpdateIndexShardRestoreStatusRequest::new, ThreadPool.Names.SAME, new UpdateRestoreStateRequestHandler()); clusterService.add(this); this.clusterSettings = clusterSettings; + this.cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(logger); } /** @@ -183,7 +170,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis * @param request restore request * @param listener restore listener */ - public void restoreSnapshot(final RestoreRequest request, final ActionListener listener) { + public void restoreSnapshot(final RestoreRequest request, final ActionListener listener) { try { // Read snapshot info and metadata from the repository Repository repository = repositoriesService.repository(request.repositoryName); @@ -314,7 +301,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } shards = shardsBuilder.build(); - RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshot, RestoreInProgress.State.INIT, Collections.unmodifiableList(new ArrayList<>(renamedIndices.keySet())), shards); + RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshot, overallState(RestoreInProgress.State.INIT, shards), Collections.unmodifiableList(new ArrayList<>(renamedIndices.keySet())), shards); builder.putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restoreEntry)); } else { shards = ImmutableOpenMap.of(); @@ -469,7 +456,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(restoreInfo); + listener.onResponse(new RestoreCompletionResponse(snapshot, restoreInfo)); } }); @@ -480,19 +467,33 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } } - /** - * This method is used by {@link IndexShard} to notify - * {@code RestoreService} about shard restore completion. - * - * @param snapshot snapshot - * @param shardId shard id - */ - public void indexShardRestoreCompleted(Snapshot snapshot, ShardId shardId) { - logger.trace("[{}] successfully restored shard [{}]", snapshot, shardId); - UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshot, shardId, - new ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(), RestoreInProgress.State.SUCCESS)); - transportService.sendRequest(clusterService.state().nodes().getMasterNode(), - UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); + public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set deletedIndices) { + boolean changesMade = false; + final List entries = new ArrayList<>(); + for (RestoreInProgress.Entry entry : oldRestore.entries()) { + ImmutableOpenMap.Builder shardsBuilder = null; + for (ObjectObjectCursor cursor : entry.shards()) { + ShardId shardId = cursor.key; + if (deletedIndices.contains(shardId.getIndex())) { + changesMade = true; + if (shardsBuilder == null) { + shardsBuilder = ImmutableOpenMap.builder(entry.shards()); + } + shardsBuilder.put(shardId, new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, "index was deleted")); + } + } + if (shardsBuilder != null) { + ImmutableOpenMap shards = shardsBuilder.build(); + entries.add(new RestoreInProgress.Entry(entry.snapshot(), overallState(RestoreInProgress.State.STARTED, shards), entry.indices(), shards)); + } else { + entries.add(entry); + } + } + if (changesMade) { + return new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()])); + } else { + return oldRestore; + } } public static final class RestoreCompletionResponse { @@ -513,168 +514,201 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } } - /** - * Updates shard restore record in the cluster state. - * - * @param request update shard status request - */ - private void updateRestoreStateOnMaster(final UpdateIndexShardRestoreStatusRequest request) { - logger.trace("received updated snapshot restore state [{}]", request); - updatedSnapshotStateQueue.add(request); + public static class RestoreInProgressUpdater extends RoutingChangesObserver.AbstractRoutingChangesObserver { + private final Map shardChanges = new HashMap<>(); - clusterService.submitStateUpdateTask("update snapshot state", new ClusterStateUpdateTask() { - private final List drainedRequests = new ArrayList<>(); - private Map>> batchedRestoreInfo = null; - - @Override - public ClusterState execute(ClusterState currentState) { - - if (request.processed) { - return currentState; + @Override + public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) { + // mark snapshot as completed + if (initializingShard.primary()) { + RecoverySource recoverySource = initializingShard.recoverySource(); + if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { + Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot(); + changes(snapshot).startedShards.put(initializingShard.shardId(), + new ShardRestoreStatus(initializingShard.currentNodeId(), RestoreInProgress.State.SUCCESS)); } + } + } - updatedSnapshotStateQueue.drainTo(drainedRequests); - - final int batchSize = drainedRequests.size(); - - // nothing to process (a previous event has processed it already) - if (batchSize == 0) { - return currentState; + @Override + public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) { + if (failedShard.primary() && failedShard.initializing()) { + RecoverySource recoverySource = failedShard.recoverySource(); + if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { + Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot(); + // mark restore entry for this shard as failed when it's due to a file corruption. There is no need wait on retries + // to restore this shard on another node if the snapshot files are corrupt. In case where a node just left or crashed, + // however, we only want to acknowledge the restore operation once it has been successfully restored on another node. + if (unassignedInfo.getFailure() != null && Lucene.isCorruptionException(unassignedInfo.getFailure().getCause())) { + changes(snapshot).failedShards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(), + RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage())); + } } + } + } - final RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE); - if (restore != null) { - int changedCount = 0; - final List entries = new ArrayList<>(); - for (RestoreInProgress.Entry entry : restore.entries()) { - ImmutableOpenMap.Builder shardsBuilder = null; + @Override + public void shardInitialized(ShardRouting unassignedShard, ShardRouting initializedShard) { + // if we force an empty primary, we should also fail the restore entry + if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT && + initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) { + Snapshot snapshot = ((SnapshotRecoverySource) unassignedShard.recoverySource()).snapshot(); + changes(snapshot).failedShards.put(unassignedShard.shardId(), new ShardRestoreStatus(null, + RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource())); + } + } - for (int i = 0; i < batchSize; i++) { - final UpdateIndexShardRestoreStatusRequest updateSnapshotState = drainedRequests.get(i); - updateSnapshotState.processed = true; + /** + * Helper method that creates update entry for the given shard id if such an entry does not exist yet. + */ + private Updates changes(Snapshot snapshot) { + return shardChanges.computeIfAbsent(snapshot, k -> new Updates()); + } - if (entry.snapshot().equals(updateSnapshotState.snapshot())) { - logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state()); - if (shardsBuilder == null) { - shardsBuilder = ImmutableOpenMap.builder(entry.shards()); - } - shardsBuilder.put(updateSnapshotState.shardId(), updateSnapshotState.status()); - changedCount++; - } + private static class Updates { + private Map failedShards = new HashMap<>(); + private Map startedShards = new HashMap<>(); + } + + public RestoreInProgress applyChanges(RestoreInProgress oldRestore) { + if (shardChanges.isEmpty() == false) { + final List entries = new ArrayList<>(); + for (RestoreInProgress.Entry entry : oldRestore.entries()) { + Snapshot snapshot = entry.snapshot(); + Updates updates = shardChanges.get(snapshot); + assert Sets.haveEmptyIntersection(updates.startedShards.keySet(), updates.failedShards.keySet()); + if (updates.startedShards.isEmpty() == false || updates.failedShards.isEmpty() == false) { + ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(entry.shards()); + for (Map.Entry startedShardEntry : updates.startedShards.entrySet()) { + shardsBuilder.put(startedShardEntry.getKey(), startedShardEntry.getValue()); } - - if (shardsBuilder != null) { - ImmutableOpenMap shards = shardsBuilder.build(); - if (!completed(shards)) { - entries.add(new RestoreInProgress.Entry(entry.snapshot(), RestoreInProgress.State.STARTED, entry.indices(), shards)); - } else { - logger.info("restore [{}] is done", entry.snapshot()); - if (batchedRestoreInfo == null) { - batchedRestoreInfo = new HashMap<>(); - } - assert !batchedRestoreInfo.containsKey(entry.snapshot()); - batchedRestoreInfo.put(entry.snapshot(), - new Tuple<>( - new RestoreInfo(entry.snapshot().getSnapshotId().getName(), - entry.indices(), - shards.size(), - shards.size() - failedShards(shards)), - shards)); - } - } else { - entries.add(entry); + for (Map.Entry failedShardEntry : updates.failedShards.entrySet()) { + shardsBuilder.put(failedShardEntry.getKey(), failedShardEntry.getValue()); } - } - - if (changedCount > 0) { - logger.trace("changed cluster state triggered by {} snapshot restore state updates", changedCount); - - final RestoreInProgress updatedRestore = new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()])); - return ClusterState.builder(currentState).putCustom(RestoreInProgress.TYPE, updatedRestore).build(); + ImmutableOpenMap shards = shardsBuilder.build(); + RestoreInProgress.State newState = overallState(RestoreInProgress.State.STARTED, shards); + entries.add(new RestoreInProgress.Entry(entry.snapshot(), newState, entry.indices(), shards)); + } else { + entries.add(entry); } } - return currentState; + return new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()])); + } else { + return oldRestore; } + } - @Override - public void onFailure(String source, @Nullable Exception e) { - for (UpdateIndexShardRestoreStatusRequest request : drainedRequests) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]", request.snapshot(), request.shardId(), request.status()), e); - } - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (batchedRestoreInfo != null) { - for (final Entry>> entry : batchedRestoreInfo.entrySet()) { - final Snapshot snapshot = entry.getKey(); - final RestoreInfo restoreInfo = entry.getValue().v1(); - final ImmutableOpenMap shards = entry.getValue().v2(); - RoutingTable routingTable = newState.getRoutingTable(); - final List waitForStarted = new ArrayList<>(); - for (ObjectObjectCursor shard : shards) { - if (shard.value.state() == RestoreInProgress.State.SUCCESS ) { - ShardId shardId = shard.key; - ShardRouting shardRouting = findPrimaryShard(routingTable, shardId); - if (shardRouting != null && !shardRouting.active()) { - logger.trace("[{}][{}] waiting for the shard to start", snapshot, shardId); - waitForStarted.add(shardId); - } - } - } - if (waitForStarted.isEmpty()) { - notifyListeners(snapshot, restoreInfo); - } else { - clusterService.addLast(new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent event) { - if (event.routingTableChanged()) { - RoutingTable routingTable = event.state().getRoutingTable(); - for (Iterator iterator = waitForStarted.iterator(); iterator.hasNext();) { - ShardId shardId = iterator.next(); - ShardRouting shardRouting = findPrimaryShard(routingTable, shardId); - // Shard disappeared (index deleted) or became active - if (shardRouting == null || shardRouting.active()) { - iterator.remove(); - logger.trace("[{}][{}] shard disappeared or started - removing", snapshot, shardId); - } - } - } - if (waitForStarted.isEmpty()) { - notifyListeners(snapshot, restoreInfo); - clusterService.remove(this); - } - } - }); - } - } - } - } - - private ShardRouting findPrimaryShard(RoutingTable routingTable, ShardId shardId) { - IndexRoutingTable indexRoutingTable = routingTable.index(shardId.getIndex()); - if (indexRoutingTable != null) { - IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId.id()); - if (indexShardRoutingTable != null) { - return indexShardRoutingTable.primaryShard(); - } - } - return null; - } - - private void notifyListeners(Snapshot snapshot, RestoreInfo restoreInfo) { - for (ActionListener listener : listeners) { - try { - listener.onResponse(new RestoreCompletionResponse(snapshot, restoreInfo)); - } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("failed to update snapshot status for [{}]", listener), e); - } - } - } - }); } - private boolean completed(ImmutableOpenMap shards) { + public static RestoreInProgress.Entry restoreInProgress(ClusterState state, Snapshot snapshot) { + final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE); + if (restoreInProgress != null) { + for (RestoreInProgress.Entry e : restoreInProgress.entries()) { + if (e.snapshot().equals(snapshot)) { + return e; + } + } + } + return null; + } + + static class CleanRestoreStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { + + static class Task { + final Snapshot snapshot; + + Task(Snapshot snapshot) { + this.snapshot = snapshot; + } + + @Override + public String toString() { + return "clean restore state for restoring snapshot " + snapshot; + } + } + + private final Logger logger; + + public CleanRestoreStateTaskExecutor(Logger logger) { + this.logger = logger; + } + + @Override + public BatchResult execute(final ClusterState currentState, final List tasks) throws Exception { + final BatchResult.Builder resultBuilder = BatchResult.builder().successes(tasks); + Set completedSnapshots = tasks.stream().map(e -> e.snapshot).collect(Collectors.toSet()); + final List entries = new ArrayList<>(); + final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); + boolean changed = false; + if (restoreInProgress != null) { + for (RestoreInProgress.Entry entry : restoreInProgress.entries()) { + if (completedSnapshots.contains(entry.snapshot()) == false) { + entries.add(entry); + } else { + changed = true; + } + } + } + if (changed == false) { + return resultBuilder.build(currentState); + } + RestoreInProgress updatedRestoreInProgress = new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()])); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(currentState.getCustoms()); + builder.put(RestoreInProgress.TYPE, updatedRestoreInProgress); + ImmutableOpenMap customs = builder.build(); + return resultBuilder.build(ClusterState.builder(currentState).customs(customs).build()); + } + + @Override + public void onFailure(final String source, final Exception e) { + logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); + } + + @Override + public void onNoLongerMaster(String source) { + logger.debug("no longer master while processing restore state update [{}]", source); + } + + } + + private void cleanupRestoreState(ClusterChangedEvent event) { + ClusterState state = event.state(); + + RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE); + if (restoreInProgress != null) { + for (RestoreInProgress.Entry entry : restoreInProgress.entries()) { + if (entry.state().completed()) { + assert completed(entry.shards()) : "state says completed but restore entries are not"; + clusterService.submitStateUpdateTask( + "clean up snapshot restore state", + new CleanRestoreStateTaskExecutor.Task(entry.snapshot()), + ClusterStateTaskConfig.build(Priority.URGENT), + cleanRestoreStateTaskExecutor, + cleanRestoreStateTaskExecutor); + } + } + } + } + + public static RestoreInProgress.State overallState(RestoreInProgress.State nonCompletedState, + ImmutableOpenMap shards) { + boolean hasFailed = false; + for (ObjectCursor status : shards.values()) { + if (!status.value.state().completed()) { + return nonCompletedState; + } + if (status.value.state() == RestoreInProgress.State.FAILURE) { + hasFailed = true; + } + } + if (hasFailed) { + return RestoreInProgress.State.FAILURE; + } else { + return RestoreInProgress.State.SUCCESS; + } + } + + public static boolean completed(ImmutableOpenMap shards) { for (ObjectCursor status : shards.values()) { if (!status.value.state().completed()) { return false; @@ -683,7 +717,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis return true; } - private int failedShards(ImmutableOpenMap shards) { + public static int failedShards(ImmutableOpenMap shards) { int failedShards = 0; for (ObjectCursor status : shards.values()) { if (status.value.state() == RestoreInProgress.State.FAILURE) { @@ -727,53 +761,6 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } } - /** - * Checks if any of the deleted indices are still recovering and fails recovery on the shards of these indices - * - * @param event cluster changed event - */ - private void processDeletedIndices(ClusterChangedEvent event) { - RestoreInProgress restore = event.state().custom(RestoreInProgress.TYPE); - if (restore == null) { - // Not restoring - nothing to do - return; - } - - if (!event.indicesDeleted().isEmpty()) { - // Some indices were deleted, let's make sure all indices that we are restoring still exist - for (RestoreInProgress.Entry entry : restore.entries()) { - List shardsToFail = null; - for (ObjectObjectCursor shard : entry.shards()) { - if (!shard.value.state().completed()) { - if (!event.state().metaData().hasIndex(shard.key.getIndex().getName())) { - if (shardsToFail == null) { - shardsToFail = new ArrayList<>(); - } - shardsToFail.add(shard.key); - } - } - } - if (shardsToFail != null) { - for (ShardId shardId : shardsToFail) { - logger.trace("[{}] failing running shard restore [{}]", entry.snapshot(), shardId); - updateRestoreStateOnMaster(new UpdateIndexShardRestoreStatusRequest(entry.snapshot(), shardId, new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, "index was deleted"))); - } - } - } - } - } - - /** - * Fails the given snapshot restore operation for the given shard - */ - public void failRestore(Snapshot snapshot, ShardId shardId) { - logger.debug("[{}] failed to restore shard [{}]", snapshot, shardId); - UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshot, shardId, - new ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(), RestoreInProgress.State.FAILURE)); - transportService.sendRequest(clusterService.state().nodes().getMasterNode(), - UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); - } - private boolean failed(SnapshotInfo snapshot, String index) { for (SnapshotShardFailure failure : snapshot.shardFailures()) { if (index.equals(failure.index())) { @@ -810,34 +797,11 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } } - /** - * Adds restore completion listener - *

- * This listener is called for each snapshot that finishes restore operation in the cluster. It's responsibility of - * the listener to decide if it's called for the appropriate snapshot or not. - * - * @param listener restore completion listener - */ - public void addListener(ActionListener listener) { - this.listeners.add(listener); - } - - /** - * Removes restore completion listener - *

- * This listener is called for each snapshot that finishes restore operation in the cluster. - * - * @param listener restore completion listener - */ - public void removeListener(ActionListener listener) { - this.listeners.remove(listener); - } - @Override public void clusterChanged(ClusterChangedEvent event) { try { if (event.localNodeMaster()) { - processDeletedIndices(event); + cleanupRestoreState(event); } } catch (Exception t) { logger.warn("Failed to update restore state ", t); @@ -1061,69 +1025,4 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } } - - /** - * Internal class that is used to send notifications about finished shard restore operations to master node - */ - public static class UpdateIndexShardRestoreStatusRequest extends TransportRequest { - private Snapshot snapshot; - private ShardId shardId; - private ShardRestoreStatus status; - - volatile boolean processed; // state field, no need to serialize - - public UpdateIndexShardRestoreStatusRequest() { - - } - - private UpdateIndexShardRestoreStatusRequest(Snapshot snapshot, ShardId shardId, ShardRestoreStatus status) { - this.snapshot = snapshot; - this.shardId = shardId; - this.status = status; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - snapshot = new Snapshot(in); - shardId = ShardId.readShardId(in); - status = ShardRestoreStatus.readShardRestoreStatus(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - snapshot.writeTo(out); - shardId.writeTo(out); - status.writeTo(out); - } - - public Snapshot snapshot() { - return snapshot; - } - - public ShardId shardId() { - return shardId; - } - - public ShardRestoreStatus status() { - return status; - } - - @Override - public String toString() { - return "" + snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; - } - } - - /** - * Internal class that is used to send notifications about finished shard restore operations to master node - */ - class UpdateRestoreStateRequestHandler implements TransportRequestHandler { - @Override - public void messageReceived(UpdateIndexShardRestoreStatusRequest request, final TransportChannel channel) throws Exception { - updateRestoreStateOnMaster(request); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } } diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesServiceTests.java index a11c74d657b..8a342057dab 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesServiceTests.java @@ -33,7 +33,7 @@ import java.util.Collection; import static java.util.Collections.singletonList; import static org.hamcrest.Matchers.contains; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyCollectionOf; +import static org.mockito.Matchers.anySetOf; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -45,7 +45,7 @@ public class MetaDataIndexAliasesServiceTests extends ESTestCase { public MetaDataIndexAliasesServiceTests() { // Mock any deletes so we don't need to worry about how MetaDataDeleteIndexService does its job - when(deleteIndexService.deleteIndices(any(ClusterState.class), anyCollectionOf(Index.class))).then(i -> { + when(deleteIndexService.deleteIndices(any(ClusterState.class), anySetOf(Index.class))).then(i -> { ClusterState state = (ClusterState) i.getArguments()[0]; @SuppressWarnings("unchecked") Collection indices = (Collection) i.getArguments()[1]; diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 3e43cc83045..fb55f5bb767 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -686,6 +686,46 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> total number of simulated failures during restore: [{}]", getFailureCount("test-repo")); } + public void testDataFileCorruptionDuringRestore() throws Exception { + Path repositoryLocation = randomRepoPath(); + Client client = client(); + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(Settings.builder().put("location", repositoryLocation))); + + prepareCreate("test-idx").setSettings(Settings.builder().put("index.allocation.max_retries", Integer.MAX_VALUE)).get(); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().totalHits(), equalTo(100L)); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(createSnapshotResponse.getSnapshotInfo().successfulShards())); + + logger.info("--> update repository with mock version"); + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("mock").setSettings( + Settings.builder() + .put("location", repositoryLocation) + .put("random", randomAsciiOfLength(10)) + .put("use_lucene_corruption", true) + .put("random_data_file_io_exception_rate", 1.0))); + + // Test restore after index deletion + logger.info("--> delete index"); + cluster().wipeIndices("test-idx"); + logger.info("--> restore corrupt index"); + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards())); + } + public void testDeletionOfFailingToRecoverIndexShouldStopRestore() throws Exception { Path repositoryLocation = randomRepoPath(); Client client = client(); @@ -2199,32 +2239,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertFalse(snapshotListener.timedOut()); // Check that cluster state update task was called only once assertEquals(1, snapshotListener.count()); - - logger.info("--> close indices"); - client.admin().indices().prepareClose("test-idx").get(); - - BlockingClusterStateListener restoreListener = new BlockingClusterStateListener(clusterService, "restore_snapshot[", "update snapshot state", Priority.HIGH); - - try { - clusterService.addFirst(restoreListener); - logger.info("--> restore snapshot"); - ListenableActionFuture futureRestore = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute(); - - // Await until shard updates are in pending state. - assertBusyPendingTasks("update snapshot state", numberOfShards); - restoreListener.unblock(); - - RestoreSnapshotResponse restoreSnapshotResponse = futureRestore.actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(numberOfShards)); - - } finally { - clusterService.remove(restoreListener); - } - - // Check that we didn't timeout - assertFalse(restoreListener.timedOut()); - // Check that cluster state update task was called only once - assertEquals(1, restoreListener.count()); } public void testSnapshotName() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 06c4ec10af0..ca3aeb674bd 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -81,6 +82,8 @@ public class MockRepository extends FsRepository { private final double randomDataFileIOExceptionRate; + private final boolean useLuceneCorruptionException; + private final long maximumNumberOfFailures; private final long waitAfterUnblock; @@ -101,6 +104,7 @@ public class MockRepository extends FsRepository { super(overrideSettings(metadata, environment), environment); randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); + useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false); maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L); blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false); blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false); @@ -245,7 +249,11 @@ public class MockRepository extends FsRepository { if (blobName.startsWith("__")) { if (shouldFail(blobName, randomDataFileIOExceptionRate) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) { logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path()); - throw new IOException("Random IOException"); + if (useLuceneCorruptionException) { + throw new CorruptIndexException("Random corruption", "random file"); + } else { + throw new IOException("Random IOException"); + } } else if (blockOnDataFiles) { logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path()); if (blockExecution() && waitAfterUnblock > 0) {