lazy snapshot repository initialization (#31606)

lazy snapshot repository initialization
This commit is contained in:
Vladimir Dolzhenko 2018-07-13 20:05:49 +02:00 committed by GitHub
parent 0edb096eb4
commit b1bf643e41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 543 additions and 220 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.repositories.url;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.url.URLBlobStore;
@ -31,7 +32,6 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
@ -71,33 +71,44 @@ public class URLRepository extends BlobStoreRepository {
private final Environment environment;
private final URLBlobStore blobStore;
private final BlobPath basePath;
private final URL url;
/**
* Constructs a read-only URL-based repository
*/
public URLRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) throws IOException {
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), namedXContentRegistry);
if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(settings) == false) {
throw new RepositoryException(metadata.name(), "missing url");
}
this.environment = environment;
supportedProtocols = SUPPORTED_PROTOCOLS_SETTING.get(settings);
urlWhiteList = ALLOWED_URLS_SETTING.get(settings).toArray(new URIPattern[]{});
this.environment = environment;
URL url = URL_SETTING.exists(metadata.settings()) ? URL_SETTING.get(metadata.settings()) : REPOSITORIES_URL_SETTING.get(settings);
URL normalizedURL = checkURL(url);
blobStore = new URLBlobStore(settings, normalizedURL);
basePath = BlobPath.cleanPath();
url = URL_SETTING.exists(metadata.settings())
? URL_SETTING.get(metadata.settings()) : REPOSITORIES_URL_SETTING.get(settings);
}
@Override
protected BlobStore blobStore() {
return blobStore;
protected BlobStore createBlobStore() {
URL normalizedURL = checkURL(url);
return new URLBlobStore(settings, normalizedURL);
}
// only use for testing
@Override
protected BlobContainer blobContainer() {
return super.blobContainer();
}
// only use for testing
@Override
protected BlobStore getBlobStore() {
return super.getBlobStore();
}
@Override

View File

@ -31,8 +31,22 @@ import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
public class URLRepositoryTests extends ESTestCase {
private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) {
return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings),
new NamedXContentRegistry(Collections.emptyList())) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
}
};
}
public void testWhiteListingRepoURL() throws IOException {
String repoPath = createTempDir().resolve("repository").toUri().toURL().toString();
Settings baseSettings = Settings.builder()
@ -41,8 +55,12 @@ public class URLRepositoryTests extends ESTestCase {
.put(URLRepository.REPOSITORIES_URL_SETTING.getKey(), repoPath)
.build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("url", URLRepository.TYPE, baseSettings);
new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings),
new NamedXContentRegistry(Collections.emptyList()));
final URLRepository repository = createRepository(baseSettings, repositoryMetaData);
repository.start();
assertThat("blob store has to be lazy initialized", repository.getBlobStore(), is(nullValue()));
repository.blobContainer();
assertThat("blobContainer has to initialize blob store", repository.getBlobStore(), not(nullValue()));
}
public void testIfNotWhiteListedMustSetRepoURL() throws IOException {
@ -52,9 +70,10 @@ public class URLRepositoryTests extends ESTestCase {
.put(URLRepository.REPOSITORIES_URL_SETTING.getKey(), repoPath)
.build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("url", URLRepository.TYPE, baseSettings);
final URLRepository repository = createRepository(baseSettings, repositoryMetaData);
repository.start();
try {
new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings),
new NamedXContentRegistry(Collections.emptyList()));
repository.blobContainer();
fail("RepositoryException should have been thrown.");
} catch (RepositoryException e) {
String msg = "[url] file url [" + repoPath
@ -73,13 +92,33 @@ public class URLRepositoryTests extends ESTestCase {
.put(URLRepository.SUPPORTED_PROTOCOLS_SETTING.getKey(), "http,https")
.build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("url", URLRepository.TYPE, baseSettings);
final URLRepository repository = createRepository(baseSettings, repositoryMetaData);
repository.start();
try {
new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings),
new NamedXContentRegistry(Collections.emptyList()));
repository.blobContainer();
fail("RepositoryException should have been thrown.");
} catch (RepositoryException e) {
assertEquals("[url] unsupported url protocol [file] from URL [" + repoPath +"]", e.getMessage());
}
}
public void testNonNormalizedUrl() throws IOException {
Settings baseSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(URLRepository.ALLOWED_URLS_SETTING.getKey(), "file:/tmp/")
.put(URLRepository.REPOSITORIES_URL_SETTING.getKey(), "file:/var/" )
.build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("url", URLRepository.TYPE, baseSettings);
final URLRepository repository = createRepository(baseSettings, repositoryMetaData);
repository.start();
try {
repository.blobContainer();
fail("RepositoryException should have been thrown.");
} catch (RepositoryException e) {
assertEquals("[url] file url [file:/var/] doesn't match any of the locations "
+ "specified by path.repo or repositories.url.allowed_urls",
e.getMessage());
}
}
}

View File

@ -38,7 +38,6 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotId;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Locale;
@ -78,25 +77,21 @@ public class AzureRepository extends BlobStoreRepository {
public static final Setting<Boolean> READONLY_SETTING = Setting.boolSetting("readonly", false, Property.NodeScope);
}
private final AzureBlobStore blobStore;
private final BlobPath basePath;
private final ByteSizeValue chunkSize;
private final boolean compress;
private final Environment environment;
private final AzureStorageService storageService;
private final boolean readonly;
public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
AzureStorageService storageService) throws IOException, URISyntaxException, StorageException {
AzureStorageService storageService) {
super(metadata, environment.settings(), namedXContentRegistry);
this.blobStore = new AzureBlobStore(metadata, environment.settings(), storageService);
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.compress = Repository.COMPRESS_SETTING.get(metadata.settings());
// If the user explicitly did not define a readonly value, we set it by ourselves depending on the location mode setting.
// For secondary_only setting, the repository should be read only
if (Repository.READONLY_SETTING.exists(metadata.settings())) {
this.readonly = Repository.READONLY_SETTING.get(metadata.settings());
} else {
this.readonly = this.blobStore.getLocationMode() == LocationMode.SECONDARY_ONLY;
}
this.environment = environment;
this.storageService = storageService;
final String basePath = Strings.trimLeadingCharacter(Repository.BASE_PATH_SETTING.get(metadata.settings()), '/');
if (Strings.hasLength(basePath)) {
// Remove starting / if any
@ -108,15 +103,33 @@ public class AzureRepository extends BlobStoreRepository {
} else {
this.basePath = BlobPath.cleanPath();
}
logger.debug((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]", blobStore, chunkSize, compress, basePath));
// If the user explicitly did not define a readonly value, we set it by ourselves depending on the location mode setting.
// For secondary_only setting, the repository should be read only
final LocationMode locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
if (Repository.READONLY_SETTING.exists(metadata.settings())) {
this.readonly = Repository.READONLY_SETTING.get(metadata.settings());
} else {
this.readonly = locationMode == LocationMode.SECONDARY_ONLY;
}
}
// only use for testing
@Override
protected BlobStore getBlobStore() {
return super.getBlobStore();
}
/**
* {@inheritDoc}
*/
@Override
protected BlobStore blobStore() {
protected AzureBlobStore createBlobStore() throws URISyntaxException, StorageException {
final AzureBlobStore blobStore = new AzureBlobStore(metadata, environment.settings(), storageService);
logger.debug((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
blobStore, chunkSize, compress, basePath));
return blobStore;
}
@ -144,6 +157,7 @@ public class AzureRepository extends BlobStoreRepository {
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetadata) {
try {
final AzureBlobStore blobStore = (AzureBlobStore) blobStore();
if (blobStore.containerExist() == false) {
throw new IllegalArgumentException("The bucket [" + blobStore + "] does not exist. Please create it before "
+ " creating an azure snapshot repository backed by it.");

View File

@ -20,7 +20,6 @@
package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -30,76 +29,76 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.net.URISyntaxException;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
public class AzureRepositorySettingsTests extends ESTestCase {
private AzureRepository azureRepository(Settings settings) throws StorageException, IOException, URISyntaxException {
private AzureRepository azureRepository(Settings settings) {
Settings internalSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
.putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths())
.put(settings)
.build();
return new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings),
final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings),
TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class));
assertThat(azureRepository.getBlobStore(), is(nullValue()));
return azureRepository;
}
public void testReadonlyDefault() throws StorageException, IOException, URISyntaxException {
public void testReadonlyDefault() {
assertThat(azureRepository(Settings.EMPTY).isReadOnly(), is(false));
}
public void testReadonlyDefaultAndReadonlyOn() throws StorageException, IOException, URISyntaxException {
public void testReadonlyDefaultAndReadonlyOn() {
assertThat(azureRepository(Settings.builder()
.put("readonly", true)
.build()).isReadOnly(), is(true));
}
public void testReadonlyWithPrimaryOnly() throws StorageException, IOException, URISyntaxException {
public void testReadonlyWithPrimaryOnly() {
assertThat(azureRepository(Settings.builder()
.put(AzureRepository.Repository.LOCATION_MODE_SETTING.getKey(), LocationMode.PRIMARY_ONLY.name())
.build()).isReadOnly(), is(false));
}
public void testReadonlyWithPrimaryOnlyAndReadonlyOn() throws StorageException, IOException, URISyntaxException {
public void testReadonlyWithPrimaryOnlyAndReadonlyOn() {
assertThat(azureRepository(Settings.builder()
.put(AzureRepository.Repository.LOCATION_MODE_SETTING.getKey(), LocationMode.PRIMARY_ONLY.name())
.put("readonly", true)
.build()).isReadOnly(), is(true));
}
public void testReadonlyWithSecondaryOnlyAndReadonlyOn() throws StorageException, IOException, URISyntaxException {
public void testReadonlyWithSecondaryOnlyAndReadonlyOn() {
assertThat(azureRepository(Settings.builder()
.put(AzureRepository.Repository.LOCATION_MODE_SETTING.getKey(), LocationMode.SECONDARY_ONLY.name())
.put("readonly", true)
.build()).isReadOnly(), is(true));
}
public void testReadonlyWithSecondaryOnlyAndReadonlyOff() throws StorageException, IOException, URISyntaxException {
public void testReadonlyWithSecondaryOnlyAndReadonlyOff() {
assertThat(azureRepository(Settings.builder()
.put(AzureRepository.Repository.LOCATION_MODE_SETTING.getKey(), LocationMode.SECONDARY_ONLY.name())
.put("readonly", false)
.build()).isReadOnly(), is(false));
}
public void testReadonlyWithPrimaryAndSecondaryOnlyAndReadonlyOn() throws StorageException, IOException, URISyntaxException {
public void testReadonlyWithPrimaryAndSecondaryOnlyAndReadonlyOn() {
assertThat(azureRepository(Settings.builder()
.put(AzureRepository.Repository.LOCATION_MODE_SETTING.getKey(), LocationMode.PRIMARY_THEN_SECONDARY.name())
.put("readonly", true)
.build()).isReadOnly(), is(true));
}
public void testReadonlyWithPrimaryAndSecondaryOnlyAndReadonlyOff() throws StorageException, IOException, URISyntaxException {
public void testReadonlyWithPrimaryAndSecondaryOnlyAndReadonlyOff() {
assertThat(azureRepository(Settings.builder()
.put(AzureRepository.Repository.LOCATION_MODE_SETTING.getKey(), LocationMode.PRIMARY_THEN_SECONDARY.name())
.put("readonly", false)
.build()).isReadOnly(), is(false));
}
public void testChunkSize() throws StorageException, IOException, URISyntaxException {
public void testChunkSize() {
// default chunk size
AzureRepository azureRepository = azureRepository(Settings.EMPTY);
assertEquals(AzureStorageService.MAX_CHUNK_SIZE, azureRepository.chunkSize());

View File

@ -22,7 +22,6 @@ package org.elasticsearch.repositories.gcs;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -56,18 +55,19 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
byteSizeSetting("chunk_size", MAX_CHUNK_SIZE, MIN_CHUNK_SIZE, MAX_CHUNK_SIZE, Property.NodeScope, Property.Dynamic);
static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());
private final ByteSizeValue chunkSize;
private final boolean compress;
private final GoogleCloudStorageService storageService;
private final BlobPath basePath;
private final GoogleCloudStorageBlobStore blobStore;
private final boolean compress;
private final ByteSizeValue chunkSize;
private final String bucket;
private final String clientName;
GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry,
GoogleCloudStorageService storageService) throws Exception {
GoogleCloudStorageService storageService) {
super(metadata, environment.settings(), namedXContentRegistry);
this.storageService = storageService;
String bucket = getSetting(BUCKET, metadata);
String clientName = CLIENT_NAME.get(metadata.settings());
String basePath = BASE_PATH.get(metadata.settings());
if (Strings.hasLength(basePath)) {
BlobPath path = new BlobPath();
@ -81,16 +81,14 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
this.compress = getSetting(COMPRESS, metadata);
this.chunkSize = getSetting(CHUNK_SIZE, metadata);
this.bucket = getSetting(BUCKET, metadata);
this.clientName = CLIENT_NAME.get(metadata.settings());
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, compress);
this.blobStore = new GoogleCloudStorageBlobStore(settings, bucket, clientName, storageService);
}
@Override
protected BlobStore blobStore() {
return blobStore;
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(settings, bucket, clientName, storageService);
}
@Override

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
import org.junit.AfterClass;
@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.instanceOf;
public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
@ -49,9 +51,10 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
}
@Override
protected void createTestRepository(String name) {
protected void createTestRepository(String name, boolean verify) {
assertAcked(client().admin().cluster().preparePutRepository(name)
.setType(GoogleCloudStorageRepository.TYPE)
.setVerify(verify)
.setSettings(Settings.builder()
.put("bucket", BUCKET)
.put("base_path", GoogleCloudStorageBlobStoreRepositoryTests.class.getSimpleName())
@ -59,6 +62,11 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
}
@Override
protected void afterCreationCheck(Repository repository) {
assertThat(repository, instanceOf(GoogleCloudStorageRepository.class));
}
@AfterClass
public static void wipeRepository() {
blobs.clear();

View File

@ -42,7 +42,6 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -61,29 +60,26 @@ public final class HdfsRepository extends BlobStoreRepository {
private final ByteSizeValue chunkSize;
private final boolean compress;
private final BlobPath basePath = BlobPath.cleanPath();
private HdfsBlobStore blobStore;
private final URI uri;
private final String pathSetting;
// buffer size passed to HDFS read/write methods
// TODO: why 100KB?
private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB);
public HdfsRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) throws IOException {
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), namedXContentRegistry);
this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
this.compress = metadata.settings().getAsBoolean("compress", false);
}
@Override
protected void doStart() {
String uriSetting = getMetadata().settings().get("uri");
if (Strings.hasText(uriSetting) == false) {
throw new IllegalArgumentException("No 'uri' defined for hdfs snapshot/restore");
}
URI uri = URI.create(uriSetting);
uri = URI.create(uriSetting);
if ("hdfs".equalsIgnoreCase(uri.getScheme()) == false) {
throw new IllegalArgumentException(String.format(Locale.ROOT,
"Invalid scheme [%s] specified in uri [%s]; only 'hdfs' uri allowed for hdfs snapshot/restore", uri.getScheme(), uriSetting));
@ -93,16 +89,11 @@ public final class HdfsRepository extends BlobStoreRepository {
"Use 'path' option to specify a path [%s], not the uri [%s] for hdfs snapshot/restore", uri.getPath(), uriSetting));
}
String pathSetting = getMetadata().settings().get("path");
pathSetting = getMetadata().settings().get("path");
// get configuration
if (pathSetting == null) {
throw new IllegalArgumentException("No 'path' defined for hdfs snapshot/restore");
}
// initialize our blobstore using elevated privileges.
SpecialPermission.check();
blobStore = AccessController.doPrivileged((PrivilegedAction<HdfsBlobStore>) () -> createBlobstore(uri, pathSetting, getMetadata().settings()));
super.doStart();
}
private HdfsBlobStore createBlobstore(URI uri, String path, Settings repositorySettings) {
@ -229,7 +220,12 @@ public final class HdfsRepository extends BlobStoreRepository {
}
@Override
protected BlobStore blobStore() {
protected HdfsBlobStore createBlobStore() {
// initialize our blobstore using elevated privileges.
SpecialPermission.check();
final HdfsBlobStore blobStore =
AccessController.doPrivileged((PrivilegedAction<HdfsBlobStore>)
() -> createBlobstore(uri, pathSetting, getMetadata().settings()));
return blobStore;
}

View File

@ -35,7 +35,6 @@ import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
@ -144,30 +143,43 @@ class S3Repository extends BlobStoreRepository {
*/
static final Setting<String> BASE_PATH_SETTING = Setting.simpleString("base_path");
private final S3BlobStore blobStore;
private final S3Service service;
private final BlobPath basePath;
private final String bucket;
private final ByteSizeValue bufferSize;
private final ByteSizeValue chunkSize;
private final boolean compress;
private final BlobPath basePath;
private final boolean serverSideEncryption;
private final String storageClass;
private final String cannedACL;
private final String clientName;
/**
* Constructs an s3 backed repository
*/
S3Repository(final RepositoryMetaData metadata,
final Settings settings,
final NamedXContentRegistry namedXContentRegistry,
final S3Service service) throws IOException {
final S3Service service) {
super(metadata, settings, namedXContentRegistry);
this.service = service;
final String bucket = BUCKET_SETTING.get(metadata.settings());
// Parse and validate the user's S3 Storage Class setting
this.bucket = BUCKET_SETTING.get(metadata.settings());
if (bucket == null) {
throw new RepositoryException(metadata.name(), "No bucket defined for s3 repository");
}
final boolean serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(metadata.settings());
final ByteSizeValue bufferSize = BUFFER_SIZE_SETTING.get(metadata.settings());
this.bufferSize = BUFFER_SIZE_SETTING.get(metadata.settings());
this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings());
this.compress = COMPRESS_SETTING.get(metadata.settings());
@ -177,33 +189,44 @@ class S3Repository extends BlobStoreRepository {
") can't be lower than " + BUFFER_SIZE_SETTING.getKey() + " (" + bufferSize + ").");
}
// Parse and validate the user's S3 Storage Class setting
final String storageClass = STORAGE_CLASS_SETTING.get(metadata.settings());
final String cannedACL = CANNED_ACL_SETTING.get(metadata.settings());
final String clientName = CLIENT_NAME.get(metadata.settings());
logger.debug("using bucket [{}], chunk_size [{}], server_side_encryption [{}], " +
"buffer_size [{}], cannedACL [{}], storageClass [{}]",
bucket, chunkSize, serverSideEncryption, bufferSize, cannedACL, storageClass);
// deprecated behavior: override client credentials from the cluster state
// (repository settings)
if (S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) {
overrideCredentialsFromClusterState(service);
}
blobStore = new S3BlobStore(settings, service, clientName, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass);
final String basePath = BASE_PATH_SETTING.get(metadata.settings());
if (Strings.hasLength(basePath)) {
this.basePath = new BlobPath().add(basePath);
} else {
this.basePath = BlobPath.cleanPath();
}
this.serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(metadata.settings());
this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings());
this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings());
this.clientName = CLIENT_NAME.get(metadata.settings());
logger.debug("using bucket [{}], chunk_size [{}], server_side_encryption [{}], " +
"buffer_size [{}], cannedACL [{}], storageClass [{}]",
bucket, chunkSize, serverSideEncryption, bufferSize, cannedACL, storageClass);
// (repository settings)
if (S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) {
overrideCredentialsFromClusterState(service);
}
}
@Override
protected S3BlobStore createBlobStore() {
return new S3BlobStore(settings, service, clientName, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass);
}
// only use for testing
@Override
protected BlobStore blobStore() {
return blobStore;
return super.blobStore();
}
// only use for testing
@Override
protected BlobStore getBlobStore() {
return super.getBlobStore();
}
@Override

View File

@ -61,7 +61,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
});
}
private final S3Service service;
protected final S3Service service;
public S3RepositoryPlugin(final Settings settings) {
this(settings, new S3Service(settings));
@ -77,7 +77,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
// proxy method for testing
protected S3Repository createRepository(final RepositoryMetaData metadata,
final Settings settings,
final NamedXContentRegistry registry) throws IOException {
final NamedXContentRegistry registry) {
return new S3Repository(metadata, settings, registry, service);
}

View File

@ -80,6 +80,16 @@ public class RepositoryCredentialsTests extends ESTestCase {
ProxyS3RepositoryPlugin(Settings settings) {
super(settings, new ProxyS3Service(settings));
}
@Override
protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry) {
return new S3Repository(metadata, settings, registry, service){
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
}
};
}
}
public void testRepositoryCredentialsOverrideSecureCredentials() throws IOException {
@ -102,8 +112,8 @@ public class RepositoryCredentialsTests extends ESTestCase {
.put(S3Repository.ACCESS_KEY_SETTING.getKey(), "insecure_aws_key")
.put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret").build());
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings);
S3Repository s3repo = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY);
AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin);
AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials();
assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key"));
assertThat(credentials.getAWSSecretKey(), is("insecure_aws_secret"));
@ -125,8 +135,8 @@ public class RepositoryCredentialsTests extends ESTestCase {
.put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret")
.build());
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(Settings.EMPTY);
S3Repository s3repo = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY);
AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin);
AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials();
assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key"));
assertThat(credentials.getAWSSecretKey(), is("insecure_aws_secret"));
@ -140,6 +150,12 @@ public class RepositoryCredentialsTests extends ESTestCase {
+ " See the breaking changes documentation for the next major version.");
}
private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin) {
final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY);
repository.start();
return repository;
}
public void testReinitSecureCredentials() throws IOException {
final String clientName = randomFrom("default", "some_client");
// initial client node settings
@ -156,7 +172,7 @@ public class RepositoryCredentialsTests extends ESTestCase {
}
final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", builder.build());
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings);
S3Repository s3repo = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY)) {
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin)) {
try (AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials
.getCredentials();

View File

@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.mock;
@ -84,8 +85,11 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa
}
@Override
protected void createTestRepository(final String name) {
assertAcked(client().admin().cluster().preparePutRepository(name).setType(S3Repository.TYPE).setSettings(Settings.builder()
protected void createTestRepository(final String name, boolean verify) {
assertAcked(client().admin().cluster().preparePutRepository(name)
.setType(S3Repository.TYPE)
.setVerify(verify)
.setSettings(Settings.builder()
.put(S3Repository.BUCKET_SETTING.getKey(), bucket)
.put(S3Repository.CLIENT_NAME.getKey(), client)
.put(S3Repository.BUFFER_SIZE_SETTING.getKey(), bufferSize)
@ -96,6 +100,11 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa
.put(S3Repository.SECRET_KEY_SETTING.getKey(), "not_used_but_this_is_a_secret")));
}
@Override
protected void afterCreationCheck(Repository repository) {
assertThat(repository, instanceOf(S3Repository.class));
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(TestS3RepositoryPlugin.class);
@ -125,7 +134,7 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa
public void testInsecureRepositoryCredentials() throws Exception {
final String repositoryName = "testInsecureRepositoryCredentials";
createTestRepository(repositoryName);
createAndCheckTestRepository(repositoryName);
final NodeClient nodeClient = internalCluster().getInstance(NodeClient.class);
final RestGetRepositoriesAction getRepoAction = new RestGetRepositoriesAction(Settings.EMPTY, mock(RestController.class),
internalCluster().getInstance(SettingsFilter.class));

View File

@ -29,11 +29,13 @@ import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
public class S3RepositoryTests extends ESTestCase {
@ -70,27 +72,27 @@ public class S3RepositoryTests extends ESTestCase {
}
}
public void testInvalidChunkBufferSizeSettings() throws IOException {
public void testInvalidChunkBufferSizeSettings() {
// chunk < buffer should fail
final Settings s1 = bufferAndChunkSettings(10, 5);
final Exception e1 = expectThrows(RepositoryException.class,
() -> new S3Repository(getRepositoryMetaData(s1), Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()));
() -> createS3Repo(getRepositoryMetaData(s1)));
assertThat(e1.getMessage(), containsString("chunk_size (5mb) can't be lower than buffer_size (10mb)"));
// chunk > buffer should pass
final Settings s2 = bufferAndChunkSettings(5, 10);
new S3Repository(getRepositoryMetaData(s2), Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()).close();
createS3Repo(getRepositoryMetaData(s2)).close();
// chunk = buffer should pass
final Settings s3 = bufferAndChunkSettings(5, 5);
new S3Repository(getRepositoryMetaData(s3), Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()).close();
createS3Repo(getRepositoryMetaData(s3)).close();
// buffer < 5mb should fail
final Settings s4 = bufferAndChunkSettings(4, 10);
final IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class,
() -> new S3Repository(getRepositoryMetaData(s4), Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service())
() -> createS3Repo(getRepositoryMetaData(s4))
.close());
assertThat(e2.getMessage(), containsString("failed to parse value [4mb] for setting [buffer_size], must be >= [5mb]"));
final Settings s5 = bufferAndChunkSettings(5, 6000000);
final IllegalArgumentException e3 = expectThrows(IllegalArgumentException.class,
() -> new S3Repository(getRepositoryMetaData(s5), Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service())
() -> createS3Repo(getRepositoryMetaData(s5))
.close());
assertThat(e3.getMessage(), containsString("failed to parse value [6000000mb] for setting [chunk_size], must be <= [5tb]"));
}
@ -106,20 +108,32 @@ public class S3RepositoryTests extends ESTestCase {
return new RepositoryMetaData("dummy-repo", "mock", Settings.builder().put(settings).build());
}
public void testBasePathSetting() throws IOException {
public void testBasePathSetting() {
final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", Settings.builder()
.put(S3Repository.BASE_PATH_SETTING.getKey(), "foo/bar").build());
try (S3Repository s3repo = new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service())) {
try (S3Repository s3repo = createS3Repo(metadata)) {
assertEquals("foo/bar/", s3repo.basePath().buildAsString());
}
}
public void testDefaultBufferSize() throws IOException {
public void testDefaultBufferSize() {
final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", Settings.EMPTY);
try (S3Repository s3repo = new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service())) {
final long defaultBufferSize = ((S3BlobStore) s3repo.blobStore()).bufferSizeInBytes();
try (S3Repository s3repo = createS3Repo(metadata)) {
assertThat(s3repo.getBlobStore(), is(nullValue()));
s3repo.start();
final long defaultBufferSize = ((S3BlobStore)s3repo.blobStore()).bufferSizeInBytes();
assertThat(s3repo.getBlobStore(), not(nullValue()));
assertThat(defaultBufferSize, Matchers.lessThanOrEqualTo(100L * 1024 * 1024));
assertThat(defaultBufferSize, Matchers.greaterThanOrEqualTo(5L * 1024 * 1024));
}
}
private S3Repository createS3Repo(RepositoryMetaData metadata) {
return new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
}
};
}
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -58,16 +59,20 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final VerifyNodeRepositoryAction verifyAction;
private volatile Map<String, Repository> repositories = Collections.emptyMap();
@Inject
public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService,
Map<String, Repository.Factory> typesRegistry) {
Map<String, Repository.Factory> typesRegistry,
ThreadPool threadPool) {
super(settings);
this.typesRegistry = typesRegistry;
this.clusterService = clusterService;
this.threadPool = threadPool;
// Doesn't make sense to maintain repositories on non-master and non-data nodes
// Nothing happens there anyway
if (DiscoveryNode.isDataNode(settings) || DiscoveryNode.isMasterNode(settings)) {
@ -208,39 +213,51 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
public void verifyRepository(final String repositoryName, final ActionListener<VerifyResponse> listener) {
final Repository repository = repository(repositoryName);
try {
final String verificationToken = repository.startVerification();
if (verificationToken != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
verifyAction.verify(repositoryName, verificationToken, new ActionListener<VerifyResponse>() {
@Override
public void onResponse(VerifyResponse verifyResponse) {
try {
repository.endVerification(verificationToken);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to finish repository verification", repositoryName), e);
listener.onFailure(e);
return;
}
listener.onResponse(verifyResponse);
}
final String verificationToken = repository.startVerification();
if (verificationToken != null) {
try {
verifyAction.verify(repositoryName, verificationToken, new ActionListener<VerifyResponse>() {
@Override
public void onResponse(VerifyResponse verifyResponse) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
repository.endVerification(verificationToken);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] failed to finish repository verification", repositoryName), e);
listener.onFailure(e);
return;
}
listener.onResponse(verifyResponse);
});
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} catch (Exception e) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
repository.endVerification(verificationToken);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"[{}] failed to finish repository verification", repositoryName), inner);
}
listener.onFailure(e);
});
}
});
} catch (Exception e) {
try {
repository.endVerification(verificationToken);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage("[{}] failed to finish repository verification", repositoryName), inner);
} else {
listener.onResponse(new VerifyResponse(new DiscoveryNode[0], new VerificationFailure[0]));
}
} catch (Exception e) {
listener.onFailure(e);
}
} else {
listener.onResponse(new VerifyResponse(new DiscoveryNode[0], new VerificationFailure[0]));
}
});
} catch (Exception e) {
listener.onFailure(e);
}

View File

@ -61,7 +61,7 @@ public class VerifyNodeRepositoryAction extends AbstractComponent {
this.transportService = transportService;
this.clusterService = clusterService;
this.repositoriesService = repositoriesService;
transportService.registerRequestHandler(ACTION_NAME, VerifyNodeRepositoryRequest::new, ThreadPool.Names.SAME, new VerifyNodeRepositoryRequestHandler());
transportService.registerRequestHandler(ACTION_NAME, VerifyNodeRepositoryRequest::new, ThreadPool.Names.SNAPSHOT, new VerifyNodeRepositoryRequestHandler());
}
public void verify(String repository, String verificationToken, final ActionListener<VerifyResponse> listener) {

View File

@ -34,6 +34,7 @@ import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
@ -102,6 +103,7 @@ import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.FilterInputStream;
import java.io.IOException;
@ -126,8 +128,8 @@ import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSna
/**
* BlobStore - based implementation of Snapshot Repository
* <p>
* This repository works with any {@link BlobStore} implementation. The blobStore should be initialized in the derived
* class before {@link #doStart()} is called.
* This repository works with any {@link BlobStore} implementation. The blobStore could be (and preferred) lazy initialized in
* {@link #createBlobStore()}.
* <p>
* BlobStoreRepository maintains the following structure in the blob store
* <pre>
@ -169,8 +171,6 @@ import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSna
*/
public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository {
private BlobContainer snapshotsBlobContainer;
protected final RepositoryMetaData metadata;
protected final NamedXContentRegistry namedXContentRegistry;
@ -225,6 +225,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private final ChecksumBlobStoreFormat<BlobStoreIndexShardSnapshots> indexShardSnapshotsFormat;
private final Object lock = new Object();
private final SetOnce<BlobContainer> blobContainer = new SetOnce<>();
private final SetOnce<BlobStore> blobStore = new SetOnce<>();
/**
* Constructs new BlobStoreRepository
*
@ -251,7 +257,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
protected void doStart() {
this.snapshotsBlobContainer = blobStore().blobContainer(basePath());
globalMetaDataFormat = new ChecksumBlobStoreFormat<>(METADATA_CODEC, METADATA_NAME_FORMAT,
MetaData::fromXContent, namedXContentRegistry, isCompress());
indexMetaDataFormat = new ChecksumBlobStoreFormat<>(INDEX_METADATA_CODEC, METADATA_NAME_FORMAT,
@ -265,17 +270,82 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
protected void doClose() {
try {
blobStore().close();
} catch (Exception t) {
logger.warn("cannot close blob store", t);
BlobStore store;
// to close blobStore if blobStore initialization is started during close
synchronized (lock) {
store = blobStore.get();
}
if (store != null) {
try {
store.close();
} catch (Exception t) {
logger.warn("cannot close blob store", t);
}
}
}
// package private, only use for testing
BlobContainer getBlobContainer() {
return blobContainer.get();
}
// for test purposes only
protected BlobStore getBlobStore() {
return blobStore.get();
}
/**
* Returns the BlobStore to read and write data.
* maintains single lazy instance of {@link BlobContainer}
*/
protected abstract BlobStore blobStore();
protected BlobContainer blobContainer() {
assertSnapshotOrGenericThread();
BlobContainer blobContainer = this.blobContainer.get();
if (blobContainer == null) {
synchronized (lock) {
blobContainer = this.blobContainer.get();
if (blobContainer == null) {
blobContainer = blobStore().blobContainer(basePath());
this.blobContainer.set(blobContainer);
}
}
}
return blobContainer;
}
/**
* maintains single lazy instance of {@link BlobStore}
*/
protected BlobStore blobStore() {
assertSnapshotOrGenericThread();
BlobStore store = blobStore.get();
if (store == null) {
synchronized (lock) {
store = blobStore.get();
if (store == null) {
if (lifecycle.started() == false) {
throw new RepositoryException(metadata.name(), "repository is not in started state");
}
try {
store = createBlobStore();
} catch (RepositoryException e) {
throw e;
} catch (Exception e) {
throw new RepositoryException(metadata.name(), "cannot create blob store" , e);
}
blobStore.set(store);
}
}
}
return store;
}
/**
* Creates new BlobStore to read and write data.
*/
protected abstract BlobStore createBlobStore() throws Exception;
/**
* Returns base path of the repository
@ -319,12 +389,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (repositoryData.getAllSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists");
}
if (snapshotFormat.exists(snapshotsBlobContainer, snapshotId.getUUID())) {
if (snapshotFormat.exists(blobContainer(), snapshotId.getUUID())) {
throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists");
}
// Write Global MetaData
globalMetaDataFormat.write(clusterMetaData, snapshotsBlobContainer, snapshotId.getUUID());
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID());
// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
@ -421,7 +491,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private void deleteSnapshotBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) {
try {
snapshotFormat.delete(snapshotsBlobContainer, blobId);
snapshotFormat.delete(blobContainer(), blobId);
} catch (IOException e) {
if (snapshotInfo != null) {
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete snapshot file [{}]",
@ -434,7 +504,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private void deleteGlobalMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) {
try {
globalMetaDataFormat.delete(snapshotsBlobContainer, blobId);
globalMetaDataFormat.delete(blobContainer(), blobId);
} catch (IOException e) {
if (snapshotInfo != null) {
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata file [{}]",
@ -472,7 +542,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
startTime, failure, System.currentTimeMillis(), totalShards, shardFailures,
includeGlobalState);
try {
snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, snapshotId.getUUID());
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID());
final RepositoryData repositoryData = getRepositoryData();
writeIndexGen(repositoryData.addSnapshot(snapshotId, blobStoreSnapshot.state(), indices), repositoryStateId);
} catch (FileAlreadyExistsException ex) {
@ -490,7 +560,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
try {
return snapshotFormat.read(snapshotsBlobContainer, snapshotId.getUUID());
return snapshotFormat.read(blobContainer(), snapshotId.getUUID());
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException | NotXContentException ex) {
@ -501,7 +571,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public MetaData getSnapshotGlobalMetaData(final SnapshotId snapshotId) {
try {
return globalMetaDataFormat.read(snapshotsBlobContainer, snapshotId.getUUID());
return globalMetaDataFormat.read(blobContainer(), snapshotId.getUUID());
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
@ -543,11 +613,21 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return restoreRateLimitingTimeInNanos.count();
}
protected void assertSnapshotOrGenericThread() {
assert Thread.currentThread().getName().contains(ThreadPool.Names.SNAPSHOT)
|| Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC) :
"Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";
}
@Override
public String startVerification() {
try {
if (isReadOnly()) {
// It's readonly - so there is not much we can do here to verify it
// TODO: add repository verification for read-only repositories
// It's readonly - so there is not much we can do here to verify it apart try to create blobStore()
// and check that is is accessible on the master
blobStore();
return null;
} else {
String seed = UUIDs.randomBase64UUID();
@ -584,7 +664,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final String snapshotsIndexBlobName = INDEX_FILE_PREFIX + Long.toString(indexGen);
RepositoryData repositoryData;
try (InputStream blob = snapshotsBlobContainer.readBlob(snapshotsIndexBlobName)) {
try (InputStream blob = blobContainer().readBlob(snapshotsIndexBlobName)) {
BytesStreamOutput out = new BytesStreamOutput();
Streams.copy(blob, out);
// EMPTY is safe here because RepositoryData#fromXContent calls namedObject
@ -598,7 +678,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
// now load the incompatible snapshot ids, if they exist
try (InputStream blob = snapshotsBlobContainer.readBlob(INCOMPATIBLE_SNAPSHOTS_BLOB)) {
try (InputStream blob = blobContainer().readBlob(INCOMPATIBLE_SNAPSHOTS_BLOB)) {
BytesStreamOutput out = new BytesStreamOutput();
Streams.copy(blob, out);
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY,
@ -636,11 +716,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return readOnly;
}
// package private, only use for testing
BlobContainer blobContainer() {
return snapshotsBlobContainer;
}
protected void writeIndexGen(final RepositoryData repositoryData, final long repositoryStateId) throws IOException {
assert isReadOnly() == false; // can not write to a read only repository
final long currentGen = latestIndexBlobId();
@ -668,7 +743,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// delete the N-2 index file if it exists, keep the previous one around as a backup
if (isReadOnly() == false && newGen - 2 >= 0) {
final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
snapshotsBlobContainer.deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
}
// write the current generation to the index-latest file
@ -736,7 +811,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// package private for testing
long readSnapshotIndexLatestBlob() throws IOException {
try (InputStream blob = snapshotsBlobContainer.readBlob(INDEX_LATEST_BLOB)) {
try (InputStream blob = blobContainer().readBlob(INDEX_LATEST_BLOB)) {
BytesStreamOutput out = new BytesStreamOutput();
Streams.copy(blob, out);
return Numbers.bytesToLong(out.bytes().toBytesRef());
@ -744,7 +819,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
private long listBlobsToGetLatestIndexId() throws IOException {
Map<String, BlobMetaData> blobs = snapshotsBlobContainer.listBlobsByPrefix(INDEX_FILE_PREFIX);
Map<String, BlobMetaData> blobs = blobContainer().listBlobsByPrefix(INDEX_FILE_PREFIX);
long latest = RepositoryData.EMPTY_REPO_GEN;
if (blobs.isEmpty()) {
// no snapshot index blobs have been written yet
@ -766,7 +841,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private void writeAtomic(final String blobName, final BytesReference bytesRef, boolean failIfAlreadyExists) throws IOException {
try (InputStream stream = bytesRef.streamInput()) {
snapshotsBlobContainer.writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists);
blobContainer().writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists);
}
}
@ -806,6 +881,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public void verify(String seed, DiscoveryNode localNode) {
assertSnapshotOrGenericThread();
BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
if (testBlobContainer.blobExists("master.dat")) {
try {

View File

@ -31,7 +31,6 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
import java.nio.file.Path;
import java.util.function.Function;
@ -61,8 +60,7 @@ public class FsRepository extends BlobStoreRepository {
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope);
public static final Setting<Boolean> REPOSITORIES_COMPRESS_SETTING =
Setting.boolSetting("repositories.fs.compress", false, Property.NodeScope);
private final FsBlobStore blobStore;
private final Environment environment;
private ByteSizeValue chunkSize;
@ -74,37 +72,45 @@ public class FsRepository extends BlobStoreRepository {
* Constructs a shared file system repository.
*/
public FsRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) throws IOException {
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), namedXContentRegistry);
this.environment = environment;
String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
if (location.isEmpty()) {
logger.warn("the repository location is missing, it should point to a shared file system location that is available on all master and data nodes");
logger.warn("the repository location is missing, it should point to a shared file system location"
+ " that is available on all master and data nodes");
throw new RepositoryException(metadata.name(), "missing location");
}
Path locationFile = environment.resolveRepoFile(location);
if (locationFile == null) {
if (environment.repoFiles().length > 0) {
logger.warn("The specified location [{}] doesn't start with any repository paths specified by the path.repo setting: [{}] ", location, environment.repoFiles());
throw new RepositoryException(metadata.name(), "location [" + location + "] doesn't match any of the locations specified by path.repo");
logger.warn("The specified location [{}] doesn't start with any "
+ "repository paths specified by the path.repo setting: [{}] ", location, environment.repoFiles());
throw new RepositoryException(metadata.name(), "location [" + location
+ "] doesn't match any of the locations specified by path.repo");
} else {
logger.warn("The specified location [{}] should start with a repository path specified by the path.repo setting, but the path.repo setting was not set on this node", location);
throw new RepositoryException(metadata.name(), "location [" + location + "] doesn't match any of the locations specified by path.repo because this setting is empty");
logger.warn("The specified location [{}] should start with a repository path specified by"
+ " the path.repo setting, but the path.repo setting was not set on this node", location);
throw new RepositoryException(metadata.name(), "location [" + location
+ "] doesn't match any of the locations specified by path.repo because this setting is empty");
}
}
blobStore = new FsBlobStore(settings, locationFile);
if (CHUNK_SIZE_SETTING.exists(metadata.settings())) {
this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings());
} else {
this.chunkSize = REPOSITORIES_CHUNK_SIZE_SETTING.get(settings);
}
this.compress = COMPRESS_SETTING.exists(metadata.settings()) ? COMPRESS_SETTING.get(metadata.settings()) : REPOSITORIES_COMPRESS_SETTING.get(settings);
this.compress = COMPRESS_SETTING.exists(metadata.settings())
? COMPRESS_SETTING.get(metadata.settings()) : REPOSITORIES_COMPRESS_SETTING.get(settings);
this.basePath = BlobPath.cleanPath();
}
@Override
protected BlobStore blobStore() {
return blobStore;
protected BlobStore createBlobStore() throws Exception {
final String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
final Path locationFile = environment.resolveRepoFile(location);
return new FsBlobStore(settings, locationFile);
}
@Override

View File

@ -406,7 +406,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
Collections.emptySet());
final ClusterService clusterService = mock(ClusterService.class);
final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService,
transportService, null);
transportService, null, threadPool);
final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService(settings, threadPool,
transportService, null, clusterService);
final ShardStateAction shardStateAction = mock(ShardStateAction.class);

View File

@ -173,10 +173,17 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
}
/** Create a {@link Repository} with a random name **/
private Repository createRepository() throws IOException {
private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry());
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry()) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually
}
};
repository.start();
return repository;
}
/** Create a {@link Environment} with random path.home and path.repo **/

View File

@ -24,10 +24,16 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
import org.elasticsearch.client.Client;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
@ -37,18 +43,42 @@ import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
/**
* Tests for the {@link BlobStoreRepository} and its subclasses.
*/
public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
static final String REPO_TYPE = "fsLike";
protected Collection<Class<? extends Plugin>> getPlugins() {
return Arrays.asList(FsLikeRepoPlugin.class);
}
// the reason for this plug-in is to drop any assertSnapshotOrGenericThread as mostly all access in this test goes from test threads
public static class FsLikeRepoPlugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.singletonMap(REPO_TYPE,
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we access blobStore on test/main threads
}
});
}
}
public void testRetrieveSnapshots() throws Exception {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
@ -57,7 +87,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse =
client.admin().cluster().preparePutRepository(repositoryName)
.setType("fs")
.setType(REPO_TYPE)
.setSettings(Settings.builder().put(node().settings()).put("location", location))
.get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@ -209,7 +239,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
PutRepositoryResponse putRepositoryResponse =
client.admin().cluster().preparePutRepository(repositoryName)
.setType("fs")
.setType(REPO_TYPE)
.setSettings(Settings.builder().put(node().settings()).put("location", location))
.get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@ -217,6 +247,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
final RepositoriesService repositoriesService = getInstanceFromNode(RepositoriesService.class);
@SuppressWarnings("unchecked") final BlobStoreRepository repository =
(BlobStoreRepository) repositoriesService.repository(repositoryName);
assertThat("getBlobContainer has to be lazy initialized", repository.getBlobContainer(), nullValue());
return repository;
}

View File

@ -16,22 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
package org.elasticsearch.repositories.fs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.instanceOf;
public class FsBlobStoreRepositoryIT extends ESBlobStoreRepositoryIntegTestCase {
@Override
protected void createTestRepository(String name) {
protected void createTestRepository(String name, boolean verify) {
assertAcked(client().admin().cluster().preparePutRepository(name)
.setVerify(verify)
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
}
@Override
protected void afterCreationCheck(Repository repository) {
assertThat(repository, instanceOf(FsRepository.class));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.snapshots;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
@ -93,6 +94,7 @@ import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.StoredScriptsIT;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
@ -1262,7 +1264,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
Repository repository = service.repository("test-repo");
final Map<String, IndexId> indexIds = repository.getRepositoryData().getIndices();
final Map<String, IndexId> indexIds = getRepositoryData(repository).getIndices();
final Path indicesPath = repo.resolve("indices");
logger.info("--> delete index metadata and shard metadata");
@ -1739,6 +1741,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> trying to create a repository with different name");
assertAcked(client.admin().cluster().preparePutRepository("test-repo-2")
.setVerify(false) // do not do verification itself as snapshot threads could be fully blocked
.setType("fs").setSettings(Settings.builder().put("location", repositoryLocation.resolve("test"))));
logger.info("--> unblocking blocked node");
@ -2563,7 +2566,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> emulate an orphan snapshot");
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();
final RepositoryData repositoryData = getRepositoryData(repositoriesService.repository(repositoryName));
final IndexId indexId = repositoryData.resolveIndexId(idxName);
clusterService.submitStateUpdateTask("orphan snapshot test", new ClusterStateUpdateTask() {
@ -2784,7 +2787,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
Repository repository = service.repository("test-repo");
final Map<String, IndexId> indexIds = repository.getRepositoryData().getIndices();
final RepositoryData repositoryData = getRepositoryData(repository);
final Map<String, IndexId> indexIds = repositoryData.getIndices();
assertThat(indexIds.size(), equalTo(nbIndices));
// Choose a random index from the snapshot
@ -3445,6 +3449,19 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
}
}
private RepositoryData getRepositoryData(Repository repository) throws InterruptedException {
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
final SetOnce<RepositoryData> repositoryData = new SetOnce<>();
final CountDownLatch latch = new CountDownLatch(1);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
repositoryData.set(repository.getRepositoryData());
latch.countDown();
});
latch.await();
return repositoryData.get();
}
private void verifySnapshotInfo(final GetSnapshotsResponse response, final Map<String, List<String>> indicesPerSnapshot) {
for (SnapshotInfo snapshotInfo : response.getSnapshots()) {
final List<String> expected = snapshotInfo.indices();

View File

@ -92,8 +92,6 @@ public class MockRepository extends FsRepository {
private final long waitAfterUnblock;
private final MockBlobStore mockBlobStore;
private final String randomPrefix;
private volatile boolean blockOnInitialization;
@ -128,7 +126,6 @@ public class MockRepository extends FsRepository {
waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
allowAtomicOperations = metadata.settings().getAsBoolean("allow_atomic_operations", true);
logger.info("starting mock repository with random prefix {}", randomPrefix);
mockBlobStore = new MockBlobStore(super.blobStore());
}
@Override
@ -163,8 +160,8 @@ public class MockRepository extends FsRepository {
}
@Override
protected BlobStore blobStore() {
return mockBlobStore;
protected BlobStore createBlobStore() throws Exception {
return new MockBlobStore(super.createBlobStore());
}
public synchronized void unblock() {
@ -195,7 +192,7 @@ public class MockRepository extends FsRepository {
}
private synchronized boolean blockExecution() {
logger.debug("Blocking execution");
logger.debug("[{}] Blocking execution", metadata.name());
boolean wasBlocked = false;
try {
while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile ||
@ -207,7 +204,7 @@ public class MockRepository extends FsRepository {
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
logger.debug("Unblocking execution");
logger.debug("[{}] Unblocking execution", metadata.name());
return wasBlocked;
}
@ -285,7 +282,7 @@ public class MockRepository extends FsRepository {
}
private void blockExecutionAndMaybeWait(final String blobName) {
logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path());
logger.info("[{}] blocking I/O operation for file [{}] at path [{}]", metadata.name(), blobName, path());
if (blockExecution() && waitAfterUnblock > 0) {
try {
// Delay operation after unblocking

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.repositories.blobstore;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
@ -27,34 +28,61 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
/**
* Basic integration tests for blob-based repository validation.
*/
public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase {
protected abstract void createTestRepository(String name);
protected abstract void createTestRepository(String name, boolean verify);
protected void afterCreationCheck(Repository repository) {
}
protected void createAndCheckTestRepository(String name) {
final boolean verify = randomBoolean();
createTestRepository(name, verify);
final Iterable<RepositoriesService> repositoriesServices =
internalCluster().getDataOrMasterNodeInstances(RepositoriesService.class);
for (RepositoriesService repositoriesService : repositoriesServices) {
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(name);
afterCreationCheck(repository);
assertThat("blob store has to be lazy initialized",
repository.getBlobStore(), verify ? is(notNullValue()) : is(nullValue()));
}
}
public void testSnapshotAndRestore() throws Exception {
final String repoName = randomAsciiName();
logger.info("--> creating repository {}", repoName);
createTestRepository(repoName);
createAndCheckTestRepository(repoName);
int indexCount = randomIntBetween(1, 5);
int[] docCounts = new int[indexCount];
String[] indexNames = generateRandomNames(indexCount);
@ -125,7 +153,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
public void testMultipleSnapshotAndRollback() throws Exception {
String repoName = randomAsciiName();
logger.info("--> creating repository {}", repoName);
createTestRepository(repoName);
createAndCheckTestRepository(repoName);
int iterationCount = randomIntBetween(2, 5);
int[] docCounts = new int[iterationCount];
String indexName = randomAsciiName();
@ -177,12 +205,12 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
}
}
public void testIndicesDeletedFromRepository() {
public void testIndicesDeletedFromRepository() throws Exception {
Client client = client();
logger.info("--> creating repository");
final String repoName = "test-repo";
createTestRepository(repoName);
createAndCheckTestRepository(repoName);
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();
@ -219,12 +247,22 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
logger.info("--> verify index folder deleted from blob container");
RepositoriesService repositoriesSvc = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
@SuppressWarnings("unchecked") BlobStoreRepository repository = (BlobStoreRepository) repositoriesSvc.repository(repoName);
BlobContainer indicesBlobContainer = repository.blobStore().blobContainer(repository.basePath().add("indices"));
RepositoryData repositoryData = repository.getRepositoryData();
for (IndexId indexId : repositoryData.getIndices().values()) {
final SetOnce<BlobContainer> indicesBlobContainer = new SetOnce<>();
final SetOnce<RepositoryData> repositoryData = new SetOnce<>();
final CountDownLatch latch = new CountDownLatch(1);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
indicesBlobContainer.set(repository.blobStore().blobContainer(repository.basePath().add("indices")));
repositoryData.set(repository.getRepositoryData());
latch.countDown();
});
latch.await();
for (IndexId indexId : repositoryData.get().getIndices().values()) {
if (indexId.getName().equals("test-idx-3")) {
assertFalse(indicesBlobContainer.blobExists(indexId.getId())); // deleted index
assertFalse(indicesBlobContainer.get().blobExists(indexId.getId())); // deleted index
}
}
}