Merge branch 's3-storage-class-support' of https://github.com/socialrank/elasticsearch into socialrank-s3-storage-class-support

# Please enter a commit message to explain why this merge is necessary,
# especially if it merges an updated upstream into a topic branch.
#
# Lines starting with '#' will be ignored, and an empty message aborts
# the commit.
This commit is contained in:
David Pilato 2015-11-19 13:10:08 +01:00
parent 6a2fa73fb5
commit d48d8ef863
6 changed files with 80 additions and 13 deletions

View File

@ -218,6 +218,14 @@ The following settings are supported:
You could specify a canned ACL using the `canned_acl` setting. When the S3 repository You could specify a canned ACL using the `canned_acl` setting. When the S3 repository
creates buckets and objects, it adds the canned ACL into the buckets and objects. creates buckets and objects, it adds the canned ACL into the buckets and objects.
`storage_class`::
Sets the S3 storage class type for the backup files. Values may be
`standard`, `reduced_redundancy`, `standard_ia`. Defaults to `standard`.
Due to the extra complexity with the Glacier class lifecycle, it is not
currently supported by the plugin. For more information about the
different classes, see http://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html[AWS Storage Classes Guide]
The S3 repositories use the same credentials as the rest of the AWS services The S3 repositories use the same credentials as the rest of the AWS services
provided by this plugin (`discovery`). See <<repository-s3-usage>> for details. provided by this plugin (`discovery`). See <<repository-s3-usage>> for details.

View File

@ -134,7 +134,9 @@ public class DefaultS3OutputStream extends S3OutputStream {
throw new RuntimeException(impossible); throw new RuntimeException(impossible);
} }
PutObjectRequest putRequest = new PutObjectRequest(bucketName, blobName, inputStream, md).withCannedAcl(blobStore.getCannedACL()); PutObjectRequest putRequest = new PutObjectRequest(bucketName, blobName, inputStream, md)
.withStorageClass(blobStore.getStorageClass())
.withCannedAcl(blobStore.getCannedACL());
PutObjectResult putObjectResult = blobStore.client().putObject(putRequest); PutObjectResult putObjectResult = blobStore.client().putObject(putRequest);
String localMd5 = Base64.encodeAsString(messageDigest.digest()); String localMd5 = Base64.encodeAsString(messageDigest.digest());
@ -167,7 +169,10 @@ public class DefaultS3OutputStream extends S3OutputStream {
} }
protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) { protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) {
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, blobName).withCannedACL(blobStore.getCannedACL()); InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, blobName)
.withCannedACL(blobStore.getCannedACL())
.withStorageClass(blobStore.getStorageClass());
if (serverSideEncryption) { if (serverSideEncryption) {
ObjectMetadata md = new ObjectMetadata(); ObjectMetadata md = new ObjectMetadata();
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);

View File

@ -21,14 +21,8 @@ package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.*;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CreateBucketRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
@ -40,6 +34,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Locale;
/** /**
* *
@ -62,8 +57,10 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
private final CannedAccessControlList cannedACL; private final CannedAccessControlList cannedACL;
private final StorageClass storageClass;
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption, public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption,
ByteSizeValue bufferSize, int maxRetries, String cannedACL) { ByteSizeValue bufferSize, int maxRetries, String cannedACL, String storageClass) {
super(settings); super(settings);
this.client = client; this.client = client;
this.bucket = bucket; this.bucket = bucket;
@ -77,6 +74,7 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
this.cannedACL = initCannedACL(cannedACL); this.cannedACL = initCannedACL(cannedACL);
this.numberOfRetries = maxRetries; this.numberOfRetries = maxRetries;
this.storageClass = initStorageClass(storageClass);
// Note: the method client.doesBucketExist() may return 'true' is the bucket exists // Note: the method client.doesBucketExist() may return 'true' is the bucket exists
// but we don't have access to it (ie, 403 Forbidden response code) // but we don't have access to it (ie, 403 Forbidden response code)
@ -196,6 +194,25 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
return cannedACL; return cannedACL;
} }
public StorageClass getStorageClass() { return storageClass; }
public static StorageClass initStorageClass(String storageClass) {
if (storageClass == null || storageClass.equals("")) {
return StorageClass.Standard;
}
try {
StorageClass _storageClass = StorageClass.fromValue(storageClass.toUpperCase(Locale.ENGLISH));
if(_storageClass.equals(StorageClass.Glacier)) {
throw new BlobStoreException("Glacier storage class is not supported");
}
return _storageClass;
} catch (IllegalArgumentException illegalArgumentException) {
throw new BlobStoreException("`" + storageClass + "` is not a valid S3 Storage Class.");
}
}
/** /**
* Constructs canned acl from string * Constructs canned acl from string
*/ */

View File

@ -118,13 +118,15 @@ public class S3Repository extends BlobStoreRepository {
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize("repositories.s3.chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB))); this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize("repositories.s3.chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean("repositories.s3.compress", false)); this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean("repositories.s3.compress", false));
// Parse and validate the user's S3 Storage Class setting
String storageClass = repositorySettings.settings().get("storage_class", settings.get("repositories.s3.storage_class", null));
String cannedACL = repositorySettings.settings().get("canned_acl", settings.get("repositories.s3.canned_acl", null)); String cannedACL = repositorySettings.settings().get("canned_acl", settings.get("repositories.s3.canned_acl", null));
logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}], cannedACL [{}]", logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}], cannedACL [{}], storageClass [{}]",
bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries, cannedACL); bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass);
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries),
bucket, region, serverSideEncryption, bufferSize, maxRetries, cannedACL); bucket, region, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass);
String basePath = repositorySettings.settings().get("base_path", settings.get("repositories.s3.base_path")); String basePath = repositorySettings.settings().get("base_path", settings.get("repositories.s3.base_path"));
if (Strings.hasLength(basePath)) { if (Strings.hasLength(basePath)) {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cloud.aws.blobstore; package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.StorageClass;
import org.elasticsearch.common.blobstore.BlobStoreException; import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -58,4 +59,37 @@ public class S3BlobStoreTests extends ESTestCase {
assertThat(ex.getMessage(), equalTo("cannedACL is not valid: [test_invalid]")); assertThat(ex.getMessage(), equalTo("cannedACL is not valid: [test_invalid]"));
} }
} }
public void testInitStorageClass() throws IOException {
// it should default to `standard`
assertThat(S3BlobStore.initStorageClass(null), equalTo(StorageClass.Standard));
assertThat(S3BlobStore.initStorageClass(""), equalTo(StorageClass.Standard));
// it should accept [standard, standard_ia, reduced_redundancy]
assertThat(S3BlobStore.initStorageClass("standard"), equalTo(StorageClass.Standard));
assertThat(S3BlobStore.initStorageClass("standard_ia"), equalTo(StorageClass.StandardInfrequentAccess));
assertThat(S3BlobStore.initStorageClass("reduced_redundancy"), equalTo(StorageClass.ReducedRedundancy));
}
public void testCaseInsensitiveStorageClass() throws IOException {
assertThat(S3BlobStore.initStorageClass("sTandaRd"), equalTo(StorageClass.Standard));
assertThat(S3BlobStore.initStorageClass("sTandaRd_Ia"), equalTo(StorageClass.StandardInfrequentAccess));
assertThat(S3BlobStore.initStorageClass("reduCED_redundancy"), equalTo(StorageClass.ReducedRedundancy));
}
public void testInvalidStorageClass() throws IOException {
try {
S3BlobStore.initStorageClass("whatever");
} catch(BlobStoreException ex) {
assertThat(ex.getMessage(), equalTo("`whatever` is not a valid S3 Storage Class."));
}
}
public void testRejectGlacierStorageClass() throws IOException {
try {
S3BlobStore.initStorageClass("glacier");
} catch(BlobStoreException ex) {
assertThat(ex.getMessage(), equalTo("Glacier storage class is not supported"));
}
}
} }

View File

@ -12,6 +12,7 @@
access_key: "AKVAIQBF2RECL7FJWGJQ" access_key: "AKVAIQBF2RECL7FJWGJQ"
secret_key: "vExyMThREXeRMm/b/LRzEB8jWwvzQeXgjqMX+6br" secret_key: "vExyMThREXeRMm/b/LRzEB8jWwvzQeXgjqMX+6br"
canned_acl: "public-read" canned_acl: "public-read"
storage_class: "standard"
# Get repositry # Get repositry
- do: - do: