* Make SnapshotsService#getRepositoryData Async (#49322) Follow up to #49299 removing the blocking step for the snapshot status APIs as well.
This commit is contained in:
parent
20558cf61c
commit
1cde4a6364
|
@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.cluster.snapshots.get;
|
|||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
|
@ -96,7 +97,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|||
|
||||
final RepositoryData repositoryData;
|
||||
if (isCurrentSnapshotsOnly(request.snapshots()) == false) {
|
||||
repositoryData = snapshotsService.getRepositoryData(repository);
|
||||
repositoryData = PlainActionFuture.get(fut -> snapshotsService.getRepositoryData(repository, fut));
|
||||
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
|
||||
allSnapshotIds.put(snapshotId.getName(), snapshotId);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.StepListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -97,7 +98,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
List<SnapshotsInProgress.Entry> currentSnapshots =
|
||||
snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
|
||||
if (currentSnapshots.isEmpty()) {
|
||||
listener.onResponse(buildResponse(request, currentSnapshots, null));
|
||||
buildResponse(request, currentSnapshots, null, listener);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -119,20 +120,22 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
transportNodesSnapshotsStatus.execute(
|
||||
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY))
|
||||
.snapshots(snapshots).timeout(request.masterNodeTimeout()),
|
||||
ActionListener.wrap(
|
||||
nodeSnapshotStatuses -> threadPool.executor(ThreadPool.Names.GENERIC).execute(
|
||||
ActionRunnable.supply(listener, () -> buildResponse(request, snapshotsService.currentSnapshots(
|
||||
request.repository(), Arrays.asList(request.snapshots())), nodeSnapshotStatuses))), listener::onFailure));
|
||||
ActionListener.wrap(nodeSnapshotStatuses -> threadPool.generic().execute(
|
||||
ActionRunnable.wrap(listener,
|
||||
l -> buildResponse(
|
||||
request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
|
||||
nodeSnapshotStatuses, l))
|
||||
), listener::onFailure));
|
||||
} else {
|
||||
// We don't have any in-progress shards, just return current stats
|
||||
listener.onResponse(buildResponse(request, currentSnapshots, null));
|
||||
buildResponse(request, currentSnapshots, null, listener);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, List<SnapshotsInProgress.Entry> currentSnapshotEntries,
|
||||
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses)
|
||||
throws IOException {
|
||||
private void buildResponse(SnapshotsStatusRequest request, List<SnapshotsInProgress.Entry> currentSnapshotEntries,
|
||||
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
|
||||
ActionListener<SnapshotsStatusResponse> listener) {
|
||||
// First process snapshot that are currently processed
|
||||
List<SnapshotStatus> builder = new ArrayList<>();
|
||||
Set<String> currentSnapshotNames = new HashSet<>();
|
||||
|
@ -192,8 +195,18 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
// Now add snapshots on disk that are not currently running
|
||||
final String repositoryName = request.repository();
|
||||
if (Strings.hasText(repositoryName) && request.snapshots() != null && request.snapshots().length > 0) {
|
||||
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
|
||||
final RepositoryData repositoryData = snapshotsService.getRepositoryData(repositoryName);
|
||||
loadRepositoryData(request, builder, currentSnapshotNames, repositoryName, listener);
|
||||
} else {
|
||||
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
|
||||
}
|
||||
}
|
||||
|
||||
private void loadRepositoryData(SnapshotsStatusRequest request, List<SnapshotStatus> builder, Set<String> currentSnapshotNames,
|
||||
String repositoryName, ActionListener<SnapshotsStatusResponse> listener) {
|
||||
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
|
||||
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
|
||||
snapshotsService.getRepositoryData(repositoryName, repositoryDataListener);
|
||||
repositoryDataListener.whenComplete(repositoryData -> {
|
||||
final Map<String, SnapshotId> matchedSnapshotIds = repositoryData.getSnapshotIds().stream()
|
||||
.filter(s -> requestedSnapshotNames.contains(s.getName()))
|
||||
.collect(Collectors.toMap(SnapshotId::getName, Function.identity()));
|
||||
|
@ -248,9 +261,8 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new SnapshotsStatusResponse(Collections.unmodifiableList(builder));
|
||||
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
|
||||
}, listener::onFailure);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.StepListener;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateApplier;
|
||||
|
@ -163,12 +162,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
* Gets the {@link RepositoryData} for the given repository.
|
||||
*
|
||||
* @param repositoryName repository name
|
||||
* @return repository data
|
||||
* @param listener listener to pass {@link RepositoryData} to
|
||||
*/
|
||||
public RepositoryData getRepositoryData(final String repositoryName) {
|
||||
Repository repository = repositoriesService.repository(repositoryName);
|
||||
assert repository != null; // should only be called once we've validated the repository exists
|
||||
return PlainActionFuture.get(repository::getRepositoryData);
|
||||
public void getRepositoryData(final String repositoryName, final ActionListener<RepositoryData> listener) {
|
||||
try {
|
||||
Repository repository = repositoriesService.repository(repositoryName);
|
||||
assert repository != null; // should only be called once we've validated the repository exists
|
||||
repository.getRepositoryData(listener);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue