Merge pull request #14297 from xuzha/aws-acl
Add aws canned acl, closes #14103
This commit is contained in:
commit
dd2a6310a8
|
@ -210,6 +210,14 @@ The following settings are supported:
|
||||||
|
|
||||||
Makes repository read-only. coming[2.1.0] Defaults to `false`.
|
Makes repository read-only. coming[2.1.0] Defaults to `false`.
|
||||||
|
|
||||||
|
`canned_acl`::
|
||||||
|
|
||||||
|
The S3 repository supports all http://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl[S3 canned ACLs]
|
||||||
|
: `private`, `public-read`, `public-read-write`, `authenticated-read`, `log-delivery-write`,
|
||||||
|
`bucket-owner-read`, `bucket-owner-full-control`. Defaults to `private`.
|
||||||
|
You could specify a canned ACL using the `canned_acl` setting. When the S3 repository
|
||||||
|
creates buckets and objects, it adds the canned ACL into the buckets and objects.
|
||||||
|
|
||||||
The S3 repositories use the same credentials as the rest of the AWS services
|
The S3 repositories use the same credentials as the rest of the AWS services
|
||||||
provided by this plugin (`discovery`). See <<repository-s3-usage>> for details.
|
provided by this plugin (`discovery`). See <<repository-s3-usage>> for details.
|
||||||
|
|
||||||
|
|
|
@ -133,7 +133,9 @@ public class DefaultS3OutputStream extends S3OutputStream {
|
||||||
// Every implementation of the Java platform is required to support MD5 (see MessageDigest)
|
// Every implementation of the Java platform is required to support MD5 (see MessageDigest)
|
||||||
throw new RuntimeException(impossible);
|
throw new RuntimeException(impossible);
|
||||||
}
|
}
|
||||||
PutObjectResult putObjectResult = blobStore.client().putObject(bucketName, blobName, inputStream, md);
|
|
||||||
|
PutObjectRequest putRequest = new PutObjectRequest(bucketName, blobName, inputStream, md).withCannedAcl(blobStore.getCannedACL());
|
||||||
|
PutObjectResult putObjectResult = blobStore.client().putObject(putRequest);
|
||||||
|
|
||||||
String localMd5 = Base64.encodeAsString(messageDigest.digest());
|
String localMd5 = Base64.encodeAsString(messageDigest.digest());
|
||||||
String remoteMd5 = putObjectResult.getContentMd5();
|
String remoteMd5 = putObjectResult.getContentMd5();
|
||||||
|
@ -165,12 +167,13 @@ public class DefaultS3OutputStream extends S3OutputStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) {
|
protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) {
|
||||||
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, blobName);
|
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, blobName).withCannedACL(blobStore.getCannedACL());
|
||||||
if (serverSideEncryption) {
|
if (serverSideEncryption) {
|
||||||
ObjectMetadata md = new ObjectMetadata();
|
ObjectMetadata md = new ObjectMetadata();
|
||||||
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
|
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
|
||||||
request.setObjectMetadata(md);
|
request.setObjectMetadata(md);
|
||||||
}
|
}
|
||||||
|
|
||||||
return blobStore.client().initiateMultipartUpload(request).getUploadId();
|
return blobStore.client().initiateMultipartUpload(request).getUploadId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,10 +22,13 @@ package org.elasticsearch.cloud.aws.blobstore;
|
||||||
import com.amazonaws.AmazonClientException;
|
import com.amazonaws.AmazonClientException;
|
||||||
import com.amazonaws.services.s3.AmazonS3;
|
import com.amazonaws.services.s3.AmazonS3;
|
||||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||||
|
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||||
|
import com.amazonaws.services.s3.model.CreateBucketRequest;
|
||||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
|
||||||
import com.amazonaws.services.s3.model.ObjectListing;
|
import com.amazonaws.services.s3.model.ObjectListing;
|
||||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||||
|
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
|
@ -57,8 +60,10 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
|
||||||
|
|
||||||
private final int numberOfRetries;
|
private final int numberOfRetries;
|
||||||
|
|
||||||
|
private final CannedAccessControlList cannedACL;
|
||||||
|
|
||||||
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption,
|
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption,
|
||||||
ByteSizeValue bufferSize, int maxRetries) {
|
ByteSizeValue bufferSize, int maxRetries, String cannedACL) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.bucket = bucket;
|
this.bucket = bucket;
|
||||||
|
@ -70,6 +75,7 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
|
||||||
throw new BlobStoreException("Detected a buffer_size for the S3 storage lower than [" + MIN_BUFFER_SIZE + "]");
|
throw new BlobStoreException("Detected a buffer_size for the S3 storage lower than [" + MIN_BUFFER_SIZE + "]");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.cannedACL = initCannedACL(cannedACL);
|
||||||
this.numberOfRetries = maxRetries;
|
this.numberOfRetries = maxRetries;
|
||||||
|
|
||||||
// Note: the method client.doesBucketExist() may return 'true' is the bucket exists
|
// Note: the method client.doesBucketExist() may return 'true' is the bucket exists
|
||||||
|
@ -81,11 +87,14 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
|
||||||
while (retry <= maxRetries) {
|
while (retry <= maxRetries) {
|
||||||
try {
|
try {
|
||||||
if (!client.doesBucketExist(bucket)) {
|
if (!client.doesBucketExist(bucket)) {
|
||||||
|
CreateBucketRequest request = null;
|
||||||
if (region != null) {
|
if (region != null) {
|
||||||
client.createBucket(bucket, region);
|
request = new CreateBucketRequest(bucket, region);
|
||||||
} else {
|
} else {
|
||||||
client.createBucket(bucket);
|
request = new CreateBucketRequest(bucket);
|
||||||
}
|
}
|
||||||
|
request.setCannedAcl(this.cannedACL);
|
||||||
|
client.createBucket(request);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
} catch (AmazonClientException e) {
|
} catch (AmazonClientException e) {
|
||||||
|
@ -182,4 +191,25 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CannedAccessControlList getCannedACL() {
|
||||||
|
return cannedACL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs canned acl from string
|
||||||
|
*/
|
||||||
|
public static CannedAccessControlList initCannedACL(String cannedACL) {
|
||||||
|
if (cannedACL == null || cannedACL.equals("")) {
|
||||||
|
return CannedAccessControlList.Private;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (CannedAccessControlList cur : CannedAccessControlList.values()) {
|
||||||
|
if (cur.toString().equalsIgnoreCase(cannedACL)) {
|
||||||
|
return cur;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new BlobStoreException("cannedACL is not valid: [" + cannedACL + "]");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,10 +118,14 @@ public class S3Repository extends BlobStoreRepository {
|
||||||
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize("repositories.s3.chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
|
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize("repositories.s3.chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
|
||||||
this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean("repositories.s3.compress", false));
|
this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean("repositories.s3.compress", false));
|
||||||
|
|
||||||
logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}]",
|
String cannedACL = repositorySettings.settings().get("canned_acl", settings.get("repositories.s3.canned_acl", null));
|
||||||
bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries);
|
|
||||||
|
logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}], cannedACL [{}]",
|
||||||
|
bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries, cannedACL);
|
||||||
|
|
||||||
|
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries),
|
||||||
|
bucket, region, serverSideEncryption, bufferSize, maxRetries, cannedACL);
|
||||||
|
|
||||||
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries);
|
|
||||||
String basePath = repositorySettings.settings().get("base_path", settings.get("repositories.s3.base_path"));
|
String basePath = repositorySettings.settings().get("base_path", settings.get("repositories.s3.base_path"));
|
||||||
if (Strings.hasLength(basePath)) {
|
if (Strings.hasLength(basePath)) {
|
||||||
BlobPath path = new BlobPath();
|
BlobPath path = new BlobPath();
|
||||||
|
|
|
@ -0,0 +1,61 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.cloud.aws.blobstore;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobStoreException;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
|
public class S3BlobStoreTests extends ESTestCase {
|
||||||
|
public void testInitCannedACL() throws IOException {
|
||||||
|
String[] aclList = new String[]{
|
||||||
|
"private", "public-read", "public-read-write", "authenticated-read",
|
||||||
|
"log-delivery-write", "bucket-owner-read", "bucket-owner-full-control"};
|
||||||
|
|
||||||
|
//empty acl
|
||||||
|
assertThat(S3BlobStore.initCannedACL(null), equalTo(CannedAccessControlList.Private));
|
||||||
|
assertThat(S3BlobStore.initCannedACL(""), equalTo(CannedAccessControlList.Private));
|
||||||
|
|
||||||
|
// it should init cannedACL correctly
|
||||||
|
for (String aclString : aclList) {
|
||||||
|
CannedAccessControlList acl = S3BlobStore.initCannedACL(aclString);
|
||||||
|
assertThat(acl.toString(), equalTo(aclString));
|
||||||
|
}
|
||||||
|
|
||||||
|
// it should accept all aws cannedACLs
|
||||||
|
for (CannedAccessControlList awsList : CannedAccessControlList.values()) {
|
||||||
|
CannedAccessControlList acl = S3BlobStore.initCannedACL(awsList.toString());
|
||||||
|
assertThat(acl, equalTo(awsList));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testInvalidCannedACL() throws IOException {
|
||||||
|
try {
|
||||||
|
S3BlobStore.initCannedACL("test_invalid");
|
||||||
|
fail("CannedACL should fail");
|
||||||
|
} catch (BlobStoreException ex) {
|
||||||
|
assertThat(ex.getMessage(), equalTo("cannedACL is not valid: [test_invalid]"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -11,6 +11,7 @@
|
||||||
bucket: "my_bucket_name"
|
bucket: "my_bucket_name"
|
||||||
access_key: "AKVAIQBF2RECL7FJWGJQ"
|
access_key: "AKVAIQBF2RECL7FJWGJQ"
|
||||||
secret_key: "vExyMThREXeRMm/b/LRzEB8jWwvzQeXgjqMX+6br"
|
secret_key: "vExyMThREXeRMm/b/LRzEB8jWwvzQeXgjqMX+6br"
|
||||||
|
canned_acl: "public-read"
|
||||||
|
|
||||||
# Get repositry
|
# Get repositry
|
||||||
- do:
|
- do:
|
||||||
|
@ -21,3 +22,4 @@
|
||||||
- is_true: test_repo_s3_1.settings.bucket
|
- is_true: test_repo_s3_1.settings.bucket
|
||||||
- is_false: test_repo_s3_1.settings.access_key
|
- is_false: test_repo_s3_1.settings.access_key
|
||||||
- is_false: test_repo_s3_1.settings.secret_key
|
- is_false: test_repo_s3_1.settings.secret_key
|
||||||
|
- match: {test_repo_s3_1.settings.canned_acl : "public-read"}
|
||||||
|
|
Loading…
Reference in New Issue