From 15c85b29fd9ad8f3b16495a381e2ff1c41a83bcd Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 1 Jul 2020 12:19:29 +0200 Subject: [PATCH] Account for recovery throttling when restoring snapshot (#58658) (#58811) Restoring from a snapshot (which is a particular form of recovery) does not currently take recovery throttling into account (i.e. the `indices.recovery.max_bytes_per_sec` setting). While restores are subject to their own throttling (repository setting `max_restore_bytes_per_sec`), this repository setting does not allow for values to be configured differently on a per-node basis. As restores are very similar in nature to peer recoveries (streaming bytes to the node), it makes sense to configure throttling in a single place. The `max_restore_bytes_per_sec` setting is also changed to default to unlimited now, whereas previously it was set to `40mb`, which is the current default of `indices.recovery.max_bytes_per_sec`). This means that no behavioral change will be observed by clusters where the recovery and restore settings were not adapted. Relates https://github.com/elastic/elasticsearch/issues/57023 Co-authored-by: James Rodewig --- .../repository-shared-settings.asciidoc | 5 +- .../modules/indices/recovery.asciidoc | 5 +- docs/reference/release-notes/7.9.asciidoc | 10 +++ .../apis/put-repo-api.asciidoc | 5 +- .../register-repository.asciidoc | 2 +- .../repository/url/URLRepositoryPlugin.java | 5 +- .../repositories/url/URLRepository.java | 6 +- .../repositories/url/URLRepositoryTests.java | 5 +- .../repositories/azure/AzureRepository.java | 7 +- .../azure/AzureRepositoryPlugin.java | 6 +- .../azure/AzureRepositorySettingsTests.java | 5 +- .../gcs/GoogleCloudStoragePlugin.java | 6 +- .../gcs/GoogleCloudStorageRepository.java | 6 +- ...eCloudStorageBlobStoreRepositoryTests.java | 5 +- .../repositories/hdfs/HdfsPlugin.java | 6 +- .../repositories/hdfs/HdfsRepository.java | 7 +- .../repositories/s3/S3Repository.java | 6 +- .../repositories/s3/S3RepositoryPlugin.java | 11 ++-- .../s3/RepositoryCredentialsTests.java | 6 +- .../s3/S3BlobStoreRepositoryTests.java | 5 +- .../repositories/s3/S3RepositoryTests.java | 5 +- ...etadataLoadingDuringSnapshotRestoreIT.java | 10 +-- .../RepositoryFilterUserMetadataIT.java | 5 +- .../SharedClusterSnapshotRestoreIT.java | 66 ++++++++++++++++++- .../blobstore/RateLimitingInputStream.java | 20 +++--- .../java/org/elasticsearch/node/Node.java | 6 +- .../plugins/RepositoryPlugin.java | 5 +- .../repositories/RepositoriesModule.java | 13 ++-- .../blobstore/BlobStoreRepository.java | 19 ++++-- .../repositories/fs/FsRepository.java | 5 +- .../repositories/RepositoriesModuleTests.java | 33 ++++++---- .../BlobStoreRepositoryRestoreTests.java | 5 +- .../blobstore/BlobStoreRepositoryTests.java | 7 +- .../repositories/fs/FsRepositoryTests.java | 5 +- .../snapshots/SnapshotResiliencyTests.java | 9 ++- .../MockEventuallyConsistentRepository.java | 4 +- ...ckEventuallyConsistentRepositoryTests.java | 18 +++-- .../snapshots/mockstore/MockRepository.java | 10 +-- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 3 +- .../snapshots/SourceOnlySnapshotIT.java | 3 +- .../elasticsearch/xpack/core/XPackPlugin.java | 3 +- .../SourceOnlySnapshotShardTests.java | 5 +- .../core/LocalStateCompositeXPackPlugin.java | 13 ++-- .../SearchableSnapshotDirectoryTests.java | 5 +- .../store/cache/CachePreWarmingTests.java | 5 +- 45 files changed, 289 insertions(+), 112 deletions(-) diff --git a/docs/plugins/repository-shared-settings.asciidoc b/docs/plugins/repository-shared-settings.asciidoc index ca9345e0ffc..13c2716c52d 100644 --- a/docs/plugins/repository-shared-settings.asciidoc +++ b/docs/plugins/repository-shared-settings.asciidoc @@ -1,6 +1,7 @@ `max_restore_bytes_per_sec`:: - Throttles per node restore rate. Defaults to `40mb` per second. + Throttles per node restore rate. Defaults to unlimited. + Note that restores are also throttled through {ref}/recovery.html[recovery settings]. `max_snapshot_bytes_per_sec`:: @@ -8,4 +9,4 @@ `readonly`:: - Makes repository read-only. Defaults to `false`. \ No newline at end of file + Makes repository read-only. Defaults to `false`. diff --git a/docs/reference/modules/indices/recovery.asciidoc b/docs/reference/modules/indices/recovery.asciidoc index 92e6c730c6d..361c9ef1f09 100644 --- a/docs/reference/modules/indices/recovery.asciidoc +++ b/docs/reference/modules/indices/recovery.asciidoc @@ -13,11 +13,12 @@ You can view a list of in-progress and completed recoveries using the <>. [float] -==== Peer recovery settings +==== Recovery settings `indices.recovery.max_bytes_per_sec`:: (<>) Limits total inbound and outbound -recovery traffic for each node. Defaults to `40mb`. +recovery traffic for each node. Applies to both peer recoveries as well +as snapshot recoveries (i.e., restores from a snapshot). Defaults to `40mb`. + This limit applies to each node separately. If multiple nodes in a cluster perform recoveries at the same time, the cluster's total recovery traffic may diff --git a/docs/reference/release-notes/7.9.asciidoc b/docs/reference/release-notes/7.9.asciidoc index 8a4a4e74327..90722089440 100644 --- a/docs/reference/release-notes/7.9.asciidoc +++ b/docs/reference/release-notes/7.9.asciidoc @@ -12,3 +12,13 @@ Script Cache:: Field capabilities API:: * Constant_keyword fields are now described by their family type `keyword` instead of `constant_keyword` {es-pull}58483[#58483] (issue: {es-issue}53175[#53175]) + +Snapshot restore throttling:: +* Restoring from a snapshot (which is a particular form of recovery) is now + properly taking recovery throttling into account (i.e. the + `indices.recovery.max_bytes_per_sec` setting). + The `max_restore_bytes_per_sec` setting is also now defaulting to + unlimited, whereas previously it was set to `40mb`, which is the + default that's used for `indices.recovery.max_bytes_per_sec`. This means + that no behavioral change will be observed by clusters where the recovery + and restore settings had not been adapted from the defaults. {es-pull}58658[#58658] diff --git a/docs/reference/snapshot-restore/apis/put-repo-api.asciidoc b/docs/reference/snapshot-restore/apis/put-repo-api.asciidoc index d9169a7cf69..afab92b5a92 100644 --- a/docs/reference/snapshot-restore/apis/put-repo-api.asciidoc +++ b/docs/reference/snapshot-restore/apis/put-repo-api.asciidoc @@ -176,7 +176,8 @@ nodes in the cluster. `max_restore_bytes_per_sec`:: (Optional, <>) -Maximum snapshot restore rate per node. Defaults to `40mb` per second. +Maximum snapshot restore rate per node. Defaults to unlimited. Note +that restores are also throttled through <>. `max_snapshot_bytes_per_sec`:: (Optional, <>) @@ -249,4 +250,4 @@ data nodes in the cluster. If `false`, this verification is skipped. Defaults to `true`. + You can manually perform this verification using the -<>. \ No newline at end of file +<>. diff --git a/docs/reference/snapshot-restore/register-repository.asciidoc b/docs/reference/snapshot-restore/register-repository.asciidoc index e5df7ae1fd9..6dc2690abd8 100644 --- a/docs/reference/snapshot-restore/register-repository.asciidoc +++ b/docs/reference/snapshot-restore/register-repository.asciidoc @@ -155,7 +155,7 @@ The following settings are supported: `compress`:: Turns on compression of the snapshot files. Compression is applied only to metadata files (index mapping and settings). Data files are not compressed. Defaults to `true`. `chunk_size`:: Big files can be broken down into chunks during snapshotting if needed. Specify the chunk size as a value and unit, for example: `1GB`, `10MB`, `5KB`, `500B`. Defaults to `null` (unlimited chunk size). -`max_restore_bytes_per_sec`:: Throttles per node restore rate. Defaults to `40mb` per second. +`max_restore_bytes_per_sec`:: Throttles per node restore rate. Defaults to unlimited. Note that restores are also throttled through <>. `max_snapshot_bytes_per_sec`:: Throttles per node snapshot rate. Defaults to `40mb` per second. `readonly`:: Makes repository read-only. Defaults to `false`. diff --git a/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java b/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java index 4c3fa615937..ff43a923c27 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; @@ -46,8 +47,8 @@ public class URLRepositoryPlugin extends Plugin implements RepositoryPlugin { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { return Collections.singletonMap(URLRepository.TYPE, - metadata -> new URLRepository(metadata, env, namedXContentRegistry, clusterService)); + metadata -> new URLRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings)); } } diff --git a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java index 56aa2315fe6..61171f3e03d 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.util.URIPattern; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; @@ -83,8 +84,9 @@ public class URLRepository extends BlobStoreRepository { * Constructs a read-only URL-based repository */ public URLRepository(RepositoryMetadata metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) { - super(metadata, false, namedXContentRegistry, clusterService); + NamedXContentRegistry namedXContentRegistry, ClusterService clusterService, + RecoverySettings recoverySettings) { + super(metadata, false, namedXContentRegistry, clusterService, recoverySettings); if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) { throw new RepositoryException(metadata.name(), "missing url"); diff --git a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java index 2ffe0bd042d..1426bc8b893 100644 --- a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java +++ b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java @@ -20,10 +20,12 @@ package org.elasticsearch.repositories.url; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; @@ -40,7 +42,8 @@ public class URLRepositoryTests extends ESTestCase { private URLRepository createRepository(Settings baseSettings, RepositoryMetadata repositoryMetadata) { return new URLRepository(repositoryMetadata, TestEnvironment.newEnvironment(baseSettings), - new NamedXContentRegistry(Collections.emptyList()), BlobStoreTestUtil.mockClusterService()) { + new NamedXContentRegistry(Collections.emptyList()), BlobStoreTestUtil.mockClusterService(), + new RecoverySettings(baseSettings, new ClusterSettings(baseSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index 9384b72eb6c..4107cba65c3 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import java.util.Locale; @@ -81,8 +82,10 @@ public class AzureRepository extends BlobStoreRepository { final RepositoryMetadata metadata, final NamedXContentRegistry namedXContentRegistry, final AzureStorageService storageService, - final ClusterService clusterService) { - super(metadata, Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService); + final ClusterService clusterService, + final RecoverySettings recoverySettings) { + super(metadata, Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, + recoverySettings); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.storageService = storageService; diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java index d98a7c3cbd7..6f6c29ba07a 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; @@ -60,9 +61,10 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { return Collections.singletonMap(AzureRepository.TYPE, - (metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, clusterService)); + (metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, clusterService, + recoverySettings)); } @Override diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java index 259fb6ee8f1..1f2f2ae9b8a 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java @@ -21,11 +21,13 @@ package org.elasticsearch.repositories.azure; import com.microsoft.azure.storage.LocationMode; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; @@ -42,7 +44,8 @@ public class AzureRepositorySettingsTests extends ESTestCase { .put(settings) .build(); final AzureRepository azureRepository = new AzureRepository(new RepositoryMetadata("foo", "azure", internalSettings), - NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), BlobStoreTestUtil.mockClusterService()); + NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), BlobStoreTestUtil.mockClusterService(), + new RecoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); assertThat(azureRepository.getBlobStore(), is(nullValue())); return azureRepository; } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index c4a065270be..0c878d0fe3f 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; @@ -52,9 +53,10 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { return Collections.singletonMap(GoogleCloudStorageRepository.TYPE, - metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, clusterService)); + metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, clusterService, + recoverySettings)); } @Override diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 1f27f04948a..7e033c47f20 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; @@ -68,8 +69,9 @@ class GoogleCloudStorageRepository extends BlobStoreRepository { final RepositoryMetadata metadata, final NamedXContentRegistry namedXContentRegistry, final GoogleCloudStorageService storageService, - final ClusterService clusterService) { - super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService); + final ClusterService clusterService, + final RecoverySettings recoverySettings) { + super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService, recoverySettings); this.storageService = storageService; String basePath = BASE_PATH.get(metadata.settings()); diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index f965114d958..848c73815d4 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -258,9 +259,9 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe @Override public Map getRepositories(Environment env, NamedXContentRegistry registry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { return Collections.singletonMap(GoogleCloudStorageRepository.TYPE, - metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, clusterService) { + metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, clusterService, recoverySettings) { @Override protected GoogleCloudStorageBlobStore createBlobStore() { return new GoogleCloudStorageBlobStore( diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java index 1d93aed9cc3..bc118d5b109 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; @@ -112,7 +113,8 @@ public final class HdfsPlugin extends Plugin implements RepositoryPlugin { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { - return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, clusterService)); + ClusterService clusterService, RecoverySettings recoverySettings) { + return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, clusterService, + recoverySettings)); } } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index 9b41c20030b..4a6dcb4a3db 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import java.io.IOException; @@ -71,8 +72,10 @@ public final class HdfsRepository extends BlobStoreRepository { final RepositoryMetadata metadata, final Environment environment, final NamedXContentRegistry namedXContentRegistry, - final ClusterService clusterService) { - super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, clusterService); + final ClusterService clusterService, + final RecoverySettings recoverySettings) { + super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, clusterService, + recoverySettings); this.environment = environment; this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 9fdaec59a34..48cfc93a2f2 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; @@ -215,8 +216,9 @@ class S3Repository extends BlobStoreRepository { final RepositoryMetadata metadata, final NamedXContentRegistry namedXContentRegistry, final S3Service service, - final ClusterService clusterService) { - super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService); + final ClusterService clusterService, + final RecoverySettings recoverySettings) { + super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings); this.service = service; this.repositoryMetadata = metadata; diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index 581aabfd650..178b0fcd529 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; @@ -79,14 +80,16 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo protected S3Repository createRepository( final RepositoryMetadata metadata, final NamedXContentRegistry registry, - final ClusterService clusterService) { - return new S3Repository(metadata, registry, service, clusterService); + final ClusterService clusterService, + final RecoverySettings recoverySettings) { + return new S3Repository(metadata, registry, service, clusterService, recoverySettings); } @Override public Map getRepositories(final Environment env, final NamedXContentRegistry registry, - final ClusterService clusterService) { - return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, clusterService)); + final ClusterService clusterService, final RecoverySettings recoverySettings) { + return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, clusterService, + recoverySettings)); } @Override diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index a4402558c92..a171c1e6364 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesService; @@ -268,8 +269,9 @@ public class RepositoryCredentialsTests extends ESSingleNodeTestCase { @Override protected S3Repository createRepository(RepositoryMetadata metadata, - NamedXContentRegistry registry, ClusterService clusterService) { - return new S3Repository(metadata, registry, service, clusterService) { + NamedXContentRegistry registry, ClusterService clusterService, + RecoverySettings recoverySettings) { + return new S3Repository(metadata, registry, service, clusterService, recoverySettings) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 82896c8d66d..5ac4428c416 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryData; @@ -201,8 +202,8 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes @Override protected S3Repository createRepository(RepositoryMetadata metadata, NamedXContentRegistry registry, - ClusterService clusterService) { - return new S3Repository(metadata, registry, service, clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { + return new S3Repository(metadata, registry, service, clusterService, recoverySettings) { @Override public BlobStore blobStore() { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java index ea8500c0738..ba274a9955d 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java @@ -21,10 +21,12 @@ package org.elasticsearch.repositories.s3; import com.amazonaws.services.s3.AbstractAmazonS3; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; @@ -119,7 +121,8 @@ public class S3RepositoryTests extends ESTestCase { } private S3Repository createS3Repo(RepositoryMetadata metadata) { - return new S3Repository(metadata, NamedXContentRegistry.EMPTY, new DummyS3Service(), BlobStoreTestUtil.mockClusterService()) { + return new S3Repository(metadata, NamedXContentRegistry.EMPTY, new DummyS3Service(), BlobStoreTestUtil.mockClusterService(), + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java index 67d115ed30f..c59cae2e827 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; @@ -185,8 +186,9 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte public CountingMockRepository(final RepositoryMetadata metadata, final Environment environment, - final NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) { - super(metadata, environment, namedXContentRegistry, clusterService); + final NamedXContentRegistry namedXContentRegistry, ClusterService clusterService, + RecoverySettings recoverySettings) { + super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); } @Override @@ -209,9 +211,9 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { return Collections.singletonMap(TYPE, - metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, clusterService)); + metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings)); } } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java index da016df56bf..174e5bd464c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.IndexId; @@ -79,9 +80,9 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { return Collections.singletonMap("mock_meta_filtering", metadata -> - new FsRepository(metadata, env, namedXContentRegistry, clusterService) { + new FsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) { // Storing the initially expected metadata value here to verify that #filterUserMetadata is only called once on the // initial master node starting the snapshot diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 5282a0134bf..443d24f3187 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -125,6 +125,7 @@ import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTI import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.index.shard.IndexShardTests.getEngineFromShard; +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing; @@ -1910,12 +1911,14 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas Path repositoryLocation = randomRepoPath(); boolean throttleSnapshot = randomBoolean(); boolean throttleRestore = randomBoolean(); + boolean throttleRestoreViaRecoverySettings = throttleRestore && randomBoolean(); assertAcked(client.admin().cluster().preparePutRepository("test-repo") .setType("fs").setSettings(Settings.builder() .put("location", repositoryLocation) .put("compress", randomBoolean()) .put("chunk_size", randomIntBetween(1000, 10000), ByteSizeUnit.BYTES) - .put("max_restore_bytes_per_sec", throttleRestore ? "10k" : "0") + .put("max_restore_bytes_per_sec", + throttleRestore && (throttleRestoreViaRecoverySettings == false) ? "10k" : "0") .put("max_snapshot_bytes_per_sec", throttleSnapshot ? "10k" : "0"))); createIndex("test-idx"); @@ -1939,6 +1942,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas cluster().wipeIndices("test-idx"); logger.info("--> restore index"); + if (throttleRestoreViaRecoverySettings) { + client.admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "10k").build()).get(); + } RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") .setWaitForCompletion(true).execute().actionGet(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); @@ -1962,6 +1969,63 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas } else { assertThat(restorePause, equalTo(0L)); } + client.admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .putNull(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()).build()).get(); + } + + public void testDynamicRestoreThrottling() throws Exception { + Client client = client(); + + logger.info("--> creating repository"); + Path repositoryLocation = randomRepoPath(); + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(Settings.builder() + .put("location", repositoryLocation) + .put("compress", randomBoolean()) + .put("chunk_size", 100, ByteSizeUnit.BYTES))); + + createIndex("test-idx"); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx", "_doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); + + logger.info("--> snapshot"); + client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + .setWaitForCompletion(true).setIndices("test-idx").get(); + + logger.info("--> delete index"); + cluster().wipeIndices("test-idx"); + + logger.info("--> restore index"); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "100b").build()).get(); + ActionFuture restoreSnapshotResponse = client.admin().cluster() + .prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute(); + + // check if throttling is active + assertBusy(() -> { + long restorePause = 0L; + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + restorePause += repositoriesService.repository("test-repo").getRestoreThrottleTimeInNanos(); + } + assertThat(restorePause, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(1, 5)).nanos())); + assertFalse(restoreSnapshotResponse.isDone()); + }, 30, TimeUnit.SECONDS); + + // run at full speed again + client.admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .putNull(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()).build()).get(); + + // check that restore now completes quickly (i.e. within 10 seconds) + assertBusy(() -> assertTrue(restoreSnapshotResponse.isDone())); + + assertThat(restoreSnapshotResponse.get().getRestoreInfo().totalShards(), greaterThan(0)); + assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); } public void testSnapshotStatus() throws Exception { diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java index 61371616db9..b1bcc056e6f 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java @@ -24,13 +24,14 @@ import org.apache.lucene.store.RateLimiter; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.function.Supplier; /** * Rate limiting wrapper for InputStream */ public class RateLimitingInputStream extends FilterInputStream { - private final RateLimiter rateLimiter; + private final Supplier rateLimiterSupplier; private final Listener listener; @@ -40,19 +41,22 @@ public class RateLimitingInputStream extends FilterInputStream { void onPause(long nanos); } - public RateLimitingInputStream(InputStream delegate, RateLimiter rateLimiter, Listener listener) { + public RateLimitingInputStream(InputStream delegate, Supplier rateLimiterSupplier, Listener listener) { super(delegate); - this.rateLimiter = rateLimiter; + this.rateLimiterSupplier = rateLimiterSupplier; this.listener = listener; } private void maybePause(int bytes) throws IOException { bytesSinceLastRateLimit += bytes; - if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) { - long pause = rateLimiter.pause(bytesSinceLastRateLimit); - bytesSinceLastRateLimit = 0; - if (pause > 0) { - listener.onPause(pause); + final RateLimiter rateLimiter = rateLimiterSupplier.get(); + if (rateLimiter != null) { + if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) { + long pause = rateLimiter.pause(bytesSinceLastRateLimit); + bytesSinceLastRateLimit = 0; + if (pause > 0) { + listener.onPause(pause); + } } } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 6b0dc7fb59b..d45be0d99e8 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -546,9 +546,10 @@ public class Node implements Closeable { SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final HttpServerTransport httpServerTransport = newHttpTransport(networkModule); - + final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment, - pluginsService.filterPlugins(RepositoryPlugin.class), transportService, clusterService, threadPool, xContentRegistry); + pluginsService.filterPlugins(RepositoryPlugin.class), transportService, clusterService, threadPool, xContentRegistry, + recoverySettings); RepositoriesService repositoryService = repositoriesModule.getRepositoryService(); repositoriesServiceReference.set(repositoryService); SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService, @@ -633,7 +634,6 @@ public class Node implements Closeable { b.bind(GatewayMetaState.class).toInstance(gatewayMetaState); b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery()); { - RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings); b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); diff --git a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java index 7891363adb0..4f15a352c1d 100644 --- a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java @@ -22,6 +22,7 @@ package org.elasticsearch.plugins; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.Repository; import java.util.Collections; @@ -41,7 +42,7 @@ public interface RepositoryPlugin { * the value is a factory to construct the {@link Repository} interface. */ default Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { return Collections.emptyMap(); } @@ -55,7 +56,7 @@ public interface RepositoryPlugin { * the value is a factory to construct the {@link Repository} interface. */ default Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { return Collections.emptyMap(); } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index f87aab460fc..cb9a5917a2a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.threadpool.ThreadPool; @@ -41,12 +42,15 @@ public final class RepositoriesModule { private final RepositoriesService repositoriesService; public RepositoriesModule(Environment env, List repoPlugins, TransportService transportService, - ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry) { + ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry, + RecoverySettings recoverySettings) { Map factories = new HashMap<>(); - factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, clusterService)); + factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, + recoverySettings)); for (RepositoryPlugin repoPlugin : repoPlugins) { - Map newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, clusterService); + Map newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, clusterService, + recoverySettings); for (Map.Entry entry : newRepoTypes.entrySet()) { if (factories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Repository type [" + entry.getKey() + "] is already registered"); @@ -56,7 +60,8 @@ public final class RepositoriesModule { Map internalFactories = new HashMap<>(); for (RepositoryPlugin repoPlugin : repoPlugins) { - Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, clusterService); + Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, clusterService, + recoverySettings); for (Map.Entry entry : newRepoTypes.entrySet()) { if (internalFactories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered"); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index ec9740b0d5e..1181ec4474b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -97,6 +97,7 @@ import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetadata; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; @@ -137,6 +138,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -252,6 +254,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final ClusterService clusterService; + private final RecoverySettings recoverySettings; + /** * Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for * {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart @@ -289,14 +293,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final RepositoryMetadata metadata, final boolean compress, final NamedXContentRegistry namedXContentRegistry, - final ClusterService clusterService) { + final ClusterService clusterService, + final RecoverySettings recoverySettings) { this.compress = compress; this.metadata = metadata; this.namedXContentRegistry = namedXContentRegistry; this.threadPool = clusterService.getClusterApplierService().threadPool(); this.clusterService = clusterService; + this.recoverySettings = recoverySettings; snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); - restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); + restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO); readOnly = metadata.settings().getAsBoolean("readonly", false); cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, @@ -1993,16 +1999,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp }); } - private static InputStream maybeRateLimit(InputStream stream, @Nullable RateLimiter rateLimiter, CounterMetric metric) { - return rateLimiter == null ? stream : new RateLimitingInputStream(stream, rateLimiter, metric::inc); + private static InputStream maybeRateLimit(InputStream stream, Supplier rateLimiterSupplier, CounterMetric metric) { + return new RateLimitingInputStream(stream, rateLimiterSupplier, metric::inc); } public InputStream maybeRateLimitRestores(InputStream stream) { - return maybeRateLimit(stream, restoreRateLimiter, restoreRateLimitingTimeInNanos); + return maybeRateLimit(maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos), + recoverySettings::rateLimiter, restoreRateLimitingTimeInNanos); } public InputStream maybeRateLimitSnapshots(InputStream stream) { - return maybeRateLimit(stream, snapshotRateLimiter, snapshotRateLimitingTimeInNanos); + return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java index ba2987c4149..95c3f0a185c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; @@ -75,8 +76,8 @@ public class FsRepository extends BlobStoreRepository { * Constructs a shared file system repository. */ public FsRepository(RepositoryMetadata metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { - super(metadata, calculateCompress(metadata, environment), namedXContentRegistry, clusterService); + ClusterService clusterService, RecoverySettings recoverySettings) { + super(metadata, calculateCompress(metadata, environment), namedXContentRegistry, clusterService, recoverySettings); this.environment = environment; String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings()); if (location.isEmpty()) { diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java index 1fad6f62773..2c3542df6ce 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -45,6 +46,7 @@ public class RepositoriesModuleTests extends ESTestCase { private Repository.Factory factory; private ThreadPool threadPool; private ClusterService clusterService; + private RecoverySettings recoverySettings; @Override public void setUp() throws Exception { @@ -53,6 +55,7 @@ public class RepositoriesModuleTests extends ESTestCase { contentRegistry = mock(NamedXContentRegistry.class); threadPool = mock(ThreadPool.class); clusterService = mock(ClusterService.class); + recoverySettings = mock(RecoverySettings.class); plugin1 = mock(RepositoryPlugin.class); plugin2 = mock(RepositoryPlugin.class); factory = mock(Repository.Factory.class); @@ -62,46 +65,52 @@ public class RepositoriesModuleTests extends ESTestCase { } public void testCanRegisterTwoRepositoriesWithDifferentTypes() { - when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type2", factory)); + when(plugin1.getRepositories(environment, contentRegistry, clusterService, recoverySettings)) + .thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry, clusterService, recoverySettings)) + .thenReturn(Collections.singletonMap("type2", factory)); // Would throw new RepositoriesModule( - environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), threadPool, contentRegistry); + environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), threadPool, contentRegistry, + recoverySettings); } public void testCannotRegisterTwoRepositoriesWithSameTypes() { - when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin1.getRepositories(environment, contentRegistry, clusterService, recoverySettings)) + .thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry, clusterService, recoverySettings)) + .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService, - threadPool, contentRegistry)); + threadPool, contentRegistry, recoverySettings)); assertEquals("Repository type [type1] is already registered", ex.getMessage()); } public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() { - when(plugin1.getInternalRepositories(environment, contentRegistry, clusterService)) + when(plugin1.getInternalRepositories(environment, contentRegistry, clusterService, recoverySettings)) .thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService)) + when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService, recoverySettings)) .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService, - threadPool, contentRegistry)); + threadPool, contentRegistry, recoverySettings)); assertEquals("Internal repository type [type1] is already registered", ex.getMessage()); } public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() { - when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService)) + when(plugin1.getRepositories(environment, contentRegistry, clusterService, recoverySettings)) + .thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService, recoverySettings)) .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService, threadPool, - contentRegistry)); + contentRegistry, recoverySettings)); assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage()); } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index e7943134bd8..fc749f28f81 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; @@ -44,6 +45,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetadata; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -198,7 +200,8 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); - final FsRepository repository = new FsRepository(repositoryMetadata, createEnvironment(), xContentRegistry(), clusterService) { + final FsRepository repository = new FsRepository(repositoryMetadata, createEnvironment(), xContentRegistry(), clusterService, + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index fec8b6c1535..ba142f6cafe 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.IndexId; @@ -74,9 +75,9 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { return Collections.singletonMap(REPO_TYPE, - (metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService) { + (metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we access blobStore on test/main threads @@ -233,7 +234,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { Environment useCompressEnvironment = new Environment(useCompressSettings, node().getEnvironment().configFile()); - new FsRepository(metadata, useCompressEnvironment, null, BlobStoreTestUtil.mockClusterService()); + new FsRepository(metadata, useCompressEnvironment, null, BlobStoreTestUtil.mockClusterService(), null); assertWarnings("[repositories.fs.compress] setting was deprecated in Elasticsearch and will be removed in a future release!" + " See the breaking changes documentation for the next major version."); diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 5293d93742f..c32efc3ec8c 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -52,6 +53,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; @@ -92,7 +94,8 @@ public class FsRepositoryTests extends ESTestCase { int numDocs = indexDocs(directory); RepositoryMetadata metadata = new RepositoryMetadata("test", "fs", settings); FsRepository repository = new FsRepository(metadata, new Environment(settings, null), NamedXContentRegistry.EMPTY, - BlobStoreTestUtil.mockClusterService()); + BlobStoreTestUtil.mockClusterService(), new RecoverySettings(settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); repository.start(); final Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_INDEX_UUID, "myindexUUID").build(); IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("myindex", indexSettings); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 9cf0c8ef5e1..6a1f0d36484 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1303,6 +1303,8 @@ public class SnapshotResiliencyTests extends ESTestCase { private final ClusterService clusterService; + private final RecoverySettings recoverySettings; + private final NodeConnectionsService nodeConnectionsService; private final RepositoriesService repositoriesService; @@ -1350,6 +1352,7 @@ public class SnapshotResiliencyTests extends ESTestCase { // don't do anything, and don't block } }); + recoverySettings = new RecoverySettings(settings, clusterSettings); mockTransport = new DisruptableMockTransport(node, logger) { @Override protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) { @@ -1587,7 +1590,8 @@ public class SnapshotResiliencyTests extends ESTestCase { private Repository.Factory getRepoFactory(Environment environment) { // Run half the tests with the eventually consistent repository if (blobStoreContext == null) { - return metadata -> new FsRepository(metadata, environment, xContentRegistry(), clusterService) { + return metadata -> new FsRepository(metadata, environment, xContentRegistry(), clusterService, + recoverySettings) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo in the test thread @@ -1595,7 +1599,8 @@ public class SnapshotResiliencyTests extends ESTestCase { }; } else { return metadata -> - new MockEventuallyConsistentRepository(metadata, xContentRegistry(), clusterService, blobStoreContext, random()); + new MockEventuallyConsistentRepository(metadata, xContentRegistry(), clusterService, recoverySettings, + blobStoreContext, random()); } } public void restart() { diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 03900cd292c..5d1e9591ae3 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESTestCase; @@ -76,9 +77,10 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository { final RepositoryMetadata metadata, final NamedXContentRegistry namedXContentRegistry, final ClusterService clusterService, + final RecoverySettings recoverySettings, final Context context, final Random random) { - super(metadata, false, namedXContentRegistry, clusterService); + super(metadata, false, namedXContentRegistry, clusterService, recoverySettings); this.context = context; this.namedXContentRegistry = namedXContentRegistry; this.random = random; diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index d0c18d24bcd..d7866a3df9f 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -26,7 +26,9 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; @@ -48,11 +50,14 @@ import static org.hamcrest.Matchers.startsWith; public class MockEventuallyConsistentRepositoryTests extends ESTestCase { + private final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + public void testReadAfterWriteConsistently() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), recoverySettings, blobStoreContext, random())) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -72,7 +77,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), recoverySettings, blobStoreContext, random())) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -88,7 +93,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), recoverySettings, blobStoreContext, random())) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -106,7 +111,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), recoverySettings, blobStoreContext, random())) { repository.start(); final BlobContainer container = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -123,7 +128,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), recoverySettings, blobStoreContext, random())) { repository.start(); final BlobContainer container = repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0")); @@ -142,7 +147,8 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { final RepositoryMetadata metadata = new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY); final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(metadata); try (BlobStoreRepository repository = - new MockEventuallyConsistentRepository(metadata, xContentRegistry(), clusterService, blobStoreContext, random())) { + new MockEventuallyConsistentRepository(metadata, xContentRegistry(), clusterService, recoverySettings, + blobStoreContext, random())) { clusterService.addStateApplier(event -> repository.updateState(event.state())); // Apply state once to initialize repo properly like RepositoriesService would repository.updateState(clusterService.state()); diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 51fa9877e52..4921c113e8c 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.fs.FsRepository; @@ -72,9 +73,9 @@ public class MockRepository extends FsRepository { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { return Collections.singletonMap("mock", (metadata) -> - new MockRepository(metadata, env, namedXContentRegistry, clusterService)); + new MockRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings)); } @Override @@ -117,8 +118,9 @@ public class MockRepository extends FsRepository { private volatile boolean blocked = false; public MockRepository(RepositoryMetadata metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) { - super(overrideSettings(metadata, environment), environment, namedXContentRegistry, clusterService); + NamedXContentRegistry namedXContentRegistry, ClusterService clusterService, + RecoverySettings recoverySettings) { + super(overrideSettings(metadata, environment), environment, namedXContentRegistry, clusterService, recoverySettings); randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 7224ebf3197..197eb5c2fd4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -32,6 +32,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTasksExecutor; @@ -341,7 +342,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E @Override public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), clusterService.getClusterApplierService().threadPool()); diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java index a1fe12468a2..d9060e6076e 100644 --- a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; @@ -71,7 +72,7 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase { public static final class MyPlugin extends Plugin implements RepositoryPlugin, EnginePlugin { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index a732523b6bc..ef0e80faf74 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -39,6 +39,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.license.LicenseService; import org.elasticsearch.license.LicensesMetadata; import org.elasticsearch.license.Licensing; @@ -370,7 +371,7 @@ public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin, @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 427ba7d3b9b..91140d57f2e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.internal.io.IOUtils; @@ -58,6 +59,7 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; @@ -359,7 +361,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); - final Repository repository = new FsRepository(repositoryMetadata, createEnvironment(), xContentRegistry(), clusterService); + final Repository repository = new FsRepository(repositoryMetadata, createEnvironment(), xContentRegistry(), clusterService, + new RecoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); clusterService.addStateApplier(e -> repository.updateState(e.state())); // Apply state once to initialize repo properly like RepositoriesService would repository.updateState(clusterService.state()); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index a423ebc30ca..15c79b8a24e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -44,6 +44,7 @@ import org.elasticsearch.index.analysis.TokenizerFactory; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.indices.analysis.AnalysisModule; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.ingest.Processor; import org.elasticsearch.license.LicenseService; import org.elasticsearch.license.XPackLicenseState; @@ -422,21 +423,21 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { HashMap repositories = - new HashMap<>(super.getRepositories(env, namedXContentRegistry, clusterService)); + new HashMap<>(super.getRepositories(env, namedXContentRegistry, clusterService, recoverySettings)); filterPlugins(RepositoryPlugin.class).forEach( - r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, clusterService))); + r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, clusterService, recoverySettings))); return repositories; } @Override public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ClusterService clusterService) { + ClusterService clusterService, RecoverySettings recoverySettings) { HashMap internalRepositories = - new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, clusterService)); + new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, clusterService, recoverySettings)); filterPlugins(RepositoryPlugin.class).forEach(r -> - internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, clusterService))); + internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, clusterService, recoverySettings))); return internalRepositories; } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index b18010ef2d9..53b3b79ee5a 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -50,6 +50,7 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -65,6 +66,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; @@ -509,7 +511,8 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase { null ), NamedXContentRegistry.EMPTY, - BlobStoreTestUtil.mockClusterService(repositoryMetadata) + BlobStoreTestUtil.mockClusterService(repositoryMetadata), + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) ) { @Override diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachePreWarmingTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachePreWarmingTests.java index 000f20bd52a..a2528a91da9 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachePreWarmingTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachePreWarmingTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.blobstore.support.FilterBlobContainer; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -41,6 +42,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.F import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; @@ -163,7 +165,8 @@ public class CachePreWarmingTests extends ESTestCase { null ), NamedXContentRegistry.EMPTY, - BlobStoreTestUtil.mockClusterService(repositoryMetadata) + BlobStoreTestUtil.mockClusterService(repositoryMetadata), + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) ) { @Override