Keep snapshot restore state and routing table in sync (#20836)
The snapshot restore state tracks information about shards being restored from a snapshot in the cluster state. For example it records if a shard has been successfully restored or if restoring it was not possible due to a corruption of the snapshot. Recording these events is usually based on changes to the shard routing table, i.e., when a shard is started after a successful restore or failed after an unsuccessful one. As of now, there were two communication channels to transmit recovery failure / success to update the routing table and the restore state. This lead to issues where a shard was failed but the restore state was not updated due to connection issues between data and master node. In some rare situations, this lead to an issue where the restore state could not be properly cleaned up anymore by the master, making it impossible to start new restore operations. The following change updates routing table and restore state in the same cluster state update so that both always stay in sync. It also eliminates the extra communication channel for restore operations and uses standard cluster state listener mechanism to update restore listener upon successful completion of a snapshot.
This commit is contained in:
parent
42a7a554b1
commit
075047065d
|
@ -22,19 +22,27 @@ package org.elasticsearch.action.admin.cluster.snapshots.restore;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.RestoreInProgress;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.snapshots.RestoreInfo;
|
||||
import org.elasticsearch.snapshots.RestoreService;
|
||||
import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
|
||||
|
||||
/**
|
||||
* Transport action for restore snapshot operation
|
||||
*/
|
||||
|
@ -78,28 +86,44 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
|
|||
request.settings(), request.masterNodeTimeout(), request.includeGlobalState(), request.partial(), request.includeAliases(),
|
||||
request.indexSettings(), request.ignoreIndexSettings(), "restore_snapshot[" + request.snapshot() + "]");
|
||||
|
||||
restoreService.restoreSnapshot(restoreRequest, new ActionListener<RestoreInfo>() {
|
||||
restoreService.restoreSnapshot(restoreRequest, new ActionListener<RestoreCompletionResponse>() {
|
||||
@Override
|
||||
public void onResponse(RestoreInfo restoreInfo) {
|
||||
if (restoreInfo == null && request.waitForCompletion()) {
|
||||
restoreService.addListener(new ActionListener<RestoreService.RestoreCompletionResponse>() {
|
||||
public void onResponse(RestoreCompletionResponse restoreCompletionResponse) {
|
||||
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
|
||||
final Snapshot snapshot = restoreCompletionResponse.getSnapshot();
|
||||
|
||||
ClusterStateListener clusterStateListener = new ClusterStateListener() {
|
||||
@Override
|
||||
public void onResponse(RestoreService.RestoreCompletionResponse restoreCompletionResponse) {
|
||||
final Snapshot snapshot = restoreCompletionResponse.getSnapshot();
|
||||
if (snapshot.getRepository().equals(request.repository()) &&
|
||||
snapshot.getSnapshotId().getName().equals(request.snapshot())) {
|
||||
listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
|
||||
restoreService.removeListener(this);
|
||||
public void clusterChanged(ClusterChangedEvent changedEvent) {
|
||||
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), snapshot);
|
||||
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), snapshot);
|
||||
if (prevEntry == null) {
|
||||
// When there is a master failure after a restore has been started, this listener might not be registered
|
||||
// on the current master and as such it might miss some intermediary cluster states due to batching.
|
||||
// Clean up listener in that case and acknowledge completion of restore operation to client.
|
||||
clusterService.remove(this);
|
||||
listener.onResponse(new RestoreSnapshotResponse(null));
|
||||
} else if (newEntry == null) {
|
||||
clusterService.remove(this);
|
||||
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
|
||||
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
|
||||
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
|
||||
RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(),
|
||||
prevEntry.indices(),
|
||||
shards.size(),
|
||||
shards.size() - RestoreService.failedShards(shards));
|
||||
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
|
||||
logger.debug("restore of [{}] completed", snapshot);
|
||||
listener.onResponse(response);
|
||||
} else {
|
||||
// restore not completed yet, wait for next cluster state update
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
clusterService.addLast(clusterStateListener);
|
||||
} else {
|
||||
listener.onResponse(new RestoreSnapshotResponse(restoreInfo));
|
||||
listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,20 +23,22 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexClusterStateUpdateRequest;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.RestoreInProgress;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.snapshots.RestoreService;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
|
@ -73,7 +75,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState currentState) {
|
||||
return deleteIndices(currentState, Arrays.asList(request.indices()));
|
||||
return deleteIndices(currentState, Sets.newHashSet(request.indices()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -81,7 +83,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
/**
|
||||
* Delete some indices from the cluster state.
|
||||
*/
|
||||
public ClusterState deleteIndices(ClusterState currentState, Collection<Index> indices) {
|
||||
public ClusterState deleteIndices(ClusterState currentState, Set<Index> indices) {
|
||||
final MetaData meta = currentState.metaData();
|
||||
final Set<IndexMetaData> metaDatas = indices.stream().map(i -> meta.getIndexSafe(i)).collect(toSet());
|
||||
// Check if index deletion conflicts with any running snapshots
|
||||
|
@ -107,11 +109,25 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
|
||||
MetaData newMetaData = metaDataBuilder.build();
|
||||
ClusterBlocks blocks = clusterBlocksBuilder.build();
|
||||
|
||||
// update snapshot restore entries
|
||||
ImmutableOpenMap<String, ClusterState.Custom> customs = currentState.getCustoms();
|
||||
final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
|
||||
if (restoreInProgress != null) {
|
||||
RestoreInProgress updatedRestoreInProgress = RestoreService.updateRestoreStateWithDeletedIndices(restoreInProgress, indices);
|
||||
if (updatedRestoreInProgress != restoreInProgress) {
|
||||
ImmutableOpenMap.Builder<String, ClusterState.Custom> builder = ImmutableOpenMap.builder(customs);
|
||||
builder.put(RestoreInProgress.TYPE, updatedRestoreInProgress);
|
||||
customs = builder.build();
|
||||
}
|
||||
}
|
||||
|
||||
return allocationService.reroute(
|
||||
ClusterState.builder(currentState)
|
||||
.routingTable(routingTableBuilder.build())
|
||||
.metaData(newMetaData)
|
||||
.blocks(blocks)
|
||||
.customs(customs)
|
||||
.build(),
|
||||
"deleted indices [" + indices + "]");
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation;
|
|||
|
||||
import org.elasticsearch.cluster.ClusterInfoService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.RestoreInProgress;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.health.ClusterStateHealth;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -34,6 +35,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
|||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -94,17 +96,24 @@ public class AllocationService extends AbstractComponent {
|
|||
}
|
||||
|
||||
protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, RoutingAllocation allocation, String reason) {
|
||||
return buildResultAndLogHealthChange(oldState, allocation, reason, new RoutingExplanations());
|
||||
}
|
||||
|
||||
protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, RoutingAllocation allocation, String reason,
|
||||
RoutingExplanations explanations) {
|
||||
RoutingTable oldRoutingTable = oldState.routingTable();
|
||||
RoutingNodes newRoutingNodes = allocation.routingNodes();
|
||||
final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(oldRoutingTable.version(), newRoutingNodes).build();
|
||||
MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges(newRoutingTable);
|
||||
assert newRoutingTable.validate(newMetaData); // validates the routing table is coherent with the cluster state metadata
|
||||
final ClusterState newState = ClusterState.builder(oldState).routingTable(newRoutingTable).metaData(newMetaData).build();
|
||||
final ClusterState.Builder newStateBuilder = ClusterState.builder(oldState)
|
||||
.routingTable(newRoutingTable)
|
||||
.metaData(newMetaData);
|
||||
final RestoreInProgress restoreInProgress = allocation.custom(RestoreInProgress.TYPE);
|
||||
if (restoreInProgress != null) {
|
||||
RestoreInProgress updatedRestoreInProgress = allocation.updateRestoreInfoWithRoutingChanges(restoreInProgress);
|
||||
if (updatedRestoreInProgress != restoreInProgress) {
|
||||
ImmutableOpenMap.Builder<String, ClusterState.Custom> customsBuilder = ImmutableOpenMap.builder(allocation.getCustoms());
|
||||
customsBuilder.put(RestoreInProgress.TYPE, updatedRestoreInProgress);
|
||||
newStateBuilder.customs(customsBuilder.build());
|
||||
}
|
||||
}
|
||||
final ClusterState newState = newStateBuilder.build();
|
||||
logClusterHealthStateChange(
|
||||
new ClusterStateHealth(oldState),
|
||||
new ClusterStateHealth(newState),
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation;
|
|||
|
||||
import org.elasticsearch.cluster.ClusterInfo;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.RestoreInProgress;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
|
||||
|
@ -30,6 +31,8 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
|||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.snapshots.RestoreService;
|
||||
import org.elasticsearch.snapshots.RestoreService.RestoreInProgressUpdater;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -76,8 +79,9 @@ public class RoutingAllocation {
|
|||
|
||||
private final IndexMetaDataUpdater indexMetaDataUpdater = new IndexMetaDataUpdater();
|
||||
private final RoutingNodesChangedObserver nodesChangedObserver = new RoutingNodesChangedObserver();
|
||||
private final RestoreInProgressUpdater restoreInProgressUpdater = new RestoreInProgressUpdater();
|
||||
private final RoutingChangesObserver routingChangesObserver = new RoutingChangesObserver.DelegatingRoutingChangesObserver(
|
||||
nodesChangedObserver, indexMetaDataUpdater
|
||||
nodesChangedObserver, indexMetaDataUpdater, restoreInProgressUpdater
|
||||
);
|
||||
|
||||
|
||||
|
@ -154,6 +158,10 @@ public class RoutingAllocation {
|
|||
return (T)customs.get(key);
|
||||
}
|
||||
|
||||
public ImmutableOpenMap<String, ClusterState.Custom> getCustoms() {
|
||||
return customs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get explanations of current routing
|
||||
* @return explanation of routing
|
||||
|
@ -234,6 +242,13 @@ public class RoutingAllocation {
|
|||
return indexMetaDataUpdater.applyChanges(metaData, newRoutingTable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns updated {@link RestoreInProgress} based on the changes that were made to the routing nodes
|
||||
*/
|
||||
public RestoreInProgress updateRestoreInfoWithRoutingChanges(RestoreInProgress restoreInProgress) {
|
||||
return restoreInProgressUpdater.applyChanges(restoreInProgress);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true iff changes were made to the routing nodes
|
||||
*/
|
||||
|
|
|
@ -570,8 +570,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|||
|
||||
/**
|
||||
* Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard
|
||||
* routing to *require* peer recovery, use {@link ShardRouting#recoverySource()} to
|
||||
* check if its needed or not.
|
||||
* routing to *require* peer recovery, use {@link ShardRouting#recoverySource()} to check if its needed or not.
|
||||
*/
|
||||
private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, RoutingTable routingTable, DiscoveryNodes nodes,
|
||||
ShardRouting shardRouting) {
|
||||
|
@ -610,29 +609,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|||
|
||||
@Override
|
||||
public void onRecoveryDone(RecoveryState state) {
|
||||
if (state.getRecoverySource().getType() == Type.SNAPSHOT) {
|
||||
SnapshotRecoverySource snapshotRecoverySource = (SnapshotRecoverySource) state.getRecoverySource();
|
||||
restoreService.indexShardRestoreCompleted(snapshotRecoverySource.snapshot(), shardRouting.shardId());
|
||||
}
|
||||
shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
|
||||
if (state.getRecoverySource().getType() == Type.SNAPSHOT) {
|
||||
try {
|
||||
if (Lucene.isCorruptionException(e.getCause())) {
|
||||
SnapshotRecoverySource snapshotRecoverySource = (SnapshotRecoverySource) state.getRecoverySource();
|
||||
restoreService.failRestore(snapshotRecoverySource.snapshot(), shardRouting.shardId());
|
||||
}
|
||||
} catch (Exception inner) {
|
||||
e.addSuppressed(inner);
|
||||
} finally {
|
||||
handleRecoveryFailure(shardRouting, sendShardFailure, e);
|
||||
}
|
||||
} else {
|
||||
handleRecoveryFailure(shardRouting, sendShardFailure, e);
|
||||
}
|
||||
handleRecoveryFailure(shardRouting, sendShardFailure, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import com.carrotsearch.hppc.IntHashSet;
|
|||
import com.carrotsearch.hppc.IntSet;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.Version;
|
||||
|
@ -30,6 +31,9 @@ import org.elasticsearch.action.support.IndicesOptions;
|
|||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.RestoreInProgress;
|
||||
import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus;
|
||||
|
@ -41,41 +45,32 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
|||
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
|
||||
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
|
||||
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -83,12 +78,10 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.unmodifiableSet;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
|
||||
|
@ -117,14 +110,12 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet;
|
|||
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
|
||||
* at the {@link ShardRouting#recoverySource()} property.
|
||||
* <p>
|
||||
* At the end of the successful restore process {@code IndexShardSnapshotAndRestoreService} calls {@link #indexShardRestoreCompleted(Snapshot, ShardId)},
|
||||
* which updates {@link RestoreInProgress} in cluster state or removes it when all shards are completed. In case of
|
||||
* At the end of the successful restore process {@code RestoreService} calls {@link #cleanupRestoreState(ClusterChangedEvent)},
|
||||
* which removes {@link RestoreInProgress} when all shards are completed. In case of
|
||||
* restore failure a normal recovery fail-over process kicks in.
|
||||
*/
|
||||
public class RestoreService extends AbstractComponent implements ClusterStateListener {
|
||||
|
||||
public static final String UPDATE_RESTORE_ACTION_NAME = "internal:cluster/snapshot/update_restore";
|
||||
|
||||
private static final Set<String> UNMODIFIABLE_SETTINGS = unmodifiableSet(newHashSet(
|
||||
SETTING_NUMBER_OF_SHARDS,
|
||||
SETTING_VERSION_CREATED,
|
||||
|
@ -148,33 +139,29 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
|
||||
private final RepositoriesService repositoriesService;
|
||||
|
||||
private final TransportService transportService;
|
||||
|
||||
private final AllocationService allocationService;
|
||||
|
||||
private final MetaDataCreateIndexService createIndexService;
|
||||
|
||||
private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
|
||||
|
||||
private final CopyOnWriteArrayList<ActionListener<RestoreCompletionResponse>> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
private final BlockingQueue<UpdateIndexShardRestoreStatusRequest> updatedSnapshotStateQueue = ConcurrentCollections.newBlockingQueue();
|
||||
private final ClusterSettings clusterSettings;
|
||||
|
||||
private final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor;
|
||||
|
||||
@Inject
|
||||
public RestoreService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, TransportService transportService,
|
||||
public RestoreService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService,
|
||||
AllocationService allocationService, MetaDataCreateIndexService createIndexService,
|
||||
MetaDataIndexUpgradeService metaDataIndexUpgradeService, ClusterSettings clusterSettings) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.repositoriesService = repositoriesService;
|
||||
this.transportService = transportService;
|
||||
this.allocationService = allocationService;
|
||||
this.createIndexService = createIndexService;
|
||||
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
|
||||
transportService.registerRequestHandler(UPDATE_RESTORE_ACTION_NAME, UpdateIndexShardRestoreStatusRequest::new, ThreadPool.Names.SAME, new UpdateRestoreStateRequestHandler());
|
||||
clusterService.add(this);
|
||||
this.clusterSettings = clusterSettings;
|
||||
this.cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(logger);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -183,7 +170,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
* @param request restore request
|
||||
* @param listener restore listener
|
||||
*/
|
||||
public void restoreSnapshot(final RestoreRequest request, final ActionListener<RestoreInfo> listener) {
|
||||
public void restoreSnapshot(final RestoreRequest request, final ActionListener<RestoreCompletionResponse> listener) {
|
||||
try {
|
||||
// Read snapshot info and metadata from the repository
|
||||
Repository repository = repositoriesService.repository(request.repositoryName);
|
||||
|
@ -314,7 +301,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
}
|
||||
|
||||
shards = shardsBuilder.build();
|
||||
RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshot, RestoreInProgress.State.INIT, Collections.unmodifiableList(new ArrayList<>(renamedIndices.keySet())), shards);
|
||||
RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshot, overallState(RestoreInProgress.State.INIT, shards), Collections.unmodifiableList(new ArrayList<>(renamedIndices.keySet())), shards);
|
||||
builder.putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restoreEntry));
|
||||
} else {
|
||||
shards = ImmutableOpenMap.of();
|
||||
|
@ -469,7 +456,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(restoreInfo);
|
||||
listener.onResponse(new RestoreCompletionResponse(snapshot, restoreInfo));
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -480,19 +467,33 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used by {@link IndexShard} to notify
|
||||
* {@code RestoreService} about shard restore completion.
|
||||
*
|
||||
* @param snapshot snapshot
|
||||
* @param shardId shard id
|
||||
*/
|
||||
public void indexShardRestoreCompleted(Snapshot snapshot, ShardId shardId) {
|
||||
logger.trace("[{}] successfully restored shard [{}]", snapshot, shardId);
|
||||
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshot, shardId,
|
||||
new ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(), RestoreInProgress.State.SUCCESS));
|
||||
transportService.sendRequest(clusterService.state().nodes().getMasterNode(),
|
||||
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set<Index> deletedIndices) {
|
||||
boolean changesMade = false;
|
||||
final List<RestoreInProgress.Entry> entries = new ArrayList<>();
|
||||
for (RestoreInProgress.Entry entry : oldRestore.entries()) {
|
||||
ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> shardsBuilder = null;
|
||||
for (ObjectObjectCursor<ShardId, ShardRestoreStatus> cursor : entry.shards()) {
|
||||
ShardId shardId = cursor.key;
|
||||
if (deletedIndices.contains(shardId.getIndex())) {
|
||||
changesMade = true;
|
||||
if (shardsBuilder == null) {
|
||||
shardsBuilder = ImmutableOpenMap.builder(entry.shards());
|
||||
}
|
||||
shardsBuilder.put(shardId, new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, "index was deleted"));
|
||||
}
|
||||
}
|
||||
if (shardsBuilder != null) {
|
||||
ImmutableOpenMap<ShardId, ShardRestoreStatus> shards = shardsBuilder.build();
|
||||
entries.add(new RestoreInProgress.Entry(entry.snapshot(), overallState(RestoreInProgress.State.STARTED, shards), entry.indices(), shards));
|
||||
} else {
|
||||
entries.add(entry);
|
||||
}
|
||||
}
|
||||
if (changesMade) {
|
||||
return new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()]));
|
||||
} else {
|
||||
return oldRestore;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RestoreCompletionResponse {
|
||||
|
@ -513,168 +514,201 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates shard restore record in the cluster state.
|
||||
*
|
||||
* @param request update shard status request
|
||||
*/
|
||||
private void updateRestoreStateOnMaster(final UpdateIndexShardRestoreStatusRequest request) {
|
||||
logger.trace("received updated snapshot restore state [{}]", request);
|
||||
updatedSnapshotStateQueue.add(request);
|
||||
public static class RestoreInProgressUpdater extends RoutingChangesObserver.AbstractRoutingChangesObserver {
|
||||
private final Map<Snapshot, Updates> shardChanges = new HashMap<>();
|
||||
|
||||
clusterService.submitStateUpdateTask("update snapshot state", new ClusterStateUpdateTask() {
|
||||
private final List<UpdateIndexShardRestoreStatusRequest> drainedRequests = new ArrayList<>();
|
||||
private Map<Snapshot, Tuple<RestoreInfo, ImmutableOpenMap<ShardId, ShardRestoreStatus>>> batchedRestoreInfo = null;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
|
||||
if (request.processed) {
|
||||
return currentState;
|
||||
@Override
|
||||
public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
|
||||
// mark snapshot as completed
|
||||
if (initializingShard.primary()) {
|
||||
RecoverySource recoverySource = initializingShard.recoverySource();
|
||||
if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
|
||||
Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot();
|
||||
changes(snapshot).startedShards.put(initializingShard.shardId(),
|
||||
new ShardRestoreStatus(initializingShard.currentNodeId(), RestoreInProgress.State.SUCCESS));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
updatedSnapshotStateQueue.drainTo(drainedRequests);
|
||||
|
||||
final int batchSize = drainedRequests.size();
|
||||
|
||||
// nothing to process (a previous event has processed it already)
|
||||
if (batchSize == 0) {
|
||||
return currentState;
|
||||
@Override
|
||||
public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
|
||||
if (failedShard.primary() && failedShard.initializing()) {
|
||||
RecoverySource recoverySource = failedShard.recoverySource();
|
||||
if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
|
||||
Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot();
|
||||
// mark restore entry for this shard as failed when it's due to a file corruption. There is no need wait on retries
|
||||
// to restore this shard on another node if the snapshot files are corrupt. In case where a node just left or crashed,
|
||||
// however, we only want to acknowledge the restore operation once it has been successfully restored on another node.
|
||||
if (unassignedInfo.getFailure() != null && Lucene.isCorruptionException(unassignedInfo.getFailure().getCause())) {
|
||||
changes(snapshot).failedShards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(),
|
||||
RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
|
||||
if (restore != null) {
|
||||
int changedCount = 0;
|
||||
final List<RestoreInProgress.Entry> entries = new ArrayList<>();
|
||||
for (RestoreInProgress.Entry entry : restore.entries()) {
|
||||
ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> shardsBuilder = null;
|
||||
@Override
|
||||
public void shardInitialized(ShardRouting unassignedShard, ShardRouting initializedShard) {
|
||||
// if we force an empty primary, we should also fail the restore entry
|
||||
if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT &&
|
||||
initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) {
|
||||
Snapshot snapshot = ((SnapshotRecoverySource) unassignedShard.recoverySource()).snapshot();
|
||||
changes(snapshot).failedShards.put(unassignedShard.shardId(), new ShardRestoreStatus(null,
|
||||
RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource()));
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < batchSize; i++) {
|
||||
final UpdateIndexShardRestoreStatusRequest updateSnapshotState = drainedRequests.get(i);
|
||||
updateSnapshotState.processed = true;
|
||||
/**
|
||||
* Helper method that creates update entry for the given shard id if such an entry does not exist yet.
|
||||
*/
|
||||
private Updates changes(Snapshot snapshot) {
|
||||
return shardChanges.computeIfAbsent(snapshot, k -> new Updates());
|
||||
}
|
||||
|
||||
if (entry.snapshot().equals(updateSnapshotState.snapshot())) {
|
||||
logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state());
|
||||
if (shardsBuilder == null) {
|
||||
shardsBuilder = ImmutableOpenMap.builder(entry.shards());
|
||||
}
|
||||
shardsBuilder.put(updateSnapshotState.shardId(), updateSnapshotState.status());
|
||||
changedCount++;
|
||||
}
|
||||
private static class Updates {
|
||||
private Map<ShardId, ShardRestoreStatus> failedShards = new HashMap<>();
|
||||
private Map<ShardId, ShardRestoreStatus> startedShards = new HashMap<>();
|
||||
}
|
||||
|
||||
public RestoreInProgress applyChanges(RestoreInProgress oldRestore) {
|
||||
if (shardChanges.isEmpty() == false) {
|
||||
final List<RestoreInProgress.Entry> entries = new ArrayList<>();
|
||||
for (RestoreInProgress.Entry entry : oldRestore.entries()) {
|
||||
Snapshot snapshot = entry.snapshot();
|
||||
Updates updates = shardChanges.get(snapshot);
|
||||
assert Sets.haveEmptyIntersection(updates.startedShards.keySet(), updates.failedShards.keySet());
|
||||
if (updates.startedShards.isEmpty() == false || updates.failedShards.isEmpty() == false) {
|
||||
ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder(entry.shards());
|
||||
for (Map.Entry<ShardId, ShardRestoreStatus> startedShardEntry : updates.startedShards.entrySet()) {
|
||||
shardsBuilder.put(startedShardEntry.getKey(), startedShardEntry.getValue());
|
||||
}
|
||||
|
||||
if (shardsBuilder != null) {
|
||||
ImmutableOpenMap<ShardId, ShardRestoreStatus> shards = shardsBuilder.build();
|
||||
if (!completed(shards)) {
|
||||
entries.add(new RestoreInProgress.Entry(entry.snapshot(), RestoreInProgress.State.STARTED, entry.indices(), shards));
|
||||
} else {
|
||||
logger.info("restore [{}] is done", entry.snapshot());
|
||||
if (batchedRestoreInfo == null) {
|
||||
batchedRestoreInfo = new HashMap<>();
|
||||
}
|
||||
assert !batchedRestoreInfo.containsKey(entry.snapshot());
|
||||
batchedRestoreInfo.put(entry.snapshot(),
|
||||
new Tuple<>(
|
||||
new RestoreInfo(entry.snapshot().getSnapshotId().getName(),
|
||||
entry.indices(),
|
||||
shards.size(),
|
||||
shards.size() - failedShards(shards)),
|
||||
shards));
|
||||
}
|
||||
} else {
|
||||
entries.add(entry);
|
||||
for (Map.Entry<ShardId, ShardRestoreStatus> failedShardEntry : updates.failedShards.entrySet()) {
|
||||
shardsBuilder.put(failedShardEntry.getKey(), failedShardEntry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
if (changedCount > 0) {
|
||||
logger.trace("changed cluster state triggered by {} snapshot restore state updates", changedCount);
|
||||
|
||||
final RestoreInProgress updatedRestore = new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()]));
|
||||
return ClusterState.builder(currentState).putCustom(RestoreInProgress.TYPE, updatedRestore).build();
|
||||
ImmutableOpenMap<ShardId, ShardRestoreStatus> shards = shardsBuilder.build();
|
||||
RestoreInProgress.State newState = overallState(RestoreInProgress.State.STARTED, shards);
|
||||
entries.add(new RestoreInProgress.Entry(entry.snapshot(), newState, entry.indices(), shards));
|
||||
} else {
|
||||
entries.add(entry);
|
||||
}
|
||||
}
|
||||
return currentState;
|
||||
return new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()]));
|
||||
} else {
|
||||
return oldRestore;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, @Nullable Exception e) {
|
||||
for (UpdateIndexShardRestoreStatusRequest request : drainedRequests) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]", request.snapshot(), request.shardId(), request.status()), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (batchedRestoreInfo != null) {
|
||||
for (final Entry<Snapshot, Tuple<RestoreInfo, ImmutableOpenMap<ShardId, ShardRestoreStatus>>> entry : batchedRestoreInfo.entrySet()) {
|
||||
final Snapshot snapshot = entry.getKey();
|
||||
final RestoreInfo restoreInfo = entry.getValue().v1();
|
||||
final ImmutableOpenMap<ShardId, ShardRestoreStatus> shards = entry.getValue().v2();
|
||||
RoutingTable routingTable = newState.getRoutingTable();
|
||||
final List<ShardId> waitForStarted = new ArrayList<>();
|
||||
for (ObjectObjectCursor<ShardId, ShardRestoreStatus> shard : shards) {
|
||||
if (shard.value.state() == RestoreInProgress.State.SUCCESS ) {
|
||||
ShardId shardId = shard.key;
|
||||
ShardRouting shardRouting = findPrimaryShard(routingTable, shardId);
|
||||
if (shardRouting != null && !shardRouting.active()) {
|
||||
logger.trace("[{}][{}] waiting for the shard to start", snapshot, shardId);
|
||||
waitForStarted.add(shardId);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (waitForStarted.isEmpty()) {
|
||||
notifyListeners(snapshot, restoreInfo);
|
||||
} else {
|
||||
clusterService.addLast(new ClusterStateListener() {
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.routingTableChanged()) {
|
||||
RoutingTable routingTable = event.state().getRoutingTable();
|
||||
for (Iterator<ShardId> iterator = waitForStarted.iterator(); iterator.hasNext();) {
|
||||
ShardId shardId = iterator.next();
|
||||
ShardRouting shardRouting = findPrimaryShard(routingTable, shardId);
|
||||
// Shard disappeared (index deleted) or became active
|
||||
if (shardRouting == null || shardRouting.active()) {
|
||||
iterator.remove();
|
||||
logger.trace("[{}][{}] shard disappeared or started - removing", snapshot, shardId);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (waitForStarted.isEmpty()) {
|
||||
notifyListeners(snapshot, restoreInfo);
|
||||
clusterService.remove(this);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ShardRouting findPrimaryShard(RoutingTable routingTable, ShardId shardId) {
|
||||
IndexRoutingTable indexRoutingTable = routingTable.index(shardId.getIndex());
|
||||
if (indexRoutingTable != null) {
|
||||
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId.id());
|
||||
if (indexShardRoutingTable != null) {
|
||||
return indexShardRoutingTable.primaryShard();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void notifyListeners(Snapshot snapshot, RestoreInfo restoreInfo) {
|
||||
for (ActionListener<RestoreCompletionResponse> listener : listeners) {
|
||||
try {
|
||||
listener.onResponse(new RestoreCompletionResponse(snapshot, restoreInfo));
|
||||
} catch (Exception e) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to update snapshot status for [{}]", listener), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private boolean completed(ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
|
||||
public static RestoreInProgress.Entry restoreInProgress(ClusterState state, Snapshot snapshot) {
|
||||
final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE);
|
||||
if (restoreInProgress != null) {
|
||||
for (RestoreInProgress.Entry e : restoreInProgress.entries()) {
|
||||
if (e.snapshot().equals(snapshot)) {
|
||||
return e;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
static class CleanRestoreStateTaskExecutor implements ClusterStateTaskExecutor<CleanRestoreStateTaskExecutor.Task>, ClusterStateTaskListener {
|
||||
|
||||
static class Task {
|
||||
final Snapshot snapshot;
|
||||
|
||||
Task(Snapshot snapshot) {
|
||||
this.snapshot = snapshot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "clean restore state for restoring snapshot " + snapshot;
|
||||
}
|
||||
}
|
||||
|
||||
private final Logger logger;
|
||||
|
||||
public CleanRestoreStateTaskExecutor(Logger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BatchResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
|
||||
final BatchResult.Builder<Task> resultBuilder = BatchResult.<Task>builder().successes(tasks);
|
||||
Set<Snapshot> completedSnapshots = tasks.stream().map(e -> e.snapshot).collect(Collectors.toSet());
|
||||
final List<RestoreInProgress.Entry> entries = new ArrayList<>();
|
||||
final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
|
||||
boolean changed = false;
|
||||
if (restoreInProgress != null) {
|
||||
for (RestoreInProgress.Entry entry : restoreInProgress.entries()) {
|
||||
if (completedSnapshots.contains(entry.snapshot()) == false) {
|
||||
entries.add(entry);
|
||||
} else {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (changed == false) {
|
||||
return resultBuilder.build(currentState);
|
||||
}
|
||||
RestoreInProgress updatedRestoreInProgress = new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()]));
|
||||
ImmutableOpenMap.Builder<String, ClusterState.Custom> builder = ImmutableOpenMap.builder(currentState.getCustoms());
|
||||
builder.put(RestoreInProgress.TYPE, updatedRestoreInProgress);
|
||||
ImmutableOpenMap<String, ClusterState.Custom> customs = builder.build();
|
||||
return resultBuilder.build(ClusterState.builder(currentState).customs(customs).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final String source, final Exception e) {
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoLongerMaster(String source) {
|
||||
logger.debug("no longer master while processing restore state update [{}]", source);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void cleanupRestoreState(ClusterChangedEvent event) {
|
||||
ClusterState state = event.state();
|
||||
|
||||
RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE);
|
||||
if (restoreInProgress != null) {
|
||||
for (RestoreInProgress.Entry entry : restoreInProgress.entries()) {
|
||||
if (entry.state().completed()) {
|
||||
assert completed(entry.shards()) : "state says completed but restore entries are not";
|
||||
clusterService.submitStateUpdateTask(
|
||||
"clean up snapshot restore state",
|
||||
new CleanRestoreStateTaskExecutor.Task(entry.snapshot()),
|
||||
ClusterStateTaskConfig.build(Priority.URGENT),
|
||||
cleanRestoreStateTaskExecutor,
|
||||
cleanRestoreStateTaskExecutor);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static RestoreInProgress.State overallState(RestoreInProgress.State nonCompletedState,
|
||||
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
|
||||
boolean hasFailed = false;
|
||||
for (ObjectCursor<RestoreInProgress.ShardRestoreStatus> status : shards.values()) {
|
||||
if (!status.value.state().completed()) {
|
||||
return nonCompletedState;
|
||||
}
|
||||
if (status.value.state() == RestoreInProgress.State.FAILURE) {
|
||||
hasFailed = true;
|
||||
}
|
||||
}
|
||||
if (hasFailed) {
|
||||
return RestoreInProgress.State.FAILURE;
|
||||
} else {
|
||||
return RestoreInProgress.State.SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean completed(ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
|
||||
for (ObjectCursor<RestoreInProgress.ShardRestoreStatus> status : shards.values()) {
|
||||
if (!status.value.state().completed()) {
|
||||
return false;
|
||||
|
@ -683,7 +717,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
return true;
|
||||
}
|
||||
|
||||
private int failedShards(ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
|
||||
public static int failedShards(ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
|
||||
int failedShards = 0;
|
||||
for (ObjectCursor<RestoreInProgress.ShardRestoreStatus> status : shards.values()) {
|
||||
if (status.value.state() == RestoreInProgress.State.FAILURE) {
|
||||
|
@ -727,53 +761,6 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if any of the deleted indices are still recovering and fails recovery on the shards of these indices
|
||||
*
|
||||
* @param event cluster changed event
|
||||
*/
|
||||
private void processDeletedIndices(ClusterChangedEvent event) {
|
||||
RestoreInProgress restore = event.state().custom(RestoreInProgress.TYPE);
|
||||
if (restore == null) {
|
||||
// Not restoring - nothing to do
|
||||
return;
|
||||
}
|
||||
|
||||
if (!event.indicesDeleted().isEmpty()) {
|
||||
// Some indices were deleted, let's make sure all indices that we are restoring still exist
|
||||
for (RestoreInProgress.Entry entry : restore.entries()) {
|
||||
List<ShardId> shardsToFail = null;
|
||||
for (ObjectObjectCursor<ShardId, ShardRestoreStatus> shard : entry.shards()) {
|
||||
if (!shard.value.state().completed()) {
|
||||
if (!event.state().metaData().hasIndex(shard.key.getIndex().getName())) {
|
||||
if (shardsToFail == null) {
|
||||
shardsToFail = new ArrayList<>();
|
||||
}
|
||||
shardsToFail.add(shard.key);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (shardsToFail != null) {
|
||||
for (ShardId shardId : shardsToFail) {
|
||||
logger.trace("[{}] failing running shard restore [{}]", entry.snapshot(), shardId);
|
||||
updateRestoreStateOnMaster(new UpdateIndexShardRestoreStatusRequest(entry.snapshot(), shardId, new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, "index was deleted")));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fails the given snapshot restore operation for the given shard
|
||||
*/
|
||||
public void failRestore(Snapshot snapshot, ShardId shardId) {
|
||||
logger.debug("[{}] failed to restore shard [{}]", snapshot, shardId);
|
||||
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshot, shardId,
|
||||
new ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(), RestoreInProgress.State.FAILURE));
|
||||
transportService.sendRequest(clusterService.state().nodes().getMasterNode(),
|
||||
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
}
|
||||
|
||||
private boolean failed(SnapshotInfo snapshot, String index) {
|
||||
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
|
||||
if (index.equals(failure.index())) {
|
||||
|
@ -810,34 +797,11 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds restore completion listener
|
||||
* <p>
|
||||
* This listener is called for each snapshot that finishes restore operation in the cluster. It's responsibility of
|
||||
* the listener to decide if it's called for the appropriate snapshot or not.
|
||||
*
|
||||
* @param listener restore completion listener
|
||||
*/
|
||||
public void addListener(ActionListener<RestoreCompletionResponse> listener) {
|
||||
this.listeners.add(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes restore completion listener
|
||||
* <p>
|
||||
* This listener is called for each snapshot that finishes restore operation in the cluster.
|
||||
*
|
||||
* @param listener restore completion listener
|
||||
*/
|
||||
public void removeListener(ActionListener<RestoreCompletionResponse> listener) {
|
||||
this.listeners.remove(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
try {
|
||||
if (event.localNodeMaster()) {
|
||||
processDeletedIndices(event);
|
||||
cleanupRestoreState(event);
|
||||
}
|
||||
} catch (Exception t) {
|
||||
logger.warn("Failed to update restore state ", t);
|
||||
|
@ -1061,69 +1025,4 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal class that is used to send notifications about finished shard restore operations to master node
|
||||
*/
|
||||
public static class UpdateIndexShardRestoreStatusRequest extends TransportRequest {
|
||||
private Snapshot snapshot;
|
||||
private ShardId shardId;
|
||||
private ShardRestoreStatus status;
|
||||
|
||||
volatile boolean processed; // state field, no need to serialize
|
||||
|
||||
public UpdateIndexShardRestoreStatusRequest() {
|
||||
|
||||
}
|
||||
|
||||
private UpdateIndexShardRestoreStatusRequest(Snapshot snapshot, ShardId shardId, ShardRestoreStatus status) {
|
||||
this.snapshot = snapshot;
|
||||
this.shardId = shardId;
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
snapshot = new Snapshot(in);
|
||||
shardId = ShardId.readShardId(in);
|
||||
status = ShardRestoreStatus.readShardRestoreStatus(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
snapshot.writeTo(out);
|
||||
shardId.writeTo(out);
|
||||
status.writeTo(out);
|
||||
}
|
||||
|
||||
public Snapshot snapshot() {
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
public ShardId shardId() {
|
||||
return shardId;
|
||||
}
|
||||
|
||||
public ShardRestoreStatus status() {
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "" + snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal class that is used to send notifications about finished shard restore operations to master node
|
||||
*/
|
||||
class UpdateRestoreStateRequestHandler implements TransportRequestHandler<UpdateIndexShardRestoreStatusRequest> {
|
||||
@Override
|
||||
public void messageReceived(UpdateIndexShardRestoreStatusRequest request, final TransportChannel channel) throws Exception {
|
||||
updateRestoreStateOnMaster(request);
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ import java.util.Collection;
|
|||
import static java.util.Collections.singletonList;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyCollectionOf;
|
||||
import static org.mockito.Matchers.anySetOf;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -45,7 +45,7 @@ public class MetaDataIndexAliasesServiceTests extends ESTestCase {
|
|||
|
||||
public MetaDataIndexAliasesServiceTests() {
|
||||
// Mock any deletes so we don't need to worry about how MetaDataDeleteIndexService does its job
|
||||
when(deleteIndexService.deleteIndices(any(ClusterState.class), anyCollectionOf(Index.class))).then(i -> {
|
||||
when(deleteIndexService.deleteIndices(any(ClusterState.class), anySetOf(Index.class))).then(i -> {
|
||||
ClusterState state = (ClusterState) i.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
Collection<Index> indices = (Collection<Index>) i.getArguments()[1];
|
||||
|
|
|
@ -686,6 +686,46 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
logger.info("--> total number of simulated failures during restore: [{}]", getFailureCount("test-repo"));
|
||||
}
|
||||
|
||||
public void testDataFileCorruptionDuringRestore() throws Exception {
|
||||
Path repositoryLocation = randomRepoPath();
|
||||
Client client = client();
|
||||
logger.info("--> creating repository");
|
||||
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs").setSettings(Settings.builder().put("location", repositoryLocation)));
|
||||
|
||||
prepareCreate("test-idx").setSettings(Settings.builder().put("index.allocation.max_retries", Integer.MAX_VALUE)).get();
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
for (int i = 0; i < 100; i++) {
|
||||
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
}
|
||||
refresh();
|
||||
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().totalHits(), equalTo(100L));
|
||||
|
||||
logger.info("--> snapshot");
|
||||
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(createSnapshotResponse.getSnapshotInfo().successfulShards()));
|
||||
|
||||
logger.info("--> update repository with mock version");
|
||||
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("mock").setSettings(
|
||||
Settings.builder()
|
||||
.put("location", repositoryLocation)
|
||||
.put("random", randomAsciiOfLength(10))
|
||||
.put("use_lucene_corruption", true)
|
||||
.put("random_data_file_io_exception_rate", 1.0)));
|
||||
|
||||
// Test restore after index deletion
|
||||
logger.info("--> delete index");
|
||||
cluster().wipeIndices("test-idx");
|
||||
logger.info("--> restore corrupt index");
|
||||
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards()));
|
||||
}
|
||||
|
||||
public void testDeletionOfFailingToRecoverIndexShouldStopRestore() throws Exception {
|
||||
Path repositoryLocation = randomRepoPath();
|
||||
Client client = client();
|
||||
|
@ -2199,32 +2239,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertFalse(snapshotListener.timedOut());
|
||||
// Check that cluster state update task was called only once
|
||||
assertEquals(1, snapshotListener.count());
|
||||
|
||||
logger.info("--> close indices");
|
||||
client.admin().indices().prepareClose("test-idx").get();
|
||||
|
||||
BlockingClusterStateListener restoreListener = new BlockingClusterStateListener(clusterService, "restore_snapshot[", "update snapshot state", Priority.HIGH);
|
||||
|
||||
try {
|
||||
clusterService.addFirst(restoreListener);
|
||||
logger.info("--> restore snapshot");
|
||||
ListenableActionFuture<RestoreSnapshotResponse> futureRestore = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute();
|
||||
|
||||
// Await until shard updates are in pending state.
|
||||
assertBusyPendingTasks("update snapshot state", numberOfShards);
|
||||
restoreListener.unblock();
|
||||
|
||||
RestoreSnapshotResponse restoreSnapshotResponse = futureRestore.actionGet();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(numberOfShards));
|
||||
|
||||
} finally {
|
||||
clusterService.remove(restoreListener);
|
||||
}
|
||||
|
||||
// Check that we didn't timeout
|
||||
assertFalse(restoreListener.timedOut());
|
||||
// Check that cluster state update task was called only once
|
||||
assertEquals(1, restoreListener.count());
|
||||
}
|
||||
|
||||
public void testSnapshotName() throws Exception {
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
|
@ -81,6 +82,8 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
private final double randomDataFileIOExceptionRate;
|
||||
|
||||
private final boolean useLuceneCorruptionException;
|
||||
|
||||
private final long maximumNumberOfFailures;
|
||||
|
||||
private final long waitAfterUnblock;
|
||||
|
@ -101,6 +104,7 @@ public class MockRepository extends FsRepository {
|
|||
super(overrideSettings(metadata, environment), environment);
|
||||
randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0);
|
||||
randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
|
||||
useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false);
|
||||
maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L);
|
||||
blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false);
|
||||
blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false);
|
||||
|
@ -245,7 +249,11 @@ public class MockRepository extends FsRepository {
|
|||
if (blobName.startsWith("__")) {
|
||||
if (shouldFail(blobName, randomDataFileIOExceptionRate) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) {
|
||||
logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path());
|
||||
throw new IOException("Random IOException");
|
||||
if (useLuceneCorruptionException) {
|
||||
throw new CorruptIndexException("Random corruption", "random file");
|
||||
} else {
|
||||
throw new IOException("Random IOException");
|
||||
}
|
||||
} else if (blockOnDataFiles) {
|
||||
logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path());
|
||||
if (blockExecution() && waitAfterUnblock > 0) {
|
||||
|
|
Loading…
Reference in New Issue