diff --git a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java index c1128fd683a..98b8c0a1945 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories.url; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.url.URLBlobStore; @@ -31,7 +32,6 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import java.io.IOException; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; @@ -71,33 +71,44 @@ public class URLRepository extends BlobStoreRepository { private final Environment environment; - private final URLBlobStore blobStore; - private final BlobPath basePath; + private final URL url; + /** * Constructs a read-only URL-based repository */ public URLRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) throws IOException { + NamedXContentRegistry namedXContentRegistry) { super(metadata, environment.settings(), namedXContentRegistry); if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(settings) == false) { throw new RepositoryException(metadata.name(), "missing url"); } + this.environment = environment; supportedProtocols = SUPPORTED_PROTOCOLS_SETTING.get(settings); urlWhiteList = ALLOWED_URLS_SETTING.get(settings).toArray(new URIPattern[]{}); - this.environment = environment; - - URL url = URL_SETTING.exists(metadata.settings()) ? URL_SETTING.get(metadata.settings()) : REPOSITORIES_URL_SETTING.get(settings); - URL normalizedURL = checkURL(url); - blobStore = new URLBlobStore(settings, normalizedURL); basePath = BlobPath.cleanPath(); + url = URL_SETTING.exists(metadata.settings()) + ? URL_SETTING.get(metadata.settings()) : REPOSITORIES_URL_SETTING.get(settings); } @Override - protected BlobStore blobStore() { - return blobStore; + protected BlobStore createBlobStore() { + URL normalizedURL = checkURL(url); + return new URLBlobStore(settings, normalizedURL); + } + + // only use for testing + @Override + protected BlobContainer blobContainer() { + return super.blobContainer(); + } + + // only use for testing + @Override + protected BlobStore getBlobStore() { + return super.getBlobStore(); } @Override diff --git a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java index 1af4c1eaba9..2de4c132673 100644 --- a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java +++ b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java @@ -31,8 +31,22 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Collections; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; + public class URLRepositoryTests extends ESTestCase { + private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) { + return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings), + new NamedXContentRegistry(Collections.emptyList())) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo manually on test/main threads + } + }; + } + public void testWhiteListingRepoURL() throws IOException { String repoPath = createTempDir().resolve("repository").toUri().toURL().toString(); Settings baseSettings = Settings.builder() @@ -41,8 +55,12 @@ public class URLRepositoryTests extends ESTestCase { .put(URLRepository.REPOSITORIES_URL_SETTING.getKey(), repoPath) .build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData("url", URLRepository.TYPE, baseSettings); - new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings), - new NamedXContentRegistry(Collections.emptyList())); + final URLRepository repository = createRepository(baseSettings, repositoryMetaData); + repository.start(); + + assertThat("blob store has to be lazy initialized", repository.getBlobStore(), is(nullValue())); + repository.blobContainer(); + assertThat("blobContainer has to initialize blob store", repository.getBlobStore(), not(nullValue())); } public void testIfNotWhiteListedMustSetRepoURL() throws IOException { @@ -52,9 +70,10 @@ public class URLRepositoryTests extends ESTestCase { .put(URLRepository.REPOSITORIES_URL_SETTING.getKey(), repoPath) .build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData("url", URLRepository.TYPE, baseSettings); + final URLRepository repository = createRepository(baseSettings, repositoryMetaData); + repository.start(); try { - new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings), - new NamedXContentRegistry(Collections.emptyList())); + repository.blobContainer(); fail("RepositoryException should have been thrown."); } catch (RepositoryException e) { String msg = "[url] file url [" + repoPath @@ -73,13 +92,33 @@ public class URLRepositoryTests extends ESTestCase { .put(URLRepository.SUPPORTED_PROTOCOLS_SETTING.getKey(), "http,https") .build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData("url", URLRepository.TYPE, baseSettings); + final URLRepository repository = createRepository(baseSettings, repositoryMetaData); + repository.start(); try { - new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings), - new NamedXContentRegistry(Collections.emptyList())); + repository.blobContainer(); fail("RepositoryException should have been thrown."); } catch (RepositoryException e) { assertEquals("[url] unsupported url protocol [file] from URL [" + repoPath +"]", e.getMessage()); } } + public void testNonNormalizedUrl() throws IOException { + Settings baseSettings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .put(URLRepository.ALLOWED_URLS_SETTING.getKey(), "file:/tmp/") + .put(URLRepository.REPOSITORIES_URL_SETTING.getKey(), "file:/var/" ) + .build(); + RepositoryMetaData repositoryMetaData = new RepositoryMetaData("url", URLRepository.TYPE, baseSettings); + final URLRepository repository = createRepository(baseSettings, repositoryMetaData); + repository.start(); + try { + repository.blobContainer(); + fail("RepositoryException should have been thrown."); + } catch (RepositoryException e) { + assertEquals("[url] file url [file:/var/] doesn't match any of the locations " + + "specified by path.repo or repositories.url.allowed_urls", + e.getMessage()); + } + } + } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index 47b398a4c2f..0797c78af33 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -38,7 +38,6 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotCreationException; import org.elasticsearch.snapshots.SnapshotId; -import java.io.IOException; import java.net.URISyntaxException; import java.util.List; import java.util.Locale; @@ -78,25 +77,21 @@ public class AzureRepository extends BlobStoreRepository { public static final Setting READONLY_SETTING = Setting.boolSetting("readonly", false, Property.NodeScope); } - private final AzureBlobStore blobStore; private final BlobPath basePath; private final ByteSizeValue chunkSize; private final boolean compress; + private final Environment environment; + private final AzureStorageService storageService; private final boolean readonly; public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, - AzureStorageService storageService) throws IOException, URISyntaxException, StorageException { + AzureStorageService storageService) { super(metadata, environment.settings(), namedXContentRegistry); - this.blobStore = new AzureBlobStore(metadata, environment.settings(), storageService); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.compress = Repository.COMPRESS_SETTING.get(metadata.settings()); - // If the user explicitly did not define a readonly value, we set it by ourselves depending on the location mode setting. - // For secondary_only setting, the repository should be read only - if (Repository.READONLY_SETTING.exists(metadata.settings())) { - this.readonly = Repository.READONLY_SETTING.get(metadata.settings()); - } else { - this.readonly = this.blobStore.getLocationMode() == LocationMode.SECONDARY_ONLY; - } + this.environment = environment; + this.storageService = storageService; + final String basePath = Strings.trimLeadingCharacter(Repository.BASE_PATH_SETTING.get(metadata.settings()), '/'); if (Strings.hasLength(basePath)) { // Remove starting / if any @@ -108,15 +103,33 @@ public class AzureRepository extends BlobStoreRepository { } else { this.basePath = BlobPath.cleanPath(); } - logger.debug((org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage( - "using container [{}], chunk_size [{}], compress [{}], base_path [{}]", blobStore, chunkSize, compress, basePath)); + + // If the user explicitly did not define a readonly value, we set it by ourselves depending on the location mode setting. + // For secondary_only setting, the repository should be read only + final LocationMode locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings()); + if (Repository.READONLY_SETTING.exists(metadata.settings())) { + this.readonly = Repository.READONLY_SETTING.get(metadata.settings()); + } else { + this.readonly = locationMode == LocationMode.SECONDARY_ONLY; + } + } + + // only use for testing + @Override + protected BlobStore getBlobStore() { + return super.getBlobStore(); } /** * {@inheritDoc} */ @Override - protected BlobStore blobStore() { + protected AzureBlobStore createBlobStore() throws URISyntaxException, StorageException { + final AzureBlobStore blobStore = new AzureBlobStore(metadata, environment.settings(), storageService); + + logger.debug((org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage( + "using container [{}], chunk_size [{}], compress [{}], base_path [{}]", + blobStore, chunkSize, compress, basePath)); return blobStore; } @@ -144,6 +157,7 @@ public class AzureRepository extends BlobStoreRepository { @Override public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetadata) { try { + final AzureBlobStore blobStore = (AzureBlobStore) blobStore(); if (blobStore.containerExist() == false) { throw new IllegalArgumentException("The bucket [" + blobStore + "] does not exist. Please create it before " + " creating an azure snapshot repository backed by it."); diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java index 639905042cf..b4b71577cbc 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.repositories.azure; import com.microsoft.azure.storage.LocationMode; -import com.microsoft.azure.storage.StorageException; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -30,76 +29,76 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.test.ESTestCase; -import java.io.IOException; -import java.net.URISyntaxException; - import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; public class AzureRepositorySettingsTests extends ESTestCase { - private AzureRepository azureRepository(Settings settings) throws StorageException, IOException, URISyntaxException { + private AzureRepository azureRepository(Settings settings) { Settings internalSettings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) .putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths()) .put(settings) .build(); - return new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings), + final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings), TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class)); + assertThat(azureRepository.getBlobStore(), is(nullValue())); + return azureRepository; } - public void testReadonlyDefault() throws StorageException, IOException, URISyntaxException { + public void testReadonlyDefault() { assertThat(azureRepository(Settings.EMPTY).isReadOnly(), is(false)); } - public void testReadonlyDefaultAndReadonlyOn() throws StorageException, IOException, URISyntaxException { + public void testReadonlyDefaultAndReadonlyOn() { assertThat(azureRepository(Settings.builder() .put("readonly", true) .build()).isReadOnly(), is(true)); } - public void testReadonlyWithPrimaryOnly() throws StorageException, IOException, URISyntaxException { + public void testReadonlyWithPrimaryOnly() { assertThat(azureRepository(Settings.builder() .put(AzureRepository.Repository.LOCATION_MODE_SETTING.getKey(), LocationMode.PRIMARY_ONLY.name()) .build()).isReadOnly(), is(false)); } - public void testReadonlyWithPrimaryOnlyAndReadonlyOn() throws StorageException, IOException, URISyntaxException { + public void testReadonlyWithPrimaryOnlyAndReadonlyOn() { assertThat(azureRepository(Settings.builder() .put(AzureRepository.Repository.LOCATION_MODE_SETTING.getKey(), LocationMode.PRIMARY_ONLY.name()) .put("readonly", true) .build()).isReadOnly(), is(true)); } - public void testReadonlyWithSecondaryOnlyAndReadonlyOn() throws StorageException, IOException, URISyntaxException { + public void testReadonlyWithSecondaryOnlyAndReadonlyOn() { assertThat(azureRepository(Settings.builder() .put(AzureRepository.Repository.LOCATION_MODE_SETTING.getKey(), LocationMode.SECONDARY_ONLY.name()) .put("readonly", true) .build()).isReadOnly(), is(true)); } - public void testReadonlyWithSecondaryOnlyAndReadonlyOff() throws StorageException, IOException, URISyntaxException { + public void testReadonlyWithSecondaryOnlyAndReadonlyOff() { assertThat(azureRepository(Settings.builder() .put(AzureRepository.Repository.LOCATION_MODE_SETTING.getKey(), LocationMode.SECONDARY_ONLY.name()) .put("readonly", false) .build()).isReadOnly(), is(false)); } - public void testReadonlyWithPrimaryAndSecondaryOnlyAndReadonlyOn() throws StorageException, IOException, URISyntaxException { + public void testReadonlyWithPrimaryAndSecondaryOnlyAndReadonlyOn() { assertThat(azureRepository(Settings.builder() .put(AzureRepository.Repository.LOCATION_MODE_SETTING.getKey(), LocationMode.PRIMARY_THEN_SECONDARY.name()) .put("readonly", true) .build()).isReadOnly(), is(true)); } - public void testReadonlyWithPrimaryAndSecondaryOnlyAndReadonlyOff() throws StorageException, IOException, URISyntaxException { + public void testReadonlyWithPrimaryAndSecondaryOnlyAndReadonlyOff() { assertThat(azureRepository(Settings.builder() .put(AzureRepository.Repository.LOCATION_MODE_SETTING.getKey(), LocationMode.PRIMARY_THEN_SECONDARY.name()) .put("readonly", false) .build()).isReadOnly(), is(false)); } - public void testChunkSize() throws StorageException, IOException, URISyntaxException { + public void testChunkSize() { // default chunk size AzureRepository azureRepository = azureRepository(Settings.EMPTY); assertEquals(AzureStorageService.MAX_CHUNK_SIZE, azureRepository.chunkSize()); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 83d48eeda20..fe6c8889bd2 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -22,7 +22,6 @@ package org.elasticsearch.repositories.gcs; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -56,18 +55,19 @@ class GoogleCloudStorageRepository extends BlobStoreRepository { byteSizeSetting("chunk_size", MAX_CHUNK_SIZE, MIN_CHUNK_SIZE, MAX_CHUNK_SIZE, Property.NodeScope, Property.Dynamic); static final Setting CLIENT_NAME = new Setting<>("client", "default", Function.identity()); - private final ByteSizeValue chunkSize; - private final boolean compress; + private final GoogleCloudStorageService storageService; private final BlobPath basePath; - private final GoogleCloudStorageBlobStore blobStore; + private final boolean compress; + private final ByteSizeValue chunkSize; + private final String bucket; + private final String clientName; GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, - GoogleCloudStorageService storageService) throws Exception { + GoogleCloudStorageService storageService) { super(metadata, environment.settings(), namedXContentRegistry); + this.storageService = storageService; - String bucket = getSetting(BUCKET, metadata); - String clientName = CLIENT_NAME.get(metadata.settings()); String basePath = BASE_PATH.get(metadata.settings()); if (Strings.hasLength(basePath)) { BlobPath path = new BlobPath(); @@ -81,16 +81,14 @@ class GoogleCloudStorageRepository extends BlobStoreRepository { this.compress = getSetting(COMPRESS, metadata); this.chunkSize = getSetting(CHUNK_SIZE, metadata); - + this.bucket = getSetting(BUCKET, metadata); + this.clientName = CLIENT_NAME.get(metadata.settings()); logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, compress); - - this.blobStore = new GoogleCloudStorageBlobStore(settings, bucket, clientName, storageService); } - @Override - protected BlobStore blobStore() { - return blobStore; + protected GoogleCloudStorageBlobStore createBlobStore() { + return new GoogleCloudStorageBlobStore(settings, bucket, clientName, storageService); } @Override diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 3692b26f2bb..6d5c1bbf853 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; import org.junit.AfterClass; @@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.instanceOf; public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase { @@ -49,9 +51,10 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos } @Override - protected void createTestRepository(String name) { + protected void createTestRepository(String name, boolean verify) { assertAcked(client().admin().cluster().preparePutRepository(name) .setType(GoogleCloudStorageRepository.TYPE) + .setVerify(verify) .setSettings(Settings.builder() .put("bucket", BUCKET) .put("base_path", GoogleCloudStorageBlobStoreRepositoryTests.class.getSimpleName()) @@ -59,6 +62,11 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); } + @Override + protected void afterCreationCheck(Repository repository) { + assertThat(repository, instanceOf(GoogleCloudStorageRepository.class)); + } + @AfterClass public static void wipeRepository() { blobs.clear(); diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index 5ef1c7d18d6..97285f9cecb 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -42,7 +42,6 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -61,29 +60,26 @@ public final class HdfsRepository extends BlobStoreRepository { private final ByteSizeValue chunkSize; private final boolean compress; private final BlobPath basePath = BlobPath.cleanPath(); - - private HdfsBlobStore blobStore; + private final URI uri; + private final String pathSetting; // buffer size passed to HDFS read/write methods // TODO: why 100KB? private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB); public HdfsRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) throws IOException { + NamedXContentRegistry namedXContentRegistry) { super(metadata, environment.settings(), namedXContentRegistry); this.environment = environment; this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null); this.compress = metadata.settings().getAsBoolean("compress", false); - } - @Override - protected void doStart() { String uriSetting = getMetadata().settings().get("uri"); if (Strings.hasText(uriSetting) == false) { throw new IllegalArgumentException("No 'uri' defined for hdfs snapshot/restore"); } - URI uri = URI.create(uriSetting); + uri = URI.create(uriSetting); if ("hdfs".equalsIgnoreCase(uri.getScheme()) == false) { throw new IllegalArgumentException(String.format(Locale.ROOT, "Invalid scheme [%s] specified in uri [%s]; only 'hdfs' uri allowed for hdfs snapshot/restore", uri.getScheme(), uriSetting)); @@ -93,16 +89,11 @@ public final class HdfsRepository extends BlobStoreRepository { "Use 'path' option to specify a path [%s], not the uri [%s] for hdfs snapshot/restore", uri.getPath(), uriSetting)); } - String pathSetting = getMetadata().settings().get("path"); + pathSetting = getMetadata().settings().get("path"); // get configuration if (pathSetting == null) { throw new IllegalArgumentException("No 'path' defined for hdfs snapshot/restore"); } - - // initialize our blobstore using elevated privileges. - SpecialPermission.check(); - blobStore = AccessController.doPrivileged((PrivilegedAction) () -> createBlobstore(uri, pathSetting, getMetadata().settings())); - super.doStart(); } private HdfsBlobStore createBlobstore(URI uri, String path, Settings repositorySettings) { @@ -229,7 +220,12 @@ public final class HdfsRepository extends BlobStoreRepository { } @Override - protected BlobStore blobStore() { + protected HdfsBlobStore createBlobStore() { + // initialize our blobstore using elevated privileges. + SpecialPermission.check(); + final HdfsBlobStore blobStore = + AccessController.doPrivileged((PrivilegedAction) + () -> createBlobstore(uri, pathSetting, getMetadata().settings())); return blobStore; } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index f6f949aa4d0..ec60536f135 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -35,7 +35,6 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import java.io.IOException; import java.util.Map; import java.util.function.Function; @@ -144,30 +143,43 @@ class S3Repository extends BlobStoreRepository { */ static final Setting BASE_PATH_SETTING = Setting.simpleString("base_path"); - private final S3BlobStore blobStore; + private final S3Service service; - private final BlobPath basePath; + private final String bucket; + + private final ByteSizeValue bufferSize; private final ByteSizeValue chunkSize; private final boolean compress; + private final BlobPath basePath; + + private final boolean serverSideEncryption; + + private final String storageClass; + + private final String cannedACL; + + private final String clientName; + /** * Constructs an s3 backed repository */ S3Repository(final RepositoryMetaData metadata, final Settings settings, final NamedXContentRegistry namedXContentRegistry, - final S3Service service) throws IOException { + final S3Service service) { super(metadata, settings, namedXContentRegistry); + this.service = service; - final String bucket = BUCKET_SETTING.get(metadata.settings()); + // Parse and validate the user's S3 Storage Class setting + this.bucket = BUCKET_SETTING.get(metadata.settings()); if (bucket == null) { throw new RepositoryException(metadata.name(), "No bucket defined for s3 repository"); } - final boolean serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(metadata.settings()); - final ByteSizeValue bufferSize = BUFFER_SIZE_SETTING.get(metadata.settings()); + this.bufferSize = BUFFER_SIZE_SETTING.get(metadata.settings()); this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings()); this.compress = COMPRESS_SETTING.get(metadata.settings()); @@ -177,33 +189,44 @@ class S3Repository extends BlobStoreRepository { ") can't be lower than " + BUFFER_SIZE_SETTING.getKey() + " (" + bufferSize + ")."); } - // Parse and validate the user's S3 Storage Class setting - final String storageClass = STORAGE_CLASS_SETTING.get(metadata.settings()); - final String cannedACL = CANNED_ACL_SETTING.get(metadata.settings()); - final String clientName = CLIENT_NAME.get(metadata.settings()); - - logger.debug("using bucket [{}], chunk_size [{}], server_side_encryption [{}], " + - "buffer_size [{}], cannedACL [{}], storageClass [{}]", - bucket, chunkSize, serverSideEncryption, bufferSize, cannedACL, storageClass); - - // deprecated behavior: override client credentials from the cluster state - // (repository settings) - if (S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) { - overrideCredentialsFromClusterState(service); - } - blobStore = new S3BlobStore(settings, service, clientName, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass); - final String basePath = BASE_PATH_SETTING.get(metadata.settings()); if (Strings.hasLength(basePath)) { this.basePath = new BlobPath().add(basePath); } else { this.basePath = BlobPath.cleanPath(); } + + this.serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(metadata.settings()); + + this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings()); + this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings()); + this.clientName = CLIENT_NAME.get(metadata.settings()); + + logger.debug("using bucket [{}], chunk_size [{}], server_side_encryption [{}], " + + "buffer_size [{}], cannedACL [{}], storageClass [{}]", + bucket, chunkSize, serverSideEncryption, bufferSize, cannedACL, storageClass); + + // (repository settings) + if (S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) { + overrideCredentialsFromClusterState(service); + } } + @Override + protected S3BlobStore createBlobStore() { + return new S3BlobStore(settings, service, clientName, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass); + } + + // only use for testing @Override protected BlobStore blobStore() { - return blobStore; + return super.blobStore(); + } + + // only use for testing + @Override + protected BlobStore getBlobStore() { + return super.getBlobStore(); } @Override diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index 79a5187059f..da3219f2aef 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -61,7 +61,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo }); } - private final S3Service service; + protected final S3Service service; public S3RepositoryPlugin(final Settings settings) { this(settings, new S3Service(settings)); @@ -77,7 +77,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo // proxy method for testing protected S3Repository createRepository(final RepositoryMetaData metadata, final Settings settings, - final NamedXContentRegistry registry) throws IOException { + final NamedXContentRegistry registry) { return new S3Repository(metadata, settings, registry, service); } diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index 744a27dc48e..7eb603b4b78 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -80,6 +80,16 @@ public class RepositoryCredentialsTests extends ESTestCase { ProxyS3RepositoryPlugin(Settings settings) { super(settings, new ProxyS3Service(settings)); } + + @Override + protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry) { + return new S3Repository(metadata, settings, registry, service){ + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo manually on test/main threads + } + }; + } } public void testRepositoryCredentialsOverrideSecureCredentials() throws IOException { @@ -102,8 +112,8 @@ public class RepositoryCredentialsTests extends ESTestCase { .put(S3Repository.ACCESS_KEY_SETTING.getKey(), "insecure_aws_key") .put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret").build()); try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings); - S3Repository s3repo = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY); - AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { + S3Repository s3repo = createAndStartRepository(metadata, s3Plugin); + AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials(); assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key")); assertThat(credentials.getAWSSecretKey(), is("insecure_aws_secret")); @@ -125,8 +135,8 @@ public class RepositoryCredentialsTests extends ESTestCase { .put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret") .build()); try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(Settings.EMPTY); - S3Repository s3repo = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY); - AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { + S3Repository s3repo = createAndStartRepository(metadata, s3Plugin); + AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials(); assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key")); assertThat(credentials.getAWSSecretKey(), is("insecure_aws_secret")); @@ -140,6 +150,12 @@ public class RepositoryCredentialsTests extends ESTestCase { + " See the breaking changes documentation for the next major version."); } + private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin) { + final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY); + repository.start(); + return repository; + } + public void testReinitSecureCredentials() throws IOException { final String clientName = randomFrom("default", "some_client"); // initial client node settings @@ -156,7 +172,7 @@ public class RepositoryCredentialsTests extends ESTestCase { } final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", builder.build()); try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings); - S3Repository s3repo = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY)) { + S3Repository s3repo = createAndStartRepository(metadata, s3Plugin)) { try (AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials .getCredentials(); diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index b061e8e45ed..51fc48dfb59 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.mockito.Mockito.mock; @@ -84,8 +85,11 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa } @Override - protected void createTestRepository(final String name) { - assertAcked(client().admin().cluster().preparePutRepository(name).setType(S3Repository.TYPE).setSettings(Settings.builder() + protected void createTestRepository(final String name, boolean verify) { + assertAcked(client().admin().cluster().preparePutRepository(name) + .setType(S3Repository.TYPE) + .setVerify(verify) + .setSettings(Settings.builder() .put(S3Repository.BUCKET_SETTING.getKey(), bucket) .put(S3Repository.CLIENT_NAME.getKey(), client) .put(S3Repository.BUFFER_SIZE_SETTING.getKey(), bufferSize) @@ -96,6 +100,11 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa .put(S3Repository.SECRET_KEY_SETTING.getKey(), "not_used_but_this_is_a_secret"))); } + @Override + protected void afterCreationCheck(Repository repository) { + assertThat(repository, instanceOf(S3Repository.class)); + } + @Override protected Collection> nodePlugins() { return Collections.singletonList(TestS3RepositoryPlugin.class); @@ -125,7 +134,7 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa public void testInsecureRepositoryCredentials() throws Exception { final String repositoryName = "testInsecureRepositoryCredentials"; - createTestRepository(repositoryName); + createAndCheckTestRepository(repositoryName); final NodeClient nodeClient = internalCluster().getInstance(NodeClient.class); final RestGetRepositoriesAction getRepoAction = new RestGetRepositoriesAction(Settings.EMPTY, mock(RestController.class), internalCluster().getInstance(SettingsFilter.class)); diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java index 14f53ae5d33..dcc46661bef 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java @@ -29,11 +29,13 @@ import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; -import java.io.IOException; import java.util.Collections; import java.util.Map; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; public class S3RepositoryTests extends ESTestCase { @@ -70,27 +72,27 @@ public class S3RepositoryTests extends ESTestCase { } } - public void testInvalidChunkBufferSizeSettings() throws IOException { + public void testInvalidChunkBufferSizeSettings() { // chunk < buffer should fail final Settings s1 = bufferAndChunkSettings(10, 5); final Exception e1 = expectThrows(RepositoryException.class, - () -> new S3Repository(getRepositoryMetaData(s1), Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service())); + () -> createS3Repo(getRepositoryMetaData(s1))); assertThat(e1.getMessage(), containsString("chunk_size (5mb) can't be lower than buffer_size (10mb)")); // chunk > buffer should pass final Settings s2 = bufferAndChunkSettings(5, 10); - new S3Repository(getRepositoryMetaData(s2), Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()).close(); + createS3Repo(getRepositoryMetaData(s2)).close(); // chunk = buffer should pass final Settings s3 = bufferAndChunkSettings(5, 5); - new S3Repository(getRepositoryMetaData(s3), Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()).close(); + createS3Repo(getRepositoryMetaData(s3)).close(); // buffer < 5mb should fail final Settings s4 = bufferAndChunkSettings(4, 10); final IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, - () -> new S3Repository(getRepositoryMetaData(s4), Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()) + () -> createS3Repo(getRepositoryMetaData(s4)) .close()); assertThat(e2.getMessage(), containsString("failed to parse value [4mb] for setting [buffer_size], must be >= [5mb]")); final Settings s5 = bufferAndChunkSettings(5, 6000000); final IllegalArgumentException e3 = expectThrows(IllegalArgumentException.class, - () -> new S3Repository(getRepositoryMetaData(s5), Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()) + () -> createS3Repo(getRepositoryMetaData(s5)) .close()); assertThat(e3.getMessage(), containsString("failed to parse value [6000000mb] for setting [chunk_size], must be <= [5tb]")); } @@ -106,20 +108,32 @@ public class S3RepositoryTests extends ESTestCase { return new RepositoryMetaData("dummy-repo", "mock", Settings.builder().put(settings).build()); } - public void testBasePathSetting() throws IOException { + public void testBasePathSetting() { final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", Settings.builder() .put(S3Repository.BASE_PATH_SETTING.getKey(), "foo/bar").build()); - try (S3Repository s3repo = new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service())) { + try (S3Repository s3repo = createS3Repo(metadata)) { assertEquals("foo/bar/", s3repo.basePath().buildAsString()); } } - public void testDefaultBufferSize() throws IOException { + public void testDefaultBufferSize() { final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", Settings.EMPTY); - try (S3Repository s3repo = new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service())) { - final long defaultBufferSize = ((S3BlobStore) s3repo.blobStore()).bufferSizeInBytes(); + try (S3Repository s3repo = createS3Repo(metadata)) { + assertThat(s3repo.getBlobStore(), is(nullValue())); + s3repo.start(); + final long defaultBufferSize = ((S3BlobStore)s3repo.blobStore()).bufferSizeInBytes(); + assertThat(s3repo.getBlobStore(), not(nullValue())); assertThat(defaultBufferSize, Matchers.lessThanOrEqualTo(100L * 1024 * 1024)); assertThat(defaultBufferSize, Matchers.greaterThanOrEqualTo(5L * 1024 * 1024)); } } + + private S3Repository createS3Repo(RepositoryMetaData metadata) { + return new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo manually on test/main threads + } + }; + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index d5b2a6413e9..c6cbaa50cdf 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -58,16 +59,20 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta private final ClusterService clusterService; + private final ThreadPool threadPool; + private final VerifyNodeRepositoryAction verifyAction; private volatile Map repositories = Collections.emptyMap(); @Inject public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService, - Map typesRegistry) { + Map typesRegistry, + ThreadPool threadPool) { super(settings); this.typesRegistry = typesRegistry; this.clusterService = clusterService; + this.threadPool = threadPool; // Doesn't make sense to maintain repositories on non-master and non-data nodes // Nothing happens there anyway if (DiscoveryNode.isDataNode(settings) || DiscoveryNode.isMasterNode(settings)) { @@ -208,39 +213,51 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta public void verifyRepository(final String repositoryName, final ActionListener listener) { final Repository repository = repository(repositoryName); try { - final String verificationToken = repository.startVerification(); - if (verificationToken != null) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { - verifyAction.verify(repositoryName, verificationToken, new ActionListener() { - @Override - public void onResponse(VerifyResponse verifyResponse) { - try { - repository.endVerification(verificationToken); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to finish repository verification", repositoryName), e); - listener.onFailure(e); - return; - } - listener.onResponse(verifyResponse); - } + final String verificationToken = repository.startVerification(); + if (verificationToken != null) { + try { + verifyAction.verify(repositoryName, verificationToken, new ActionListener() { + @Override + public void onResponse(VerifyResponse verifyResponse) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + try { + repository.endVerification(verificationToken); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage( + "[{}] failed to finish repository verification", repositoryName), e); + listener.onFailure(e); + return; + } + listener.onResponse(verifyResponse); + }); + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } catch (Exception e) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + try { + repository.endVerification(verificationToken); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn(() -> new ParameterizedMessage( + "[{}] failed to finish repository verification", repositoryName), inner); + } + listener.onFailure(e); + }); } - }); - } catch (Exception e) { - try { - repository.endVerification(verificationToken); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage("[{}] failed to finish repository verification", repositoryName), inner); + } else { + listener.onResponse(new VerifyResponse(new DiscoveryNode[0], new VerificationFailure[0])); } + } catch (Exception e) { listener.onFailure(e); } - } else { - listener.onResponse(new VerifyResponse(new DiscoveryNode[0], new VerificationFailure[0])); - } + }); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java index 380ae974080..fbaf369912e 100644 --- a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java @@ -61,7 +61,7 @@ public class VerifyNodeRepositoryAction extends AbstractComponent { this.transportService = transportService; this.clusterService = clusterService; this.repositoriesService = repositoriesService; - transportService.registerRequestHandler(ACTION_NAME, VerifyNodeRepositoryRequest::new, ThreadPool.Names.SAME, new VerifyNodeRepositoryRequestHandler()); + transportService.registerRequestHandler(ACTION_NAME, VerifyNodeRepositoryRequest::new, ThreadPool.Names.SNAPSHOT, new VerifyNodeRepositoryRequestHandler()); } public void verify(String repository, String verificationToken, final ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 86131fe468d..22743e38839 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -34,6 +34,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -102,6 +103,7 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotShardFailure; +import org.elasticsearch.threadpool.ThreadPool; import java.io.FilterInputStream; import java.io.IOException; @@ -126,8 +128,8 @@ import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSna /** * BlobStore - based implementation of Snapshot Repository *

- * This repository works with any {@link BlobStore} implementation. The blobStore should be initialized in the derived - * class before {@link #doStart()} is called. + * This repository works with any {@link BlobStore} implementation. The blobStore could be (and preferred) lazy initialized in + * {@link #createBlobStore()}. *

* BlobStoreRepository maintains the following structure in the blob store *

@@ -169,8 +171,6 @@ import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSna
  */
 public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository {
 
-    private BlobContainer snapshotsBlobContainer;
-
     protected final RepositoryMetaData metadata;
 
     protected final NamedXContentRegistry namedXContentRegistry;
@@ -225,6 +225,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     private final ChecksumBlobStoreFormat indexShardSnapshotsFormat;
 
+    private final Object lock = new Object();
+
+    private final SetOnce blobContainer = new SetOnce<>();
+
+    private final SetOnce blobStore = new SetOnce<>();
+
     /**
      * Constructs new BlobStoreRepository
      *
@@ -251,7 +257,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     @Override
     protected void doStart() {
-        this.snapshotsBlobContainer = blobStore().blobContainer(basePath());
         globalMetaDataFormat = new ChecksumBlobStoreFormat<>(METADATA_CODEC, METADATA_NAME_FORMAT,
             MetaData::fromXContent, namedXContentRegistry, isCompress());
         indexMetaDataFormat = new ChecksumBlobStoreFormat<>(INDEX_METADATA_CODEC, METADATA_NAME_FORMAT,
@@ -265,17 +270,82 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     @Override
     protected void doClose() {
-        try {
-            blobStore().close();
-        } catch (Exception t) {
-            logger.warn("cannot close blob store", t);
+        BlobStore store;
+        // to close blobStore if blobStore initialization is started during close
+        synchronized (lock) {
+            store = blobStore.get();
+        }
+        if (store != null) {
+            try {
+                store.close();
+            } catch (Exception t) {
+                logger.warn("cannot close blob store", t);
+            }
         }
     }
 
+    // package private, only use for testing
+    BlobContainer getBlobContainer() {
+        return blobContainer.get();
+    }
+
+    // for test purposes only
+    protected BlobStore getBlobStore() {
+        return blobStore.get();
+    }
+
     /**
-     * Returns the BlobStore to read and write data.
+     * maintains single lazy instance of {@link BlobContainer}
      */
-    protected abstract BlobStore blobStore();
+    protected BlobContainer blobContainer() {
+        assertSnapshotOrGenericThread();
+
+        BlobContainer blobContainer = this.blobContainer.get();
+        if (blobContainer == null) {
+           synchronized (lock) {
+               blobContainer = this.blobContainer.get();
+               if (blobContainer == null) {
+                   blobContainer = blobStore().blobContainer(basePath());
+                   this.blobContainer.set(blobContainer);
+               }
+           }
+        }
+
+        return blobContainer;
+    }
+
+    /**
+     * maintains single lazy instance of {@link BlobStore}
+     */
+    protected BlobStore blobStore() {
+        assertSnapshotOrGenericThread();
+
+        BlobStore store = blobStore.get();
+        if (store == null) {
+            synchronized (lock) {
+                store = blobStore.get();
+                if (store == null) {
+                    if (lifecycle.started() == false) {
+                        throw new RepositoryException(metadata.name(), "repository is not in started state");
+                    }
+                    try {
+                        store = createBlobStore();
+                    } catch (RepositoryException e) {
+                        throw e;
+                    } catch (Exception e) {
+                        throw new RepositoryException(metadata.name(), "cannot create blob store" , e);
+                    }
+                    blobStore.set(store);
+                }
+            }
+        }
+        return store;
+    }
+
+    /**
+     * Creates new BlobStore to read and write data.
+     */
+    protected abstract BlobStore createBlobStore() throws Exception;
 
     /**
      * Returns base path of the repository
@@ -319,12 +389,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             if (repositoryData.getAllSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
                 throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists");
             }
-            if (snapshotFormat.exists(snapshotsBlobContainer, snapshotId.getUUID())) {
+            if (snapshotFormat.exists(blobContainer(), snapshotId.getUUID())) {
                 throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists");
             }
 
             // Write Global MetaData
-            globalMetaDataFormat.write(clusterMetaData, snapshotsBlobContainer, snapshotId.getUUID());
+            globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID());
 
             // write the index metadata for each index in the snapshot
             for (IndexId index : indices) {
@@ -421,7 +491,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     private void deleteSnapshotBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) {
         try {
-            snapshotFormat.delete(snapshotsBlobContainer, blobId);
+            snapshotFormat.delete(blobContainer(), blobId);
         } catch (IOException e) {
             if (snapshotInfo != null) {
                 logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete snapshot file [{}]",
@@ -434,7 +504,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     private void deleteGlobalMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) {
         try {
-            globalMetaDataFormat.delete(snapshotsBlobContainer, blobId);
+            globalMetaDataFormat.delete(blobContainer(), blobId);
         } catch (IOException e) {
             if (snapshotInfo != null) {
                 logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata file [{}]",
@@ -472,7 +542,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             startTime, failure, System.currentTimeMillis(), totalShards, shardFailures,
             includeGlobalState);
         try {
-            snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, snapshotId.getUUID());
+            snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID());
             final RepositoryData repositoryData = getRepositoryData();
             writeIndexGen(repositoryData.addSnapshot(snapshotId, blobStoreSnapshot.state(), indices), repositoryStateId);
         } catch (FileAlreadyExistsException ex) {
@@ -490,7 +560,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     @Override
     public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
         try {
-            return snapshotFormat.read(snapshotsBlobContainer, snapshotId.getUUID());
+            return snapshotFormat.read(blobContainer(), snapshotId.getUUID());
         } catch (NoSuchFileException ex) {
             throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
         } catch (IOException | NotXContentException ex) {
@@ -501,7 +571,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     @Override
     public MetaData getSnapshotGlobalMetaData(final SnapshotId snapshotId) {
         try {
-            return globalMetaDataFormat.read(snapshotsBlobContainer, snapshotId.getUUID());
+            return globalMetaDataFormat.read(blobContainer(), snapshotId.getUUID());
         } catch (NoSuchFileException ex) {
             throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
         } catch (IOException ex) {
@@ -543,11 +613,21 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         return restoreRateLimitingTimeInNanos.count();
     }
 
+    protected void assertSnapshotOrGenericThread() {
+        assert Thread.currentThread().getName().contains(ThreadPool.Names.SNAPSHOT)
+            || Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC) :
+            "Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";
+    }
+
     @Override
     public String startVerification() {
         try {
             if (isReadOnly()) {
-                // It's readonly - so there is not much we can do here to verify it
+                // TODO: add repository verification for read-only repositories
+
+                // It's readonly - so there is not much we can do here to verify it apart try to create blobStore()
+                // and check that is is accessible on the master
+                blobStore();
                 return null;
             } else {
                 String seed = UUIDs.randomBase64UUID();
@@ -584,7 +664,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             final String snapshotsIndexBlobName = INDEX_FILE_PREFIX + Long.toString(indexGen);
 
             RepositoryData repositoryData;
-            try (InputStream blob = snapshotsBlobContainer.readBlob(snapshotsIndexBlobName)) {
+            try (InputStream blob = blobContainer().readBlob(snapshotsIndexBlobName)) {
                 BytesStreamOutput out = new BytesStreamOutput();
                 Streams.copy(blob, out);
                 // EMPTY is safe here because RepositoryData#fromXContent calls namedObject
@@ -598,7 +678,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             }
 
             // now load the incompatible snapshot ids, if they exist
-            try (InputStream blob = snapshotsBlobContainer.readBlob(INCOMPATIBLE_SNAPSHOTS_BLOB)) {
+            try (InputStream blob = blobContainer().readBlob(INCOMPATIBLE_SNAPSHOTS_BLOB)) {
                 BytesStreamOutput out = new BytesStreamOutput();
                 Streams.copy(blob, out);
                 try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY,
@@ -636,11 +716,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         return readOnly;
     }
 
-    // package private, only use for testing
-    BlobContainer blobContainer() {
-        return snapshotsBlobContainer;
-    }
-
     protected void writeIndexGen(final RepositoryData repositoryData, final long repositoryStateId) throws IOException {
         assert isReadOnly() == false; // can not write to a read only repository
         final long currentGen = latestIndexBlobId();
@@ -668,7 +743,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         // delete the N-2 index file if it exists, keep the previous one around as a backup
         if (isReadOnly() == false && newGen - 2 >= 0) {
             final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
-            snapshotsBlobContainer.deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
+            blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
         }
 
         // write the current generation to the index-latest file
@@ -736,7 +811,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     // package private for testing
     long readSnapshotIndexLatestBlob() throws IOException {
-        try (InputStream blob = snapshotsBlobContainer.readBlob(INDEX_LATEST_BLOB)) {
+        try (InputStream blob = blobContainer().readBlob(INDEX_LATEST_BLOB)) {
             BytesStreamOutput out = new BytesStreamOutput();
             Streams.copy(blob, out);
             return Numbers.bytesToLong(out.bytes().toBytesRef());
@@ -744,7 +819,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     }
 
     private long listBlobsToGetLatestIndexId() throws IOException {
-        Map blobs = snapshotsBlobContainer.listBlobsByPrefix(INDEX_FILE_PREFIX);
+        Map blobs = blobContainer().listBlobsByPrefix(INDEX_FILE_PREFIX);
         long latest = RepositoryData.EMPTY_REPO_GEN;
         if (blobs.isEmpty()) {
             // no snapshot index blobs have been written yet
@@ -766,7 +841,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     private void writeAtomic(final String blobName, final BytesReference bytesRef, boolean failIfAlreadyExists) throws IOException {
         try (InputStream stream = bytesRef.streamInput()) {
-            snapshotsBlobContainer.writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists);
+            blobContainer().writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists);
         }
     }
 
@@ -806,6 +881,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     @Override
     public void verify(String seed, DiscoveryNode localNode) {
+        assertSnapshotOrGenericThread();
         BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
         if (testBlobContainer.blobExists("master.dat")) {
             try  {
diff --git a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java
index 4d4ab60feef..643ff2bc93d 100644
--- a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java
@@ -31,7 +31,6 @@ import org.elasticsearch.env.Environment;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 
-import java.io.IOException;
 import java.nio.file.Path;
 import java.util.function.Function;
 
@@ -61,8 +60,7 @@ public class FsRepository extends BlobStoreRepository {
     public static final Setting COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope);
     public static final Setting REPOSITORIES_COMPRESS_SETTING =
         Setting.boolSetting("repositories.fs.compress", false, Property.NodeScope);
-
-    private final FsBlobStore blobStore;
+    private final Environment environment;
 
     private ByteSizeValue chunkSize;
 
@@ -74,37 +72,45 @@ public class FsRepository extends BlobStoreRepository {
      * Constructs a shared file system repository.
      */
     public FsRepository(RepositoryMetaData metadata, Environment environment,
-                        NamedXContentRegistry namedXContentRegistry) throws IOException {
+                        NamedXContentRegistry namedXContentRegistry) {
         super(metadata, environment.settings(), namedXContentRegistry);
+        this.environment = environment;
         String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
         if (location.isEmpty()) {
-            logger.warn("the repository location is missing, it should point to a shared file system location that is available on all master and data nodes");
+            logger.warn("the repository location is missing, it should point to a shared file system location"
+                + " that is available on all master and data nodes");
             throw new RepositoryException(metadata.name(), "missing location");
         }
         Path locationFile = environment.resolveRepoFile(location);
         if (locationFile == null) {
             if (environment.repoFiles().length > 0) {
-                logger.warn("The specified location [{}] doesn't start with any repository paths specified by the path.repo setting: [{}] ", location, environment.repoFiles());
-                throw new RepositoryException(metadata.name(), "location [" + location + "] doesn't match any of the locations specified by path.repo");
+                logger.warn("The specified location [{}] doesn't start with any "
+                    + "repository paths specified by the path.repo setting: [{}] ", location, environment.repoFiles());
+                throw new RepositoryException(metadata.name(), "location [" + location
+                    + "] doesn't match any of the locations specified by path.repo");
             } else {
-                logger.warn("The specified location [{}] should start with a repository path specified by the path.repo setting, but the path.repo setting was not set on this node", location);
-                throw new RepositoryException(metadata.name(), "location [" + location + "] doesn't match any of the locations specified by path.repo because this setting is empty");
+                logger.warn("The specified location [{}] should start with a repository path specified by"
+                    + " the path.repo setting, but the path.repo setting was not set on this node", location);
+                throw new RepositoryException(metadata.name(), "location [" + location
+                    + "] doesn't match any of the locations specified by path.repo because this setting is empty");
             }
         }
 
-        blobStore = new FsBlobStore(settings, locationFile);
         if (CHUNK_SIZE_SETTING.exists(metadata.settings())) {
             this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings());
         } else {
             this.chunkSize = REPOSITORIES_CHUNK_SIZE_SETTING.get(settings);
         }
-        this.compress = COMPRESS_SETTING.exists(metadata.settings()) ? COMPRESS_SETTING.get(metadata.settings()) : REPOSITORIES_COMPRESS_SETTING.get(settings);
+        this.compress = COMPRESS_SETTING.exists(metadata.settings())
+            ? COMPRESS_SETTING.get(metadata.settings()) : REPOSITORIES_COMPRESS_SETTING.get(settings);
         this.basePath = BlobPath.cleanPath();
     }
 
     @Override
-    protected BlobStore blobStore() {
-        return blobStore;
+    protected BlobStore createBlobStore() throws Exception {
+        final String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
+        final Path locationFile = environment.resolveRepoFile(location);
+        return new FsBlobStore(settings, locationFile);
     }
 
     @Override
diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java
index cc971ed1b04..3a5302bcec2 100644
--- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java
@@ -406,7 +406,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
             Collections.emptySet());
         final ClusterService clusterService = mock(ClusterService.class);
         final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService,
-            transportService, null);
+            transportService, null, threadPool);
         final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService(settings, threadPool,
             transportService, null, clusterService);
         final ShardStateAction shardStateAction = mock(ShardStateAction.class);
diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
index 7a1d3a89420..0eae9a14200 100644
--- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
@@ -173,10 +173,17 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
     }
 
     /** Create a {@link Repository} with a random name **/
-    private Repository createRepository() throws IOException {
+    private Repository createRepository() {
         Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
         RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
-        return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry());
+        final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry()) {
+            @Override
+            protected void assertSnapshotOrGenericThread() {
+                // eliminate thread name check as we create repo manually
+            }
+        };
+        repository.start();
+        return repository;
     }
 
     /** Create a {@link Environment} with random path.home and path.repo **/
diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
index 7e4d5cc54a9..1abdb97f174 100644
--- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
@@ -24,10 +24,16 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.RepositoryPlugin;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
+import org.elasticsearch.repositories.fs.FsRepository;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotState;
 import org.elasticsearch.test.ESIntegTestCase;
@@ -37,18 +43,42 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
 
 /**
  * Tests for the {@link BlobStoreRepository} and its subclasses.
  */
 public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
 
+    static final String REPO_TYPE = "fsLike";
+
+    protected Collection> getPlugins() {
+        return Arrays.asList(FsLikeRepoPlugin.class);
+    }
+
+    // the reason for this plug-in is to drop any assertSnapshotOrGenericThread as mostly all access in this test goes from test threads
+    public static class FsLikeRepoPlugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {
+
+        @Override
+        public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
+            return Collections.singletonMap(REPO_TYPE,
+                (metadata) -> new FsRepository(metadata, env, namedXContentRegistry) {
+                    @Override
+                    protected void assertSnapshotOrGenericThread() {
+                        // eliminate thread name check as we access blobStore on test/main threads
+                    }
+                });
+        }
+    }
+
     public void testRetrieveSnapshots() throws Exception {
         final Client client = client();
         final Path location = ESIntegTestCase.randomRepoPath(node().settings());
@@ -57,7 +87,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
         logger.info("-->  creating repository");
         PutRepositoryResponse putRepositoryResponse =
             client.admin().cluster().preparePutRepository(repositoryName)
-                                    .setType("fs")
+                                    .setType(REPO_TYPE)
                                     .setSettings(Settings.builder().put(node().settings()).put("location", location))
                                     .get();
         assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@@ -209,7 +239,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
 
         PutRepositoryResponse putRepositoryResponse =
             client.admin().cluster().preparePutRepository(repositoryName)
-                                    .setType("fs")
+                                    .setType(REPO_TYPE)
                                     .setSettings(Settings.builder().put(node().settings()).put("location", location))
                                     .get();
         assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@@ -217,6 +247,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
         final RepositoriesService repositoriesService = getInstanceFromNode(RepositoriesService.class);
         @SuppressWarnings("unchecked") final BlobStoreRepository repository =
             (BlobStoreRepository) repositoriesService.repository(repositoryName);
+        assertThat("getBlobContainer has to be lazy initialized", repository.getBlobContainer(), nullValue());
         return repository;
     }
 
diff --git a/server/src/test/java/org/elasticsearch/snapshots/FsBlobStoreRepositoryIT.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsBlobStoreRepositoryIT.java
similarity index 79%
rename from server/src/test/java/org/elasticsearch/snapshots/FsBlobStoreRepositoryIT.java
rename to server/src/test/java/org/elasticsearch/repositories/fs/FsBlobStoreRepositoryIT.java
index 792b1bdbddd..1ed42cb2474 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/FsBlobStoreRepositoryIT.java
+++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsBlobStoreRepositoryIT.java
@@ -16,22 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.elasticsearch.snapshots;
+package org.elasticsearch.repositories.fs;
 
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.instanceOf;
 
 public class FsBlobStoreRepositoryIT extends ESBlobStoreRepositoryIntegTestCase {
     @Override
-    protected void createTestRepository(String name) {
+    protected void createTestRepository(String name, boolean verify) {
         assertAcked(client().admin().cluster().preparePutRepository(name)
+            .setVerify(verify)
             .setType("fs").setSettings(Settings.builder()
                 .put("location", randomRepoPath())
                 .put("compress", randomBoolean())
                 .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
+    }
 
+    @Override
+    protected void afterCreationCheck(Repository repository) {
+        assertThat(repository, instanceOf(FsRepository.class));
     }
 }
diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
index c9ca1637b1a..d2954a4c128 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
@@ -19,6 +19,7 @@
 
 package org.elasticsearch.snapshots;
 
+import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionFuture;
@@ -93,6 +94,7 @@ import org.elasticsearch.script.MockScriptEngine;
 import org.elasticsearch.script.StoredScriptsIT;
 import org.elasticsearch.snapshots.mockstore.MockRepository;
 import org.elasticsearch.test.junit.annotations.TestLogging;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.nio.channels.SeekableByteChannel;
@@ -1262,7 +1264,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
         Repository repository = service.repository("test-repo");
 
-        final Map indexIds = repository.getRepositoryData().getIndices();
+        final Map indexIds = getRepositoryData(repository).getIndices();
         final Path indicesPath = repo.resolve("indices");
 
         logger.info("--> delete index metadata and shard metadata");
@@ -1739,6 +1741,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
 
         logger.info("--> trying to create a repository with different name");
         assertAcked(client.admin().cluster().preparePutRepository("test-repo-2")
+                .setVerify(false) // do not do verification itself as snapshot threads could be fully blocked
                 .setType("fs").setSettings(Settings.builder().put("location", repositoryLocation.resolve("test"))));
 
         logger.info("--> unblocking blocked node");
@@ -2563,7 +2566,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
 
         logger.info("--> emulate an orphan snapshot");
         RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
-        final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();
+        final RepositoryData repositoryData = getRepositoryData(repositoriesService.repository(repositoryName));
         final IndexId indexId = repositoryData.resolveIndexId(idxName);
 
         clusterService.submitStateUpdateTask("orphan snapshot test", new ClusterStateUpdateTask() {
@@ -2784,7 +2787,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
         Repository repository = service.repository("test-repo");
 
-        final Map indexIds = repository.getRepositoryData().getIndices();
+        final RepositoryData repositoryData = getRepositoryData(repository);
+        final Map indexIds = repositoryData.getIndices();
         assertThat(indexIds.size(), equalTo(nbIndices));
 
         // Choose a random index from the snapshot
@@ -3445,6 +3449,19 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         }
     }
 
+    private RepositoryData getRepositoryData(Repository repository) throws InterruptedException {
+        ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
+        final SetOnce repositoryData = new SetOnce<>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
+            repositoryData.set(repository.getRepositoryData());
+            latch.countDown();
+        });
+
+        latch.await();
+        return repositoryData.get();
+    }
+
     private void verifySnapshotInfo(final GetSnapshotsResponse response, final Map> indicesPerSnapshot) {
         for (SnapshotInfo snapshotInfo : response.getSnapshots()) {
             final List expected = snapshotInfo.indices();
diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
index d05a10905d8..75a86831bc5 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
@@ -92,8 +92,6 @@ public class MockRepository extends FsRepository {
 
     private final long waitAfterUnblock;
 
-    private final MockBlobStore mockBlobStore;
-
     private final String randomPrefix;
 
     private volatile boolean blockOnInitialization;
@@ -128,7 +126,6 @@ public class MockRepository extends FsRepository {
         waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
         allowAtomicOperations = metadata.settings().getAsBoolean("allow_atomic_operations", true);
         logger.info("starting mock repository with random prefix {}", randomPrefix);
-        mockBlobStore = new MockBlobStore(super.blobStore());
     }
 
     @Override
@@ -163,8 +160,8 @@ public class MockRepository extends FsRepository {
     }
 
     @Override
-    protected BlobStore blobStore() {
-        return mockBlobStore;
+    protected BlobStore createBlobStore() throws Exception {
+        return new MockBlobStore(super.createBlobStore());
     }
 
     public synchronized void unblock() {
@@ -195,7 +192,7 @@ public class MockRepository extends FsRepository {
     }
 
     private synchronized boolean blockExecution() {
-        logger.debug("Blocking execution");
+        logger.debug("[{}] Blocking execution", metadata.name());
         boolean wasBlocked = false;
         try {
             while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile ||
@@ -207,7 +204,7 @@ public class MockRepository extends FsRepository {
         } catch (InterruptedException ex) {
             Thread.currentThread().interrupt();
         }
-        logger.debug("Unblocking execution");
+        logger.debug("[{}] Unblocking execution", metadata.name());
         return wasBlocked;
     }
 
@@ -285,7 +282,7 @@ public class MockRepository extends FsRepository {
             }
 
             private void blockExecutionAndMaybeWait(final String blobName) {
-                logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path());
+                logger.info("[{}] blocking I/O operation for file [{}] at path [{}]", metadata.name(), blobName, path());
                 if (blockExecution() && waitAfterUnblock > 0) {
                     try {
                         // Delay operation after unblocking
diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java
index bf9c8193234..439728bac9e 100644
--- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java
@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.repositories.blobstore;
 
+import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
@@ -27,34 +28,61 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.snapshots.SnapshotMissingException;
 import org.elasticsearch.snapshots.SnapshotRestoreException;
 import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 /**
  * Basic integration tests for blob-based repository validation.
  */
 public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase {
 
-    protected abstract void createTestRepository(String name);
+    protected abstract void createTestRepository(String name, boolean verify);
+
+    protected void afterCreationCheck(Repository repository) {
+
+    }
+
+    protected void createAndCheckTestRepository(String name) {
+        final boolean verify = randomBoolean();
+        createTestRepository(name, verify);
+
+        final Iterable repositoriesServices =
+            internalCluster().getDataOrMasterNodeInstances(RepositoriesService.class);
+
+        for (RepositoriesService repositoriesService : repositoriesServices) {
+            final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(name);
+
+            afterCreationCheck(repository);
+            assertThat("blob store has to be lazy initialized",
+                repository.getBlobStore(), verify ? is(notNullValue()) : is(nullValue()));
+        }
+
+    }
 
     public void testSnapshotAndRestore() throws Exception {
         final String repoName = randomAsciiName();
         logger.info("-->  creating repository {}", repoName);
-        createTestRepository(repoName);
+        createAndCheckTestRepository(repoName);
         int indexCount = randomIntBetween(1, 5);
         int[] docCounts = new int[indexCount];
         String[] indexNames = generateRandomNames(indexCount);
@@ -125,7 +153,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
     public void testMultipleSnapshotAndRollback() throws Exception {
         String repoName = randomAsciiName();
         logger.info("-->  creating repository {}", repoName);
-        createTestRepository(repoName);
+        createAndCheckTestRepository(repoName);
         int iterationCount = randomIntBetween(2, 5);
         int[] docCounts = new int[iterationCount];
         String indexName = randomAsciiName();
@@ -177,12 +205,12 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
         }
     }
 
-    public void testIndicesDeletedFromRepository() {
+    public void testIndicesDeletedFromRepository() throws Exception {
         Client client = client();
 
         logger.info("-->  creating repository");
         final String repoName = "test-repo";
-        createTestRepository(repoName);
+        createAndCheckTestRepository(repoName);
 
         createIndex("test-idx-1", "test-idx-2", "test-idx-3");
         ensureGreen();
@@ -219,12 +247,22 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
 
         logger.info("--> verify index folder deleted from blob container");
         RepositoriesService repositoriesSvc = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
+        ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
         @SuppressWarnings("unchecked") BlobStoreRepository repository = (BlobStoreRepository) repositoriesSvc.repository(repoName);
-        BlobContainer indicesBlobContainer = repository.blobStore().blobContainer(repository.basePath().add("indices"));
-        RepositoryData repositoryData = repository.getRepositoryData();
-        for (IndexId indexId : repositoryData.getIndices().values()) {
+
+        final SetOnce indicesBlobContainer = new SetOnce<>();
+        final SetOnce repositoryData = new SetOnce<>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
+            indicesBlobContainer.set(repository.blobStore().blobContainer(repository.basePath().add("indices")));
+            repositoryData.set(repository.getRepositoryData());
+            latch.countDown();
+        });
+
+        latch.await();
+        for (IndexId indexId : repositoryData.get().getIndices().values()) {
             if (indexId.getName().equals("test-idx-3")) {
-                assertFalse(indicesBlobContainer.blobExists(indexId.getId())); // deleted index
+                assertFalse(indicesBlobContainer.get().blobExists(indexId.getId())); // deleted index
             }
         }
     }