diff --git a/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java b/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java index a28413d213a..6e88f0e0deb 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java @@ -26,6 +26,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.url.URLRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collections; @@ -44,7 +45,9 @@ public class URLRepositoryPlugin extends Plugin implements RepositoryPlugin { } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - return Collections.singletonMap(URLRepository.TYPE, metadata -> new URLRepository(metadata, env, namedXContentRegistry)); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + return Collections.singletonMap(URLRepository.TYPE, + metadata -> new URLRepository(metadata, env, namedXContentRegistry, threadPool)); } } diff --git a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java index d314ce912ef..29582f1f871 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.net.MalformedURLException; import java.net.URISyntaxException; @@ -82,8 +83,8 @@ public class URLRepository extends BlobStoreRepository { * Constructs a read-only URL-based repository */ public URLRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) { - super(metadata, environment.settings(), false, namedXContentRegistry); + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { + super(metadata, environment.settings(), false, namedXContentRegistry, threadPool); if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) { throw new RepositoryException(metadata.name(), "missing url"); diff --git a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java index 2de4c132673..96a82ee0b9d 100644 --- a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java +++ b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; @@ -34,12 +35,13 @@ import java.util.Collections; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; +import static org.mockito.Mockito.mock; public class URLRepositoryTests extends ESTestCase { private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) { return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings), - new NamedXContentRegistry(Collections.emptyList())) { + new NamedXContentRegistry(Collections.emptyList()), mock(ThreadPool.class)) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index 078e0e698aa..5345fb13f6d 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -38,6 +38,7 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotCreationException; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.ThreadPool; import java.net.URISyntaxException; import java.util.List; @@ -86,8 +87,8 @@ public class AzureRepository extends BlobStoreRepository { private final boolean readonly; public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, - AzureStorageService storageService) { - super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry); + AzureStorageService storageService, ThreadPool threadPool) { + super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.environment = environment; this.storageService = storageService; diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java index c6e8335bd5a..ab48cf1314e 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java @@ -28,6 +28,8 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -47,9 +49,10 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap(AzureRepository.TYPE, - (metadata) -> new AzureRepository(metadata, env, namedXContentRegistry, azureStoreService)); + (metadata) -> new AzureRepository(metadata, env, namedXContentRegistry, azureStoreService, threadPool)); } @Override diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java index 43891a8e9d5..71f16b1413a 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -42,7 +43,8 @@ public class AzureRepositorySettingsTests extends ESTestCase { .put(settings) .build(); final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings), - TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class)); + TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), + mock(ThreadPool.class)); assertThat(azureRepository.getBlobStore(), is(nullValue())); return azureRepository; } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 3186d2547a3..8e46b305a33 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -27,6 +27,8 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -49,9 +51,10 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap(GoogleCloudStorageRepository.TYPE, - (metadata) -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService)); + metadata -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService, threadPool)); } @Override diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 3192691d843..8e39cb4b5f1 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -25,13 +25,13 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.util.function.Function; @@ -59,7 +59,6 @@ class GoogleCloudStorageRepository extends BlobStoreRepository { byteSizeSetting("chunk_size", MAX_CHUNK_SIZE, MIN_CHUNK_SIZE, MAX_CHUNK_SIZE, Property.NodeScope, Property.Dynamic); static final Setting CLIENT_NAME = new Setting<>("client", "default", Function.identity()); - private final Settings settings; private final GoogleCloudStorageService storageService; private final BlobPath basePath; private final ByteSizeValue chunkSize; @@ -68,9 +67,8 @@ class GoogleCloudStorageRepository extends BlobStoreRepository { GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, - GoogleCloudStorageService storageService) { - super(metadata, environment.settings(), getSetting(COMPRESS, metadata), namedXContentRegistry); - this.settings = environment.settings(); + GoogleCloudStorageService storageService, ThreadPool threadPool) { + super(metadata, environment.settings(), getSetting(COMPRESS, metadata), namedXContentRegistry, threadPool); this.storageService = storageService; String basePath = BASE_PATH.get(metadata.settings()); diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java index c0b3d805bcc..a6dc5fe7db1 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java @@ -36,6 +36,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; public final class HdfsPlugin extends Plugin implements RepositoryPlugin { @@ -110,7 +111,8 @@ public final class HdfsPlugin extends Plugin implements RepositoryPlugin { } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry)); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, threadPool)); } } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index bba1b0031c8..d51a48cac0e 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.UncheckedIOException; @@ -67,8 +68,8 @@ public final class HdfsRepository extends BlobStoreRepository { private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB); public HdfsRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) { - super(metadata, environment.settings(), metadata.settings().getAsBoolean("compress", false), namedXContentRegistry); + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { + super(metadata, environment.settings(), metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, threadPool); this.environment = environment; this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 189d57f8332..e8d8c6d27ad 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.util.function.Function; @@ -171,8 +172,8 @@ class S3Repository extends BlobStoreRepository { S3Repository(final RepositoryMetaData metadata, final Settings settings, final NamedXContentRegistry namedXContentRegistry, - final S3Service service) { - super(metadata, settings, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry); + final S3Service service, final ThreadPool threadPool) { + super(metadata, settings, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool); this.service = service; this.repositoryMetaData = metadata; diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index a2f9da5f846..bb044771e60 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -30,6 +30,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.security.AccessController; @@ -77,13 +78,15 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo // proxy method for testing protected S3Repository createRepository(final RepositoryMetaData metadata, final Settings settings, - final NamedXContentRegistry registry) { - return new S3Repository(metadata, settings, registry, service); + final NamedXContentRegistry registry, final ThreadPool threadPool) { + return new S3Repository(metadata, settings, registry, service, threadPool); } @Override - public Map getRepositories(final Environment env, final NamedXContentRegistry registry) { - return Collections.singletonMap(S3Repository.TYPE, (metadata) -> createRepository(metadata, env.settings(), registry)); + public Map getRepositories(final Environment env, final NamedXContentRegistry registry, + final ThreadPool threadPool) { + return Collections.singletonMap(S3Repository.TYPE, + metadata -> createRepository(metadata, env.settings(), registry, threadPool)); } @Override diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index ca5893b57b2..89cc35ccf0c 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -30,12 +30,14 @@ import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.security.AccessController; import java.security.PrivilegedAction; import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; @SuppressForbidden(reason = "test fixture requires System.setProperty") public class RepositoryCredentialsTests extends ESTestCase { @@ -61,9 +63,9 @@ public class RepositoryCredentialsTests extends ESTestCase { } static final class ProxyS3Service extends S3Service { - + private static final Logger logger = LogManager.getLogger(ProxyS3Service.class); - + @Override AmazonS3 buildClient(final S3ClientSettings clientSettings) { final AmazonS3 client = super.buildClient(clientSettings); @@ -77,8 +79,9 @@ public class RepositoryCredentialsTests extends ESTestCase { } @Override - protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry) { - return new S3Repository(metadata, settings, registry, service){ + protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry, + ThreadPool threadPool) { + return new S3Repository(metadata, settings, registry, service, threadPool){ @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads @@ -106,7 +109,7 @@ public class RepositoryCredentialsTests extends ESTestCase { .put(S3Repository.ACCESS_KEY_SETTING.getKey(), "insecure_aws_key") .put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret").build()); try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings); - S3Repository s3repo = createAndStartRepository(metadata, s3Plugin); + S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class)); AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials(); assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key")); @@ -129,7 +132,7 @@ public class RepositoryCredentialsTests extends ESTestCase { .put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret") .build()); try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(Settings.EMPTY); - S3Repository s3repo = createAndStartRepository(metadata, s3Plugin); + S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class)); AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials(); assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key")); @@ -144,8 +147,8 @@ public class RepositoryCredentialsTests extends ESTestCase { + " See the breaking changes documentation for the next major version."); } - private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin) { - final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY); + private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin, ThreadPool threadPool) { + final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, threadPool); repository.start(); return repository; } @@ -168,7 +171,7 @@ public class RepositoryCredentialsTests extends ESTestCase { } final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", builder.build()); try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings); - S3Repository s3repo = createAndStartRepository(metadata, s3Plugin)) { + S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class))) { try (AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials .getCredentials(); diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 739452dc178..61c0328e516 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction; import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -114,14 +115,15 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa } @Override - public Map getRepositories(final Environment env, final NamedXContentRegistry registry) { + public Map getRepositories(final Environment env, final NamedXContentRegistry registry, + final ThreadPool threadPool) { return Collections.singletonMap(S3Repository.TYPE, - (metadata) -> new S3Repository(metadata, env.settings(), registry, new S3Service() { + metadata -> new S3Repository(metadata, env.settings(), registry, new S3Service() { @Override AmazonS3 buildClient(S3ClientSettings clientSettings) { return new MockAmazonS3(blobs, bucket, serverSideEncryption, cannedACL, storageClass); } - })); + }, threadPool)); } } diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java index 36fa8b684bb..af04c420408 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.util.Map; @@ -35,6 +36,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; public class S3RepositoryTests extends ESTestCase { @@ -118,7 +120,7 @@ public class S3RepositoryTests extends ESTestCase { } private S3Repository createS3Repo(RepositoryMetaData metadata) { - return new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()) { + return new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service(), mock(ThreadPool.class)) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java index c552019e07f..73f9a0742a7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java @@ -49,7 +49,9 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + default Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.emptyMap(); } @@ -52,7 +54,8 @@ public interface RepositoryPlugin { * The key of the returned {@link Map} is the type name of the repository and * the value is a factory to construct the {@link Repository} interface. */ - default Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + default Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.emptyMap(); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 4e8e9b6c7f5..afc38bda86c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories; import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -84,8 +85,8 @@ public class FilterRepository implements Repository { } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { - in.deleteSnapshot(snapshotId, repositoryStateId); + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { + in.deleteSnapshot(snapshotId, repositoryStateId, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index 90e3c94dfb3..5ea853b0b55 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -47,10 +47,10 @@ public class RepositoriesModule extends AbstractModule { public RepositoriesModule(Environment env, List repoPlugins, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry) { Map factories = new HashMap<>(); - factories.put(FsRepository.TYPE, (metadata) -> new FsRepository(metadata, env, namedXContentRegistry)); + factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, threadPool)); for (RepositoryPlugin repoPlugin : repoPlugins) { - Map newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry); + Map newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, threadPool); for (Map.Entry entry : newRepoTypes.entrySet()) { if (factories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Repository type [" + entry.getKey() + "] is already registered"); @@ -60,7 +60,7 @@ public class RepositoriesModule extends AbstractModule { Map internalFactories = new HashMap<>(); for (RepositoryPlugin repoPlugin : repoPlugins) { - Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry); + Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, threadPool); for (Map.Entry entry : newRepoTypes.entrySet()) { if (internalFactories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered"); diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 1ca6f5e1485..20f7c42cb21 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories; import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -140,8 +141,9 @@ public interface Repository extends LifecycleComponent { * * @param snapshotId snapshot id * @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began + * @param listener completion listener */ - void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId); + void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener); /** * Returns snapshot throttle time in nanoseconds diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java index eb9dc41236d..dc9f8092e3f 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java @@ -100,11 +100,11 @@ public abstract class BlobStoreFormat { /** * Checks obj in the blob container */ - public boolean exists(BlobContainer blobContainer, String name) throws IOException { + public boolean exists(BlobContainer blobContainer, String name) { return blobContainer.blobExists(blobName(name)); } - protected String blobName(String name) { + public String blobName(String name) { return String.format(Locale.ROOT, blobNameFormat, name); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index f11f4cec929..fdaeb17c201 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -31,8 +31,10 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -103,9 +105,12 @@ import java.io.InputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -163,6 +168,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp protected final NamedXContentRegistry namedXContentRegistry; + private final ThreadPool threadPool; + private static final int BUFFER_SIZE = 4096; private static final String SNAPSHOT_PREFIX = "snap-"; @@ -225,17 +232,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp /** * Constructs new BlobStoreRepository - * - * @param metadata The metadata for this repository including name and settings - * @param settings Settings for the node this repository object is created on - * @param compress true if metadata and snapshot files should be compressed + * @param metadata The metadata for this repository including name and settings + * @param settings Settings for the node this repository object is created on + * @param threadPool Threadpool to run long running repository manipulations on asynchronously */ protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, boolean compress, - NamedXContentRegistry namedXContentRegistry) { + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { this.settings = settings; this.compress = compress; this.metadata = metadata; this.namedXContentRegistry = namedXContentRegistry; + this.threadPool = threadPool; snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); readOnly = metadata.settings().getAsBoolean("readonly", false); @@ -405,108 +412,98 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { if (isReadOnly()) { - throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"); - } - - final RepositoryData repositoryData = getRepositoryData(); - SnapshotInfo snapshot = null; - try { - snapshot = getSnapshotInfo(snapshotId); - } catch (SnapshotMissingException ex) { - throw ex; - } catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) { - logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex); - } - - try { + listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository")); + } else { + SnapshotInfo snapshot = null; + try { + snapshot = getSnapshotInfo(snapshotId); + } catch (SnapshotMissingException ex) { + listener.onFailure(ex); + return; + } catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) { + logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex); + } // Delete snapshot from the index file, since it is the maintainer of truth of active snapshots - final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); - writeIndexGen(updatedRepositoryData, repositoryStateId); + final RepositoryData repositoryData; + final RepositoryData updatedRepositoryData; + try { + repositoryData = getRepositoryData(); + updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); + writeIndexGen(updatedRepositoryData, repositoryStateId); + } catch (Exception ex) { + listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex)); + return; + } + final SnapshotInfo finalSnapshotInfo = snapshot; + final Collection unreferencedIndices = Sets.newHashSet(repositoryData.getIndices().values()); + unreferencedIndices.removeAll(updatedRepositoryData.getIndices().values()); + try { + blobContainer().deleteBlobsIgnoringIfNotExists( + Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID()))); + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e); + } + deleteIndices( + Optional.ofNullable(finalSnapshotInfo) + .map(info -> info.indices().stream().map(repositoryData::resolveIndexId).collect(Collectors.toList())) + .orElse(Collections.emptyList()), + snapshotId, + ActionListener.map(listener, v -> { + try { + blobStore().blobContainer(basePath().add("indices")).deleteBlobsIgnoringIfNotExists( + unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList())); + } catch (IOException e) { + logger.warn(() -> + new ParameterizedMessage( + "[{}] indices {} are no longer part of any snapshots in the repository, " + + "but failed to clean up their index folders.", metadata.name(), unreferencedIndices), e); + } + return null; + }) + ); + } + } - // delete the snapshot file - deleteSnapshotBlobIgnoringErrors(snapshot, snapshotId.getUUID()); - // delete the global metadata file - deleteGlobalMetaDataBlobIgnoringErrors(snapshot, snapshotId.getUUID()); - - // Now delete all indices - if (snapshot != null) { - final List indices = snapshot.indices(); - for (String index : indices) { - final IndexId indexId = repositoryData.resolveIndexId(index); + private void deleteIndices(List indices, SnapshotId snapshotId, ActionListener listener) { + if (indices.isEmpty()) { + listener.onResponse(null); + return; + } + final ActionListener groupedListener = new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size()); + for (IndexId indexId: indices) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(groupedListener) { + @Override + protected void doRun() { IndexMetaData indexMetaData = null; try { indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); - } catch (ElasticsearchParseException | IOException ex) { + } catch (Exception ex) { logger.warn(() -> - new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); + new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex); } - - deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); - + deleteIndexMetaDataBlobIgnoringErrors(snapshotId, indexId); if (indexMetaData != null) { for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { try { - delete(snapshotId, indexId, new ShardId(indexMetaData.getIndex(), shardId)); + final ShardId sid = new ShardId(indexMetaData.getIndex(), shardId); + new Context(snapshotId, indexId, sid, sid).delete(); } catch (SnapshotException ex) { final int finalShardId = shardId; logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", - snapshotId, index, finalShardId), ex); + snapshotId, indexId.getName(), finalShardId), ex); } } } + groupedListener.onResponse(null); } - } - - // cleanup indices that are no longer part of the repository - final Collection indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values()); - indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values()); - final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices")); - try { - indicesBlobContainer.deleteBlobsIgnoringIfNotExists( - indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList())); - } catch (IOException ioe) { - // a different IOException occurred while trying to delete - will just log the issue for now - logger.warn(() -> - new ParameterizedMessage( - "[{}] indices {} are no longer part of any snapshots in the repository, " + - "but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe); - } - } catch (IOException | ResourceNotFoundException ex) { - throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex); + }); } } - private void deleteSnapshotBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) { - try { - snapshotFormat.delete(blobContainer(), blobId); - } catch (IOException e) { - if (snapshotInfo != null) { - logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete snapshot file [{}]", - snapshotInfo.snapshotId(), blobId), e); - } else { - logger.warn(() -> new ParameterizedMessage("Unable to delete snapshot file [{}]", blobId), e); - } - } - } - - private void deleteGlobalMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) { - try { - globalMetaDataFormat.delete(blobContainer(), blobId); - } catch (IOException e) { - if (snapshotInfo != null) { - logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata file [{}]", - snapshotInfo.snapshotId(), blobId), e); - } else { - logger.warn(() -> new ParameterizedMessage("Unable to delete global metadata file [{}]", blobId), e); - } - } - } - - private void deleteIndexMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final IndexId indexId) { - final SnapshotId snapshotId = snapshotInfo.snapshotId(); + private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) { BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId())); try { indexMetaDataFormat.delete(indexMetaDataBlobContainer, snapshotId.getUUID()); @@ -904,17 +901,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - /** - * Delete shard snapshot - * - * @param snapshotId snapshot id - * @param shardId shard id - */ - private void delete(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { - Context context = new Context(snapshotId, indexId, shardId, shardId); - context.delete(); - } - @Override public String toString() { return "BlobStoreRepository[" + diff --git a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java index a47ced0496d..5d30de8d6d8 100644 --- a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; import java.util.function.Function; @@ -73,9 +74,9 @@ public class FsRepository extends BlobStoreRepository { /** * Constructs a shared file system repository. */ - public FsRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) { - super(metadata, environment.settings(), calculateCompress(metadata, environment), namedXContentRegistry); + public FsRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + super(metadata, environment.settings(), calculateCompress(metadata, environment), namedXContentRegistry, threadPool); this.environment = environment; String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings()); if (location.isEmpty()) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 7ba53cb5d1e..d21f27c4f19 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -1308,15 +1309,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began */ private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(listener) { + @Override + protected void doRun() { Repository repository = repositoriesService.repository(snapshot.getRepository()); - repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId); - logger.info("snapshot [{}] deleted", snapshot); - - removeSnapshotDeletionFromClusterState(snapshot, null, listener); - } catch (Exception ex) { - removeSnapshotDeletionFromClusterState(snapshot, ex, listener); + repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, ActionListener.wrap(v -> { + logger.info("snapshot [{}] deleted", snapshot); + removeSnapshotDeletionFromClusterState(snapshot, null, listener); + }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, listener) + )); } }); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java index 96a9670d162..cd31ce121b2 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java @@ -43,12 +43,14 @@ public class RepositoriesModuleTests extends ESTestCase { private RepositoryPlugin plugin1; private RepositoryPlugin plugin2; private Repository.Factory factory; + private ThreadPool threadPool; @Override public void setUp() throws Exception { super.setUp(); environment = mock(Environment.class); contentRegistry = mock(NamedXContentRegistry.class); + threadPool = mock(ThreadPool.class); plugin1 = mock(RepositoryPlugin.class); plugin2 = mock(RepositoryPlugin.class); factory = mock(Repository.Factory.class); @@ -58,43 +60,46 @@ public class RepositoriesModuleTests extends ESTestCase { } public void testCanRegisterTwoRepositoriesWithDifferentTypes() { - when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type2", factory)); + when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type2", factory)); // Would throw - new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - mock(ThreadPool.class), contentRegistry); + new RepositoriesModule( + environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), threadPool, contentRegistry); } public void testCannotRegisterTwoRepositoriesWithSameTypes() { - when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - mock(ThreadPool.class), contentRegistry)); + threadPool, contentRegistry)); assertEquals("Repository type [type1] is already registered", ex.getMessage()); } public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() { - when(plugin1.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin1.getInternalRepositories(environment, contentRegistry, threadPool)) + .thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool)) + .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - mock(ThreadPool.class), contentRegistry)); + threadPool, contentRegistry)); assertEquals("Internal repository type [type1] is already registered", ex.getMessage()); } public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() { - when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool)) + .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - mock(ThreadPool.class), contentRegistry)); + threadPool, contentRegistry)); assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage()); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index c02ab0d1856..981004f48ef 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.repositories; import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -149,8 +150,8 @@ public class RepositoriesServiceTests extends ESTestCase { } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { - + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { + listener.onResponse(null); } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 1b59f558db5..a904879321d 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -180,7 +180,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase { private Repository createRepository() { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry()) { + final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index a09560c54ce..2abd623c496 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; @@ -67,12 +68,13 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { } // the reason for this plug-in is to drop any assertSnapshotOrGenericThread as mostly all access in this test goes from test threads - public static class FsLikeRepoPlugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin { + public static class FsLikeRepoPlugin extends Plugin implements RepositoryPlugin { @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap(REPO_TYPE, - (metadata) -> new FsRepository(metadata, env, namedXContentRegistry) { + (metadata) -> new FsRepository(metadata, env, namedXContentRegistry, threadPool) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we access blobStore on test/main threads @@ -260,7 +262,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { Environment useCompressEnvironment = new Environment(useCompressSettings, node().getEnvironment().configFile()); - new FsRepository(metaData, useCompressEnvironment, null); + new FsRepository(metaData, useCompressEnvironment, null, null); assertWarnings("[repositories.fs.compress] setting was deprecated in Elasticsearch and will be removed in a future release!" + " See the breaking changes documentation for the next major version."); diff --git a/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java index 13b74df4e3d..040f12c9566 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java @@ -35,6 +35,7 @@ import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collection; @@ -187,8 +188,8 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte public CountingMockRepository(final RepositoryMetaData metadata, final Environment environment, - final NamedXContentRegistry namedXContentRegistry) throws IOException { - super(metadata, environment, namedXContentRegistry); + final NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { + super(metadata, environment, namedXContentRegistry, threadPool); } @Override @@ -207,8 +208,10 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte /** A plugin that uses CountingMockRepository as implementation of the Repository **/ public static class CountingMockRepositoryPlugin extends MockRepository.Plugin { @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - return Collections.singletonMap("coutingmock", (metadata) -> new CountingMockRepository(metadata, env, namedXContentRegistry)); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + return Collections.singletonMap("coutingmock", + metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, threadPool)); } } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index a30ac9bda53..29bf9e0493e 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -889,7 +889,7 @@ public class SnapshotResiliencyTests extends ESTestCase { repositoriesService = new RepositoriesService( settings, clusterService, transportService, Collections.singletonMap(FsRepository.TYPE, metaData -> { - final Repository repository = new FsRepository(metaData, environment, xContentRegistry()) { + final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo in the test thread diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 8a49324757f..9ce111e1d30 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -42,6 +42,7 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; @@ -69,8 +70,9 @@ public class MockRepository extends FsRepository { @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - return Collections.singletonMap("mock", (metadata) -> new MockRepository(metadata, env, namedXContentRegistry)); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + return Collections.singletonMap("mock", (metadata) -> new MockRepository(metadata, env, namedXContentRegistry, threadPool)); } @Override @@ -113,8 +115,8 @@ public class MockRepository extends FsRepository { private volatile boolean blocked = false; public MockRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) throws IOException { - super(overrideSettings(metadata, environment), environment, namedXContentRegistry); + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { + super(overrideSettings(metadata, environment), environment, namedXContentRegistry, threadPool); randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 11bdfb7bcc7..bc60b4c1946 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.shard; import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -103,7 +104,8 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { + listener.onResponse(null); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 7fa7f37f4b7..3eda554a84b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -132,7 +132,6 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final CcrLicenseChecker ccrLicenseChecker; private final SetOnce restoreSourceService = new SetOnce<>(); private final SetOnce ccrSettings = new SetOnce<>(); - private final SetOnce threadPool = new SetOnce<>(); private Client client; private final boolean transportClientMode; @@ -177,7 +176,6 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings()); this.ccrSettings.set(ccrSettings); - this.threadPool.set(threadPool); CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(threadPool, ccrSettings); this.restoreSourceService.set(restoreSourceService); return Arrays.asList( @@ -326,9 +324,10 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E } @Override - public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { Repository.Factory repositoryFactory = - (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool.get()); + (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool); return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 0b445a3eb01..5a0472339c1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -260,7 +260,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index b57b648b765..2038b35b4e6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -357,7 +357,8 @@ public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin, } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java index 00b199eef44..81be978d331 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.io.IOException; @@ -72,7 +73,8 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase { public static final class MyPlugin extends Plugin implements RepositoryPlugin, EnginePlugin { @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index ec1f002d05b..6a37e8265c0 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -328,7 +328,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { private Repository createRepository() throws IOException { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry()); + return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool); } private static void runAsSnapshot(ThreadPool pool, Runnable runnable) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 9b5414a6f83..3f8b279e501 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -394,17 +394,20 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - HashMap repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry)); - filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry))); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + HashMap repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry, threadPool)); + filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, threadPool))); return repositories; } @Override - public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - HashMap internalRepositories = new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry)); + public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + HashMap internalRepositories = + new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, threadPool)); filterPlugins(RepositoryPlugin.class).forEach(r -> - internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry))); + internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, threadPool))); return internalRepositories; }