Allow per-repo protocols
Also add an integration test for custom endpoints/protocols.
This commit is contained in:
parent
2203f439e2
commit
30d80cc27d
|
@ -164,6 +164,7 @@ The following settings are supported:
|
||||||
* `bucket`: The name of the bucket to be used for snapshots. (Mandatory)
|
* `bucket`: The name of the bucket to be used for snapshots. (Mandatory)
|
||||||
* `region`: The region where bucket is located. Defaults to US Standard
|
* `region`: The region where bucket is located. Defaults to US Standard
|
||||||
* `endpoint`: The endpoint to the S3 API. Defaults to AWS's default S3 endpoint. Note that setting a region overrides the endpoint setting.
|
* `endpoint`: The endpoint to the S3 API. Defaults to AWS's default S3 endpoint. Note that setting a region overrides the endpoint setting.
|
||||||
|
* `protocol`: The protocol to use (`http` or `https`). Defaults to `https`.
|
||||||
* `base_path`: Specifies the path within bucket to repository data. Defaults to root directory.
|
* `base_path`: Specifies the path within bucket to repository data. Defaults to root directory.
|
||||||
* `access_key`: The access key to use for authentication. Defaults to value of `cloud.aws.access_key`.
|
* `access_key`: The access key to use for authentication. Defaults to value of `cloud.aws.access_key`.
|
||||||
* `secret_key`: The secret key to use for authentication. Defaults to value of `cloud.aws.secret_key`.
|
* `secret_key`: The secret key to use for authentication. Defaults to value of `cloud.aws.secret_key`.
|
||||||
|
@ -290,10 +291,16 @@ repositories:
|
||||||
remote-bucket:
|
remote-bucket:
|
||||||
bucket: <bucket in other region>
|
bucket: <bucket in other region>
|
||||||
region: <region>
|
region: <region>
|
||||||
|
external-bucket:
|
||||||
|
bucket: <bucket>
|
||||||
|
access_key: <access key>
|
||||||
|
secret_key: <secret key>
|
||||||
|
endpoint: <endpoint>
|
||||||
|
protocol: <protocol>
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
Replace all occurrences of `access_key`, `secret_key`, `bucket` and `region` with your settings. Please, note that the test will delete all snapshot/restore related files in the specified buckets.
|
Replace all occurrences of `access_key`, `secret_key`, `endpoint`, `protocol`, `bucket` and `region` with your settings. Please, note that the test will delete all snapshot/restore related files in the specified buckets.
|
||||||
|
|
||||||
To run test:
|
To run test:
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ import org.elasticsearch.common.component.LifecycleComponent;
|
||||||
public interface AwsS3Service extends LifecycleComponent<AwsS3Service> {
|
public interface AwsS3Service extends LifecycleComponent<AwsS3Service> {
|
||||||
AmazonS3 client();
|
AmazonS3 client();
|
||||||
|
|
||||||
AmazonS3 client(String endpoint, String region, String account, String key);
|
AmazonS3 client(String endpoint, String protocol, String region, String account, String key);
|
||||||
|
|
||||||
AmazonS3 client(String endpoint, String region, String account, String key, Integer maxRetries);
|
AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries);
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,34 +60,32 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
|
||||||
String account = componentSettings.get("access_key", settings.get("cloud.account"));
|
String account = componentSettings.get("access_key", settings.get("cloud.account"));
|
||||||
String key = componentSettings.get("secret_key", settings.get("cloud.key"));
|
String key = componentSettings.get("secret_key", settings.get("cloud.key"));
|
||||||
|
|
||||||
return getClient(endpoint, account, key, null);
|
return getClient(endpoint, "https", account, key, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AmazonS3 client(String endpoint, String region, String account, String key) {
|
public AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
|
||||||
return client(endpoint, region, account, key, null);
|
return client(endpoint, protocol, region, account, key, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized AmazonS3 client(String endpoint, String region, String account, String key, Integer maxRetries) {
|
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
|
||||||
if (endpoint == null) {
|
if (region != null && endpoint == null) {
|
||||||
endpoint = getDefaultEndpoint();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (region != null) {
|
|
||||||
endpoint = getEndpoint(region);
|
endpoint = getEndpoint(region);
|
||||||
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
|
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
|
||||||
|
} else if (endpoint == null) {
|
||||||
|
endpoint = getDefaultEndpoint();
|
||||||
}
|
}
|
||||||
if (account == null || key == null) {
|
if (account == null || key == null) {
|
||||||
account = componentSettings.get("access_key", settings.get("cloud.account"));
|
account = componentSettings.get("access_key", settings.get("cloud.account"));
|
||||||
key = componentSettings.get("secret_key", settings.get("cloud.key"));
|
key = componentSettings.get("secret_key", settings.get("cloud.key"));
|
||||||
}
|
}
|
||||||
|
|
||||||
return getClient(endpoint, account, key, maxRetries);
|
return getClient(endpoint, protocol, account, key, maxRetries);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private synchronized AmazonS3 getClient(String endpoint, String account, String key, Integer maxRetries) {
|
private synchronized AmazonS3 getClient(String endpoint, String protocol, String account, String key, Integer maxRetries) {
|
||||||
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
|
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
|
||||||
AmazonS3Client client = clients.get(clientDescriptor);
|
AmazonS3Client client = clients.get(clientDescriptor);
|
||||||
if (client != null) {
|
if (client != null) {
|
||||||
|
@ -95,8 +93,10 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
|
||||||
}
|
}
|
||||||
|
|
||||||
ClientConfiguration clientConfiguration = new ClientConfiguration();
|
ClientConfiguration clientConfiguration = new ClientConfiguration();
|
||||||
String protocol = componentSettings.get("protocol", "https").toLowerCase();
|
if (protocol == null) {
|
||||||
protocol = componentSettings.get("s3.protocol", protocol).toLowerCase();
|
protocol = "https";
|
||||||
|
}
|
||||||
|
|
||||||
if ("http".equals(protocol)) {
|
if ("http".equals(protocol)) {
|
||||||
clientConfiguration.setProtocol(Protocol.HTTP);
|
clientConfiguration.setProtocol(Protocol.HTTP);
|
||||||
} else if ("https".equals(protocol)) {
|
} else if ("https".equals(protocol)) {
|
||||||
|
|
|
@ -80,6 +80,9 @@ public class S3Repository extends BlobStoreRepository {
|
||||||
}
|
}
|
||||||
|
|
||||||
String endpoint = repositorySettings.settings().get("endpoint", componentSettings.get("endpoint"));
|
String endpoint = repositorySettings.settings().get("endpoint", componentSettings.get("endpoint"));
|
||||||
|
String protocol = componentSettings.get("protocol", "https").toLowerCase();
|
||||||
|
protocol = componentSettings.get("s3.protocol", protocol).toLowerCase();
|
||||||
|
protocol = repositorySettings.settings().get("protocol", protocol);
|
||||||
|
|
||||||
String region = repositorySettings.settings().get("region", componentSettings.get("region"));
|
String region = repositorySettings.settings().get("region", componentSettings.get("region"));
|
||||||
if (region == null) {
|
if (region == null) {
|
||||||
|
@ -126,10 +129,10 @@ public class S3Repository extends BlobStoreRepository {
|
||||||
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
|
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
|
||||||
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
|
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
|
||||||
|
|
||||||
logger.debug("using bucket [{}], region [{}], endpoint [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}]",
|
logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}]",
|
||||||
bucket, region, endpoint, chunkSize, serverSideEncryption, bufferSize, maxRetries);
|
bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries);
|
||||||
|
|
||||||
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries);
|
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries);
|
||||||
String basePath = repositorySettings.settings().get("base_path", null);
|
String basePath = repositorySettings.settings().get("base_path", null);
|
||||||
if (Strings.hasLength(basePath)) {
|
if (Strings.hasLength(basePath)) {
|
||||||
BlobPath path = new BlobPath();
|
BlobPath path = new BlobPath();
|
||||||
|
|
|
@ -45,13 +45,13 @@ public class TestAwsS3Service extends InternalAwsS3Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized AmazonS3 client(String endpoint, String region, String account, String key) {
|
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
|
||||||
return cachedWrapper(super.client(endpoint, region, account, key));
|
return cachedWrapper(super.client(endpoint, protocol, region, account, key));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized AmazonS3 client(String region, String account, String key, Integer maxRetries) {
|
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
|
||||||
return cachedWrapper(super.client(region, account, key, maxRetries));
|
return cachedWrapper(super.client(endpoint, protocol, region, account, key, maxRetries));
|
||||||
}
|
}
|
||||||
|
|
||||||
private AmazonS3 cachedWrapper(AmazonS3 client) {
|
private AmazonS3 cachedWrapper(AmazonS3 client) {
|
||||||
|
|
|
@ -193,6 +193,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest {
|
||||||
Settings bucket = settings.getByPrefix("repositories.s3.");
|
Settings bucket = settings.getByPrefix("repositories.s3.");
|
||||||
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(
|
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(
|
||||||
bucket.get("endpoint", settings.get("repositories.s3.endpoint")),
|
bucket.get("endpoint", settings.get("repositories.s3.endpoint")),
|
||||||
|
bucket.get("protocol", settings.get("repositories.s3.protocol")),
|
||||||
bucket.get("region", settings.get("repositories.s3.region")),
|
bucket.get("region", settings.get("repositories.s3.region")),
|
||||||
bucket.get("access_key", settings.get("cloud.aws.access_key")),
|
bucket.get("access_key", settings.get("cloud.aws.access_key")),
|
||||||
bucket.get("secret_key", settings.get("cloud.aws.secret_key")));
|
bucket.get("secret_key", settings.get("cloud.aws.secret_key")));
|
||||||
|
@ -279,6 +280,22 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest {
|
||||||
assertRepositoryIsOperational(client, "test-repo");
|
assertRepositoryIsOperational(client, "test-repo");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRepositoryWithCustomEndpointProtocol() {
|
||||||
|
Client client = client();
|
||||||
|
Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.external-bucket.");
|
||||||
|
logger.info("--> creating s3 repostoriy with endpoint [{}], bucket[{}] and path [{}]", bucketSettings.get("endpoint"), bucketSettings.get("bucket"), basePath);
|
||||||
|
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||||
|
.setType("s3").setSettings(ImmutableSettings.settingsBuilder()
|
||||||
|
.put("protocol", bucketSettings.get("protocol"))
|
||||||
|
.put("endpoint", bucketSettings.get("endpoint"))
|
||||||
|
.put("access_key", bucketSettings.get("access_key"))
|
||||||
|
.put("secret_key", bucketSettings.get("secret_key"))
|
||||||
|
).get();
|
||||||
|
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||||
|
assertRepositoryIsOperational(client, "test-repo");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test verifies that the test configuration is set up in a manner that
|
* This test verifies that the test configuration is set up in a manner that
|
||||||
* does not make the test {@link #testRepositoryInRemoteRegion()} pointless.
|
* does not make the test {@link #testRepositoryInRemoteRegion()} pointless.
|
||||||
|
@ -432,6 +449,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest {
|
||||||
};
|
};
|
||||||
for (Settings bucket : buckets) {
|
for (Settings bucket : buckets) {
|
||||||
String endpoint = bucket.get("endpoint", settings.get("repositories.s3.endpoint"));
|
String endpoint = bucket.get("endpoint", settings.get("repositories.s3.endpoint"));
|
||||||
|
String protocol = bucket.get("protocol", settings.get("repositories.s3.protocol"));
|
||||||
String region = bucket.get("region", settings.get("repositories.s3.region"));
|
String region = bucket.get("region", settings.get("repositories.s3.region"));
|
||||||
String accessKey = bucket.get("access_key", settings.get("cloud.aws.access_key"));
|
String accessKey = bucket.get("access_key", settings.get("cloud.aws.access_key"));
|
||||||
String secretKey = bucket.get("secret_key", settings.get("cloud.aws.secret_key"));
|
String secretKey = bucket.get("secret_key", settings.get("cloud.aws.secret_key"));
|
||||||
|
@ -440,7 +458,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest {
|
||||||
// We check that settings has been set in elasticsearch.yml integration test file
|
// We check that settings has been set in elasticsearch.yml integration test file
|
||||||
// as described in README
|
// as described in README
|
||||||
assertThat("Your settings in elasticsearch.yml are incorrects. Check README file.", bucketName, notNullValue());
|
assertThat("Your settings in elasticsearch.yml are incorrects. Check README file.", bucketName, notNullValue());
|
||||||
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, region, accessKey, secretKey);
|
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, protocol, region, accessKey, secretKey);
|
||||||
try {
|
try {
|
||||||
ObjectListing prevListing = null;
|
ObjectListing prevListing = null;
|
||||||
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
|
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
|
||||||
|
|
Loading…
Reference in New Issue