Using optimistic locking, add the ability to run a repository state update task with a consistent view of the current repository data. Allows for a follow-up to remove the snapshot INIT state.
This commit is contained in:
parent
31e32aa420
commit
e01b999ef0
|
@ -35,7 +35,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transport action for delete snapshot operation
|
* Transport action for delete snapshot operation
|
||||||
|
@ -71,7 +70,6 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction<Del
|
||||||
@Override
|
@Override
|
||||||
protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state,
|
protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state,
|
||||||
final ActionListener<AcknowledgedResponse> listener) {
|
final ActionListener<AcknowledgedResponse> listener) {
|
||||||
snapshotsService.deleteSnapshots(request.repository(), Arrays.asList(request.snapshots()),
|
snapshotsService.deleteSnapshots(request, ActionListener.map(listener, v -> new AcknowledgedResponse(true)));
|
||||||
ActionListener.map(listener, v -> new AcknowledgedResponse(true)));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexCommit;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||||
import org.elasticsearch.cluster.metadata.Metadata;
|
import org.elasticsearch.cluster.metadata.Metadata;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||||
|
@ -42,6 +43,7 @@ import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
public class FilterRepository implements Repository {
|
public class FilterRepository implements Repository {
|
||||||
|
@ -151,6 +153,12 @@ public class FilterRepository implements Repository {
|
||||||
in.updateState(state);
|
in.updateState(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
|
||||||
|
Consumer<Exception> onFailure) {
|
||||||
|
in.executeConsistentStateUpdate(createUpdateTask, source, onFailure);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Lifecycle.State lifecycleState() {
|
public Lifecycle.State lifecycleState() {
|
||||||
return in.lifecycleState();
|
return in.lifecycleState();
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexCommit;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||||
import org.elasticsearch.cluster.metadata.Metadata;
|
import org.elasticsearch.cluster.metadata.Metadata;
|
||||||
|
@ -43,6 +44,7 @@ import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -269,6 +271,21 @@ public interface Repository extends LifecycleComponent {
|
||||||
*/
|
*/
|
||||||
void updateState(ClusterState state);
|
void updateState(ClusterState state);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a cluster state update with a consistent view of the current {@link RepositoryData}. The {@link ClusterState} passed to the
|
||||||
|
* task generated through {@code createUpdateTask} is guaranteed to point at the same state for this repository as the did the state
|
||||||
|
* at the time the {@code RepositoryData} was loaded.
|
||||||
|
* This allows for operations on the repository that need a consistent view of both the cluster state and the repository contents at
|
||||||
|
* one point in time like for example, checking if a snapshot is in the repository before adding the delete operation for it to the
|
||||||
|
* cluster state.
|
||||||
|
*
|
||||||
|
* @param createUpdateTask function to supply cluster state update task
|
||||||
|
* @param source the source of the cluster state update task
|
||||||
|
* @param onFailure error handler invoked on failure to get a consistent view of the current {@link RepositoryData}
|
||||||
|
*/
|
||||||
|
void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
|
||||||
|
Consumer<Exception> onFailure);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()}
|
* Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()}
|
||||||
* during snapshot initialization.
|
* during snapshot initialization.
|
||||||
|
|
|
@ -76,6 +76,7 @@ import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
|
@ -337,6 +338,67 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
|
||||||
|
Consumer<Exception> onFailure) {
|
||||||
|
threadPool.generic().execute(new AbstractRunnable() {
|
||||||
|
@Override
|
||||||
|
protected void doRun() {
|
||||||
|
final RepositoryMetadata repositoryMetadataStart = metadata;
|
||||||
|
getRepositoryData(ActionListener.wrap(repositoryData -> {
|
||||||
|
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
|
||||||
|
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {
|
||||||
|
|
||||||
|
private boolean executedTask = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||||
|
// Comparing the full metadata here on purpose instead of simply comparing the safe generation.
|
||||||
|
// If the safe generation has changed, then we have to reload repository data and start over.
|
||||||
|
// If the pending generation has changed we are in the midst of a write operation and might pick up the
|
||||||
|
// updated repository data and state on the retry. We don't want to wait for the write to finish though
|
||||||
|
// because it could fail for any number of reasons so we just retry instead of waiting on the cluster state
|
||||||
|
// to change in any form.
|
||||||
|
if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) {
|
||||||
|
executedTask = true;
|
||||||
|
return updateTask.execute(currentState);
|
||||||
|
}
|
||||||
|
return currentState;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(String source, Exception e) {
|
||||||
|
if (executedTask) {
|
||||||
|
updateTask.onFailure(source, e);
|
||||||
|
} else {
|
||||||
|
onFailure.accept(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
|
if (executedTask) {
|
||||||
|
updateTask.clusterStateProcessed(source, oldState, newState);
|
||||||
|
} else {
|
||||||
|
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue timeout() {
|
||||||
|
return updateTask.timeout();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}, onFailure));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
onFailure.accept(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Inspects all cluster state elements that contain a hint about what the current repository generation is and updates
|
// Inspects all cluster state elements that contain a hint about what the current repository generation is and updates
|
||||||
// #latestKnownRepoGen if a newer than currently known generation is found
|
// #latestKnownRepoGen if a newer than currently known generation is found
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRunnable;
|
import org.elasticsearch.action.ActionRunnable;
|
||||||
import org.elasticsearch.action.StepListener;
|
import org.elasticsearch.action.StepListener;
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
|
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
|
||||||
|
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateApplier;
|
import org.elasticsearch.cluster.ClusterStateApplier;
|
||||||
|
@ -1000,14 +1001,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||||
* If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from
|
* If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from
|
||||||
* the repository.
|
* the repository.
|
||||||
* If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the
|
* If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the
|
||||||
* given names in the repository and deletes them by invoking {@link #deleteCompletedSnapshots}.
|
* given names in the repository and deletes them.
|
||||||
*
|
*
|
||||||
* @param repositoryName repositoryName
|
* @param request delete snapshot request
|
||||||
* @param snapshotNames snapshotNames
|
|
||||||
* @param listener listener
|
* @param listener listener
|
||||||
*/
|
*/
|
||||||
public void deleteSnapshots(final String repositoryName, final Collection<String> snapshotNames, final ActionListener<Void> listener) {
|
public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener<Void> listener) {
|
||||||
logger.info("deleting snapshots {} from repository [{}]", snapshotNames, repositoryName);
|
|
||||||
|
final String[] snapshotNames = request.snapshots();
|
||||||
|
final String repositoryName = request.repository();
|
||||||
|
logger.info(() -> new ParameterizedMessage("deleting snapshots [{}] from repository [{}]",
|
||||||
|
Strings.arrayToCommaDelimitedString(snapshotNames), repositoryName));
|
||||||
|
|
||||||
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(Priority.NORMAL) {
|
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(Priority.NORMAL) {
|
||||||
|
|
||||||
|
@ -1017,15 +1021,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
if (snapshotNames.size() > 1 && currentState.nodes().getMinNodeVersion().before(MULTI_DELETE_VERSION)) {
|
if (snapshotNames.length > 1 && currentState.nodes().getMinNodeVersion().before(MULTI_DELETE_VERSION)) {
|
||||||
throw new IllegalArgumentException("Deleting multiple snapshots in a single request is only supported in version [ "
|
throw new IllegalArgumentException("Deleting multiple snapshots in a single request is only supported in version [ "
|
||||||
+ MULTI_DELETE_VERSION + "] but cluster contained node of version [" + currentState.nodes().getMinNodeVersion()
|
+ MULTI_DELETE_VERSION + "] but cluster contained node of version [" + currentState.nodes().getMinNodeVersion()
|
||||||
+ "]");
|
+ "]");
|
||||||
}
|
}
|
||||||
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||||
final SnapshotsInProgress.Entry snapshotEntry;
|
final SnapshotsInProgress.Entry snapshotEntry;
|
||||||
if (snapshotNames.size() == 1) {
|
if (snapshotNames.length == 1) {
|
||||||
final String snapshotName = snapshotNames.iterator().next();
|
final String snapshotName = snapshotNames[0];
|
||||||
if (Regex.isSimpleMatchPattern(snapshotName)) {
|
if (Regex.isSimpleMatchPattern(snapshotName)) {
|
||||||
snapshotEntry = null;
|
snapshotEntry = null;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1101,18 +1105,23 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||||
@Override
|
@Override
|
||||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
if (runningSnapshot == null) {
|
if (runningSnapshot == null) {
|
||||||
threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
|
try {
|
||||||
repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData ->
|
repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData ->
|
||||||
deleteCompletedSnapshots(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName),
|
createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName,
|
||||||
repositoryName, repositoryData.getGenId(), Priority.NORMAL, l), l::onFailure))));
|
repositoryData.getGenId(), request.masterNodeTimeout(), Priority.NORMAL, listener),
|
||||||
|
"delete completed snapshots", listener::onFailure);
|
||||||
|
} catch (RepositoryMissingException e) {
|
||||||
|
listener.onFailure(e);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
|
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
|
||||||
addListener(runningSnapshot, ActionListener.wrap(
|
addListener(runningSnapshot, ActionListener.wrap(
|
||||||
result -> {
|
result -> {
|
||||||
logger.debug("deleted snapshot completed - deleting files");
|
logger.debug("deleted snapshot completed - deleting files");
|
||||||
deleteCompletedSnapshots(Collections.singletonList(result.v2().snapshotId()), repositoryName,
|
clusterService.submitStateUpdateTask("delete snapshot",
|
||||||
result.v1().getGenId(), Priority.IMMEDIATE, listener);
|
createDeleteStateUpdate(Collections.singletonList(result.v2().snapshotId()), repositoryName,
|
||||||
|
result.v1().getGenId(), null, Priority.IMMEDIATE, listener));
|
||||||
},
|
},
|
||||||
e -> {
|
e -> {
|
||||||
if (abortedDuringInit) {
|
if (abortedDuringInit) {
|
||||||
|
@ -1133,10 +1142,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||||
}
|
}
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue timeout() {
|
||||||
|
return request.masterNodeTimeout();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<SnapshotId> matchingSnapshotIds(RepositoryData repositoryData, Collection<String> snapshotsOrPatterns,
|
private static List<SnapshotId> matchingSnapshotIds(RepositoryData repositoryData, String[] snapshotsOrPatterns,
|
||||||
String repositoryName) {
|
String repositoryName) {
|
||||||
final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds().stream().collect(
|
final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds().stream().collect(
|
||||||
Collectors.toMap(SnapshotId::getName, Function.identity()));
|
Collectors.toMap(SnapshotId::getName, Function.identity()));
|
||||||
|
@ -1178,23 +1192,33 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||||
return snapshotEntry;
|
return snapshotEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private ClusterStateUpdateTask createDeleteStateUpdate(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId,
|
||||||
* Deletes snapshots that are assumed to be in the repository and not tracked as in-progress in the cluster state.
|
@Nullable TimeValue timeout, Priority priority, ActionListener<Void> listener) {
|
||||||
*
|
// Short circuit to noop state update if there isn't anything to delete
|
||||||
* @param snapshotIds Snapshots to delete
|
|
||||||
* @param repoName Repository name
|
|
||||||
* @param repositoryStateId Repository generation to base the delete on
|
|
||||||
* @param listener Listener to complete when done
|
|
||||||
*/
|
|
||||||
private void deleteCompletedSnapshots(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId, Priority priority,
|
|
||||||
ActionListener<Void> listener) {
|
|
||||||
if (snapshotIds.isEmpty()) {
|
if (snapshotIds.isEmpty()) {
|
||||||
listener.onResponse(null);
|
return new ClusterStateUpdateTask() {
|
||||||
return;
|
@Override
|
||||||
|
public ClusterState execute(ClusterState currentState) {
|
||||||
|
return currentState;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(String source, Exception e) {
|
||||||
|
listener.onFailure(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
|
listener.onResponse(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue timeout() {
|
||||||
|
return timeout;
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
logger.debug("deleting snapshots {} assuming repository generation [{}] and with priority [{}]", snapshotIds, repositoryStateId,
|
return new ClusterStateUpdateTask(priority) {
|
||||||
priority);
|
|
||||||
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) {
|
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||||
|
@ -1250,7 +1274,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
deleteSnapshotsFromRepository(repoName, snapshotIds, listener, repositoryStateId, newState.nodes().getMinNodeVersion());
|
deleteSnapshotsFromRepository(repoName, snapshotIds, listener, repositoryStateId, newState.nodes().getMinNodeVersion());
|
||||||
}
|
}
|
||||||
});
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
|
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||||
import org.elasticsearch.cluster.metadata.Metadata;
|
import org.elasticsearch.cluster.metadata.Metadata;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||||
|
@ -53,6 +54,7 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -230,6 +232,11 @@ public class RepositoriesServiceTests extends ESTestCase {
|
||||||
public void updateState(final ClusterState state) {
|
public void updateState(final ClusterState state) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
|
||||||
|
Consumer<Exception> onFailure) {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Lifecycle.State lifecycleState() {
|
public Lifecycle.State lifecycleState() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -190,6 +190,7 @@ import org.junit.Before;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
@ -203,6 +204,7 @@ import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
@ -221,6 +223,7 @@ import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.iterableWithSize;
|
import static org.hamcrest.Matchers.iterableWithSize;
|
||||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||||
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
public class SnapshotResiliencyTests extends ESTestCase {
|
public class SnapshotResiliencyTests extends ESTestCase {
|
||||||
|
@ -739,6 +742,63 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||||
assertEquals(0, snapshotInfo.failedShards());
|
assertEquals(0, snapshotInfo.failedShards());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testConcurrentDeletes() {
|
||||||
|
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
|
||||||
|
|
||||||
|
String repoName = "repo";
|
||||||
|
String snapshotName = "snapshot";
|
||||||
|
final String index = "test";
|
||||||
|
final int shards = randomIntBetween(1, 10);
|
||||||
|
|
||||||
|
TestClusterNodes.TestClusterNode masterNode =
|
||||||
|
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
|
||||||
|
|
||||||
|
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
|
||||||
|
|
||||||
|
continueOrDie(createRepoAndIndex(repoName, index, shards),
|
||||||
|
createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
|
||||||
|
.setWaitForCompletion(true).execute(createSnapshotResponseStepListener));
|
||||||
|
|
||||||
|
final Collection<StepListener<Boolean>> deleteSnapshotStepListeners = Arrays.asList(new StepListener<>(), new StepListener<>());
|
||||||
|
|
||||||
|
final AtomicInteger successfulDeletes = new AtomicInteger(0);
|
||||||
|
|
||||||
|
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> {
|
||||||
|
for (StepListener<Boolean> deleteListener : deleteSnapshotStepListeners) {
|
||||||
|
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(
|
||||||
|
ActionListener.wrap(
|
||||||
|
resp -> deleteListener.onResponse(true),
|
||||||
|
e -> {
|
||||||
|
final Throwable unwrapped = ExceptionsHelper.unwrap(
|
||||||
|
e, ConcurrentSnapshotExecutionException.class, SnapshotMissingException.class);
|
||||||
|
assertThat(unwrapped, notNullValue());
|
||||||
|
deleteListener.onResponse(false);
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for (StepListener<Boolean> deleteListener : deleteSnapshotStepListeners) {
|
||||||
|
continueOrDie(deleteListener, deleted -> {
|
||||||
|
if (deleted) {
|
||||||
|
successfulDeletes.incrementAndGet();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
deterministicTaskQueue.runAllRunnableTasks();
|
||||||
|
|
||||||
|
SnapshotDeletionsInProgress deletionsInProgress = masterNode.clusterService.state().custom(SnapshotDeletionsInProgress.TYPE);
|
||||||
|
assertFalse(deletionsInProgress.hasDeletionsInProgress());
|
||||||
|
final Repository repository = masterNode.repositoriesService.repository(repoName);
|
||||||
|
final RepositoryData repositoryData = getRepositoryData(repository);
|
||||||
|
Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
|
||||||
|
// We end up with no snapshots since at least one of the deletes worked out
|
||||||
|
assertThat(snapshotIds, empty());
|
||||||
|
assertThat(successfulDeletes.get(), either(is(1)).or(is(2)));
|
||||||
|
// We did one snapshot and one delete so we went two steps from the empty generation (-1) to 1
|
||||||
|
assertThat(repositoryData.getGenId(), is(1L));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently
|
* Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently
|
||||||
* deleting a snapshot.
|
* deleting a snapshot.
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexCommit;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||||
import org.elasticsearch.cluster.metadata.Metadata;
|
import org.elasticsearch.cluster.metadata.Metadata;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||||
|
@ -44,6 +45,7 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static java.util.Collections.emptyList;
|
import static java.util.Collections.emptyList;
|
||||||
|
@ -157,4 +159,10 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
|
||||||
@Override
|
@Override
|
||||||
public void updateState(final ClusterState state) {
|
public void updateState(final ClusterState state) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
|
||||||
|
Consumer<Exception> onFailure) {
|
||||||
|
throw new UnsupportedOperationException("Unsupported for restore-only repository");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.support.ThreadedActionListener;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||||
import org.elasticsearch.cluster.metadata.MappingMetadata;
|
import org.elasticsearch.cluster.metadata.MappingMetadata;
|
||||||
import org.elasticsearch.cluster.metadata.Metadata;
|
import org.elasticsearch.cluster.metadata.Metadata;
|
||||||
|
@ -93,6 +94,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.LongConsumer;
|
import java.util.function.LongConsumer;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
@ -441,6 +443,12 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
public void updateState(ClusterState state) {
|
public void updateState(ClusterState state) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
|
||||||
|
Consumer<Exception> onFailure) {
|
||||||
|
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
|
||||||
|
}
|
||||||
|
|
||||||
private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
|
private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
|
||||||
Client followerClient, Index followerIndex) {
|
Client followerClient, Index followerIndex) {
|
||||||
final PlainActionFuture<IndexMetadata> indexMetadataFuture = new PlainActionFuture<>();
|
final PlainActionFuture<IndexMetadata> indexMetadataFuture = new PlainActionFuture<>();
|
||||||
|
|
Loading…
Reference in New Issue