diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java index 8ec1b2a7232..e393f42f04a 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java @@ -20,7 +20,6 @@ package org.apache.druid.storage.s3; import com.amazonaws.AmazonServiceException; -import com.amazonaws.services.s3.model.PutObjectRequest; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; @@ -96,7 +95,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher try { return S3Utils.retryS3Operation( () -> { - uploadFileIfPossible(config.getBucket(), s3Path, zipOutFile); + S3Utils.uploadFileIfPossible(s3Client, config.getDisableAcl(), config.getBucket(), s3Path, zipOutFile); return outSegment; } @@ -139,14 +138,4 @@ public class S3DataSegmentPusher implements DataSegmentPusher ); } - private void uploadFileIfPossible(String bucket, String key, File file) - { - final PutObjectRequest indexFilePutRequest = new PutObjectRequest(bucket, key, file); - - if (!config.getDisableAcl()) { - indexFilePutRequest.setAccessControlList(S3Utils.grantFullControlToBucketOwner(s3Client, bucket)); - } - log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key); - s3Client.putObject(indexFilePutRequest); - } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java index b437c621e94..9cee08b177e 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java @@ -135,7 +135,7 @@ public class S3TaskLogs implements TaskLogs try { S3Utils.retryS3Operation( () -> { - service.putObject(config.getS3Bucket(), taskKey, logFile); + S3Utils.uploadFileIfPossible(service, config.getDisableAcl(), config.getS3Bucket(), taskKey, logFile); return null; } ); @@ -146,7 +146,7 @@ public class S3TaskLogs implements TaskLogs } } - private String getTaskLogKey(String taskid, String filename) + String getTaskLogKey(String taskid, String filename) { return StringUtils.format("%s/%s/%s", config.getS3Prefix(), taskid, filename); } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogsConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogsConfig.java index 30f8e87e0b4..e61dbf79592 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogsConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogsConfig.java @@ -20,6 +20,7 @@ package org.apache.druid.storage.s3; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import javax.validation.constraints.NotNull; @@ -35,13 +36,34 @@ public class S3TaskLogsConfig @NotNull private String s3Prefix = null; + @JsonProperty + private boolean disableAcl = false; + + @VisibleForTesting + void setDisableAcl(boolean disableAcl) + { + this.disableAcl = disableAcl; + } + public String getS3Bucket() { return s3Bucket; } + @VisibleForTesting + void setS3Bucket(String s3Bucket) + { + this.s3Bucket = s3Bucket; + } + public String getS3Prefix() { return s3Prefix; } + + public boolean getDisableAcl() + { + return disableAcl; + } + } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index 58ecdced699..71ceba67446 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -28,13 +28,16 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.Permission; +import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Joiner; import com.google.common.base.Predicate; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.RetryUtils.Task; +import org.apache.druid.java.util.common.logger.Logger; +import java.io.File; import java.io.IOException; import java.net.URI; import java.util.Iterator; @@ -47,6 +50,7 @@ public class S3Utils { private static final Joiner JOINER = Joiner.on("/").skipNulls(); private static final String MIMETYPE_JETS3T_DIRECTORY = "application/x-directory"; + private static final Logger log = new Logger(S3Utils.class); static boolean isServiceExceptionRecoverable(AmazonServiceException ex) { @@ -242,4 +246,23 @@ public class S3Utils return objectSummary; } + + /** + * Uploads a file to S3 if possible. First trying to set ACL to give the bucket owner full control of the file before uploading. + * + * @param service S3 client + * @param disableAcl true if ACL shouldn't be set for the file + * @param key The key under which to store the new object. + * @param file The path of the file to upload to Amazon S3. + */ + public static void uploadFileIfPossible(ServerSideEncryptingAmazonS3 service, boolean disableAcl, String bucket, String key, File file) + { + final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, file); + + if (!disableAcl) { + putObjectRequest.setAccessControlList(S3Utils.grantFullControlToBucketOwner(service, bucket)); + } + log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key); + service.putObject(putObjectRequest); + } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java new file mode 100644 index 00000000000..a342b3b0cb9 --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.s3; + +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.Grant; +import com.amazonaws.services.s3.model.Owner; +import com.amazonaws.services.s3.model.Permission; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.List; + +public class S3TaskLogsTest +{ + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testTaskLogsPushWithAclDisabled() throws Exception + { + String ownerId = "test_owner"; + String ownerDisplayName = "test_owner"; + + List grantList = testPushInternal(true, ownerId, ownerDisplayName); + + Assert.assertTrue("Grant list should not be null", grantList != null); + Assert.assertEquals("Grant list should be empty as ACL is disabled", 0, grantList.size()); + } + + @Test + public void testTaskLogsPushWithAclEnabled() throws Exception + { + String ownerId = "test_owner"; + String ownerDisplayName = "test_owner"; + + List grantList = testPushInternal(false, ownerId, ownerDisplayName); + + Assert.assertTrue("Grant list should not be null", grantList != null); + Assert.assertEquals("Grant list size should be equal to 1", 1, grantList.size()); + Grant grant = grantList.get(0); + Assert.assertEquals("The Grantee identifier should be test_owner", "test_owner", grant.getGrantee().getIdentifier()); + Assert.assertEquals("The Grant should have full control permission", Permission.FullControl, grant.getPermission()); + } + + private List testPushInternal(boolean disableAcl, String ownerId, String ownerDisplayName) throws Exception + { + ServerSideEncryptingAmazonS3 s3Client = EasyMock.createMock(ServerSideEncryptingAmazonS3.class); + + EasyMock.expect(s3Client.putObject(EasyMock.anyObject())) + .andReturn(new PutObjectResult()) + .once(); + + AccessControlList aclExpected = new AccessControlList(); + aclExpected.setOwner(new Owner(ownerId, ownerDisplayName)); + + EasyMock.expect(s3Client.getBucketAcl("test_bucket")) + .andReturn(aclExpected) + .once(); + + EasyMock.expect(s3Client.putObject(EasyMock.anyObject(PutObjectRequest.class))) + .andReturn(new PutObjectResult()) + .once(); + + EasyMock.replay(s3Client); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setDisableAcl(disableAcl); + config.setS3Bucket("test_bucket"); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config); + + String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z"; + File logFile = tempFolder.newFile("test_log_file"); + + s3TaskLogs.pushTaskLog(taskId, logFile); + + List grantsAsList = aclExpected.getGrantsAsList(); + + return grantsAsList; + } + +}