Account for recovery throttling when restoring snapshot (#58658) (#58811)

Restoring from a snapshot (which is a particular form of recovery) does not currently take recovery throttling into account
(i.e. the `indices.recovery.max_bytes_per_sec` setting). While restores are subject to their own throttling (repository
setting `max_restore_bytes_per_sec`), this repository setting does not allow for values to be configured differently on a
per-node basis. As restores are very similar in nature to peer recoveries (streaming bytes to the node), it makes sense to
configure throttling in a single place.

The `max_restore_bytes_per_sec` setting is also changed to default to unlimited now, whereas previously it was set to
`40mb`, which is the current default of `indices.recovery.max_bytes_per_sec`). This means that no behavioral change
will be observed by clusters where the recovery and restore settings were not adapted.

Relates https://github.com/elastic/elasticsearch/issues/57023

Co-authored-by: James Rodewig <james.rodewig@elastic.co>
This commit is contained in:
Yannick Welsch 2020-07-01 12:19:29 +02:00 committed by GitHub
parent 7d34fa9b67
commit 15c85b29fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 289 additions and 112 deletions

View File

@ -1,6 +1,7 @@
`max_restore_bytes_per_sec`::
Throttles per node restore rate. Defaults to `40mb` per second.
Throttles per node restore rate. Defaults to unlimited.
Note that restores are also throttled through {ref}/recovery.html[recovery settings].
`max_snapshot_bytes_per_sec`::
@ -8,4 +9,4 @@
`readonly`::
Makes repository read-only. Defaults to `false`.
Makes repository read-only. Defaults to `false`.

View File

@ -13,11 +13,12 @@ You can view a list of in-progress and completed recoveries using the
<<cat-recovery, cat recovery API>>.
[float]
==== Peer recovery settings
==== Recovery settings
`indices.recovery.max_bytes_per_sec`::
(<<cluster-update-settings,Dynamic>>) Limits total inbound and outbound
recovery traffic for each node. Defaults to `40mb`.
recovery traffic for each node. Applies to both peer recoveries as well
as snapshot recoveries (i.e., restores from a snapshot). Defaults to `40mb`.
+
This limit applies to each node separately. If multiple nodes in a cluster
perform recoveries at the same time, the cluster's total recovery traffic may

View File

@ -12,3 +12,13 @@ Script Cache::
Field capabilities API::
* Constant_keyword fields are now described by their family type `keyword` instead of `constant_keyword` {es-pull}58483[#58483] (issue: {es-issue}53175[#53175])
Snapshot restore throttling::
* Restoring from a snapshot (which is a particular form of recovery) is now
properly taking recovery throttling into account (i.e. the
`indices.recovery.max_bytes_per_sec` setting).
The `max_restore_bytes_per_sec` setting is also now defaulting to
unlimited, whereas previously it was set to `40mb`, which is the
default that's used for `indices.recovery.max_bytes_per_sec`. This means
that no behavioral change will be observed by clusters where the recovery
and restore settings had not been adapted from the defaults. {es-pull}58658[#58658]

View File

@ -176,7 +176,8 @@ nodes in the cluster.
`max_restore_bytes_per_sec`::
(Optional, <<byte-units,byte value>>)
Maximum snapshot restore rate per node. Defaults to `40mb` per second.
Maximum snapshot restore rate per node. Defaults to unlimited. Note
that restores are also throttled through <<recovery,recovery settings>>.
`max_snapshot_bytes_per_sec`::
(Optional, <<byte-units,byte value>>)
@ -249,4 +250,4 @@ data nodes in the cluster. If `false`, this verification is skipped. Defaults to
`true`.
+
You can manually perform this verification using the
<<snapshots-repository-verification,verify snapshot repository API>>.
<<snapshots-repository-verification,verify snapshot repository API>>.

View File

@ -155,7 +155,7 @@ The following settings are supported:
`compress`:: Turns on compression of the snapshot files. Compression is applied only to metadata files (index mapping and settings). Data files are not compressed. Defaults to `true`.
`chunk_size`:: Big files can be broken down into chunks during snapshotting if needed. Specify the chunk size as a value and
unit, for example: `1GB`, `10MB`, `5KB`, `500B`. Defaults to `null` (unlimited chunk size).
`max_restore_bytes_per_sec`:: Throttles per node restore rate. Defaults to `40mb` per second.
`max_restore_bytes_per_sec`:: Throttles per node restore rate. Defaults to unlimited. Note that restores are also throttled through <<recovery,recovery settings>>.
`max_snapshot_bytes_per_sec`:: Throttles per node snapshot rate. Defaults to `40mb` per second.
`readonly`:: Makes repository read-only. Defaults to `false`.

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
@ -46,8 +47,8 @@ public class URLRepositoryPlugin extends Plugin implements RepositoryPlugin {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.singletonMap(URLRepository.TYPE,
metadata -> new URLRepository(metadata, env, namedXContentRegistry, clusterService));
metadata -> new URLRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings));
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.util.URIPattern;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@ -83,8 +84,9 @@ public class URLRepository extends BlobStoreRepository {
* Constructs a read-only URL-based repository
*/
public URLRepository(RepositoryMetadata metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) {
super(metadata, false, namedXContentRegistry, clusterService);
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService,
RecoverySettings recoverySettings) {
super(metadata, false, namedXContentRegistry, clusterService, recoverySettings);
if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
throw new RepositoryException(metadata.name(), "missing url");

View File

@ -20,10 +20,12 @@
package org.elasticsearch.repositories.url;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.test.ESTestCase;
@ -40,7 +42,8 @@ public class URLRepositoryTests extends ESTestCase {
private URLRepository createRepository(Settings baseSettings, RepositoryMetadata repositoryMetadata) {
return new URLRepository(repositoryMetadata, TestEnvironment.newEnvironment(baseSettings),
new NamedXContentRegistry(Collections.emptyList()), BlobStoreTestUtil.mockClusterService()) {
new NamedXContentRegistry(Collections.emptyList()), BlobStoreTestUtil.mockClusterService(),
new RecoverySettings(baseSettings, new ClusterSettings(baseSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.util.Locale;
@ -81,8 +82,10 @@ public class AzureRepository extends BlobStoreRepository {
final RepositoryMetadata metadata,
final NamedXContentRegistry namedXContentRegistry,
final AzureStorageService storageService,
final ClusterService clusterService) {
super(metadata, Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService);
final ClusterService clusterService,
final RecoverySettings recoverySettings) {
super(metadata, Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService,
recoverySettings);
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.storageService = storageService;

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
@ -60,9 +61,10 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.singletonMap(AzureRepository.TYPE,
(metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, clusterService));
(metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, clusterService,
recoverySettings));
}
@Override

View File

@ -21,11 +21,13 @@ package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.LocationMode;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.test.ESTestCase;
@ -42,7 +44,8 @@ public class AzureRepositorySettingsTests extends ESTestCase {
.put(settings)
.build();
final AzureRepository azureRepository = new AzureRepository(new RepositoryMetadata("foo", "azure", internalSettings),
NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), BlobStoreTestUtil.mockClusterService());
NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), BlobStoreTestUtil.mockClusterService(),
new RecoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
assertThat(azureRepository.getBlobStore(), is(nullValue()));
return azureRepository;
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
@ -52,9 +53,10 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, clusterService));
metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, clusterService,
recoverySettings));
}
@Override

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@ -68,8 +69,9 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
final RepositoryMetadata metadata,
final NamedXContentRegistry namedXContentRegistry,
final GoogleCloudStorageService storageService,
final ClusterService clusterService) {
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService);
final ClusterService clusterService,
final RecoverySettings recoverySettings) {
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService, recoverySettings);
this.storageService = storageService;
String basePath = BASE_PATH.get(metadata.settings());

View File

@ -48,6 +48,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
@ -258,9 +259,9 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry registry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, clusterService) {
metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, clusterService, recoverySettings) {
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
@ -112,7 +113,8 @@ public final class HdfsPlugin extends Plugin implements RepositoryPlugin {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, clusterService));
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, clusterService,
recoverySettings));
}
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
@ -71,8 +72,10 @@ public final class HdfsRepository extends BlobStoreRepository {
final RepositoryMetadata metadata,
final Environment environment,
final NamedXContentRegistry namedXContentRegistry,
final ClusterService clusterService) {
super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, clusterService);
final ClusterService clusterService,
final RecoverySettings recoverySettings) {
super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, clusterService,
recoverySettings);
this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
@ -215,8 +216,9 @@ class S3Repository extends BlobStoreRepository {
final RepositoryMetadata metadata,
final NamedXContentRegistry namedXContentRegistry,
final S3Service service,
final ClusterService clusterService) {
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService);
final ClusterService clusterService,
final RecoverySettings recoverySettings) {
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings);
this.service = service;
this.repositoryMetadata = metadata;

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
@ -79,14 +80,16 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
protected S3Repository createRepository(
final RepositoryMetadata metadata,
final NamedXContentRegistry registry,
final ClusterService clusterService) {
return new S3Repository(metadata, registry, service, clusterService);
final ClusterService clusterService,
final RecoverySettings recoverySettings) {
return new S3Repository(metadata, registry, service, clusterService, recoverySettings);
}
@Override
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry,
final ClusterService clusterService) {
return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, clusterService));
final ClusterService clusterService, final RecoverySettings recoverySettings) {
return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, clusterService,
recoverySettings));
}
@Override

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
@ -268,8 +269,9 @@ public class RepositoryCredentialsTests extends ESSingleNodeTestCase {
@Override
protected S3Repository createRepository(RepositoryMetadata metadata,
NamedXContentRegistry registry, ClusterService clusterService) {
return new S3Repository(metadata, registry, service, clusterService) {
NamedXContentRegistry registry, ClusterService clusterService,
RecoverySettings recoverySettings) {
return new S3Repository(metadata, registry, service, clusterService, recoverySettings) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
@ -201,8 +202,8 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
@Override
protected S3Repository createRepository(RepositoryMetadata metadata, NamedXContentRegistry registry,
ClusterService clusterService) {
return new S3Repository(metadata, registry, service, clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return new S3Repository(metadata, registry, service, clusterService, recoverySettings) {
@Override
public BlobStore blobStore() {

View File

@ -21,10 +21,12 @@ package org.elasticsearch.repositories.s3;
import com.amazonaws.services.s3.AbstractAmazonS3;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.test.ESTestCase;
@ -119,7 +121,8 @@ public class S3RepositoryTests extends ESTestCase {
}
private S3Repository createS3Repo(RepositoryMetadata metadata) {
return new S3Repository(metadata, NamedXContentRegistry.EMPTY, new DummyS3Service(), BlobStoreTestUtil.mockClusterService()) {
return new S3Repository(metadata, NamedXContentRegistry.EMPTY, new DummyS3Service(), BlobStoreTestUtil.mockClusterService(),
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
@ -185,8 +186,9 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte
public CountingMockRepository(final RepositoryMetadata metadata,
final Environment environment,
final NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) {
super(metadata, environment, namedXContentRegistry, clusterService);
final NamedXContentRegistry namedXContentRegistry, ClusterService clusterService,
RecoverySettings recoverySettings) {
super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings);
}
@Override
@ -209,9 +211,9 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.singletonMap(TYPE,
metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, clusterService));
metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings));
}
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
@ -79,9 +80,9 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.singletonMap("mock_meta_filtering", metadata ->
new FsRepository(metadata, env, namedXContentRegistry, clusterService) {
new FsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) {
// Storing the initially expected metadata value here to verify that #filterUserMetadata is only called once on the
// initial master node starting the snapshot

View File

@ -125,6 +125,7 @@ import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTI
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.shard.IndexShardTests.getEngineFromShard;
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing;
@ -1910,12 +1911,14 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Path repositoryLocation = randomRepoPath();
boolean throttleSnapshot = randomBoolean();
boolean throttleRestore = randomBoolean();
boolean throttleRestoreViaRecoverySettings = throttleRestore && randomBoolean();
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repositoryLocation)
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(1000, 10000), ByteSizeUnit.BYTES)
.put("max_restore_bytes_per_sec", throttleRestore ? "10k" : "0")
.put("max_restore_bytes_per_sec",
throttleRestore && (throttleRestoreViaRecoverySettings == false) ? "10k" : "0")
.put("max_snapshot_bytes_per_sec", throttleSnapshot ? "10k" : "0")));
createIndex("test-idx");
@ -1939,6 +1942,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
cluster().wipeIndices("test-idx");
logger.info("--> restore index");
if (throttleRestoreViaRecoverySettings) {
client.admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "10k").build()).get();
}
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
@ -1962,6 +1969,63 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
} else {
assertThat(restorePause, equalTo(0L));
}
client.admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.putNull(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()).build()).get();
}
public void testDynamicRestoreThrottling() throws Exception {
Client client = client();
logger.info("--> creating repository");
Path repositoryLocation = randomRepoPath();
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repositoryLocation)
.put("compress", randomBoolean())
.put("chunk_size", 100, ByteSizeUnit.BYTES)));
createIndex("test-idx");
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx", "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
logger.info("--> snapshot");
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).setIndices("test-idx").get();
logger.info("--> delete index");
cluster().wipeIndices("test-idx");
logger.info("--> restore index");
client.admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "100b").build()).get();
ActionFuture<RestoreSnapshotResponse> restoreSnapshotResponse = client.admin().cluster()
.prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute();
// check if throttling is active
assertBusy(() -> {
long restorePause = 0L;
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
restorePause += repositoriesService.repository("test-repo").getRestoreThrottleTimeInNanos();
}
assertThat(restorePause, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(1, 5)).nanos()));
assertFalse(restoreSnapshotResponse.isDone());
}, 30, TimeUnit.SECONDS);
// run at full speed again
client.admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.putNull(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()).build()).get();
// check that restore now completes quickly (i.e. within 10 seconds)
assertBusy(() -> assertTrue(restoreSnapshotResponse.isDone()));
assertThat(restoreSnapshotResponse.get().getRestoreInfo().totalShards(), greaterThan(0));
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
}
public void testSnapshotStatus() throws Exception {

View File

@ -24,13 +24,14 @@ import org.apache.lucene.store.RateLimiter;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.Supplier;
/**
* Rate limiting wrapper for InputStream
*/
public class RateLimitingInputStream extends FilterInputStream {
private final RateLimiter rateLimiter;
private final Supplier<RateLimiter> rateLimiterSupplier;
private final Listener listener;
@ -40,19 +41,22 @@ public class RateLimitingInputStream extends FilterInputStream {
void onPause(long nanos);
}
public RateLimitingInputStream(InputStream delegate, RateLimiter rateLimiter, Listener listener) {
public RateLimitingInputStream(InputStream delegate, Supplier<RateLimiter> rateLimiterSupplier, Listener listener) {
super(delegate);
this.rateLimiter = rateLimiter;
this.rateLimiterSupplier = rateLimiterSupplier;
this.listener = listener;
}
private void maybePause(int bytes) throws IOException {
bytesSinceLastRateLimit += bytes;
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
long pause = rateLimiter.pause(bytesSinceLastRateLimit);
bytesSinceLastRateLimit = 0;
if (pause > 0) {
listener.onPause(pause);
final RateLimiter rateLimiter = rateLimiterSupplier.get();
if (rateLimiter != null) {
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
long pause = rateLimiter.pause(bytesSinceLastRateLimit);
bytesSinceLastRateLimit = 0;
if (pause > 0) {
listener.onPause(pause);
}
}
}
}

View File

@ -546,9 +546,10 @@ public class Node implements Closeable {
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment,
pluginsService.filterPlugins(RepositoryPlugin.class), transportService, clusterService, threadPool, xContentRegistry);
pluginsService.filterPlugins(RepositoryPlugin.class), transportService, clusterService, threadPool, xContentRegistry,
recoverySettings);
RepositoriesService repositoryService = repositoriesModule.getRepositoryService();
repositoriesServiceReference.set(repositoryService);
SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService,
@ -633,7 +634,6 @@ public class Node implements Closeable {
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService,
indicesService, recoverySettings));

View File

@ -22,6 +22,7 @@ package org.elasticsearch.plugins;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.Repository;
import java.util.Collections;
@ -41,7 +42,7 @@ public interface RepositoryPlugin {
* the value is a factory to construct the {@link Repository} interface.
*/
default Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.emptyMap();
}
@ -55,7 +56,7 @@ public interface RepositoryPlugin {
* the value is a factory to construct the {@link Repository} interface.
*/
default Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.emptyMap();
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.threadpool.ThreadPool;
@ -41,12 +42,15 @@ public final class RepositoriesModule {
private final RepositoriesService repositoriesService;
public RepositoriesModule(Environment env, List<RepositoryPlugin> repoPlugins, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry) {
ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry,
RecoverySettings recoverySettings) {
Map<String, Repository.Factory> factories = new HashMap<>();
factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, clusterService));
factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, clusterService,
recoverySettings));
for (RepositoryPlugin repoPlugin : repoPlugins) {
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, clusterService);
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, clusterService,
recoverySettings);
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
if (factories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Repository type [" + entry.getKey() + "] is already registered");
@ -56,7 +60,8 @@ public final class RepositoriesModule {
Map<String, Repository.Factory> internalFactories = new HashMap<>();
for (RepositoryPlugin repoPlugin : repoPlugins) {
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, clusterService);
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, clusterService,
recoverySettings);
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
if (internalFactories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered");

View File

@ -97,6 +97,7 @@ import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream;
import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
@ -137,6 +138,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
@ -252,6 +254,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private final ClusterService clusterService;
private final RecoverySettings recoverySettings;
/**
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
* {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart
@ -289,14 +293,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final RepositoryMetadata metadata,
final boolean compress,
final NamedXContentRegistry namedXContentRegistry,
final ClusterService clusterService) {
final ClusterService clusterService,
final RecoverySettings recoverySettings) {
this.compress = compress;
this.metadata = metadata;
this.namedXContentRegistry = namedXContentRegistry;
this.threadPool = clusterService.getClusterApplierService().threadPool();
this.clusterService = clusterService;
this.recoverySettings = recoverySettings;
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO);
readOnly = metadata.settings().getAsBoolean("readonly", false);
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT,
@ -1993,16 +1999,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
});
}
private static InputStream maybeRateLimit(InputStream stream, @Nullable RateLimiter rateLimiter, CounterMetric metric) {
return rateLimiter == null ? stream : new RateLimitingInputStream(stream, rateLimiter, metric::inc);
private static InputStream maybeRateLimit(InputStream stream, Supplier<RateLimiter> rateLimiterSupplier, CounterMetric metric) {
return new RateLimitingInputStream(stream, rateLimiterSupplier, metric::inc);
}
public InputStream maybeRateLimitRestores(InputStream stream) {
return maybeRateLimit(stream, restoreRateLimiter, restoreRateLimitingTimeInNanos);
return maybeRateLimit(maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos),
recoverySettings::rateLimiter, restoreRateLimitingTimeInNanos);
}
public InputStream maybeRateLimitSnapshots(InputStream stream) {
return maybeRateLimit(stream, snapshotRateLimiter, snapshotRateLimitingTimeInNanos);
return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos);
}
@Override

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@ -75,8 +76,8 @@ public class FsRepository extends BlobStoreRepository {
* Constructs a shared file system repository.
*/
public FsRepository(RepositoryMetadata metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
super(metadata, calculateCompress(metadata, environment), namedXContentRegistry, clusterService);
ClusterService clusterService, RecoverySettings recoverySettings) {
super(metadata, calculateCompress(metadata, environment), namedXContentRegistry, clusterService, recoverySettings);
this.environment = environment;
String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
if (location.isEmpty()) {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@ -45,6 +46,7 @@ public class RepositoriesModuleTests extends ESTestCase {
private Repository.Factory factory;
private ThreadPool threadPool;
private ClusterService clusterService;
private RecoverySettings recoverySettings;
@Override
public void setUp() throws Exception {
@ -53,6 +55,7 @@ public class RepositoriesModuleTests extends ESTestCase {
contentRegistry = mock(NamedXContentRegistry.class);
threadPool = mock(ThreadPool.class);
clusterService = mock(ClusterService.class);
recoverySettings = mock(RecoverySettings.class);
plugin1 = mock(RepositoryPlugin.class);
plugin2 = mock(RepositoryPlugin.class);
factory = mock(Repository.Factory.class);
@ -62,46 +65,52 @@ public class RepositoriesModuleTests extends ESTestCase {
}
public void testCanRegisterTwoRepositoriesWithDifferentTypes() {
when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type2", factory));
when(plugin1.getRepositories(environment, contentRegistry, clusterService, recoverySettings))
.thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry, clusterService, recoverySettings))
.thenReturn(Collections.singletonMap("type2", factory));
// Would throw
new RepositoriesModule(
environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), threadPool, contentRegistry);
environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), threadPool, contentRegistry,
recoverySettings);
}
public void testCannotRegisterTwoRepositoriesWithSameTypes() {
when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin1.getRepositories(environment, contentRegistry, clusterService, recoverySettings))
.thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry, clusterService, recoverySettings))
.thenReturn(Collections.singletonMap("type1", factory));
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService,
threadPool, contentRegistry));
threadPool, contentRegistry, recoverySettings));
assertEquals("Repository type [type1] is already registered", ex.getMessage());
}
public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() {
when(plugin1.getInternalRepositories(environment, contentRegistry, clusterService))
when(plugin1.getInternalRepositories(environment, contentRegistry, clusterService, recoverySettings))
.thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService))
when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService, recoverySettings))
.thenReturn(Collections.singletonMap("type1", factory));
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService,
threadPool, contentRegistry));
threadPool, contentRegistry, recoverySettings));
assertEquals("Internal repository type [type1] is already registered", ex.getMessage());
}
public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() {
when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService))
when(plugin1.getRepositories(environment, contentRegistry, clusterService, recoverySettings))
.thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService, recoverySettings))
.thenReturn(Collections.singletonMap("type1", factory));
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService, threadPool,
contentRegistry));
contentRegistry, recoverySettings));
assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage());
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
@ -44,6 +45,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
@ -198,7 +200,8 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata);
final FsRepository repository = new FsRepository(repositoryMetadata, createEnvironment(), xContentRegistry(), clusterService) {
final FsRepository repository = new FsRepository(repositoryMetadata, createEnvironment(), xContentRegistry(), clusterService,
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
@ -74,9 +75,9 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.singletonMap(REPO_TYPE,
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService) {
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we access blobStore on test/main threads
@ -233,7 +234,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
Environment useCompressEnvironment =
new Environment(useCompressSettings, node().getEnvironment().configFile());
new FsRepository(metadata, useCompressEnvironment, null, BlobStoreTestUtil.mockClusterService());
new FsRepository(metadata, useCompressEnvironment, null, BlobStoreTestUtil.mockClusterService(), null);
assertWarnings("[repositories.fs.compress] setting was deprecated in Elasticsearch and will be removed in a future release!" +
" See the breaking changes documentation for the next major version.");

View File

@ -44,6 +44,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -52,6 +53,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
@ -92,7 +94,8 @@ public class FsRepositoryTests extends ESTestCase {
int numDocs = indexDocs(directory);
RepositoryMetadata metadata = new RepositoryMetadata("test", "fs", settings);
FsRepository repository = new FsRepository(metadata, new Environment(settings, null), NamedXContentRegistry.EMPTY,
BlobStoreTestUtil.mockClusterService());
BlobStoreTestUtil.mockClusterService(), new RecoverySettings(settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
repository.start();
final Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_INDEX_UUID, "myindexUUID").build();
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("myindex", indexSettings);

View File

@ -1303,6 +1303,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
private final ClusterService clusterService;
private final RecoverySettings recoverySettings;
private final NodeConnectionsService nodeConnectionsService;
private final RepositoriesService repositoriesService;
@ -1350,6 +1352,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
// don't do anything, and don't block
}
});
recoverySettings = new RecoverySettings(settings, clusterSettings);
mockTransport = new DisruptableMockTransport(node, logger) {
@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
@ -1587,7 +1590,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
private Repository.Factory getRepoFactory(Environment environment) {
// Run half the tests with the eventually consistent repository
if (blobStoreContext == null) {
return metadata -> new FsRepository(metadata, environment, xContentRegistry(), clusterService) {
return metadata -> new FsRepository(metadata, environment, xContentRegistry(), clusterService,
recoverySettings) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo in the test thread
@ -1595,7 +1599,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
};
} else {
return metadata ->
new MockEventuallyConsistentRepository(metadata, xContentRegistry(), clusterService, blobStoreContext, random());
new MockEventuallyConsistentRepository(metadata, xContentRegistry(), clusterService, recoverySettings,
blobStoreContext, random());
}
}
public void restart() {

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.ESTestCase;
@ -76,9 +77,10 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
final RepositoryMetadata metadata,
final NamedXContentRegistry namedXContentRegistry,
final ClusterService clusterService,
final RecoverySettings recoverySettings,
final Context context,
final Random random) {
super(metadata, false, namedXContentRegistry, clusterService);
super(metadata, false, namedXContentRegistry, clusterService, recoverySettings);
this.context = context;
this.namedXContentRegistry = namedXContentRegistry;
this.random = random;

View File

@ -26,7 +26,9 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@ -48,11 +50,14 @@ import static org.hamcrest.Matchers.startsWith;
public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
private final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
public void testReadAfterWriteConsistently() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), recoverySettings, blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
@ -72,7 +77,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), recoverySettings, blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
@ -88,7 +93,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), recoverySettings, blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
@ -106,7 +111,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), recoverySettings, blobStoreContext, random())) {
repository.start();
final BlobContainer container = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
@ -123,7 +128,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), recoverySettings, blobStoreContext, random())) {
repository.start();
final BlobContainer container =
repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0"));
@ -142,7 +147,8 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
final RepositoryMetadata metadata = new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(metadata);
try (BlobStoreRepository repository =
new MockEventuallyConsistentRepository(metadata, xContentRegistry(), clusterService, blobStoreContext, random())) {
new MockEventuallyConsistentRepository(metadata, xContentRegistry(), clusterService, recoverySettings,
blobStoreContext, random())) {
clusterService.addStateApplier(event -> repository.updateState(event.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository;
@ -72,9 +73,9 @@ public class MockRepository extends FsRepository {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.singletonMap("mock", (metadata) ->
new MockRepository(metadata, env, namedXContentRegistry, clusterService));
new MockRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings));
}
@Override
@ -117,8 +118,9 @@ public class MockRepository extends FsRepository {
private volatile boolean blocked = false;
public MockRepository(RepositoryMetadata metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) {
super(overrideSettings(metadata, environment), environment, namedXContentRegistry, clusterService);
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService,
RecoverySettings recoverySettings) {
super(overrideSettings(metadata, environment), environment, namedXContentRegistry, clusterService, recoverySettings);
randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0);
randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false);

View File

@ -32,6 +32,7 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksExecutor;
@ -341,7 +342,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
@Override
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
Repository.Factory repositoryFactory =
(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(),
clusterService.getClusterApplierService().threadPool());

View File

@ -28,6 +28,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
@ -71,7 +72,7 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase {
public static final class MyPlugin extends Plugin implements RepositoryPlugin, EnginePlugin {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory());
}
@Override

View File

@ -39,6 +39,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.LicensesMetadata;
import org.elasticsearch.license.Licensing;
@ -370,7 +371,7 @@ public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin,
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory());
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.internal.io.IOUtils;
@ -58,6 +59,7 @@ import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
@ -359,7 +361,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata);
final Repository repository = new FsRepository(repositoryMetadata, createEnvironment(), xContentRegistry(), clusterService);
final Repository repository = new FsRepository(repositoryMetadata, createEnvironment(), xContentRegistry(), clusterService,
new RecoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
clusterService.addStateApplier(e -> repository.updateState(e.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());

View File

@ -44,6 +44,7 @@ import org.elasticsearch.index.analysis.TokenizerFactory;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.XPackLicenseState;
@ -422,21 +423,21 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
HashMap<String, Repository.Factory> repositories =
new HashMap<>(super.getRepositories(env, namedXContentRegistry, clusterService));
new HashMap<>(super.getRepositories(env, namedXContentRegistry, clusterService, recoverySettings));
filterPlugins(RepositoryPlugin.class).forEach(
r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, clusterService)));
r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, clusterService, recoverySettings)));
return repositories;
}
@Override
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
ClusterService clusterService, RecoverySettings recoverySettings) {
HashMap<String, Repository.Factory> internalRepositories =
new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, clusterService));
new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, clusterService, recoverySettings));
filterPlugins(RepositoryPlugin.class).forEach(r ->
internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, clusterService)));
internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, clusterService, recoverySettings)));
return internalRepositories;
}

View File

@ -50,6 +50,7 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -65,6 +66,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
@ -509,7 +511,8 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
null
),
NamedXContentRegistry.EMPTY,
BlobStoreTestUtil.mockClusterService(repositoryMetadata)
BlobStoreTestUtil.mockClusterService(repositoryMetadata),
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))
) {
@Override

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -41,6 +42,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.F
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
@ -163,7 +165,8 @@ public class CachePreWarmingTests extends ESTestCase {
null
),
NamedXContentRegistry.EMPTY,
BlobStoreTestUtil.mockClusterService(repositoryMetadata)
BlobStoreTestUtil.mockClusterService(repositoryMetadata),
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))
) {
@Override