Simplify Snapshot Create Request Handling (#37464)
* The internal create request is absolutely redundant, the only difference to the transport request is that we resolved the snapshot name when moving from the transport to the internal version * Removed it and passed the transport request into the snapshot service instead * nicer way of resolve snapshot name in callback
This commit is contained in:
parent
5e94f384c4
commit
5a5e44d1de
|
@ -73,24 +73,14 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
|
|||
@Override
|
||||
protected void masterOperation(final CreateSnapshotRequest request, ClusterState state,
|
||||
final ActionListener<CreateSnapshotResponse> listener) {
|
||||
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
|
||||
SnapshotsService.SnapshotRequest snapshotRequest =
|
||||
new SnapshotsService.SnapshotRequest(request.repository(), snapshotName, "create_snapshot [" + snapshotName + "]")
|
||||
.indices(request.indices())
|
||||
.indicesOptions(request.indicesOptions())
|
||||
.partial(request.partial())
|
||||
.settings(request.settings())
|
||||
.includeGlobalState(request.includeGlobalState())
|
||||
.masterNodeTimeout(request.masterNodeTimeout());
|
||||
snapshotsService.createSnapshot(snapshotRequest, new SnapshotsService.CreateSnapshotListener() {
|
||||
snapshotsService.createSnapshot(request, new SnapshotsService.CreateSnapshotListener() {
|
||||
@Override
|
||||
public void onResponse() {
|
||||
public void onResponse(Snapshot snapshotCreated) {
|
||||
if (request.waitForCompletion()) {
|
||||
snapshotsService.addListener(new SnapshotsService.SnapshotCompletionListener() {
|
||||
@Override
|
||||
public void onSnapshotCompletion(Snapshot snapshot, SnapshotInfo snapshotInfo) {
|
||||
if (snapshot.getRepository().equals(request.repository()) &&
|
||||
snapshot.getSnapshotId().getName().equals(snapshotName)) {
|
||||
if (snapshotCreated.equals(snapshot)) {
|
||||
listener.onResponse(new CreateSnapshotResponse(snapshotInfo));
|
||||
snapshotsService.removeListener(this);
|
||||
}
|
||||
|
@ -98,8 +88,7 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
|
|||
|
||||
@Override
|
||||
public void onSnapshotFailure(Snapshot snapshot, Exception e) {
|
||||
if (snapshot.getRepository().equals(request.repository()) &&
|
||||
snapshot.getSnapshotId().getName().equals(snapshotName)) {
|
||||
if (snapshotCreated.equals(snapshot)) {
|
||||
listener.onFailure(e);
|
||||
snapshotsService.removeListener(this);
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateApplier;
|
||||
|
@ -78,7 +78,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
@ -92,8 +91,8 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
|
|||
* <p>
|
||||
* A typical snapshot creating process looks like this:
|
||||
* <ul>
|
||||
* <li>On the master node the {@link #createSnapshot(SnapshotRequest, CreateSnapshotListener)} is called and makes sure that no snapshots
|
||||
* is currently running and registers the new snapshot in cluster state</li>
|
||||
* <li>On the master node the {@link #createSnapshot(CreateSnapshotRequest, CreateSnapshotListener)} is called and makes sure that
|
||||
* no snapshot is currently running and registers the new snapshot in cluster state</li>
|
||||
* <li>When cluster state is updated
|
||||
* the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, CreateSnapshotListener)} method kicks in and initializes
|
||||
* the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
|
||||
|
@ -235,20 +234,20 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
* @param request snapshot request
|
||||
* @param listener snapshot creation listener
|
||||
*/
|
||||
public void createSnapshot(final SnapshotRequest request, final CreateSnapshotListener listener) {
|
||||
final String repositoryName = request.repositoryName;
|
||||
final String snapshotName = request.snapshotName;
|
||||
public void createSnapshot(final CreateSnapshotRequest request, final CreateSnapshotListener listener) {
|
||||
final String repositoryName = request.repository();
|
||||
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
|
||||
validate(repositoryName, snapshotName);
|
||||
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
|
||||
final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();
|
||||
|
||||
clusterService.submitStateUpdateTask(request.cause(), new ClusterStateUpdateTask() {
|
||||
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
|
||||
|
||||
private SnapshotsInProgress.Entry newSnapshot = null;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
validate(request, currentState);
|
||||
validate(repositoryName, snapshotName, currentState);
|
||||
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
|
||||
|
@ -301,16 +300,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
/**
|
||||
* Validates snapshot request
|
||||
*
|
||||
* @param request snapshot request
|
||||
* @param repositoryName repository name
|
||||
* @param snapshotName snapshot name
|
||||
* @param state current cluster state
|
||||
*/
|
||||
private void validate(SnapshotRequest request, ClusterState state) {
|
||||
private void validate(String repositoryName, String snapshotName, ClusterState state) {
|
||||
RepositoriesMetaData repositoriesMetaData = state.getMetaData().custom(RepositoriesMetaData.TYPE);
|
||||
final String repository = request.repositoryName;
|
||||
if (repositoriesMetaData == null || repositoriesMetaData.repository(repository) == null) {
|
||||
throw new RepositoryMissingException(repository);
|
||||
if (repositoriesMetaData == null || repositoriesMetaData.repository(repositoryName) == null) {
|
||||
throw new RepositoryMissingException(repositoryName);
|
||||
}
|
||||
validate(repository, request.snapshotName);
|
||||
validate(repositoryName, snapshotName);
|
||||
}
|
||||
|
||||
private static void validate(final String repositoryName, final String snapshotName) {
|
||||
|
@ -377,7 +376,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
logger.info("snapshot [{}] started", snapshot.snapshot());
|
||||
if (snapshot.indices().isEmpty()) {
|
||||
// No indices in this snapshot - we are done
|
||||
userCreateSnapshotListener.onResponse();
|
||||
userCreateSnapshotListener.onResponse(snapshot.snapshot());
|
||||
endSnapshot(snapshot);
|
||||
return;
|
||||
}
|
||||
|
@ -465,7 +464,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
// for processing. If client wants to wait for the snapshot completion, it can register snapshot
|
||||
// completion listener in this method. For the snapshot completion to work properly, the snapshot
|
||||
// should still exist when listener is registered.
|
||||
userCreateSnapshotListener.onResponse();
|
||||
userCreateSnapshotListener.onResponse(snapshot.snapshot());
|
||||
|
||||
// Now that snapshot completion listener is registered we can end the snapshot if needed
|
||||
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
|
||||
|
@ -1544,8 +1543,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
/**
|
||||
* Called when snapshot has successfully started
|
||||
*
|
||||
* @param snapshot snapshot that was created
|
||||
*/
|
||||
void onResponse();
|
||||
void onResponse(Snapshot snapshot);
|
||||
|
||||
/**
|
||||
* Called if a snapshot operation couldn't start
|
||||
|
@ -1575,186 +1576,4 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
void onSnapshotFailure(Snapshot snapshot, Exception e);
|
||||
}
|
||||
|
||||
/**
|
||||
* Snapshot creation request
|
||||
*/
|
||||
public static class SnapshotRequest {
|
||||
|
||||
private final String cause;
|
||||
|
||||
private final String repositoryName;
|
||||
|
||||
private final String snapshotName;
|
||||
|
||||
private String[] indices;
|
||||
|
||||
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
|
||||
|
||||
private boolean partial;
|
||||
|
||||
private Settings settings;
|
||||
|
||||
private boolean includeGlobalState;
|
||||
|
||||
private TimeValue masterNodeTimeout;
|
||||
|
||||
/**
|
||||
* Constructs new snapshot creation request
|
||||
*
|
||||
* @param repositoryName repository name
|
||||
* @param snapshotName snapshot name
|
||||
* @param cause cause for snapshot operation
|
||||
*/
|
||||
public SnapshotRequest(final String repositoryName, final String snapshotName, final String cause) {
|
||||
this.repositoryName = Objects.requireNonNull(repositoryName);
|
||||
this.snapshotName = Objects.requireNonNull(snapshotName);
|
||||
this.cause = Objects.requireNonNull(cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the list of indices to be snapshotted
|
||||
*
|
||||
* @param indices list of indices
|
||||
* @return this request
|
||||
*/
|
||||
public SnapshotRequest indices(String[] indices) {
|
||||
this.indices = indices;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets repository-specific snapshot settings
|
||||
*
|
||||
* @param settings snapshot settings
|
||||
* @return this request
|
||||
*/
|
||||
public SnapshotRequest settings(Settings settings) {
|
||||
this.settings = settings;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set to true if global state should be stored as part of the snapshot
|
||||
*
|
||||
* @param includeGlobalState true if global state should be stored as part of the snapshot
|
||||
* @return this request
|
||||
*/
|
||||
public SnapshotRequest includeGlobalState(boolean includeGlobalState) {
|
||||
this.includeGlobalState = includeGlobalState;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets master node timeout
|
||||
*
|
||||
* @param masterNodeTimeout master node timeout
|
||||
* @return this request
|
||||
*/
|
||||
public SnapshotRequest masterNodeTimeout(TimeValue masterNodeTimeout) {
|
||||
this.masterNodeTimeout = masterNodeTimeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the indices options
|
||||
*
|
||||
* @param indicesOptions indices options
|
||||
* @return this request
|
||||
*/
|
||||
public SnapshotRequest indicesOptions(IndicesOptions indicesOptions) {
|
||||
this.indicesOptions = indicesOptions;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set to true if partial snapshot should be allowed
|
||||
*
|
||||
* @param partial true if partial snapshots should be allowed
|
||||
* @return this request
|
||||
*/
|
||||
public SnapshotRequest partial(boolean partial) {
|
||||
this.partial = partial;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns cause for snapshot operation
|
||||
*
|
||||
* @return cause for snapshot operation
|
||||
*/
|
||||
public String cause() {
|
||||
return cause;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the repository name
|
||||
*/
|
||||
public String repositoryName() {
|
||||
return repositoryName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the snapshot name
|
||||
*/
|
||||
public String snapshotName() {
|
||||
return snapshotName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the list of indices to be snapshotted
|
||||
*
|
||||
* @return the list of indices
|
||||
*/
|
||||
public String[] indices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns indices options
|
||||
*
|
||||
* @return indices options
|
||||
*/
|
||||
public IndicesOptions indicesOptions() {
|
||||
return indicesOptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns repository-specific settings for the snapshot operation
|
||||
*
|
||||
* @return repository-specific settings
|
||||
*/
|
||||
public Settings settings() {
|
||||
return settings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if global state should be stored as part of the snapshot
|
||||
*
|
||||
* @return true if global state should be stored as part of the snapshot
|
||||
*/
|
||||
public boolean includeGlobalState() {
|
||||
return includeGlobalState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if partial snapshot should be allowed
|
||||
*
|
||||
* @return true if partial snapshot should be allowed
|
||||
*/
|
||||
public boolean partial() {
|
||||
return partial;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns master node timeout
|
||||
*
|
||||
* @return master node timeout
|
||||
*/
|
||||
public TimeValue masterNodeTimeout() {
|
||||
return masterNodeTimeout;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue