mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-25 22:36:20 +00:00
This API call in most implementations is fairly IO heavy and slow so it is more natural to be async in the first place. Concretely though, this change is a prerequisite of #49060 since determining the repository generation from the cluster state introduces situations where this call would have to wait for other operations to finish. Doing so in a blocking manner would break `SnapshotResiliencyTests` and waste a thread. Also, this sets up the possibility to in the future make use of async IO where provided by the underlying Repository implementation. In a follow-up `SnapshotsService#getRepositoryData` will be made async as well (did not do it here, since it's another huge change to do so). Note: This change for now does not alter the threading behaviour in any way (since `Repository#getRepositoryData` isn't forking) and is purely mechanical.
This commit is contained in:
parent
cb5169ae37
commit
0acba44a2e
@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.StepListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
@ -41,6 +42,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryCleanupResult;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
@ -167,97 +169,103 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
|
||||
return;
|
||||
}
|
||||
final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
|
||||
final long repositoryStateId = repository.getRepositoryData().getGenId();
|
||||
logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
|
||||
clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']',
|
||||
new ClusterStateUpdateTask() {
|
||||
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
|
||||
repository.getRepositoryData(repositoryDataListener);
|
||||
repositoryDataListener.whenComplete(repositoryData -> {
|
||||
final long repositoryStateId = repositoryData.getGenId();
|
||||
logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
|
||||
clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']',
|
||||
new ClusterStateUpdateTask() {
|
||||
|
||||
private boolean startedCleanup = false;
|
||||
private boolean startedCleanup = false;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
|
||||
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
|
||||
throw new IllegalStateException(
|
||||
"Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in ["
|
||||
+ repositoryCleanupInProgress + "]");
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
final RepositoryCleanupInProgress repositoryCleanupInProgress =
|
||||
currentState.custom(RepositoryCleanupInProgress.TYPE);
|
||||
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
|
||||
throw new IllegalStateException(
|
||||
"Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in ["
|
||||
+ repositoryCleanupInProgress + "]");
|
||||
}
|
||||
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||
throw new IllegalStateException("Cannot cleanup [" + repositoryName
|
||||
+ "] - a snapshot is currently being deleted in [" + deletionsInProgress + "]");
|
||||
}
|
||||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshots != null && !snapshots.entries().isEmpty()) {
|
||||
throw new IllegalStateException(
|
||||
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]");
|
||||
}
|
||||
return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE,
|
||||
new RepositoryCleanupInProgress(
|
||||
RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build();
|
||||
}
|
||||
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||
throw new IllegalStateException("Cannot cleanup [" + repositoryName
|
||||
+ "] - a snapshot is currently being deleted in [" + deletionsInProgress + "]");
|
||||
}
|
||||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshots != null && !snapshots.entries().isEmpty()) {
|
||||
throw new IllegalStateException(
|
||||
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]");
|
||||
}
|
||||
return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE,
|
||||
new RepositoryCleanupInProgress(
|
||||
RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
after(e, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
startedCleanup = true;
|
||||
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
|
||||
l -> blobStoreRepository.cleanup(
|
||||
repositoryStateId,
|
||||
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
|
||||
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
|
||||
}
|
||||
|
||||
private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
|
||||
if (failure == null) {
|
||||
logger.debug("Finished repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId);
|
||||
} else {
|
||||
logger.debug(() -> new ParameterizedMessage(
|
||||
"Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure);
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
after(e, null);
|
||||
}
|
||||
assert failure != null || result != null;
|
||||
if (startedCleanup == false) {
|
||||
logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure);
|
||||
listener.onFailure(failure);
|
||||
return;
|
||||
}
|
||||
clusterService.submitStateUpdateTask(
|
||||
"remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']',
|
||||
new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return removeInProgressCleanup(currentState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
if (failure != null) {
|
||||
e.addSuppressed(failure);
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
startedCleanup = true;
|
||||
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
|
||||
l -> blobStoreRepository.cleanup(
|
||||
repositoryStateId,
|
||||
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
|
||||
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
|
||||
}
|
||||
|
||||
private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
|
||||
if (failure == null) {
|
||||
logger.debug("Finished repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId);
|
||||
} else {
|
||||
logger.debug(() -> new ParameterizedMessage(
|
||||
"Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure);
|
||||
}
|
||||
assert failure != null || result != null;
|
||||
if (startedCleanup == false) {
|
||||
logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure);
|
||||
listener.onFailure(failure);
|
||||
return;
|
||||
}
|
||||
clusterService.submitStateUpdateTask(
|
||||
"remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']',
|
||||
new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return removeInProgressCleanup(currentState);
|
||||
}
|
||||
logger.warn(() ->
|
||||
new ParameterizedMessage("[{}] failed to remove repository cleanup task", repositoryName), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (failure == null) {
|
||||
logger.info("Done with repository cleanup on [{}][{}] with result [{}]",
|
||||
repositoryName, repositoryStateId, result);
|
||||
listener.onResponse(result);
|
||||
} else {
|
||||
logger.warn(() -> new ParameterizedMessage("Failed to run repository cleanup operations on [{}][{}]",
|
||||
repositoryName, repositoryStateId), failure);
|
||||
listener.onFailure(failure);
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
if (failure != null) {
|
||||
e.addSuppressed(failure);
|
||||
}
|
||||
logger.warn(() ->
|
||||
new ParameterizedMessage("[{}] failed to remove repository cleanup task", repositoryName), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (failure == null) {
|
||||
logger.info("Done with repository cleanup on [{}][{}] with result [{}]",
|
||||
repositoryName, repositoryStateId, result);
|
||||
listener.onResponse(result);
|
||||
} else {
|
||||
logger.warn(() -> new ParameterizedMessage(
|
||||
"Failed to run repository cleanup operations on [{}][{}]",
|
||||
repositoryName, repositoryStateId), failure);
|
||||
listener.onFailure(failure);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}, listener::onFailure);
|
||||
}
|
||||
}
|
||||
|
@ -483,15 +483,21 @@ final class StoreRecovery {
|
||||
translogState.totalOperations(0);
|
||||
translogState.totalOperationsOnStart(0);
|
||||
indexShard.prepareForIndexRecovery();
|
||||
ShardId snapshotShardId = shardId;
|
||||
final ShardId snapshotShardId;
|
||||
final String indexName = restoreSource.index();
|
||||
if (!shardId.getIndexName().equals(indexName)) {
|
||||
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
|
||||
} else {
|
||||
snapshotShardId = shardId;
|
||||
}
|
||||
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
|
||||
assert indexShard.getEngineOrNull() == null;
|
||||
repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId,
|
||||
indexShard.recoveryState(), restoreListener);
|
||||
repository.getRepositoryData(ActionListener.wrap(
|
||||
repositoryData -> {
|
||||
final IndexId indexId = repositoryData.resolveIndexId(indexName);
|
||||
assert indexShard.getEngineOrNull() == null;
|
||||
repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId,
|
||||
indexShard.recoveryState(), restoreListener);
|
||||
}, restoreListener::onFailure
|
||||
));
|
||||
} catch (Exception e) {
|
||||
restoreListener.onFailure(e);
|
||||
}
|
||||
|
@ -68,8 +68,8 @@ public class FilterRepository implements Repository {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositoryData getRepositoryData() {
|
||||
return in.getRepositoryData();
|
||||
public void getRepositoryData(ActionListener<RepositoryData> listener) {
|
||||
in.getRepositoryData(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -105,7 +105,7 @@ public interface Repository extends LifecycleComponent {
|
||||
* and the indices across all snapshots found in the repository. Throws a {@link RepositoryException}
|
||||
* if there was an error in reading the data.
|
||||
*/
|
||||
RepositoryData getRepositoryData();
|
||||
void getRepositoryData(ActionListener<RepositoryData> listener);
|
||||
|
||||
/**
|
||||
* Starts snapshotting process
|
||||
|
@ -116,6 +116,7 @@ import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -758,11 +759,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
// directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION
|
||||
// If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened
|
||||
// when writing the index-${N} to each shard directory.
|
||||
final Consumer<Exception> onUpdateFailure =
|
||||
e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e));
|
||||
final ActionListener<SnapshotInfo> allMetaListener = new GroupedActionListener<>(
|
||||
ActionListener.wrap(snapshotInfos -> {
|
||||
assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
|
||||
final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
|
||||
final RepositoryData existingRepositoryData = getRepositoryData();
|
||||
assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
|
||||
final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
|
||||
getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
|
||||
final RepositoryData updatedRepositoryData =
|
||||
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations);
|
||||
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens);
|
||||
@ -770,9 +773,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
|
||||
}
|
||||
listener.onResponse(snapshotInfo);
|
||||
},
|
||||
e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e))),
|
||||
2 + indices.size());
|
||||
}, onUpdateFailure));
|
||||
}, onUpdateFailure), 2 + indices.size());
|
||||
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
|
||||
|
||||
// We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will
|
||||
@ -937,31 +939,33 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);
|
||||
|
||||
@Override
|
||||
public RepositoryData getRepositoryData() {
|
||||
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
|
||||
while (true) {
|
||||
final long generation;
|
||||
try {
|
||||
generation = latestIndexBlobId();
|
||||
} catch (IOException ioe) {
|
||||
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
|
||||
}
|
||||
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
|
||||
if (genToLoad > generation) {
|
||||
logger.info("Determined repository generation [" + generation
|
||||
+ "] from repository contents but correct generation must be at least [" + genToLoad + "]");
|
||||
}
|
||||
try {
|
||||
return getRepositoryData(genToLoad);
|
||||
} catch (RepositoryException e) {
|
||||
if (genToLoad != latestKnownRepoGen.get()) {
|
||||
logger.warn("Failed to load repository data generation [" + genToLoad +
|
||||
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
|
||||
continue;
|
||||
public void getRepositoryData(ActionListener<RepositoryData> listener) {
|
||||
ActionListener.completeWith(listener, () -> {
|
||||
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
|
||||
while (true) {
|
||||
final long generation;
|
||||
try {
|
||||
generation = latestIndexBlobId();
|
||||
} catch (IOException ioe) {
|
||||
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
|
||||
}
|
||||
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
|
||||
if (genToLoad > generation) {
|
||||
logger.info("Determined repository generation [" + generation
|
||||
+ "] from repository contents but correct generation must be at least [" + genToLoad + "]");
|
||||
}
|
||||
try {
|
||||
return getRepositoryData(genToLoad);
|
||||
} catch (RepositoryException e) {
|
||||
if (genToLoad != latestKnownRepoGen.get()) {
|
||||
logger.warn("Failed to load repository data generation [" + genToLoad +
|
||||
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
|
||||
continue;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private RepositoryData getRepositoryData(long indexGen) {
|
||||
|
@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.StepListener;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
@ -162,7 +163,7 @@ public class RestoreService implements ClusterStateApplier {
|
||||
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
|
||||
clusterService.addStateApplier(this);
|
||||
this.clusterSettings = clusterSettings;
|
||||
this.cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(logger);
|
||||
this.cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -176,366 +177,372 @@ public class RestoreService implements ClusterStateApplier {
|
||||
// Read snapshot info and metadata from the repository
|
||||
final String repositoryName = request.repository();
|
||||
Repository repository = repositoriesService.repository(repositoryName);
|
||||
final RepositoryData repositoryData = repository.getRepositoryData();
|
||||
final String snapshotName = request.snapshot();
|
||||
final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
|
||||
.filter(s -> snapshotName.equals(s.getName())).findFirst();
|
||||
if (matchingSnapshotId.isPresent() == false) {
|
||||
throw new SnapshotRestoreException(repositoryName, snapshotName, "snapshot does not exist");
|
||||
}
|
||||
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
|
||||
repository.getRepositoryData(repositoryDataListener);
|
||||
repositoryDataListener.whenComplete(repositoryData -> {
|
||||
final String snapshotName = request.snapshot();
|
||||
final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
|
||||
.filter(s -> snapshotName.equals(s.getName())).findFirst();
|
||||
if (matchingSnapshotId.isPresent() == false) {
|
||||
throw new SnapshotRestoreException(repositoryName, snapshotName, "snapshot does not exist");
|
||||
}
|
||||
|
||||
final SnapshotId snapshotId = matchingSnapshotId.get();
|
||||
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
|
||||
final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
|
||||
final SnapshotId snapshotId = matchingSnapshotId.get();
|
||||
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
|
||||
final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
|
||||
|
||||
// Make sure that we can restore from this snapshot
|
||||
validateSnapshotRestorable(repositoryName, snapshotInfo);
|
||||
// Make sure that we can restore from this snapshot
|
||||
validateSnapshotRestorable(repositoryName, snapshotInfo);
|
||||
|
||||
// Resolve the indices from the snapshot that need to be restored
|
||||
final List<String> indicesInSnapshot = filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
|
||||
// Resolve the indices from the snapshot that need to be restored
|
||||
final List<String> indicesInSnapshot = filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
|
||||
|
||||
final MetaData.Builder metaDataBuilder;
|
||||
if (request.includeGlobalState()) {
|
||||
metaDataBuilder = MetaData.builder(repository.getSnapshotGlobalMetaData(snapshotId));
|
||||
} else {
|
||||
metaDataBuilder = MetaData.builder();
|
||||
}
|
||||
final MetaData.Builder metaDataBuilder;
|
||||
if (request.includeGlobalState()) {
|
||||
metaDataBuilder = MetaData.builder(repository.getSnapshotGlobalMetaData(snapshotId));
|
||||
} else {
|
||||
metaDataBuilder = MetaData.builder();
|
||||
}
|
||||
|
||||
final List<IndexId> indexIdsInSnapshot = repositoryData.resolveIndices(indicesInSnapshot);
|
||||
for (IndexId indexId : indexIdsInSnapshot) {
|
||||
metaDataBuilder.put(repository.getSnapshotIndexMetaData(snapshotId, indexId), false);
|
||||
}
|
||||
final List<IndexId> indexIdsInSnapshot = repositoryData.resolveIndices(indicesInSnapshot);
|
||||
for (IndexId indexId : indexIdsInSnapshot) {
|
||||
metaDataBuilder.put(repository.getSnapshotIndexMetaData(snapshotId, indexId), false);
|
||||
}
|
||||
|
||||
final MetaData metaData = metaDataBuilder.build();
|
||||
final MetaData metaData = metaDataBuilder.build();
|
||||
|
||||
// Apply renaming on index names, returning a map of names where
|
||||
// the key is the renamed index and the value is the original name
|
||||
final Map<String, String> indices = renamedIndices(request, indicesInSnapshot);
|
||||
// Apply renaming on index names, returning a map of names where
|
||||
// the key is the renamed index and the value is the original name
|
||||
final Map<String, String> indices = renamedIndices(request, indicesInSnapshot);
|
||||
|
||||
// Now we can start the actual restore process by adding shards to be recovered in the cluster state
|
||||
// and updating cluster metadata (global and index) as needed
|
||||
clusterService.submitStateUpdateTask("restore_snapshot[" + snapshotName + ']', new ClusterStateUpdateTask() {
|
||||
String restoreUUID = UUIDs.randomBase64UUID();
|
||||
RestoreInfo restoreInfo = null;
|
||||
// Now we can start the actual restore process by adding shards to be recovered in the cluster state
|
||||
// and updating cluster metadata (global and index) as needed
|
||||
clusterService.submitStateUpdateTask("restore_snapshot[" + snapshotName + ']', new ClusterStateUpdateTask() {
|
||||
final String restoreUUID = UUIDs.randomBase64UUID();
|
||||
RestoreInfo restoreInfo = null;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
|
||||
if (currentState.getNodes().getMinNodeVersion().before(Version.V_7_0_0)) {
|
||||
// Check if another restore process is already running - cannot run two restore processes at the
|
||||
// same time in versions prior to 7.0
|
||||
if (restoreInProgress != null && restoreInProgress.isEmpty() == false) {
|
||||
throw new ConcurrentSnapshotExecutionException(snapshot, "Restore process is already running in this cluster");
|
||||
}
|
||||
}
|
||||
// Check if the snapshot to restore is currently being deleted
|
||||
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(snapshot,
|
||||
"cannot restore a snapshot while a snapshot deletion is in-progress [" +
|
||||
deletionsInProgress.getEntries().get(0).getSnapshot() + "]");
|
||||
}
|
||||
|
||||
// Updating cluster state
|
||||
ClusterState.Builder builder = ClusterState.builder(currentState);
|
||||
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
|
||||
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
|
||||
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards;
|
||||
Set<String> aliases = new HashSet<>();
|
||||
|
||||
if (indices.isEmpty() == false) {
|
||||
// We have some indices to restore
|
||||
ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder();
|
||||
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
|
||||
.minimumIndexCompatibilityVersion();
|
||||
for (Map.Entry<String, String> indexEntry : indices.entrySet()) {
|
||||
String index = indexEntry.getValue();
|
||||
boolean partial = checkPartial(index);
|
||||
SnapshotRecoverySource recoverySource =
|
||||
new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index);
|
||||
String renamedIndexName = indexEntry.getKey();
|
||||
IndexMetaData snapshotIndexMetaData = metaData.index(index);
|
||||
snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData,
|
||||
request.indexSettings(), request.ignoreIndexSettings());
|
||||
try {
|
||||
snapshotIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(snapshotIndexMetaData,
|
||||
minIndexCompatibilityVersion);
|
||||
} catch (Exception ex) {
|
||||
throw new SnapshotRestoreException(snapshot, "cannot restore index [" + index + "] because it cannot be " +
|
||||
"upgraded", ex);
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
|
||||
if (currentState.getNodes().getMinNodeVersion().before(Version.V_7_0_0)) {
|
||||
// Check if another restore process is already running - cannot run two restore processes at the
|
||||
// same time in versions prior to 7.0
|
||||
if (restoreInProgress != null && restoreInProgress.isEmpty() == false) {
|
||||
throw new ConcurrentSnapshotExecutionException(snapshot,
|
||||
"Restore process is already running in this cluster");
|
||||
}
|
||||
// Check that the index is closed or doesn't exist
|
||||
IndexMetaData currentIndexMetaData = currentState.metaData().index(renamedIndexName);
|
||||
IntSet ignoreShards = new IntHashSet();
|
||||
final Index renamedIndex;
|
||||
if (currentIndexMetaData == null) {
|
||||
// Index doesn't exist - create it and start recovery
|
||||
// Make sure that the index we are about to create has a validate name
|
||||
MetaDataCreateIndexService.validateIndexName(renamedIndexName, currentState);
|
||||
createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), false);
|
||||
IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData)
|
||||
.state(IndexMetaData.State.OPEN)
|
||||
.index(renamedIndexName);
|
||||
indexMdBuilder.settings(Settings.builder()
|
||||
.put(snapshotIndexMetaData.getSettings())
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()));
|
||||
MetaDataCreateIndexService.checkShardLimit(snapshotIndexMetaData.getSettings(), currentState);
|
||||
if (!request.includeAliases() && !snapshotIndexMetaData.getAliases().isEmpty()) {
|
||||
// Remove all aliases - they shouldn't be restored
|
||||
indexMdBuilder.removeAllAliases();
|
||||
} else {
|
||||
for (ObjectCursor<String> alias : snapshotIndexMetaData.getAliases().keys()) {
|
||||
aliases.add(alias.value);
|
||||
}
|
||||
// Check if the snapshot to restore is currently being deleted
|
||||
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(snapshot,
|
||||
"cannot restore a snapshot while a snapshot deletion is in-progress [" +
|
||||
deletionsInProgress.getEntries().get(0).getSnapshot() + "]");
|
||||
}
|
||||
|
||||
// Updating cluster state
|
||||
ClusterState.Builder builder = ClusterState.builder(currentState);
|
||||
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
|
||||
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
|
||||
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards;
|
||||
Set<String> aliases = new HashSet<>();
|
||||
|
||||
if (indices.isEmpty() == false) {
|
||||
// We have some indices to restore
|
||||
ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shardsBuilder =
|
||||
ImmutableOpenMap.builder();
|
||||
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
|
||||
.minimumIndexCompatibilityVersion();
|
||||
for (Map.Entry<String, String> indexEntry : indices.entrySet()) {
|
||||
String index = indexEntry.getValue();
|
||||
boolean partial = checkPartial(index);
|
||||
SnapshotRecoverySource recoverySource =
|
||||
new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index);
|
||||
String renamedIndexName = indexEntry.getKey();
|
||||
IndexMetaData snapshotIndexMetaData = metaData.index(index);
|
||||
snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData,
|
||||
request.indexSettings(), request.ignoreIndexSettings());
|
||||
try {
|
||||
snapshotIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(snapshotIndexMetaData,
|
||||
minIndexCompatibilityVersion);
|
||||
} catch (Exception ex) {
|
||||
throw new SnapshotRestoreException(snapshot, "cannot restore index [" + index +
|
||||
"] because it cannot be upgraded", ex);
|
||||
}
|
||||
// Check that the index is closed or doesn't exist
|
||||
IndexMetaData currentIndexMetaData = currentState.metaData().index(renamedIndexName);
|
||||
IntSet ignoreShards = new IntHashSet();
|
||||
final Index renamedIndex;
|
||||
if (currentIndexMetaData == null) {
|
||||
// Index doesn't exist - create it and start recovery
|
||||
// Make sure that the index we are about to create has a validate name
|
||||
MetaDataCreateIndexService.validateIndexName(renamedIndexName, currentState);
|
||||
createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), false);
|
||||
IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData)
|
||||
.state(IndexMetaData.State.OPEN)
|
||||
.index(renamedIndexName);
|
||||
indexMdBuilder.settings(Settings.builder()
|
||||
.put(snapshotIndexMetaData.getSettings())
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()));
|
||||
MetaDataCreateIndexService.checkShardLimit(snapshotIndexMetaData.getSettings(), currentState);
|
||||
if (!request.includeAliases() && !snapshotIndexMetaData.getAliases().isEmpty()) {
|
||||
// Remove all aliases - they shouldn't be restored
|
||||
indexMdBuilder.removeAllAliases();
|
||||
} else {
|
||||
for (ObjectCursor<String> alias : snapshotIndexMetaData.getAliases().keys()) {
|
||||
aliases.add(alias.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
IndexMetaData updatedIndexMetaData = indexMdBuilder.build();
|
||||
if (partial) {
|
||||
populateIgnoredShards(index, ignoreShards);
|
||||
}
|
||||
rtBuilder.addAsNewRestore(updatedIndexMetaData, recoverySource, ignoreShards);
|
||||
blocks.addBlocks(updatedIndexMetaData);
|
||||
mdBuilder.put(updatedIndexMetaData, true);
|
||||
renamedIndex = updatedIndexMetaData.getIndex();
|
||||
} else {
|
||||
validateExistingIndex(currentIndexMetaData, snapshotIndexMetaData, renamedIndexName, partial);
|
||||
// Index exists and it's closed - open it in metadata and start recovery
|
||||
IndexMetaData.Builder indexMdBuilder =
|
||||
IndexMetaData updatedIndexMetaData = indexMdBuilder.build();
|
||||
if (partial) {
|
||||
populateIgnoredShards(index, ignoreShards);
|
||||
}
|
||||
rtBuilder.addAsNewRestore(updatedIndexMetaData, recoverySource, ignoreShards);
|
||||
blocks.addBlocks(updatedIndexMetaData);
|
||||
mdBuilder.put(updatedIndexMetaData, true);
|
||||
renamedIndex = updatedIndexMetaData.getIndex();
|
||||
} else {
|
||||
validateExistingIndex(currentIndexMetaData, snapshotIndexMetaData, renamedIndexName, partial);
|
||||
// Index exists and it's closed - open it in metadata and start recovery
|
||||
IndexMetaData.Builder indexMdBuilder =
|
||||
IndexMetaData.builder(snapshotIndexMetaData).state(IndexMetaData.State.OPEN);
|
||||
indexMdBuilder.version(
|
||||
indexMdBuilder.version(
|
||||
Math.max(snapshotIndexMetaData.getVersion(), 1 + currentIndexMetaData.getVersion()));
|
||||
indexMdBuilder.mappingVersion(
|
||||
indexMdBuilder.mappingVersion(
|
||||
Math.max(snapshotIndexMetaData.getMappingVersion(), 1 + currentIndexMetaData.getMappingVersion()));
|
||||
indexMdBuilder.settingsVersion(
|
||||
indexMdBuilder.settingsVersion(
|
||||
Math.max(
|
||||
snapshotIndexMetaData.getSettingsVersion(),
|
||||
1 + currentIndexMetaData.getSettingsVersion()));
|
||||
indexMdBuilder.aliasesVersion(
|
||||
snapshotIndexMetaData.getSettingsVersion(),
|
||||
1 + currentIndexMetaData.getSettingsVersion()));
|
||||
indexMdBuilder.aliasesVersion(
|
||||
Math.max(snapshotIndexMetaData.getAliasesVersion(), 1 + currentIndexMetaData.getAliasesVersion()));
|
||||
|
||||
for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) {
|
||||
indexMdBuilder.primaryTerm(shard,
|
||||
Math.max(snapshotIndexMetaData.primaryTerm(shard), currentIndexMetaData.primaryTerm(shard)));
|
||||
}
|
||||
|
||||
if (!request.includeAliases()) {
|
||||
// Remove all snapshot aliases
|
||||
if (!snapshotIndexMetaData.getAliases().isEmpty()) {
|
||||
indexMdBuilder.removeAllAliases();
|
||||
}
|
||||
/// Add existing aliases
|
||||
for (ObjectCursor<AliasMetaData> alias : currentIndexMetaData.getAliases().values()) {
|
||||
indexMdBuilder.putAlias(alias.value);
|
||||
}
|
||||
} else {
|
||||
for (ObjectCursor<String> alias : snapshotIndexMetaData.getAliases().keys()) {
|
||||
aliases.add(alias.value);
|
||||
}
|
||||
}
|
||||
indexMdBuilder.settings(Settings.builder()
|
||||
.put(snapshotIndexMetaData.getSettings())
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID,
|
||||
currentIndexMetaData.getIndexUUID()));
|
||||
IndexMetaData updatedIndexMetaData = indexMdBuilder.index(renamedIndexName).build();
|
||||
rtBuilder.addAsRestore(updatedIndexMetaData, recoverySource);
|
||||
blocks.updateBlocks(updatedIndexMetaData);
|
||||
mdBuilder.put(updatedIndexMetaData, true);
|
||||
renamedIndex = updatedIndexMetaData.getIndex();
|
||||
}
|
||||
|
||||
for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) {
|
||||
indexMdBuilder.primaryTerm(shard,
|
||||
Math.max(snapshotIndexMetaData.primaryTerm(shard), currentIndexMetaData.primaryTerm(shard)));
|
||||
}
|
||||
|
||||
if (!request.includeAliases()) {
|
||||
// Remove all snapshot aliases
|
||||
if (!snapshotIndexMetaData.getAliases().isEmpty()) {
|
||||
indexMdBuilder.removeAllAliases();
|
||||
}
|
||||
/// Add existing aliases
|
||||
for (ObjectCursor<AliasMetaData> alias : currentIndexMetaData.getAliases().values()) {
|
||||
indexMdBuilder.putAlias(alias.value);
|
||||
}
|
||||
} else {
|
||||
for (ObjectCursor<String> alias : snapshotIndexMetaData.getAliases().keys()) {
|
||||
aliases.add(alias.value);
|
||||
}
|
||||
}
|
||||
indexMdBuilder.settings(Settings.builder()
|
||||
.put(snapshotIndexMetaData.getSettings())
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID,
|
||||
currentIndexMetaData.getIndexUUID()));
|
||||
IndexMetaData updatedIndexMetaData = indexMdBuilder.index(renamedIndexName).build();
|
||||
rtBuilder.addAsRestore(updatedIndexMetaData, recoverySource);
|
||||
blocks.updateBlocks(updatedIndexMetaData);
|
||||
mdBuilder.put(updatedIndexMetaData, true);
|
||||
renamedIndex = updatedIndexMetaData.getIndex();
|
||||
}
|
||||
|
||||
for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) {
|
||||
if (!ignoreShards.contains(shard)) {
|
||||
shardsBuilder.put(new ShardId(renamedIndex, shard),
|
||||
if (!ignoreShards.contains(shard)) {
|
||||
shardsBuilder.put(new ShardId(renamedIndex, shard),
|
||||
new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId()));
|
||||
} else {
|
||||
shardsBuilder.put(new ShardId(renamedIndex, shard),
|
||||
} else {
|
||||
shardsBuilder.put(new ShardId(renamedIndex, shard),
|
||||
new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(),
|
||||
RestoreInProgress.State.FAILURE));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
shards = shardsBuilder.build();
|
||||
RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(
|
||||
restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards),
|
||||
Collections.unmodifiableList(new ArrayList<>(indices.keySet())),
|
||||
shards
|
||||
);
|
||||
RestoreInProgress.Builder restoreInProgressBuilder;
|
||||
if (restoreInProgress != null) {
|
||||
restoreInProgressBuilder = new RestoreInProgress.Builder(restoreInProgress);
|
||||
} else {
|
||||
restoreInProgressBuilder = new RestoreInProgress.Builder();
|
||||
}
|
||||
builder.putCustom(RestoreInProgress.TYPE, restoreInProgressBuilder.add(restoreEntry).build());
|
||||
} else {
|
||||
shards = ImmutableOpenMap.of();
|
||||
}
|
||||
|
||||
checkAliasNameConflicts(indices, aliases);
|
||||
|
||||
// Restore global state if needed
|
||||
if (request.includeGlobalState()) {
|
||||
if (metaData.persistentSettings() != null) {
|
||||
Settings settings = metaData.persistentSettings();
|
||||
clusterSettings.validateUpdate(settings);
|
||||
mdBuilder.persistentSettings(settings);
|
||||
}
|
||||
if (metaData.templates() != null) {
|
||||
// TODO: Should all existing templates be deleted first?
|
||||
for (ObjectCursor<IndexTemplateMetaData> cursor : metaData.templates().values()) {
|
||||
mdBuilder.put(cursor.value);
|
||||
}
|
||||
}
|
||||
if (metaData.customs() != null) {
|
||||
for (ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
|
||||
if (!RepositoriesMetaData.TYPE.equals(cursor.key)) {
|
||||
// Don't restore repositories while we are working with them
|
||||
// TODO: Should we restore them at the end?
|
||||
mdBuilder.putCustom(cursor.key, cursor.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (completed(shards)) {
|
||||
// We don't have any indices to restore - we are done
|
||||
restoreInfo = new RestoreInfo(snapshotId.getName(),
|
||||
Collections.unmodifiableList(new ArrayList<>(indices.keySet())),
|
||||
shards.size(),
|
||||
shards.size() - failedShards(shards));
|
||||
}
|
||||
|
||||
RoutingTable rt = rtBuilder.build();
|
||||
ClusterState updatedState = builder.metaData(mdBuilder).blocks(blocks).routingTable(rt).build();
|
||||
return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]");
|
||||
}
|
||||
|
||||
private void checkAliasNameConflicts(Map<String, String> renamedIndices, Set<String> aliases) {
|
||||
for (Map.Entry<String, String> renamedIndex : renamedIndices.entrySet()) {
|
||||
if (aliases.contains(renamedIndex.getKey())) {
|
||||
throw new SnapshotRestoreException(snapshot,
|
||||
"cannot rename index [" + renamedIndex.getValue() + "] into [" + renamedIndex.getKey() + "] because of " +
|
||||
"conflict with an alias with the same name");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void populateIgnoredShards(String index, IntSet ignoreShards) {
|
||||
for (SnapshotShardFailure failure : snapshotInfo.shardFailures()) {
|
||||
if (index.equals(failure.index())) {
|
||||
ignoreShards.add(failure.shardId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkPartial(String index) {
|
||||
// Make sure that index was fully snapshotted
|
||||
if (failed(snapshotInfo, index)) {
|
||||
if (request.partial()) {
|
||||
return true;
|
||||
} else {
|
||||
throw new SnapshotRestoreException(snapshot, "index [" + index + "] wasn't fully snapshotted - cannot " +
|
||||
"restore");
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void validateExistingIndex(IndexMetaData currentIndexMetaData, IndexMetaData snapshotIndexMetaData,
|
||||
String renamedIndex, boolean partial) {
|
||||
// Index exist - checking that it's closed
|
||||
if (currentIndexMetaData.getState() != IndexMetaData.State.CLOSE) {
|
||||
// TODO: Enable restore for open indices
|
||||
throw new SnapshotRestoreException(snapshot, "cannot restore index [" + renamedIndex + "] because an open index " +
|
||||
"with same name already exists in the cluster. Either close or delete the existing index or restore the " +
|
||||
"index under a different name by providing a rename pattern and replacement name");
|
||||
}
|
||||
// Index exist - checking if it's partial restore
|
||||
if (partial) {
|
||||
throw new SnapshotRestoreException(snapshot, "cannot restore partial index [" + renamedIndex + "] because such " +
|
||||
"index already exists");
|
||||
}
|
||||
// Make sure that the number of shards is the same. That's the only thing that we cannot change
|
||||
if (currentIndexMetaData.getNumberOfShards() != snapshotIndexMetaData.getNumberOfShards()) {
|
||||
throw new SnapshotRestoreException(snapshot,
|
||||
"cannot restore index [" + renamedIndex + "] with [" + currentIndexMetaData.getNumberOfShards() + "] shards " +
|
||||
"from a snapshot of index [" + snapshotIndexMetaData.getIndex().getName() + "] with [" +
|
||||
snapshotIndexMetaData.getNumberOfShards() + "] shards");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Optionally updates index settings in indexMetaData by removing settings listed in ignoreSettings and
|
||||
* merging them with settings in changeSettings.
|
||||
*/
|
||||
private IndexMetaData updateIndexSettings(IndexMetaData indexMetaData, Settings changeSettings, String[] ignoreSettings) {
|
||||
Settings normalizedChangeSettings = Settings.builder()
|
||||
.put(changeSettings)
|
||||
.normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX)
|
||||
.build();
|
||||
IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData);
|
||||
Settings settings = indexMetaData.getSettings();
|
||||
Set<String> keyFilters = new HashSet<>();
|
||||
List<String> simpleMatchPatterns = new ArrayList<>();
|
||||
for (String ignoredSetting : ignoreSettings) {
|
||||
if (!Regex.isSimpleMatchPattern(ignoredSetting)) {
|
||||
if (UNREMOVABLE_SETTINGS.contains(ignoredSetting)) {
|
||||
throw new SnapshotRestoreException(snapshot, "cannot remove setting [" + ignoredSetting + "] on restore");
|
||||
shards = shardsBuilder.build();
|
||||
RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(
|
||||
restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards),
|
||||
Collections.unmodifiableList(new ArrayList<>(indices.keySet())),
|
||||
shards
|
||||
);
|
||||
RestoreInProgress.Builder restoreInProgressBuilder;
|
||||
if (restoreInProgress != null) {
|
||||
restoreInProgressBuilder = new RestoreInProgress.Builder(restoreInProgress);
|
||||
} else {
|
||||
keyFilters.add(ignoredSetting);
|
||||
restoreInProgressBuilder = new RestoreInProgress.Builder();
|
||||
}
|
||||
builder.putCustom(RestoreInProgress.TYPE, restoreInProgressBuilder.add(restoreEntry).build());
|
||||
} else {
|
||||
simpleMatchPatterns.add(ignoredSetting);
|
||||
shards = ImmutableOpenMap.of();
|
||||
}
|
||||
|
||||
checkAliasNameConflicts(indices, aliases);
|
||||
|
||||
// Restore global state if needed
|
||||
if (request.includeGlobalState()) {
|
||||
if (metaData.persistentSettings() != null) {
|
||||
Settings settings = metaData.persistentSettings();
|
||||
clusterSettings.validateUpdate(settings);
|
||||
mdBuilder.persistentSettings(settings);
|
||||
}
|
||||
if (metaData.templates() != null) {
|
||||
// TODO: Should all existing templates be deleted first?
|
||||
for (ObjectCursor<IndexTemplateMetaData> cursor : metaData.templates().values()) {
|
||||
mdBuilder.put(cursor.value);
|
||||
}
|
||||
}
|
||||
if (metaData.customs() != null) {
|
||||
for (ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
|
||||
if (!RepositoriesMetaData.TYPE.equals(cursor.key)) {
|
||||
// Don't restore repositories while we are working with them
|
||||
// TODO: Should we restore them at the end?
|
||||
mdBuilder.putCustom(cursor.key, cursor.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (completed(shards)) {
|
||||
// We don't have any indices to restore - we are done
|
||||
restoreInfo = new RestoreInfo(snapshotId.getName(),
|
||||
Collections.unmodifiableList(new ArrayList<>(indices.keySet())),
|
||||
shards.size(),
|
||||
shards.size() - failedShards(shards));
|
||||
}
|
||||
|
||||
RoutingTable rt = rtBuilder.build();
|
||||
ClusterState updatedState = builder.metaData(mdBuilder).blocks(blocks).routingTable(rt).build();
|
||||
return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]");
|
||||
}
|
||||
|
||||
private void checkAliasNameConflicts(Map<String, String> renamedIndices, Set<String> aliases) {
|
||||
for (Map.Entry<String, String> renamedIndex : renamedIndices.entrySet()) {
|
||||
if (aliases.contains(renamedIndex.getKey())) {
|
||||
throw new SnapshotRestoreException(snapshot,
|
||||
"cannot rename index [" + renamedIndex.getValue() + "] into [" + renamedIndex.getKey()
|
||||
+ "] because of conflict with an alias with the same name");
|
||||
}
|
||||
}
|
||||
}
|
||||
Predicate<String> settingsFilter = k -> {
|
||||
if (UNREMOVABLE_SETTINGS.contains(k) == false) {
|
||||
for (String filterKey : keyFilters) {
|
||||
if (k.equals(filterKey)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for (String pattern : simpleMatchPatterns) {
|
||||
if (Regex.simpleMatch(pattern, k)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
private void populateIgnoredShards(String index, IntSet ignoreShards) {
|
||||
for (SnapshotShardFailure failure : snapshotInfo.shardFailures()) {
|
||||
if (index.equals(failure.index())) {
|
||||
ignoreShards.add(failure.shardId());
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
Settings.Builder settingsBuilder = Settings.builder()
|
||||
.put(settings.filter(settingsFilter))
|
||||
.put(normalizedChangeSettings.filter(k -> {
|
||||
if (UNMODIFIABLE_SETTINGS.contains(k)) {
|
||||
throw new SnapshotRestoreException(snapshot, "cannot modify setting [" + k + "] on restore");
|
||||
} else {
|
||||
}
|
||||
|
||||
private boolean checkPartial(String index) {
|
||||
// Make sure that index was fully snapshotted
|
||||
if (failed(snapshotInfo, index)) {
|
||||
if (request.partial()) {
|
||||
return true;
|
||||
} else {
|
||||
throw new SnapshotRestoreException(snapshot, "index [" + index + "] wasn't fully snapshotted - cannot " +
|
||||
"restore");
|
||||
}
|
||||
}));
|
||||
settingsBuilder.remove(MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey());
|
||||
return builder.settings(settingsBuilder).build();
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
private void validateExistingIndex(IndexMetaData currentIndexMetaData, IndexMetaData snapshotIndexMetaData,
|
||||
String renamedIndex, boolean partial) {
|
||||
// Index exist - checking that it's closed
|
||||
if (currentIndexMetaData.getState() != IndexMetaData.State.CLOSE) {
|
||||
// TODO: Enable restore for open indices
|
||||
throw new SnapshotRestoreException(snapshot, "cannot restore index [" + renamedIndex
|
||||
+ "] because an open index " +
|
||||
"with same name already exists in the cluster. Either close or delete the existing index or restore the " +
|
||||
"index under a different name by providing a rename pattern and replacement name");
|
||||
}
|
||||
// Index exist - checking if it's partial restore
|
||||
if (partial) {
|
||||
throw new SnapshotRestoreException(snapshot, "cannot restore partial index [" + renamedIndex
|
||||
+ "] because such index already exists");
|
||||
}
|
||||
// Make sure that the number of shards is the same. That's the only thing that we cannot change
|
||||
if (currentIndexMetaData.getNumberOfShards() != snapshotIndexMetaData.getNumberOfShards()) {
|
||||
throw new SnapshotRestoreException(snapshot,
|
||||
"cannot restore index [" + renamedIndex + "] with [" + currentIndexMetaData.getNumberOfShards()
|
||||
+ "] shards from a snapshot of index [" + snapshotIndexMetaData.getIndex().getName() + "] with [" +
|
||||
snapshotIndexMetaData.getNumberOfShards() + "] shards");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return request.masterNodeTimeout();
|
||||
}
|
||||
/**
|
||||
* Optionally updates index settings in indexMetaData by removing settings listed in ignoreSettings and
|
||||
* merging them with settings in changeSettings.
|
||||
*/
|
||||
private IndexMetaData updateIndexSettings(IndexMetaData indexMetaData, Settings changeSettings,
|
||||
String[] ignoreSettings) {
|
||||
Settings normalizedChangeSettings = Settings.builder()
|
||||
.put(changeSettings)
|
||||
.normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX)
|
||||
.build();
|
||||
IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData);
|
||||
Settings settings = indexMetaData.getSettings();
|
||||
Set<String> keyFilters = new HashSet<>();
|
||||
List<String> simpleMatchPatterns = new ArrayList<>();
|
||||
for (String ignoredSetting : ignoreSettings) {
|
||||
if (!Regex.isSimpleMatchPattern(ignoredSetting)) {
|
||||
if (UNREMOVABLE_SETTINGS.contains(ignoredSetting)) {
|
||||
throw new SnapshotRestoreException(
|
||||
snapshot, "cannot remove setting [" + ignoredSetting + "] on restore");
|
||||
} else {
|
||||
keyFilters.add(ignoredSetting);
|
||||
}
|
||||
} else {
|
||||
simpleMatchPatterns.add(ignoredSetting);
|
||||
}
|
||||
}
|
||||
Predicate<String> settingsFilter = k -> {
|
||||
if (UNREMOVABLE_SETTINGS.contains(k) == false) {
|
||||
for (String filterKey : keyFilters) {
|
||||
if (k.equals(filterKey)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for (String pattern : simpleMatchPatterns) {
|
||||
if (Regex.simpleMatch(pattern, k)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
Settings.Builder settingsBuilder = Settings.builder()
|
||||
.put(settings.filter(settingsFilter))
|
||||
.put(normalizedChangeSettings.filter(k -> {
|
||||
if (UNMODIFIABLE_SETTINGS.contains(k)) {
|
||||
throw new SnapshotRestoreException(snapshot, "cannot modify setting [" + k + "] on restore");
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}));
|
||||
settingsBuilder.remove(MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey());
|
||||
return builder.settings(settingsBuilder).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(new RestoreCompletionResponse(restoreUUID, snapshot, restoreInfo));
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return request.masterNodeTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(new RestoreCompletionResponse(restoreUUID, snapshot, restoreInfo));
|
||||
}
|
||||
});
|
||||
}, listener::onFailure);
|
||||
} catch (Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot",
|
||||
request.repository() + ":" + request.snapshot()), e);
|
||||
@ -599,7 +606,8 @@ public class RestoreService implements ClusterStateApplier {
|
||||
}
|
||||
|
||||
public static class RestoreInProgressUpdater extends RoutingChangesObserver.AbstractRoutingChangesObserver {
|
||||
private final Map<String, Updates> shardChanges = new HashMap<>();
|
||||
// Map of RestoreUUID to a of changes to the shards' restore statuses
|
||||
private final Map<String, Map<ShardId, ShardRestoreStatus>> shardChanges = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
|
||||
@ -607,7 +615,7 @@ public class RestoreService implements ClusterStateApplier {
|
||||
if (initializingShard.primary()) {
|
||||
RecoverySource recoverySource = initializingShard.recoverySource();
|
||||
if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
|
||||
changes(recoverySource).shards.put(
|
||||
changes(recoverySource).put(
|
||||
initializingShard.shardId(),
|
||||
new ShardRestoreStatus(initializingShard.currentNodeId(), RestoreInProgress.State.SUCCESS));
|
||||
}
|
||||
@ -623,7 +631,7 @@ public class RestoreService implements ClusterStateApplier {
|
||||
// to restore this shard on another node if the snapshot files are corrupt. In case where a node just left or crashed,
|
||||
// however, we only want to acknowledge the restore operation once it has been successfully restored on another node.
|
||||
if (unassignedInfo.getFailure() != null && Lucene.isCorruptionException(unassignedInfo.getFailure().getCause())) {
|
||||
changes(recoverySource).shards.put(
|
||||
changes(recoverySource).put(
|
||||
failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(),
|
||||
RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage()));
|
||||
}
|
||||
@ -636,7 +644,7 @@ public class RestoreService implements ClusterStateApplier {
|
||||
// if we force an empty primary, we should also fail the restore entry
|
||||
if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT &&
|
||||
initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) {
|
||||
changes(unassignedShard.recoverySource()).shards.put(
|
||||
changes(unassignedShard.recoverySource()).put(
|
||||
unassignedShard.shardId(),
|
||||
new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE,
|
||||
"recovery source type changed from snapshot to " + initializedShard.recoverySource())
|
||||
@ -650,7 +658,7 @@ public class RestoreService implements ClusterStateApplier {
|
||||
if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
|
||||
if (newUnassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO) {
|
||||
String reason = "shard could not be allocated to any of the nodes";
|
||||
changes(recoverySource).shards.put(
|
||||
changes(recoverySource).put(
|
||||
unassignedShard.shardId(),
|
||||
new ShardRestoreStatus(unassignedShard.currentNodeId(), RestoreInProgress.State.FAILURE, reason));
|
||||
}
|
||||
@ -661,24 +669,20 @@ public class RestoreService implements ClusterStateApplier {
|
||||
* Helper method that creates update entry for the given recovery source's restore uuid
|
||||
* if such an entry does not exist yet.
|
||||
*/
|
||||
private Updates changes(RecoverySource recoverySource) {
|
||||
private Map<ShardId, ShardRestoreStatus> changes(RecoverySource recoverySource) {
|
||||
assert recoverySource.getType() == RecoverySource.Type.SNAPSHOT;
|
||||
return shardChanges.computeIfAbsent(((SnapshotRecoverySource) recoverySource).restoreUUID(), k -> new Updates());
|
||||
}
|
||||
|
||||
private static class Updates {
|
||||
private Map<ShardId, ShardRestoreStatus> shards = new HashMap<>();
|
||||
return shardChanges.computeIfAbsent(((SnapshotRecoverySource) recoverySource).restoreUUID(), k -> new HashMap<>());
|
||||
}
|
||||
|
||||
public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) {
|
||||
if (shardChanges.isEmpty() == false) {
|
||||
RestoreInProgress.Builder builder = new RestoreInProgress.Builder();
|
||||
for (RestoreInProgress.Entry entry : oldRestore) {
|
||||
Updates updates = shardChanges.get(entry.uuid());
|
||||
Map<ShardId, ShardRestoreStatus> updates = shardChanges.get(entry.uuid());
|
||||
ImmutableOpenMap<ShardId, ShardRestoreStatus> shardStates = entry.shards();
|
||||
if (updates != null && updates.shards.isEmpty() == false) {
|
||||
if (updates != null && updates.isEmpty() == false) {
|
||||
ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder(shardStates);
|
||||
for (Map.Entry<ShardId, ShardRestoreStatus> shard : updates.shards.entrySet()) {
|
||||
for (Map.Entry<ShardId, ShardRestoreStatus> shard : updates.entrySet()) {
|
||||
ShardId shardId = shard.getKey();
|
||||
ShardRestoreStatus status = shardStates.get(shardId);
|
||||
if (status == null || status.state().completed() == false) {
|
||||
@ -724,14 +728,8 @@ public class RestoreService implements ClusterStateApplier {
|
||||
}
|
||||
}
|
||||
|
||||
private final Logger logger;
|
||||
|
||||
CleanRestoreStateTaskExecutor(Logger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
|
||||
public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) {
|
||||
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
|
||||
Set<String> completedRestores = tasks.stream().map(e -> e.uuid).collect(Collectors.toSet());
|
||||
RestoreInProgress.Builder restoreInProgressBuilder = new RestoreInProgress.Builder();
|
||||
@ -786,8 +784,8 @@ public class RestoreService implements ClusterStateApplier {
|
||||
}
|
||||
}
|
||||
|
||||
public static RestoreInProgress.State overallState(RestoreInProgress.State nonCompletedState,
|
||||
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
|
||||
private static RestoreInProgress.State overallState(RestoreInProgress.State nonCompletedState,
|
||||
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
|
||||
boolean hasFailed = false;
|
||||
for (ObjectCursor<RestoreInProgress.ShardRestoreStatus> status : shards.values()) {
|
||||
if (!status.value.state().completed()) {
|
||||
@ -823,7 +821,7 @@ public class RestoreService implements ClusterStateApplier {
|
||||
return failedShards;
|
||||
}
|
||||
|
||||
private Map<String, String> renamedIndices(RestoreSnapshotRequest request, List<String> filteredIndices) {
|
||||
private static Map<String, String> renamedIndices(RestoreSnapshotRequest request, List<String> filteredIndices) {
|
||||
Map<String, String> renamedIndices = new HashMap<>();
|
||||
for (String index : filteredIndices) {
|
||||
String renamedIndex = index;
|
||||
@ -845,7 +843,7 @@ public class RestoreService implements ClusterStateApplier {
|
||||
* @param repository repository name
|
||||
* @param snapshotInfo snapshot metadata
|
||||
*/
|
||||
private void validateSnapshotRestorable(final String repository, final SnapshotInfo snapshotInfo) {
|
||||
private static void validateSnapshotRestorable(final String repository, final SnapshotInfo snapshotInfo) {
|
||||
if (!snapshotInfo.state().restorable()) {
|
||||
throw new SnapshotRestoreException(new Snapshot(repository, snapshotInfo.snapshotId()),
|
||||
"unsupported snapshot state [" + snapshotInfo.state() + "]");
|
||||
@ -857,7 +855,7 @@ public class RestoreService implements ClusterStateApplier {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean failed(SnapshotInfo snapshot, String index) {
|
||||
private static boolean failed(SnapshotInfo snapshot, String index) {
|
||||
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
|
||||
if (index.equals(failure.index())) {
|
||||
return true;
|
||||
|
@ -29,7 +29,9 @@ import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.StepListener;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateApplier;
|
||||
@ -166,7 +168,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
public RepositoryData getRepositoryData(final String repositoryName) {
|
||||
Repository repository = repositoriesService.repository(repositoryName);
|
||||
assert repository != null; // should only be called once we've validated the repository exists
|
||||
return repository.getRepositoryData();
|
||||
return PlainActionFuture.get(repository::getRepositoryData);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -266,86 +268,88 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
|
||||
validate(repositoryName, snapshotName);
|
||||
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
|
||||
final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();
|
||||
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
|
||||
repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener);
|
||||
repositoryDataListener.whenComplete(repositoryData -> {
|
||||
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
|
||||
|
||||
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
|
||||
private SnapshotsInProgress.Entry newSnapshot = null;
|
||||
|
||||
private SnapshotsInProgress.Entry newSnapshot = null;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
validate(repositoryName, snapshotName, currentState);
|
||||
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
|
||||
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
validate(repositoryName, snapshotName, currentState);
|
||||
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
|
||||
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
|
||||
}
|
||||
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
|
||||
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
|
||||
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
|
||||
}
|
||||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshots == null || snapshots.entries().isEmpty()) {
|
||||
// Store newSnapshot here to be processed in clusterStateProcessed
|
||||
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
|
||||
request.indicesOptions(), request.indices()));
|
||||
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
|
||||
List<IndexId> snapshotIndices = repositoryData.resolveNewIndices(indices);
|
||||
newSnapshot = new SnapshotsInProgress.Entry(
|
||||
new Snapshot(repositoryName, snapshotId),
|
||||
request.includeGlobalState(), request.partial(),
|
||||
State.INIT,
|
||||
snapshotIndices,
|
||||
threadPool.absoluteTimeInMillis(),
|
||||
repositoryData.getGenId(),
|
||||
null,
|
||||
request.userMetadata(),
|
||||
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
|
||||
initializingSnapshots.add(newSnapshot.snapshot());
|
||||
snapshots = new SnapshotsInProgress(newSnapshot);
|
||||
} else {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
|
||||
}
|
||||
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
|
||||
}
|
||||
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
|
||||
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
|
||||
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
|
||||
}
|
||||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshots == null || snapshots.entries().isEmpty()) {
|
||||
// Store newSnapshot here to be processed in clusterStateProcessed
|
||||
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
|
||||
request.indicesOptions(), request.indices()));
|
||||
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
|
||||
List<IndexId> snapshotIndices = repositoryData.resolveNewIndices(indices);
|
||||
newSnapshot = new SnapshotsInProgress.Entry(
|
||||
new Snapshot(repositoryName, snapshotId),
|
||||
request.includeGlobalState(), request.partial(),
|
||||
State.INIT,
|
||||
snapshotIndices,
|
||||
threadPool.absoluteTimeInMillis(),
|
||||
repositoryData.getGenId(),
|
||||
null,
|
||||
request.userMetadata(),
|
||||
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
|
||||
initializingSnapshots.add(newSnapshot.snapshot());
|
||||
snapshots = new SnapshotsInProgress(newSnapshot);
|
||||
} else {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
|
||||
}
|
||||
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
|
||||
if (newSnapshot != null) {
|
||||
initializingSnapshots.remove(newSnapshot.snapshot());
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
|
||||
if (newSnapshot != null) {
|
||||
initializingSnapshots.remove(newSnapshot.snapshot());
|
||||
}
|
||||
newSnapshot = null;
|
||||
listener.onFailure(e);
|
||||
}
|
||||
newSnapshot = null;
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
|
||||
if (newSnapshot != null) {
|
||||
final Snapshot current = newSnapshot.snapshot();
|
||||
assert initializingSnapshots.contains(current);
|
||||
beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<Snapshot>() {
|
||||
@Override
|
||||
public void onResponse(final Snapshot snapshot) {
|
||||
initializingSnapshots.remove(snapshot);
|
||||
listener.onResponse(snapshot);
|
||||
}
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
|
||||
if (newSnapshot != null) {
|
||||
final Snapshot current = newSnapshot.snapshot();
|
||||
assert initializingSnapshots.contains(current);
|
||||
beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<Snapshot>() {
|
||||
@Override
|
||||
public void onResponse(final Snapshot snapshot) {
|
||||
initializingSnapshots.remove(snapshot);
|
||||
listener.onResponse(snapshot);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
initializingSnapshots.remove(current);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
initializingSnapshots.remove(current);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return request.masterNodeTimeout();
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return request.masterNodeTimeout();
|
||||
}
|
||||
});
|
||||
}, listener::onFailure);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -418,111 +422,116 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository");
|
||||
}
|
||||
final String snapshotName = snapshot.snapshot().getSnapshotId().getName();
|
||||
final RepositoryData repositoryData = repository.getRepositoryData();
|
||||
// check if the snapshot name already exists in the repository
|
||||
if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
|
||||
throw new InvalidSnapshotNameException(
|
||||
repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
|
||||
}
|
||||
if (clusterState.nodes().getMinNodeVersion().onOrAfter(NO_REPO_INITIALIZE_VERSION) == false) {
|
||||
// In mixed version clusters we initialize the snapshot in the repository so that in case of a master failover to an
|
||||
// older version master node snapshot finalization (that assumes initializeSnapshot was called) produces a valid
|
||||
// snapshot.
|
||||
repository.initializeSnapshot(
|
||||
snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaDataForSnapshot(snapshot, clusterState.metaData()));
|
||||
}
|
||||
snapshotCreated = true;
|
||||
|
||||
logger.info("snapshot [{}] started", snapshot.snapshot());
|
||||
if (snapshot.indices().isEmpty()) {
|
||||
// No indices in this snapshot - we are done
|
||||
userCreateSnapshotListener.onResponse(snapshot.snapshot());
|
||||
endSnapshot(snapshot, clusterState.metaData());
|
||||
return;
|
||||
}
|
||||
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
|
||||
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
|
||||
if (entry.snapshot().equals(snapshot.snapshot()) == false) {
|
||||
entries.add(entry);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (entry.state() == State.ABORTED) {
|
||||
entries.add(entry);
|
||||
assert entry.shards().isEmpty();
|
||||
hadAbortedInitializations = true;
|
||||
} else {
|
||||
// Replace the snapshot that was just initialized
|
||||
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(currentState, entry, repositoryData);
|
||||
if (!partial) {
|
||||
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
|
||||
currentState.metaData());
|
||||
Set<String> missing = indicesWithMissingShards.v1();
|
||||
Set<String> closed = indicesWithMissingShards.v2();
|
||||
if (missing.isEmpty() == false || closed.isEmpty() == false) {
|
||||
final StringBuilder failureMessage = new StringBuilder();
|
||||
if (missing.isEmpty() == false) {
|
||||
failureMessage.append("Indices don't have primary shards ");
|
||||
failureMessage.append(missing);
|
||||
}
|
||||
if (closed.isEmpty() == false) {
|
||||
if (failureMessage.length() > 0) {
|
||||
failureMessage.append("; ");
|
||||
}
|
||||
failureMessage.append("Indices are closed ");
|
||||
failureMessage.append(closed);
|
||||
}
|
||||
entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards));
|
||||
}
|
||||
}
|
||||
return ClusterState.builder(currentState)
|
||||
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries)))
|
||||
.build();
|
||||
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
|
||||
repository.getRepositoryData(repositoryDataListener);
|
||||
repositoryDataListener.whenComplete(repositoryData -> {
|
||||
// check if the snapshot name already exists in the repository
|
||||
if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
|
||||
throw new InvalidSnapshotNameException(
|
||||
repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot",
|
||||
snapshot.snapshot().getSnapshotId()), e);
|
||||
removeSnapshotFromClusterState(snapshot.snapshot(), null, e,
|
||||
new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e));
|
||||
if (clusterState.nodes().getMinNodeVersion().onOrAfter(NO_REPO_INITIALIZE_VERSION) == false) {
|
||||
// In mixed version clusters we initialize the snapshot in the repository so that in case of a master failover to an
|
||||
// older version master node snapshot finalization (that assumes initializeSnapshot was called) produces a valid
|
||||
// snapshot.
|
||||
repository.initializeSnapshot(
|
||||
snapshot.snapshot().getSnapshotId(), snapshot.indices(),
|
||||
metaDataForSnapshot(snapshot, clusterState.metaData()));
|
||||
}
|
||||
snapshotCreated = true;
|
||||
|
||||
@Override
|
||||
public void onNoLongerMaster(String source) {
|
||||
// We are not longer a master - we shouldn't try to do any cleanup
|
||||
// The new master will take care of it
|
||||
logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId());
|
||||
userCreateSnapshotListener.onFailure(
|
||||
new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
// The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted
|
||||
// for processing. If client wants to wait for the snapshot completion, it can register snapshot
|
||||
// completion listener in this method. For the snapshot completion to work properly, the snapshot
|
||||
// should still exist when listener is registered.
|
||||
logger.info("snapshot [{}] started", snapshot.snapshot());
|
||||
if (snapshot.indices().isEmpty()) {
|
||||
// No indices in this snapshot - we are done
|
||||
userCreateSnapshotListener.onResponse(snapshot.snapshot());
|
||||
|
||||
if (hadAbortedInitializations) {
|
||||
final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
|
||||
assert snapshotsInProgress != null;
|
||||
final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot());
|
||||
assert entry != null;
|
||||
endSnapshot(entry, newState.metaData());
|
||||
}
|
||||
endSnapshot(snapshot, clusterState.metaData());
|
||||
return;
|
||||
}
|
||||
});
|
||||
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
|
||||
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
|
||||
if (entry.snapshot().equals(snapshot.snapshot()) == false) {
|
||||
entries.add(entry);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (entry.state() == State.ABORTED) {
|
||||
entries.add(entry);
|
||||
assert entry.shards().isEmpty();
|
||||
hadAbortedInitializations = true;
|
||||
} else {
|
||||
// Replace the snapshot that was just initialized
|
||||
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(currentState, entry, repositoryData);
|
||||
if (!partial) {
|
||||
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
|
||||
currentState.metaData());
|
||||
Set<String> missing = indicesWithMissingShards.v1();
|
||||
Set<String> closed = indicesWithMissingShards.v2();
|
||||
if (missing.isEmpty() == false || closed.isEmpty() == false) {
|
||||
final StringBuilder failureMessage = new StringBuilder();
|
||||
if (missing.isEmpty() == false) {
|
||||
failureMessage.append("Indices don't have primary shards ");
|
||||
failureMessage.append(missing);
|
||||
}
|
||||
if (closed.isEmpty() == false) {
|
||||
if (failureMessage.length() > 0) {
|
||||
failureMessage.append("; ");
|
||||
}
|
||||
failureMessage.append("Indices are closed ");
|
||||
failureMessage.append(closed);
|
||||
}
|
||||
entries.add(
|
||||
new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards));
|
||||
}
|
||||
}
|
||||
return ClusterState.builder(currentState)
|
||||
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries)))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot",
|
||||
snapshot.snapshot().getSnapshotId()), e);
|
||||
removeSnapshotFromClusterState(snapshot.snapshot(), null, e,
|
||||
new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoLongerMaster(String source) {
|
||||
// We are not longer a master - we shouldn't try to do any cleanup
|
||||
// The new master will take care of it
|
||||
logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId());
|
||||
userCreateSnapshotListener.onFailure(
|
||||
new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
// The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted
|
||||
// for processing. If client wants to wait for the snapshot completion, it can register snapshot
|
||||
// completion listener in this method. For the snapshot completion to work properly, the snapshot
|
||||
// should still exist when listener is registered.
|
||||
userCreateSnapshotListener.onResponse(snapshot.snapshot());
|
||||
|
||||
if (hadAbortedInitializations) {
|
||||
final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
|
||||
assert snapshotsInProgress != null;
|
||||
final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot());
|
||||
assert entry != null;
|
||||
endSnapshot(entry, newState.metaData());
|
||||
}
|
||||
}
|
||||
});
|
||||
}, this::onFailure);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1151,26 +1160,27 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
final boolean immediatePriority) {
|
||||
// First, look for the snapshot in the repository
|
||||
final Repository repository = repositoriesService.repository(repositoryName);
|
||||
final RepositoryData repositoryData = repository.getRepositoryData();
|
||||
Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
|
||||
.stream()
|
||||
.filter(s -> s.getName().equals(snapshotName))
|
||||
.findFirst();
|
||||
// if nothing found by the same name, then look in the cluster state for current in progress snapshots
|
||||
long repoGenId = repositoryData.getGenId();
|
||||
if (matchedEntry.isPresent() == false) {
|
||||
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream()
|
||||
.filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
|
||||
if (matchedInProgress.isPresent()) {
|
||||
matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());
|
||||
// Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes
|
||||
repoGenId = matchedInProgress.get().getRepositoryStateId() + 1L;
|
||||
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
|
||||
Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
|
||||
.stream()
|
||||
.filter(s -> s.getName().equals(snapshotName))
|
||||
.findFirst();
|
||||
// if nothing found by the same name, then look in the cluster state for current in progress snapshots
|
||||
long repoGenId = repositoryData.getGenId();
|
||||
if (matchedEntry.isPresent() == false) {
|
||||
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream()
|
||||
.filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
|
||||
if (matchedInProgress.isPresent()) {
|
||||
matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());
|
||||
// Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes
|
||||
repoGenId = matchedInProgress.get().getRepositoryStateId() + 1L;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (matchedEntry.isPresent() == false) {
|
||||
throw new SnapshotMissingException(repositoryName, snapshotName);
|
||||
}
|
||||
deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repoGenId, immediatePriority);
|
||||
if (matchedEntry.isPresent() == false) {
|
||||
throw new SnapshotMissingException(repositoryName, snapshotName);
|
||||
}
|
||||
deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repoGenId, immediatePriority);
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -149,8 +149,8 @@ public class RepositoriesServiceTests extends ESTestCase {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositoryData getRepositoryData() {
|
||||
return null;
|
||||
public void getRepositoryData(ActionListener<RepositoryData> listener) {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -131,7 +131,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
||||
(BlobStoreRepository) repositoriesService.repository(repositoryName);
|
||||
final List<SnapshotId> originalSnapshots = Arrays.asList(snapshotId1, snapshotId2);
|
||||
|
||||
List<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds().stream()
|
||||
List<SnapshotId> snapshotIds = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().stream()
|
||||
.sorted((s1, s2) -> s1.getName().compareTo(s2.getName())).collect(Collectors.toList());
|
||||
assertThat(snapshotIds, equalTo(originalSnapshots));
|
||||
}
|
||||
@ -140,10 +140,10 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
||||
final BlobStoreRepository repository = setupRepo();
|
||||
|
||||
// write to and read from a index file with no entries
|
||||
assertThat(repository.getRepositoryData().getSnapshotIds().size(), equalTo(0));
|
||||
assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().size(), equalTo(0));
|
||||
final RepositoryData emptyData = RepositoryData.EMPTY;
|
||||
repository.writeIndexGen(emptyData, emptyData.getGenId(), true);
|
||||
RepositoryData repoData = repository.getRepositoryData();
|
||||
RepositoryData repoData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository);
|
||||
assertEquals(repoData, emptyData);
|
||||
assertEquals(repoData.getIndices().size(), 0);
|
||||
assertEquals(repoData.getSnapshotIds().size(), 0);
|
||||
@ -152,12 +152,12 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
||||
// write to and read from an index file with snapshots but no indices
|
||||
repoData = addRandomSnapshotsToRepoData(repoData, false);
|
||||
repository.writeIndexGen(repoData, repoData.getGenId(), true);
|
||||
assertEquals(repoData, repository.getRepositoryData());
|
||||
assertEquals(repoData, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository));
|
||||
|
||||
// write to and read from a index file with random repository data
|
||||
repoData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true);
|
||||
repoData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true);
|
||||
repository.writeIndexGen(repoData, repoData.getGenId(), true);
|
||||
assertEquals(repoData, repository.getRepositoryData());
|
||||
assertEquals(repoData, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository));
|
||||
}
|
||||
|
||||
public void testIndexGenerationalFiles() throws Exception {
|
||||
@ -166,22 +166,22 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
||||
// write to index generational file
|
||||
RepositoryData repositoryData = generateRandomRepoData();
|
||||
repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
|
||||
assertThat(repository.getRepositoryData(), equalTo(repositoryData));
|
||||
assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), equalTo(repositoryData));
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(0L));
|
||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L));
|
||||
|
||||
// adding more and writing to a new index generational file
|
||||
repositoryData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true);
|
||||
repositoryData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true);
|
||||
repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
|
||||
assertEquals(repository.getRepositoryData(), repositoryData);
|
||||
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(1L));
|
||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L));
|
||||
|
||||
// removing a snapshot and writing to a new index generational file
|
||||
repositoryData = repository.getRepositoryData().removeSnapshot(
|
||||
repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).removeSnapshot(
|
||||
repositoryData.getSnapshotIds().iterator().next(), ShardGenerations.EMPTY);
|
||||
repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
|
||||
assertEquals(repository.getRepositoryData(), repositoryData);
|
||||
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(2L));
|
||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L));
|
||||
}
|
||||
|
@ -18,8 +18,8 @@
|
||||
*/
|
||||
package org.elasticsearch.snapshots;
|
||||
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -44,7 +44,6 @@ import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@ -93,17 +92,13 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
||||
skipRepoConsistencyCheckReason = reason;
|
||||
}
|
||||
|
||||
protected RepositoryData getRepositoryData(Repository repository) throws InterruptedException {
|
||||
protected RepositoryData getRepositoryData(Repository repository) {
|
||||
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
|
||||
final SetOnce<RepositoryData> repositoryData = new SetOnce<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final PlainActionFuture<RepositoryData> repositoryData = PlainActionFuture.newFuture();
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
|
||||
repositoryData.set(repository.getRepositoryData());
|
||||
latch.countDown();
|
||||
repository.getRepositoryData(repositoryData);
|
||||
});
|
||||
|
||||
latch.await();
|
||||
return repositoryData.get();
|
||||
return repositoryData.actionGet();
|
||||
}
|
||||
|
||||
public static long getFailureCount(String repository) {
|
||||
|
@ -363,7 +363,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||
assertThat(client.prepareGet(restoredIndexName, typeName, docId).get().isExists(), equalTo(true));
|
||||
}
|
||||
|
||||
public void testFreshIndexUUID() throws InterruptedException {
|
||||
public void testFreshIndexUUID() {
|
||||
Client client = client();
|
||||
|
||||
logger.info("--> creating repository");
|
||||
@ -781,7 +781,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
|
||||
}
|
||||
|
||||
public void testSnapshotFileFailureDuringSnapshot() throws InterruptedException {
|
||||
public void testSnapshotFileFailureDuringSnapshot() {
|
||||
disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
|
||||
Client client = client();
|
||||
|
||||
|
@ -75,6 +75,7 @@ import org.elasticsearch.action.support.ActionTestUtils;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
@ -156,6 +157,7 @@ import org.elasticsearch.node.ResponseCollectorService;
|
||||
import org.elasticsearch.plugins.PluginsService;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||
import org.elasticsearch.repositories.fs.FsRepository;
|
||||
@ -308,7 +310,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
||||
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
|
||||
final Repository repository = masterNode.repositoriesService.repository(repoName);
|
||||
Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
|
||||
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
|
||||
assertThat(snapshotIds, hasSize(1));
|
||||
|
||||
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
|
||||
@ -369,7 +371,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||
SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
||||
assertThat(finalSnapshotsInProgress.entries(), empty());
|
||||
final Repository repository = randomMaster.repositoriesService.repository(repoName);
|
||||
Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
|
||||
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
|
||||
assertThat(snapshotIds, hasSize(1));
|
||||
}
|
||||
|
||||
@ -408,7 +410,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
||||
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
|
||||
final Repository repository = masterNode.repositoriesService.repository(repoName);
|
||||
Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
|
||||
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
|
||||
assertThat(snapshotIds, hasSize(1));
|
||||
|
||||
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
|
||||
@ -474,7 +476,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
||||
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
|
||||
final Repository repository = masterNode.repositoriesService.repository(repoName);
|
||||
Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
|
||||
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
|
||||
assertThat(snapshotIds, hasSize(thirdSnapshotResponse == null ? 2 : 3));
|
||||
|
||||
for (SnapshotId snapshotId : snapshotIds) {
|
||||
@ -559,8 +561,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||
final SnapshotsInProgress finalSnapshotsInProgress = testClusterNodes.randomDataNodeSafe()
|
||||
.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
||||
assertThat(finalSnapshotsInProgress.entries(), empty());
|
||||
final Repository repository = masterNode.repositoriesService.repository(repoName);
|
||||
Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
|
||||
final Repository repository = testClusterNodes.randomMasterNodeSafe().repositoriesService.repository(repoName);
|
||||
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
|
||||
assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0)));
|
||||
}
|
||||
|
||||
@ -633,7 +635,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
||||
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
|
||||
final Repository repository = masterNode.repositoriesService.repository(repoName);
|
||||
Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
|
||||
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
|
||||
assertThat(snapshotIds, hasSize(1));
|
||||
|
||||
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
|
||||
@ -643,6 +645,14 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||
assertEquals(0, snapshotInfo.failedShards());
|
||||
}
|
||||
|
||||
private RepositoryData getRepositoryData(Repository repository) {
|
||||
final PlainActionFuture<RepositoryData> res = PlainActionFuture.newFuture();
|
||||
repository.getRepositoryData(res);
|
||||
deterministicTaskQueue.runAllRunnableTasks();
|
||||
assertTrue(res.isDone());
|
||||
return res.actionGet();
|
||||
}
|
||||
|
||||
private StepListener<CreateIndexResponse> createRepoAndIndex(TestClusterNodes.TestClusterNode masterNode, String repoName,
|
||||
String index, int shards) {
|
||||
final AdminClient adminClient = masterNode.client.admin();
|
||||
|
@ -136,7 +136,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
|
||||
.getSnapshots(new GetSnapshotsRequest("test-repo", new String[] {"test-snap"})).actionGet());
|
||||
}
|
||||
|
||||
public void testExceptionOnMissingShardLevelSnapBlob() throws IOException, InterruptedException {
|
||||
public void testExceptionOnMissingShardLevelSnapBlob() throws IOException {
|
||||
disableRepoConsistencyCheck("This test intentionally corrupts the repository");
|
||||
|
||||
logger.info("--> creating repository");
|
||||
|
@ -79,6 +79,7 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.test.DummyShardLock;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
@ -833,7 +834,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
||||
final Index index = shard.shardId().getIndex();
|
||||
final IndexId indexId = new IndexId(index.getName(), index.getUUID());
|
||||
final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(
|
||||
repository.getRepositoryData().shardGenerations().getShardGen(indexId, shard.shardId().getId()));
|
||||
ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).shardGenerations().getShardGen(
|
||||
indexId, shard.shardId().getId()));
|
||||
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
|
||||
final String shardGen;
|
||||
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
|
||||
|
@ -85,10 +85,10 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositoryData getRepositoryData() {
|
||||
public void getRepositoryData(ActionListener<RepositoryData> listener) {
|
||||
final IndexId indexId = new IndexId(indexName, "blah");
|
||||
return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(),
|
||||
Collections.singletonMap(indexId, emptySet()), ShardGenerations.EMPTY);
|
||||
listener.onResponse(new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(),
|
||||
Collections.singletonMap(indexId, emptySet()), ShardGenerations.EMPTY));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -24,12 +24,14 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.snapshots.SnapshotMissingException;
|
||||
import org.elasticsearch.snapshots.SnapshotRestoreException;
|
||||
@ -41,7 +43,6 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
@ -58,6 +59,10 @@ import static org.hamcrest.Matchers.nullValue;
|
||||
*/
|
||||
public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase {
|
||||
|
||||
public static RepositoryData getRepositoryData(Repository repository) {
|
||||
return PlainActionFuture.get(repository::getRepositoryData);
|
||||
}
|
||||
|
||||
protected abstract String repositoryType();
|
||||
|
||||
protected Settings repositorySettings() {
|
||||
@ -256,16 +261,13 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||
BlobStoreRepository repository = (BlobStoreRepository) repositoriesSvc.repository(repoName);
|
||||
|
||||
final SetOnce<BlobContainer> indicesBlobContainer = new SetOnce<>();
|
||||
final SetOnce<RepositoryData> repositoryData = new SetOnce<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final PlainActionFuture<RepositoryData> repositoryData = PlainActionFuture.newFuture();
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
|
||||
indicesBlobContainer.set(repository.blobStore().blobContainer(repository.basePath().add("indices")));
|
||||
repositoryData.set(repository.getRepositoryData());
|
||||
latch.countDown();
|
||||
repository.getRepositoryData(repositoryData);
|
||||
});
|
||||
|
||||
latch.await();
|
||||
for (IndexId indexId : repositoryData.get().getIndices().values()) {
|
||||
for (IndexId indexId : repositoryData.actionGet().getIndices().values()) {
|
||||
if (indexId.getName().equals("test-idx-3")) {
|
||||
assertFalse(BlobStoreTestUtil.blobExists(indicesBlobContainer.get(), indexId.getId())); // deleted index
|
||||
}
|
||||
|
@ -225,26 +225,28 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositoryData getRepositoryData() {
|
||||
Client remoteClient = getRemoteClusterClient();
|
||||
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true)
|
||||
.get(ccrSettings.getRecoveryActionTimeout());
|
||||
MetaData remoteMetaData = response.getState().getMetaData();
|
||||
public void getRepositoryData(ActionListener<RepositoryData> listener) {
|
||||
ActionListener.completeWith(listener, () -> {
|
||||
Client remoteClient = getRemoteClusterClient();
|
||||
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true)
|
||||
.get(ccrSettings.getRecoveryActionTimeout());
|
||||
MetaData remoteMetaData = response.getState().getMetaData();
|
||||
|
||||
Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
|
||||
Map<String, SnapshotState> snapshotStates = new HashMap<>(copiedSnapshotIds.size());
|
||||
Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>(copiedSnapshotIds.size());
|
||||
Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
|
||||
Map<String, SnapshotState> snapshotStates = new HashMap<>(copiedSnapshotIds.size());
|
||||
Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>(copiedSnapshotIds.size());
|
||||
|
||||
ImmutableOpenMap<String, IndexMetaData> remoteIndices = remoteMetaData.getIndices();
|
||||
for (String indexName : remoteMetaData.getConcreteAllIndices()) {
|
||||
// Both the Snapshot name and UUID are set to _latest_
|
||||
SnapshotId snapshotId = new SnapshotId(LATEST, LATEST);
|
||||
copiedSnapshotIds.put(indexName, snapshotId);
|
||||
snapshotStates.put(indexName, SnapshotState.SUCCESS);
|
||||
Index index = remoteIndices.get(indexName).getIndex();
|
||||
indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singleton(snapshotId));
|
||||
}
|
||||
return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots, ShardGenerations.EMPTY);
|
||||
ImmutableOpenMap<String, IndexMetaData> remoteIndices = remoteMetaData.getIndices();
|
||||
for (String indexName : remoteMetaData.getConcreteAllIndices()) {
|
||||
// Both the Snapshot name and UUID are set to _latest_
|
||||
SnapshotId snapshotId = new SnapshotId(LATEST, LATEST);
|
||||
copiedSnapshotIds.put(indexName, snapshotId);
|
||||
snapshotStates.put(indexName, SnapshotState.SUCCESS);
|
||||
Index index = remoteIndices.get(indexName).getIndex();
|
||||
indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singleton(snapshotId));
|
||||
}
|
||||
return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots, ShardGenerations.EMPTY);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -61,6 +61,7 @@ import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.ShardGenerations;
|
||||
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
|
||||
import org.elasticsearch.repositories.fs.FsRepository;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.hamcrest.Matchers;
|
||||
@ -212,7 +213,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
|
||||
repository.finalizeSnapshot(snapshotId,
|
||||
ShardGenerations.builder().put(indexId, 0, indexShardSnapshotStatus.generation()).build(),
|
||||
indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(),
|
||||
repository.getRepositoryData().getGenId(), true,
|
||||
ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(), true,
|
||||
MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(),
|
||||
true,
|
||||
finFuture);
|
||||
|
Loading…
x
Reference in New Issue
Block a user