Add CoolDown Period to S3 Repository (#51074) (#51213)

Add cool down period after snapshot finalization and delete to prevent eventually consistent AWS S3 from corrupting shard level metadata as long as the repository is using the old format metadata on the shard level.
This commit is contained in:
Armin Braun 2020-01-20 12:18:16 +01:00 committed by GitHub
parent 424b4ed4ea
commit c5f1a90159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 168 additions and 0 deletions

View File

@ -21,6 +21,9 @@ package org.elasticsearch.repositories.s3;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
@ -32,11 +35,23 @@ import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
/**
@ -142,6 +157,23 @@ class S3Repository extends BlobStoreRepository {
static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());
/**
* Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the
* backwards compatible snapshot format from before
* {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link org.elasticsearch.Version#V_7_6_0}).
* This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when
* doing repository operations in rapid succession on a repository in the old metadata format.
* This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository
* becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than
* {@link org.elasticsearch.Version#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new
* format and disable the cooldown period.
*/
static final Setting<TimeValue> COOLDOWN_PERIOD = Setting.timeSetting(
"cooldown_period",
new TimeValue(3, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Setting.Property.Dynamic);
/**
* Specifies the path within bucket to repository data. Defaults to root directory.
*/
@ -165,6 +197,12 @@ class S3Repository extends BlobStoreRepository {
private final RepositoryMetaData repositoryMetaData;
/**
* Time period to delay repository operations by after finalizing or deleting a snapshot.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private final TimeValue coolDown;
/**
* Constructs an s3 backed repository
*/
@ -211,6 +249,8 @@ class S3Repository extends BlobStoreRepository {
+ "store these in named clients and the elasticsearch keystore for secure settings.");
}
coolDown = COOLDOWN_PERIOD.get(metadata.settings());
logger.debug(
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]",
bucket,
@ -221,6 +261,70 @@ class S3Repository extends BlobStoreRepository {
storageClass);
}
/**
* Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be
* closed concurrently.
*/
private final AtomicReference<Scheduler.Cancellable> finalizationFuture = new AtomicReference<>();
@Override
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData clusterMetaData, Map<String, Object> userMetadata, boolean writeShardGens,
ActionListener<SnapshotInfo> listener) {
if (writeShardGens == false) {
listener = delayedListener(listener);
}
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener);
}
@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
if (writeShardGens == false) {
listener = delayedListener(listener);
}
super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
}
/**
* Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private <T> ActionListener<T> delayedListener(ActionListener<T> listener) {
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
});
return new ActionListener<T>() {
@Override
public void onResponse(T response) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)),
coolDown, ThreadPool.Names.SNAPSHOT));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
@Override
public void onFailure(Exception e) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
};
}
private void logCooldownInfo() {
logger.info("Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" +
" and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " +
"repository corruption. To get rid of this message and move to the new repository metadata format, either remove " +
"all snapshots older than version [{}] from the repository or create a new repository at an empty location.",
coolDown, metadata.name(), SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION);
}
@Override
protected S3BlobStore createBlobStore() {
return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, repositoryMetaData);
@ -241,4 +345,14 @@ class S3Repository extends BlobStoreRepository {
protected ByteSizeValue chunkSize() {
return chunkSize;
}
@Override
protected void doClose() {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
if (cancellable != null) {
logger.debug("Repository [{}] closed during cool-down period", metadata.name());
cancellable.cancel();
}
super.doClose();
}
}

View File

@ -22,30 +22,48 @@ import com.amazonaws.http.AmazonHttpClient;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.s3.S3HttpHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(5L);
@Override
protected String repositoryType() {
return S3Repository.TYPE;
@ -82,6 +100,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret");
return Settings.builder()
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time
.put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl())
// Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side
.put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true)
@ -92,6 +111,41 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
.build();
}
public void testEnforcedCooldownPeriod() throws IOException {
final String repoName = createRepository(randomName(), Settings.builder().put(repositorySettings())
.put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build());
final SnapshotId fakeOldSnapshot = client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-old")
.setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId();
final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
final RepositoryData repositoryData =
PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f)));
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
final BytesReference serialized =
BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false));
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
try (InputStream stream = serialized.streamInput()) {
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true);
}
})));
final String newSnapshotName = "snapshot-new";
final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));
final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));
final long beforeFastDelete = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
}
/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/