* Move Snapshot Status Related Method to Appropriate Places Lots of things living in `SnapshotsService` for no reason other than that `SnapshotsService` provides the `RepositoriesService`. Cleaning this up to directly use `RepositoriesService` in the relevant transport actions and by that shortening the already very complex `SnapshotsService`.
This commit is contained in:
parent
4000138105
commit
48048646e7
|
@ -19,6 +19,9 @@
|
|||
|
||||
package org.elasticsearch.action.admin.cluster.snapshots.get;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
|
@ -30,11 +33,15 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.snapshots.SnapshotException;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotMissingException;
|
||||
|
@ -52,19 +59,24 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
|
||||
/**
|
||||
* Transport Action for get snapshots operation
|
||||
*/
|
||||
public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSnapshotsRequest, GetSnapshotsResponse> {
|
||||
private final SnapshotsService snapshotsService;
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(TransportGetSnapshotsAction.class);
|
||||
|
||||
private final RepositoriesService repositoriesService;
|
||||
|
||||
@Inject
|
||||
public TransportGetSnapshotsAction(TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, SnapshotsService snapshotsService, ActionFilters actionFilters,
|
||||
ThreadPool threadPool, RepositoriesService repositoriesService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(GetSnapshotsAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
GetSnapshotsRequest::new, indexNameExpressionResolver);
|
||||
this.snapshotsService = snapshotsService;
|
||||
this.repositoriesService = repositoriesService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -91,7 +103,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|||
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
|
||||
final Map<String, SnapshotId> allSnapshotIds = new HashMap<>();
|
||||
final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
|
||||
for (SnapshotInfo snapshotInfo : SnapshotsService.currentSnapshots(snapshotsInProgress, repository)) {
|
||||
for (SnapshotInfo snapshotInfo : sortedCurrentSnapshots(snapshotsInProgress, repository)) {
|
||||
SnapshotId snapshotId = snapshotInfo.snapshotId();
|
||||
allSnapshotIds.put(snapshotId.getName(), snapshotId);
|
||||
currentSnapshots.add(snapshotInfo);
|
||||
|
@ -99,7 +111,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|||
|
||||
final RepositoryData repositoryData;
|
||||
if (isCurrentSnapshotsOnly(request.snapshots()) == false) {
|
||||
repositoryData = PlainActionFuture.get(fut -> snapshotsService.getRepositoryData(repository, fut));
|
||||
repositoryData = PlainActionFuture.get(fut -> repositoriesService.getRepositoryData(repository, fut));
|
||||
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
|
||||
allSnapshotIds.put(snapshotId.getName(), snapshotId);
|
||||
}
|
||||
|
@ -136,7 +148,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|||
|
||||
final List<SnapshotInfo> snapshotInfos;
|
||||
if (request.verbose()) {
|
||||
snapshotInfos = snapshotsService.snapshots(
|
||||
snapshotInfos = snapshots(
|
||||
snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable());
|
||||
} else {
|
||||
if (repositoryData != null) {
|
||||
|
@ -154,6 +166,66 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of currently running snapshots from repository sorted by snapshot creation date
|
||||
*
|
||||
* @param snapshotsInProgress snapshots in progress in the cluster state
|
||||
* @param repositoryName repository name
|
||||
* @return list of snapshots
|
||||
*/
|
||||
private static List<SnapshotInfo> sortedCurrentSnapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName) {
|
||||
List<SnapshotInfo> snapshotList = new ArrayList<>();
|
||||
List<SnapshotsInProgress.Entry> entries =
|
||||
SnapshotsService.currentSnapshots(snapshotsInProgress, repositoryName, Collections.emptyList());
|
||||
for (SnapshotsInProgress.Entry entry : entries) {
|
||||
snapshotList.add(new SnapshotInfo(entry));
|
||||
}
|
||||
CollectionUtil.timSort(snapshotList);
|
||||
return unmodifiableList(snapshotList);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of snapshots from repository sorted by snapshot creation date
|
||||
*
|
||||
* @param snapshotsInProgress snapshots in progress in the cluster state
|
||||
* @param repositoryName repository name
|
||||
* @param snapshotIds snapshots for which to fetch snapshot information
|
||||
* @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
|
||||
* if false, they will throw an error
|
||||
* @return list of snapshots
|
||||
*/
|
||||
private List<SnapshotInfo> snapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName,
|
||||
List<SnapshotId> snapshotIds, boolean ignoreUnavailable) {
|
||||
final Set<SnapshotInfo> snapshotSet = new HashSet<>();
|
||||
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
|
||||
// first, look at the snapshots in progress
|
||||
final List<SnapshotsInProgress.Entry> entries = SnapshotsService.currentSnapshots(
|
||||
snapshotsInProgress, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
|
||||
for (SnapshotsInProgress.Entry entry : entries) {
|
||||
snapshotSet.add(new SnapshotInfo(entry));
|
||||
snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId());
|
||||
}
|
||||
// then, look in the repository
|
||||
final Repository repository = repositoriesService.repository(repositoryName);
|
||||
for (SnapshotId snapshotId : snapshotIdsToIterate) {
|
||||
try {
|
||||
snapshotSet.add(repository.getSnapshotInfo(snapshotId));
|
||||
} catch (Exception ex) {
|
||||
if (ignoreUnavailable) {
|
||||
logger.warn(() -> new ParameterizedMessage("failed to get snapshot [{}]", snapshotId), ex);
|
||||
} else {
|
||||
if (ex instanceof SnapshotException) {
|
||||
throw ex;
|
||||
}
|
||||
throw new SnapshotException(repositoryName, snapshotId, "Snapshot could not be read", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
final ArrayList<SnapshotInfo> snapshotList = new ArrayList<>(snapshotSet);
|
||||
CollectionUtil.timSort(snapshotList);
|
||||
return unmodifiableList(snapshotList);
|
||||
}
|
||||
|
||||
private boolean isAllSnapshots(String[] snapshots) {
|
||||
return (snapshots.length == 0) || (snapshots.length == 1 && GetSnapshotsRequest.ALL_SNAPSHOTS.equalsIgnoreCase(snapshots[0]));
|
||||
}
|
||||
|
@ -186,6 +258,6 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|||
snapshotInfos.add(new SnapshotInfo(snapshotId, indices, repositoryData.getSnapshotState(snapshotId)));
|
||||
}
|
||||
CollectionUtil.timSort(snapshotInfos);
|
||||
return Collections.unmodifiableList(snapshotInfos);
|
||||
return unmodifiableList(snapshotInfos);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
@ -41,11 +42,17 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotMissingException;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
import org.elasticsearch.snapshots.SnapshotShardsService;
|
||||
import org.elasticsearch.snapshots.SnapshotState;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -62,22 +69,24 @@ import java.util.Set;
|
|||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
|
||||
public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<SnapshotsStatusRequest, SnapshotsStatusResponse> {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(TransportSnapshotsStatusAction.class);
|
||||
|
||||
private final SnapshotsService snapshotsService;
|
||||
private final RepositoriesService repositoriesService;
|
||||
|
||||
private final TransportNodesSnapshotsStatus transportNodesSnapshotsStatus;
|
||||
|
||||
@Inject
|
||||
public TransportSnapshotsStatusAction(TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, SnapshotsService snapshotsService,
|
||||
ThreadPool threadPool, RepositoriesService repositoriesService,
|
||||
TransportNodesSnapshotsStatus transportNodesSnapshotsStatus,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(SnapshotsStatusAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
SnapshotsStatusRequest::new, indexNameExpressionResolver);
|
||||
this.snapshotsService = snapshotsService;
|
||||
this.repositoriesService = repositoriesService;
|
||||
this.transportNodesSnapshotsStatus = transportNodesSnapshotsStatus;
|
||||
}
|
||||
|
||||
|
@ -211,7 +220,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
ActionListener<SnapshotsStatusResponse> listener) {
|
||||
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
|
||||
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
|
||||
snapshotsService.getRepositoryData(repositoryName, repositoryDataListener);
|
||||
repositoriesService.getRepositoryData(repositoryName, repositoryDataListener);
|
||||
repositoryDataListener.whenComplete(repositoryData -> {
|
||||
final Map<String, SnapshotId> matchedSnapshotIds = repositoryData.getSnapshotIds().stream()
|
||||
.filter(s -> requestedSnapshotNames.contains(s.getName()))
|
||||
|
@ -233,11 +242,10 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
throw new SnapshotMissingException(repositoryName, snapshotName);
|
||||
}
|
||||
}
|
||||
SnapshotInfo snapshotInfo = snapshotsService.snapshot(snapshotsInProgress, repositoryName, snapshotId);
|
||||
SnapshotInfo snapshotInfo = snapshot(snapshotsInProgress, repositoryName, snapshotId);
|
||||
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
|
||||
if (snapshotInfo.state().completed()) {
|
||||
Map<ShardId, IndexShardSnapshotStatus> shardStatuses =
|
||||
snapshotsService.snapshotShards(repositoryName, repositoryData, snapshotInfo);
|
||||
Map<ShardId, IndexShardSnapshotStatus> shardStatuses = snapshotShards(repositoryName, repositoryData, snapshotInfo);
|
||||
for (Map.Entry<ShardId, IndexShardSnapshotStatus> shardStatus : shardStatuses.entrySet()) {
|
||||
IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardStatus.getValue().asCopy();
|
||||
shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), lastSnapshotStatus));
|
||||
|
@ -271,4 +279,82 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
}, listener::onFailure);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves snapshot from repository
|
||||
*
|
||||
* @param snapshotsInProgress snapshots in progress in the cluster state
|
||||
* @param repositoryName repository name
|
||||
* @param snapshotId snapshot id
|
||||
* @return snapshot
|
||||
* @throws SnapshotMissingException if snapshot is not found
|
||||
*/
|
||||
private SnapshotInfo snapshot(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName, SnapshotId snapshotId) {
|
||||
List<SnapshotsInProgress.Entry> entries =
|
||||
SnapshotsService.currentSnapshots(snapshotsInProgress, repositoryName, Collections.singletonList(snapshotId.getName()));
|
||||
if (!entries.isEmpty()) {
|
||||
return new SnapshotInfo(entries.iterator().next());
|
||||
}
|
||||
return repositoriesService.repository(repositoryName).getSnapshotInfo(snapshotId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns status of shards currently finished snapshots
|
||||
* <p>
|
||||
* This method is executed on master node and it's complimentary to the
|
||||
* {@link SnapshotShardsService#currentSnapshotShards(Snapshot)} because it
|
||||
* returns similar information but for already finished snapshots.
|
||||
* </p>
|
||||
*
|
||||
* @param repositoryName repository name
|
||||
* @param snapshotInfo snapshot info
|
||||
* @return map of shard id to snapshot status
|
||||
*/
|
||||
private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(final String repositoryName,
|
||||
final RepositoryData repositoryData,
|
||||
final SnapshotInfo snapshotInfo) throws IOException {
|
||||
final Repository repository = repositoriesService.repository(repositoryName);
|
||||
final Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
|
||||
for (String index : snapshotInfo.indices()) {
|
||||
IndexId indexId = repositoryData.resolveIndexId(index);
|
||||
IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), indexId);
|
||||
if (indexMetadata != null) {
|
||||
int numberOfShards = indexMetadata.getNumberOfShards();
|
||||
for (int i = 0; i < numberOfShards; i++) {
|
||||
ShardId shardId = new ShardId(indexMetadata.getIndex(), i);
|
||||
SnapshotShardFailure shardFailure = findShardFailure(snapshotInfo.shardFailures(), shardId);
|
||||
if (shardFailure != null) {
|
||||
shardStatus.put(shardId, IndexShardSnapshotStatus.newFailed(shardFailure.reason()));
|
||||
} else {
|
||||
final IndexShardSnapshotStatus shardSnapshotStatus;
|
||||
if (snapshotInfo.state() == SnapshotState.FAILED) {
|
||||
// If the snapshot failed, but the shard's snapshot does
|
||||
// not have an exception, it means that partial snapshots
|
||||
// were disabled and in this case, the shard snapshot will
|
||||
// *not* have any metadata, so attempting to read the shard
|
||||
// snapshot status will throw an exception. Instead, we create
|
||||
// a status for the shard to indicate that the shard snapshot
|
||||
// could not be taken due to partial being set to false.
|
||||
shardSnapshotStatus = IndexShardSnapshotStatus.newFailed("skipped");
|
||||
} else {
|
||||
shardSnapshotStatus = repository.getShardSnapshotStatus(
|
||||
snapshotInfo.snapshotId(),
|
||||
indexId,
|
||||
shardId);
|
||||
}
|
||||
shardStatus.put(shardId, shardSnapshotStatus);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return unmodifiableMap(shardStatus);
|
||||
}
|
||||
|
||||
private static SnapshotShardFailure findShardFailure(List<SnapshotShardFailure> shardFailures, ShardId shardId) {
|
||||
for (SnapshotShardFailure shardFailure : shardFailures) {
|
||||
if (shardId.getIndexName().equals(shardFailure.index()) && shardId.getId() == shardFailure.shardId()) {
|
||||
return shardFailure;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,10 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
|||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateApplier;
|
||||
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
|
||||
import org.elasticsearch.cluster.RestoreInProgress;
|
||||
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
|
||||
|
@ -42,8 +46,6 @@ import org.elasticsearch.common.regex.Regex;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.snapshots.RestoreService;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -357,6 +359,22 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the {@link RepositoryData} for the given repository.
|
||||
*
|
||||
* @param repositoryName repository name
|
||||
* @param listener listener to pass {@link RepositoryData} to
|
||||
*/
|
||||
public void getRepositoryData(final String repositoryName, final ActionListener<RepositoryData> listener) {
|
||||
try {
|
||||
Repository repository = repository(repositoryName);
|
||||
assert repository != null; // should only be called once we've validated the repository exists
|
||||
repository.getRepositoryData(listener);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns registered repository
|
||||
* <p>
|
||||
|
@ -445,11 +463,55 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
|
|||
}
|
||||
|
||||
private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
|
||||
if (SnapshotsService.isRepositoryInUse(clusterState, repository) || RestoreService.isRepositoryInUse(clusterState, repository)) {
|
||||
if (isRepositoryInUse(clusterState, repository)) {
|
||||
throw new IllegalStateException("trying to modify or unregister repository that is currently used ");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a repository is currently in use by one of the snapshots
|
||||
*
|
||||
* @param clusterState cluster state
|
||||
* @param repository repository id
|
||||
* @return true if repository is currently in use by one of the running snapshots
|
||||
*/
|
||||
private static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
|
||||
SnapshotsInProgress snapshots = clusterState.custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshots != null) {
|
||||
for (SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
|
||||
if (repository.equals(snapshot.snapshot().getRepository())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
SnapshotDeletionsInProgress deletionsInProgress = clusterState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null) {
|
||||
for (SnapshotDeletionsInProgress.Entry entry : deletionsInProgress.getEntries()) {
|
||||
if (entry.getSnapshot().getRepository().equals(repository)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
final RepositoryCleanupInProgress repositoryCleanupInProgress = clusterState.custom(RepositoryCleanupInProgress.TYPE);
|
||||
if (repositoryCleanupInProgress != null) {
|
||||
for (RepositoryCleanupInProgress.Entry entry : repositoryCleanupInProgress.entries()) {
|
||||
if (entry.repository().equals(repository)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
RestoreInProgress restoreInProgress = clusterState.custom(RestoreInProgress.TYPE);
|
||||
if (restoreInProgress != null) {
|
||||
for (RestoreInProgress.Entry entry: restoreInProgress) {
|
||||
if (repository.equals(entry.snapshot().getRepository())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
|
||||
|
|
|
@ -900,23 +900,4 @@ public class RestoreService implements ClusterStateApplier {
|
|||
logger.warn("Failed to update restore state ", t);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a repository is currently in use by one of the snapshots
|
||||
*
|
||||
* @param clusterState cluster state
|
||||
* @param repository repository id
|
||||
* @return true if repository is currently in use by one of the running snapshots
|
||||
*/
|
||||
public static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
|
||||
RestoreInProgress restoreInProgress = clusterState.custom(RestoreInProgress.TYPE);
|
||||
if (restoreInProgress != null) {
|
||||
for (RestoreInProgress.Entry entry: restoreInProgress) {
|
||||
if (repository.equals(entry.snapshot().getRepository())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchParseException;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -33,6 +34,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -44,6 +46,7 @@ import java.util.Comparator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Information about a snapshot
|
||||
|
@ -252,10 +255,10 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
|
|||
Collections.emptyList(), null, null);
|
||||
}
|
||||
|
||||
public SnapshotInfo(SnapshotId snapshotId, List<String> indices, long startTime, Boolean includeGlobalState,
|
||||
Map<String, Object> userMetadata) {
|
||||
this(snapshotId, indices, SnapshotState.IN_PROGRESS, null, Version.CURRENT, startTime, 0L,
|
||||
0, 0, Collections.emptyList(), includeGlobalState, userMetadata);
|
||||
public SnapshotInfo(SnapshotsInProgress.Entry entry) {
|
||||
this(entry.snapshot().getSnapshotId(),
|
||||
entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), SnapshotState.IN_PROGRESS, null, Version.CURRENT,
|
||||
entry.startTime(), 0L, 0, 0, Collections.emptyList(), entry.includeGlobalState(), entry.userMetadata());
|
||||
}
|
||||
|
||||
public SnapshotInfo(SnapshotId snapshotId, List<String> indices, long startTime, String reason, long endTime,
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
|
@ -67,7 +66,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
|
@ -77,7 +75,6 @@ import org.elasticsearch.repositories.RepositoryMissingException;
|
|||
import org.elasticsearch.repositories.ShardGenerations;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -96,7 +93,6 @@ import java.util.stream.StreamSupport;
|
|||
|
||||
import static java.util.Collections.emptySet;
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
|
||||
|
||||
/**
|
||||
|
@ -146,99 +142,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the {@link RepositoryData} for the given repository.
|
||||
*
|
||||
* @param repositoryName repository name
|
||||
* @param listener listener to pass {@link RepositoryData} to
|
||||
*/
|
||||
public void getRepositoryData(final String repositoryName, final ActionListener<RepositoryData> listener) {
|
||||
try {
|
||||
Repository repository = repositoriesService.repository(repositoryName);
|
||||
assert repository != null; // should only be called once we've validated the repository exists
|
||||
repository.getRepositoryData(listener);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves snapshot from repository
|
||||
*
|
||||
* @param snapshotsInProgress snapshots in progress in the cluster state
|
||||
* @param repositoryName repository name
|
||||
* @param snapshotId snapshot id
|
||||
* @return snapshot
|
||||
* @throws SnapshotMissingException if snapshot is not found
|
||||
*/
|
||||
public SnapshotInfo snapshot(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName, SnapshotId snapshotId) {
|
||||
List<SnapshotsInProgress.Entry> entries =
|
||||
currentSnapshots(snapshotsInProgress, repositoryName, Collections.singletonList(snapshotId.getName()));
|
||||
if (!entries.isEmpty()) {
|
||||
return inProgressSnapshot(entries.iterator().next());
|
||||
}
|
||||
return repositoriesService.repository(repositoryName).getSnapshotInfo(snapshotId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of snapshots from repository sorted by snapshot creation date
|
||||
*
|
||||
* @param snapshotsInProgress snapshots in progress in the cluster state
|
||||
* @param repositoryName repository name
|
||||
* @param snapshotIds snapshots for which to fetch snapshot information
|
||||
* @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
|
||||
* if false, they will throw an error
|
||||
* @return list of snapshots
|
||||
*/
|
||||
public List<SnapshotInfo> snapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName,
|
||||
List<SnapshotId> snapshotIds, boolean ignoreUnavailable) {
|
||||
final Set<SnapshotInfo> snapshotSet = new HashSet<>();
|
||||
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
|
||||
// first, look at the snapshots in progress
|
||||
final List<SnapshotsInProgress.Entry> entries = currentSnapshots(
|
||||
snapshotsInProgress, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
|
||||
for (SnapshotsInProgress.Entry entry : entries) {
|
||||
snapshotSet.add(inProgressSnapshot(entry));
|
||||
snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId());
|
||||
}
|
||||
// then, look in the repository
|
||||
final Repository repository = repositoriesService.repository(repositoryName);
|
||||
for (SnapshotId snapshotId : snapshotIdsToIterate) {
|
||||
try {
|
||||
snapshotSet.add(repository.getSnapshotInfo(snapshotId));
|
||||
} catch (Exception ex) {
|
||||
if (ignoreUnavailable) {
|
||||
logger.warn(() -> new ParameterizedMessage("failed to get snapshot [{}]", snapshotId), ex);
|
||||
} else {
|
||||
if (ex instanceof SnapshotException) {
|
||||
throw ex;
|
||||
}
|
||||
throw new SnapshotException(repositoryName, snapshotId, "Snapshot could not be read", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
final ArrayList<SnapshotInfo> snapshotList = new ArrayList<>(snapshotSet);
|
||||
CollectionUtil.timSort(snapshotList);
|
||||
return unmodifiableList(snapshotList);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of currently running snapshots from repository sorted by snapshot creation date
|
||||
*
|
||||
* @param snapshotsInProgress snapshots in progress in the cluster state
|
||||
* @param repositoryName repository name
|
||||
* @return list of snapshots
|
||||
*/
|
||||
public static List<SnapshotInfo> currentSnapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName) {
|
||||
List<SnapshotInfo> snapshotList = new ArrayList<>();
|
||||
List<SnapshotsInProgress.Entry> entries = currentSnapshots(snapshotsInProgress, repositoryName, Collections.emptyList());
|
||||
for (SnapshotsInProgress.Entry entry : entries) {
|
||||
snapshotList.add(inProgressSnapshot(entry));
|
||||
}
|
||||
CollectionUtil.timSort(snapshotList);
|
||||
return unmodifiableList(snapshotList);
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} but invokes its callback on completion of
|
||||
* the snapshot.
|
||||
|
@ -636,12 +539,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
return metadata;
|
||||
}
|
||||
|
||||
private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) {
|
||||
return new SnapshotInfo(entry.snapshot().getSnapshotId(),
|
||||
entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
|
||||
entry.startTime(), entry.includeGlobalState(), entry.userMetadata());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns status of the currently running snapshots
|
||||
* <p>
|
||||
|
@ -698,67 +595,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
return unmodifiableList(builder);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns status of shards currently finished snapshots
|
||||
* <p>
|
||||
* This method is executed on master node and it's complimentary to the
|
||||
* {@link SnapshotShardsService#currentSnapshotShards(Snapshot)} because it
|
||||
* returns similar information but for already finished snapshots.
|
||||
* </p>
|
||||
*
|
||||
* @param repositoryName repository name
|
||||
* @param snapshotInfo snapshot info
|
||||
* @return map of shard id to snapshot status
|
||||
*/
|
||||
public Map<ShardId, IndexShardSnapshotStatus> snapshotShards(final String repositoryName,
|
||||
final RepositoryData repositoryData,
|
||||
final SnapshotInfo snapshotInfo) throws IOException {
|
||||
final Repository repository = repositoriesService.repository(repositoryName);
|
||||
final Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
|
||||
for (String index : snapshotInfo.indices()) {
|
||||
IndexId indexId = repositoryData.resolveIndexId(index);
|
||||
IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), indexId);
|
||||
if (indexMetadata != null) {
|
||||
int numberOfShards = indexMetadata.getNumberOfShards();
|
||||
for (int i = 0; i < numberOfShards; i++) {
|
||||
ShardId shardId = new ShardId(indexMetadata.getIndex(), i);
|
||||
SnapshotShardFailure shardFailure = findShardFailure(snapshotInfo.shardFailures(), shardId);
|
||||
if (shardFailure != null) {
|
||||
shardStatus.put(shardId, IndexShardSnapshotStatus.newFailed(shardFailure.reason()));
|
||||
} else {
|
||||
final IndexShardSnapshotStatus shardSnapshotStatus;
|
||||
if (snapshotInfo.state() == SnapshotState.FAILED) {
|
||||
// If the snapshot failed, but the shard's snapshot does
|
||||
// not have an exception, it means that partial snapshots
|
||||
// were disabled and in this case, the shard snapshot will
|
||||
// *not* have any metadata, so attempting to read the shard
|
||||
// snapshot status will throw an exception. Instead, we create
|
||||
// a status for the shard to indicate that the shard snapshot
|
||||
// could not be taken due to partial being set to false.
|
||||
shardSnapshotStatus = IndexShardSnapshotStatus.newFailed("skipped");
|
||||
} else {
|
||||
shardSnapshotStatus = repository.getShardSnapshotStatus(
|
||||
snapshotInfo.snapshotId(),
|
||||
indexId,
|
||||
shardId);
|
||||
}
|
||||
shardStatus.put(shardId, shardSnapshotStatus);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return unmodifiableMap(shardStatus);
|
||||
}
|
||||
|
||||
private static SnapshotShardFailure findShardFailure(List<SnapshotShardFailure> shardFailures, ShardId shardId) {
|
||||
for (SnapshotShardFailure shardFailure : shardFailures) {
|
||||
if (shardId.getIndexName().equals(shardFailure.index()) && shardId.getId() == shardFailure.shardId()) {
|
||||
return shardFailure;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyClusterState(ClusterChangedEvent event) {
|
||||
try {
|
||||
|
@ -1457,41 +1293,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
return repositoryMetaVersion.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a repository is currently in use by one of the snapshots
|
||||
*
|
||||
* @param clusterState cluster state
|
||||
* @param repository repository id
|
||||
* @return true if repository is currently in use by one of the running snapshots
|
||||
*/
|
||||
public static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
|
||||
SnapshotsInProgress snapshots = clusterState.custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshots != null) {
|
||||
for (SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
|
||||
if (repository.equals(snapshot.snapshot().getRepository())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
SnapshotDeletionsInProgress deletionsInProgress = clusterState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null) {
|
||||
for (SnapshotDeletionsInProgress.Entry entry : deletionsInProgress.getEntries()) {
|
||||
if (entry.getSnapshot().getRepository().equals(repository)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
final RepositoryCleanupInProgress repositoryCleanupInProgress = clusterState.custom(RepositoryCleanupInProgress.TYPE);
|
||||
if (repositoryCleanupInProgress != null) {
|
||||
for (RepositoryCleanupInProgress.Entry entry : repositoryCleanupInProgress.entries()) {
|
||||
if (entry.repository().equals(repository)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes snapshot from repository
|
||||
*
|
||||
|
|
Loading…
Reference in New Issue