Add per repository credentials

Changed AwsS3Service to use one client per region and credentials combination.
Made S3Repository specify credentials if such exists in the repository settings.

Updated readme with repository specific credentials settings.

Closes #54.
Closes #55.
Closes #56.
(cherry picked from commit d4ea2dd)
This commit is contained in:
Konrad Beiske 2014-03-26 18:52:22 +01:00 committed by David Pilato
parent 254fb81708
commit 7f271fd37a
5 changed files with 272 additions and 90 deletions

View File

@ -119,6 +119,8 @@ The following settings are supported:
* `bucket`: The name of the bucket to be used for snapshots. (Mandatory)
* `region`: The region where bucket is located. Defaults to US Standard
* `base_path`: Specifies the path within bucket to repository data. Defaults to root directory.
* `access_key`: The access key to use for authentication. Defaults to value of `cloud.aws.access_key`.
* `secret_key`: The secret key to use for authentication. Defaults to value of `cloud.aws.secret_key`.
* `concurrent_streams`: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`.
* `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `100m`.
* `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index files that are already compressed by default. Defaults to `false`.
@ -131,11 +133,11 @@ The S3 repositories are using the same credentials as the rest of the S3 service
secret_key: vExyMThREXeRMm/b/LRzEB8jWwvzQeXgjqMX+6br
Multiple S3 repositories can be created as long as they share the same credential.
Multiple S3 repositories can be created. If the buckets require different credentials, then define them as part of the repository settings.
## Testing
Integrations tests in this plugin require working AWS configuration and therefore disabled by default. To enable tests prepare a config file elasticsearch.yml with the following content:
Integrations tests in this plugin require working AWS configuration and therefore disabled by default. Three buckets and two iam users have to be created. The first iam user needs access to two buckets in different regions and the final bucket is exclusive for the other iam user. To enable tests prepare a config file elasticsearch.yml with the following content:
```
cloud:
@ -147,10 +149,17 @@ repositories:
s3:
bucket: "bucket_name"
region: "us-west-2"
private-bucket:
bucket: <bucket not accessible by default key>
access_key: <access key>
secret_key: <access key>
remote-bucket:
bucket: <bucket in other region>
region: <region>
```
Replaces `access_key`, `secret_key`, `bucket` and `region` with your settings. Please, note that the test will delete all snapshot/restore related files in the specified bucket.
Replace all occurrences of `access_key`, `secret_key`, `bucket` and `region` with your settings. Please, note that the test will delete all snapshot/restore related files in the specified buckets.
To run test:

View File

@ -19,6 +19,9 @@
package org.elasticsearch.cloud.aws;
import java.util.HashMap;
import java.util.Map;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.*;
@ -27,6 +30,7 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -37,7 +41,10 @@ import org.elasticsearch.common.settings.SettingsFilter;
*/
public class AwsS3Service extends AbstractLifecycleComponent<AwsS3Service> {
private AmazonS3Client client;
/**
* (acceskey, endpoint) -> client
*/
private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<Tuple<String,String>, AmazonS3Client>();
@Inject
public AwsS3Service(Settings settings, SettingsFilter settingsFilter) {
@ -47,6 +54,33 @@ public class AwsS3Service extends AbstractLifecycleComponent<AwsS3Service> {
}
public synchronized AmazonS3 client() {
String endpoint = getDefaultEndpoint();
String account = componentSettings.get("access_key", settings.get("cloud.account"));
String key = componentSettings.get("secret_key", settings.get("cloud.key"));
return getClient(endpoint, account, key);
}
public synchronized AmazonS3 client(String region, String account, String key) {
String endpoint;
if (region == null) {
endpoint = getDefaultEndpoint();
} else {
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
}
if (account == null || key == null) {
account = componentSettings.get("access_key", settings.get("cloud.account"));
key = componentSettings.get("secret_key", settings.get("cloud.key"));
}
return getClient(endpoint, account, key);
}
private synchronized AmazonS3 getClient(String endpoint, String account, String key) {
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
AmazonS3Client client = clients.get(clientDescriptor);
if (client != null) {
return client;
}
@ -60,8 +94,6 @@ public class AwsS3Service extends AbstractLifecycleComponent<AwsS3Service> {
} else {
throw new ElasticsearchIllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
}
String account = componentSettings.get("access_key", settings.get("cloud.account"));
String key = componentSettings.get("secret_key", settings.get("cloud.key"));
String proxyHost = componentSettings.get("proxy_host");
if (proxyHost != null) {
@ -88,53 +120,60 @@ public class AwsS3Service extends AbstractLifecycleComponent<AwsS3Service> {
new StaticCredentialsProvider(new BasicAWSCredentials(account, key))
);
}
this.client = new AmazonS3Client(credentials, clientConfiguration);
client = new AmazonS3Client(credentials, clientConfiguration);
if (componentSettings.get("s3.endpoint") != null) {
String endpoint = componentSettings.get("s3.endpoint");
logger.debug("using explicit s3 endpoint [{}]", endpoint);
if (endpoint != null) {
client.setEndpoint(endpoint);
} else if (componentSettings.get("region") != null) {
String endpoint;
String region = componentSettings.get("region").toLowerCase();
if ("us-east".equals(region)) {
endpoint = "s3.amazonaws.com";
} else if ("us-east-1".equals(region)) {
endpoint = "s3.amazonaws.com";
} else if ("us-west".equals(region)) {
endpoint = "s3-us-west-1.amazonaws.com";
} else if ("us-west-1".equals(region)) {
endpoint = "s3-us-west-1.amazonaws.com";
} else if ("us-west-2".equals(region)) {
endpoint = "s3-us-west-2.amazonaws.com";
} else if ("ap-southeast".equals(region)) {
endpoint = "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-1".equals(region)) {
endpoint = "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-2".equals(region)) {
endpoint = "s3-ap-southeast-2.amazonaws.com";
} else if ("ap-northeast".equals(region)) {
endpoint = "s3-ap-northeast-1.amazonaws.com";
} else if ("ap-northeast-1".equals(region)) {
endpoint = "s3-ap-northeast-1.amazonaws.com";
} else if ("eu-west".equals(region)) {
endpoint = "s3-eu-west-1.amazonaws.com";
} else if ("eu-west-1".equals(region)) {
endpoint = "s3-eu-west-1.amazonaws.com";
} else if ("sa-east".equals(region)) {
endpoint = "s3-sa-east-1.amazonaws.com";
} else if ("sa-east-1".equals(region)) {
endpoint = "s3-sa-east-1.amazonaws.com";
} else {
throw new ElasticsearchIllegalArgumentException("No automatic endpoint could be derived from region [" + region + "]");
}
if (endpoint != null) {
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
client.setEndpoint(endpoint);
}
}
clients.put(clientDescriptor, client);
return client;
}
return this.client;
private String getDefaultEndpoint() {
String endpoint = null;
if (componentSettings.get("s3.endpoint") != null) {
endpoint = componentSettings.get("s3.endpoint");
logger.debug("using explicit s3 endpoint [{}]", endpoint);
} else if (componentSettings.get("region") != null) {
String region = componentSettings.get("region").toLowerCase();
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
}
return endpoint;
}
private static String getEndpoint(String region) {
if ("us-east".equals(region)) {
return "s3.amazonaws.com";
} else if ("us-east-1".equals(region)) {
return "s3.amazonaws.com";
} else if ("us-west".equals(region)) {
return "s3-us-west-1.amazonaws.com";
} else if ("us-west-1".equals(region)) {
return "s3-us-west-1.amazonaws.com";
} else if ("us-west-2".equals(region)) {
return "s3-us-west-2.amazonaws.com";
} else if ("ap-southeast".equals(region)) {
return "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-1".equals(region)) {
return "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-2".equals(region)) {
return "s3-ap-southeast-2.amazonaws.com";
} else if ("ap-northeast".equals(region)) {
return "s3-ap-northeast-1.amazonaws.com";
} else if ("ap-northeast-1".equals(region)) {
return "s3-ap-northeast-1.amazonaws.com";
} else if ("eu-west".equals(region)) {
return "s3-eu-west-1.amazonaws.com";
} else if ("eu-west-1".equals(region)) {
return "s3-eu-west-1.amazonaws.com";
} else if ("sa-east".equals(region)) {
return "s3-sa-east-1.amazonaws.com";
} else if ("sa-east-1".equals(region)) {
return "s3-sa-east-1.amazonaws.com";
} else {
throw new ElasticsearchIllegalArgumentException("No automatic endpoint could be derived from region [" + region + "]");
}
}
@Override
@ -147,7 +186,7 @@ public class AwsS3Service extends AbstractLifecycleComponent<AwsS3Service> {
@Override
protected void doClose() throws ElasticsearchException {
if (client != null) {
for (AmazonS3Client client : clients.values()) {
client.shutdown();
}
}

View File

@ -124,7 +124,7 @@ public class S3Repository extends BlobStoreRepository {
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[s3_stream]"));
logger.debug("using bucket [{}], region [{}], chunk_size [{}], concurrent_streams [{}]", bucket, region, chunkSize, concurrentStreams);
blobStore = new S3BlobStore(settings, s3Service.client(), bucket, region, concurrentStreamPool);
blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, concurrentStreamPool);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
String basePath = repositorySettings.settings().get("base_path", null);

View File

@ -23,6 +23,7 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
@ -33,6 +34,7 @@ import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
@ -50,7 +52,7 @@ import static org.hamcrest.Matchers.greaterThan;
/**
*/
@AwsTest
@ClusterScope(scope = Scope.TEST, numNodes = 2)
@ClusterScope(scope = Scope.SUITE, numNodes = 2)
public class S3SnapshotRestoreTest extends AbstractAwsTest {
@Override
@ -151,6 +153,117 @@ public class S3SnapshotRestoreTest extends AbstractAwsTest {
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
}
/**
* This test verifies that the test configuration is set up in a manner that
* does not make the test {@link #testRepositoryWithCustomCredentials()} pointless.
*/
@Test(expected = UncategorizedExecutionException.class)
public void assertRepositoryWithCustomCredentialsIsNotAccessibleByDefaultCredentials() {
Client client = client();
Settings bucketSettings = cluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket.");
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(ImmutableSettings.settingsBuilder()
.put("base_path", basePath)
.put("bucket", bucketSettings.get("bucket"))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
}
@Test
public void testRepositoryWithCustomCredentials() {
Client client = client();
Settings bucketSettings = cluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket.");
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(ImmutableSettings.settingsBuilder()
.put("base_path", basePath)
.put("access_key", bucketSettings.get("access_key"))
.put("secret_key", bucketSettings.get("secret_key"))
.put("bucket", bucketSettings.get("bucket"))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
}
/**
* This test verifies that the test configuration is set up in a manner that
* does not make the test {@link #testRepositoryInRemoteRegion()} pointless.
*/
@Test(expected = UncategorizedExecutionException.class)
public void assertRepositoryInRemoteRegionIsRemote() {
Client client = client();
Settings bucketSettings = cluster().getInstance(Settings.class).getByPrefix("repositories.s3.remote-bucket.");
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(ImmutableSettings.settingsBuilder()
.put("base_path", basePath)
.put("bucket", bucketSettings.get("bucket"))
// Below setting intentionally omitted to assert bucket is not available in default region.
// .put("region", privateBucketSettings.get("region"))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
}
@Test
public void testRepositoryInRemoteRegion() {
Client client = client();
Settings settings = cluster().getInstance(Settings.class);
Settings bucketSettings = settings.getByPrefix("repositories.s3.remote-bucket.");
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(ImmutableSettings.settingsBuilder()
.put("base_path", basePath)
.put("bucket", bucketSettings.get("bucket"))
.put("region", bucketSettings.get("region"))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
}
private void assertRepositoryIsOperational(Client client, String repository) {
createIndex("test-idx-1");
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repository, "test-snap").setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots(repository).setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get();
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(50L));
logger.info("--> close indices");
client.admin().indices().prepareClose("test-idx-1").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot(repository, "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
}
/**
* Deletes repositories, supports wildcard notation.
*/
@ -172,45 +285,55 @@ public class S3SnapshotRestoreTest extends AbstractAwsTest {
* Deletes content of the repository files in the bucket
*/
public void cleanRepositoryFiles(String basePath) {
String bucket = cluster().getInstance(Settings.class).get("repositories.s3.bucket");
AmazonS3 client = cluster().getInstance(AwsS3Service.class).client();
try {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
//we can do at most 1K objects per delete
//We don't know the bucket name until first object listing
DeleteObjectsRequest multiObjectDeleteRequest = null;
ArrayList<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<DeleteObjectsRequest.KeyVersion>();
while (true) {
ObjectListing list;
if (prevListing != null) {
list = client.listNextBatchOfObjects(prevListing);
} else {
list = client.listObjects(bucket, basePath);
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
//Every 500 objects batch the delete request
if (keys.size() > 500) {
multiObjectDeleteRequest.setKeys(keys);
client.deleteObjects(multiObjectDeleteRequest);
Settings settings = cluster().getInstance(Settings.class);
Settings[] buckets = {
settings.getByPrefix("repositories.s3."),
settings.getByPrefix("repositories.s3.private-bucket."),
settings.getByPrefix("repositories.s3.remote-bucket.")
};
for (Settings bucket : buckets) {
AmazonS3 client = cluster().getInstance(AwsS3Service.class).client(
bucket.get("region", settings.get("repositories.s3.region")),
bucket.get("access_key", settings.get("cloud.aws.access_key")),
bucket.get("secret_key", settings.get("cloud.aws.secret_key")));
try {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
//we can do at most 1K objects per delete
//We don't know the bucket name until first object listing
DeleteObjectsRequest multiObjectDeleteRequest = null;
ArrayList<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<DeleteObjectsRequest.KeyVersion>();
while (true) {
ObjectListing list;
if (prevListing != null) {
list = client.listNextBatchOfObjects(prevListing);
} else {
list = client.listObjects(bucket.get("bucket"), basePath);
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
keys.clear();
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
//Every 500 objects batch the delete request
if (keys.size() > 500) {
multiObjectDeleteRequest.setKeys(keys);
client.deleteObjects(multiObjectDeleteRequest);
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
keys.clear();
}
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
}
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
if (!keys.isEmpty()) {
multiObjectDeleteRequest.setKeys(keys);
client.deleteObjects(multiObjectDeleteRequest);
}
} catch (Throwable ex) {
logger.warn("Failed to delete S3 repository", ex);
}
if (!keys.isEmpty()) {
multiObjectDeleteRequest.setKeys(keys);
client.deleteObjects(multiObjectDeleteRequest);
}
} catch (Throwable ex) {
logger.warn("Failed to delete S3 repository", ex);
}
}
}

View File

@ -1,10 +1,21 @@
# Replace this access_key / secret_key and bucket name with your own if you want
# to run tests.
#cloud:
# cloud:
# aws:
# access_key: AKVAIQBF2RECL7FJWGJQ
# secret_key: vExyMThREXeRMm/b/LRzEB8jWwvzQeXgjqMX+6br
# access_key: <default access key>
# secret_key: <default secret key>
#
#discovery:
# type: ec2
#
#repositories:
# s3:
# bucket: <default bucket>
# region: <default region>
# private-bucket:
# bucket: <bucket not accessible by default key>
# access_key: <access key>
# secret_key: <access key>
# remote-bucket:
# bucket: <bucket in other region>
# region: <region>