mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-24 17:09:48 +00:00
We shouldn't be using potentially changing versions of the cluster state when answering a snapshot status API call by calling `SnapshotService#currentSnapshots` multiple times (each time using `ClusterService#state` under the hood) but instead pass down the state from the transport action. Having these API behave more in a more deterministic way will make it easier to use them once parallel repository operations are introduced.
This commit is contained in:
parent
9085024e1d
commit
2eeea21d84
@ -25,6 +25,7 @@ import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
@ -87,9 +88,10 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
||||
final ActionListener<GetSnapshotsResponse> listener) {
|
||||
try {
|
||||
final String repository = request.repository();
|
||||
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
|
||||
final Map<String, SnapshotId> allSnapshotIds = new HashMap<>();
|
||||
final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
|
||||
for (SnapshotInfo snapshotInfo : snapshotsService.currentSnapshots(repository)) {
|
||||
for (SnapshotInfo snapshotInfo : SnapshotsService.currentSnapshots(snapshotsInProgress, repository)) {
|
||||
SnapshotId snapshotId = snapshotInfo.snapshotId();
|
||||
allSnapshotIds.put(snapshotId.getName(), snapshotId);
|
||||
currentSnapshots.add(snapshotInfo);
|
||||
@ -134,7 +136,8 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
||||
|
||||
final List<SnapshotInfo> snapshotInfos;
|
||||
if (request.verbose()) {
|
||||
snapshotInfos = snapshotsService.snapshots(repository, new ArrayList<>(toResolve), request.ignoreUnavailable());
|
||||
snapshotInfos = snapshotsService.snapshots(
|
||||
snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable());
|
||||
} else {
|
||||
if (repositoryData != null) {
|
||||
// want non-current snapshots as well, which are found in the repository data
|
||||
|
@ -34,6 +34,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
@ -99,10 +100,11 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
||||
protected void masterOperation(final SnapshotsStatusRequest request,
|
||||
final ClusterState state,
|
||||
final ActionListener<SnapshotsStatusResponse> listener) throws Exception {
|
||||
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
|
||||
List<SnapshotsInProgress.Entry> currentSnapshots =
|
||||
snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
|
||||
SnapshotsService.currentSnapshots(snapshotsInProgress, request.repository(), Arrays.asList(request.snapshots()));
|
||||
if (currentSnapshots.isEmpty()) {
|
||||
buildResponse(request, currentSnapshots, null, listener);
|
||||
buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -126,18 +128,17 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
||||
.snapshots(snapshots).timeout(request.masterNodeTimeout()),
|
||||
ActionListener.wrap(nodeSnapshotStatuses -> threadPool.generic().execute(
|
||||
ActionRunnable.wrap(listener,
|
||||
l -> buildResponse(
|
||||
request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
|
||||
nodeSnapshotStatuses, l))
|
||||
l -> buildResponse(snapshotsInProgress, request, currentSnapshots, nodeSnapshotStatuses, l))
|
||||
), listener::onFailure));
|
||||
} else {
|
||||
// We don't have any in-progress shards, just return current stats
|
||||
buildResponse(request, currentSnapshots, null, listener);
|
||||
buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void buildResponse(SnapshotsStatusRequest request, List<SnapshotsInProgress.Entry> currentSnapshotEntries,
|
||||
private void buildResponse(@Nullable SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
|
||||
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
|
||||
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
|
||||
ActionListener<SnapshotsStatusResponse> listener) {
|
||||
// First process snapshot that are currently processed
|
||||
@ -199,14 +200,15 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
||||
// Now add snapshots on disk that are not currently running
|
||||
final String repositoryName = request.repository();
|
||||
if (Strings.hasText(repositoryName) && request.snapshots() != null && request.snapshots().length > 0) {
|
||||
loadRepositoryData(request, builder, currentSnapshotNames, repositoryName, listener);
|
||||
loadRepositoryData(snapshotsInProgress, request, builder, currentSnapshotNames, repositoryName, listener);
|
||||
} else {
|
||||
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
|
||||
}
|
||||
}
|
||||
|
||||
private void loadRepositoryData(SnapshotsStatusRequest request, List<SnapshotStatus> builder, Set<String> currentSnapshotNames,
|
||||
String repositoryName, ActionListener<SnapshotsStatusResponse> listener) {
|
||||
private void loadRepositoryData(@Nullable SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
|
||||
List<SnapshotStatus> builder, Set<String> currentSnapshotNames, String repositoryName,
|
||||
ActionListener<SnapshotsStatusResponse> listener) {
|
||||
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
|
||||
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
|
||||
snapshotsService.getRepositoryData(repositoryName, repositoryDataListener);
|
||||
@ -231,7 +233,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
||||
throw new SnapshotMissingException(repositoryName, snapshotName);
|
||||
}
|
||||
}
|
||||
SnapshotInfo snapshotInfo = snapshotsService.snapshot(repositoryName, snapshotId);
|
||||
SnapshotInfo snapshotInfo = snapshotsService.snapshot(snapshotsInProgress, repositoryName, snapshotId);
|
||||
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
|
||||
if (snapshotInfo.state().completed()) {
|
||||
Map<ShardId, IndexShardSnapshotStatus> shardStatuses =
|
||||
|
@ -180,13 +180,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
/**
|
||||
* Retrieves snapshot from repository
|
||||
*
|
||||
* @param repositoryName repository name
|
||||
* @param snapshotId snapshot id
|
||||
* @param snapshotsInProgress snapshots in progress in the cluster state
|
||||
* @param repositoryName repository name
|
||||
* @param snapshotId snapshot id
|
||||
* @return snapshot
|
||||
* @throws SnapshotMissingException if snapshot is not found
|
||||
*/
|
||||
public SnapshotInfo snapshot(final String repositoryName, final SnapshotId snapshotId) {
|
||||
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, Collections.singletonList(snapshotId.getName()));
|
||||
public SnapshotInfo snapshot(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName, SnapshotId snapshotId) {
|
||||
List<SnapshotsInProgress.Entry> entries =
|
||||
currentSnapshots(snapshotsInProgress, repositoryName, Collections.singletonList(snapshotId.getName()));
|
||||
if (!entries.isEmpty()) {
|
||||
return inProgressSnapshot(entries.iterator().next());
|
||||
}
|
||||
@ -196,18 +198,20 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
/**
|
||||
* Returns a list of snapshots from repository sorted by snapshot creation date
|
||||
*
|
||||
* @param repositoryName repository name
|
||||
* @param snapshotIds snapshots for which to fetch snapshot information
|
||||
* @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
|
||||
* if false, they will throw an error
|
||||
* @param snapshotsInProgress snapshots in progress in the cluster state
|
||||
* @param repositoryName repository name
|
||||
* @param snapshotIds snapshots for which to fetch snapshot information
|
||||
* @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
|
||||
* if false, they will throw an error
|
||||
* @return list of snapshots
|
||||
*/
|
||||
public List<SnapshotInfo> snapshots(final String repositoryName, final List<SnapshotId> snapshotIds, final boolean ignoreUnavailable) {
|
||||
public List<SnapshotInfo> snapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName,
|
||||
List<SnapshotId> snapshotIds, boolean ignoreUnavailable) {
|
||||
final Set<SnapshotInfo> snapshotSet = new HashSet<>();
|
||||
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
|
||||
// first, look at the snapshots in progress
|
||||
final List<SnapshotsInProgress.Entry> entries =
|
||||
currentSnapshots(repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
|
||||
final List<SnapshotsInProgress.Entry> entries = currentSnapshots(
|
||||
snapshotsInProgress, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
|
||||
for (SnapshotsInProgress.Entry entry : entries) {
|
||||
snapshotSet.add(inProgressSnapshot(entry));
|
||||
snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId());
|
||||
@ -236,12 +240,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
/**
|
||||
* Returns a list of currently running snapshots from repository sorted by snapshot creation date
|
||||
*
|
||||
* @param snapshotsInProgress snapshots in progress in the cluster state
|
||||
* @param repositoryName repository name
|
||||
* @return list of snapshots
|
||||
*/
|
||||
public List<SnapshotInfo> currentSnapshots(final String repositoryName) {
|
||||
public static List<SnapshotInfo> currentSnapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName) {
|
||||
List<SnapshotInfo> snapshotList = new ArrayList<>();
|
||||
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, Collections.emptyList());
|
||||
List<SnapshotsInProgress.Entry> entries = currentSnapshots(snapshotsInProgress, repositoryName, Collections.emptyList());
|
||||
for (SnapshotsInProgress.Entry entry : entries) {
|
||||
snapshotList.add(inProgressSnapshot(entry));
|
||||
}
|
||||
@ -687,12 +692,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
* This method is executed on master node
|
||||
* </p>
|
||||
*
|
||||
* @param repository repository id
|
||||
* @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
|
||||
* @param snapshotsInProgress snapshots in progress in the cluster state
|
||||
* @param repository repository id
|
||||
* @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
|
||||
* @return list of metadata for currently running snapshots
|
||||
*/
|
||||
public List<SnapshotsInProgress.Entry> currentSnapshots(final String repository, final List<String> snapshots) {
|
||||
SnapshotsInProgress snapshotsInProgress = clusterService.state().custom(SnapshotsInProgress.TYPE);
|
||||
public static List<SnapshotsInProgress.Entry> currentSnapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repository,
|
||||
List<String> snapshots) {
|
||||
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
@ -1236,7 +1242,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
// if nothing found by the same name, then look in the cluster state for current in progress snapshots
|
||||
long repoGenId = repositoryData.getGenId();
|
||||
if (matchedEntry.isPresent() == false) {
|
||||
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream()
|
||||
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(
|
||||
clusterService.state().custom(SnapshotsInProgress.TYPE), repositoryName, Collections.emptyList()).stream()
|
||||
.filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
|
||||
if (matchedInProgress.isPresent()) {
|
||||
matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());
|
||||
|
Loading…
x
Reference in New Issue
Block a user