mirror of https://github.com/apache/druid.git
* #7875: Setting ACL on S3 task logs on similar lines as that of data segment pushed to S3 * #7875 1. Extracting a method (which uploads a file to S3 setting appropriate access control list to the file being uploaded) and moving it to utils class. 2. Adding S3TaskLogsTest.java file to test acl (permissions) on the task log files pushed to S3. * fixing checkstyle errors * #7875 Incorporating review comments
This commit is contained in:
parent
0d5fbfa0eb
commit
6cc8802b8e
|
@ -20,7 +20,6 @@
|
||||||
package org.apache.druid.storage.s3;
|
package org.apache.druid.storage.s3;
|
||||||
|
|
||||||
import com.amazonaws.AmazonServiceException;
|
import com.amazonaws.AmazonServiceException;
|
||||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
@ -96,7 +95,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
|
||||||
try {
|
try {
|
||||||
return S3Utils.retryS3Operation(
|
return S3Utils.retryS3Operation(
|
||||||
() -> {
|
() -> {
|
||||||
uploadFileIfPossible(config.getBucket(), s3Path, zipOutFile);
|
S3Utils.uploadFileIfPossible(s3Client, config.getDisableAcl(), config.getBucket(), s3Path, zipOutFile);
|
||||||
|
|
||||||
return outSegment;
|
return outSegment;
|
||||||
}
|
}
|
||||||
|
@ -139,14 +138,4 @@ public class S3DataSegmentPusher implements DataSegmentPusher
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void uploadFileIfPossible(String bucket, String key, File file)
|
|
||||||
{
|
|
||||||
final PutObjectRequest indexFilePutRequest = new PutObjectRequest(bucket, key, file);
|
|
||||||
|
|
||||||
if (!config.getDisableAcl()) {
|
|
||||||
indexFilePutRequest.setAccessControlList(S3Utils.grantFullControlToBucketOwner(s3Client, bucket));
|
|
||||||
}
|
|
||||||
log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key);
|
|
||||||
s3Client.putObject(indexFilePutRequest);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,7 +135,7 @@ public class S3TaskLogs implements TaskLogs
|
||||||
try {
|
try {
|
||||||
S3Utils.retryS3Operation(
|
S3Utils.retryS3Operation(
|
||||||
() -> {
|
() -> {
|
||||||
service.putObject(config.getS3Bucket(), taskKey, logFile);
|
S3Utils.uploadFileIfPossible(service, config.getDisableAcl(), config.getS3Bucket(), taskKey, logFile);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
@ -146,7 +146,7 @@ public class S3TaskLogs implements TaskLogs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getTaskLogKey(String taskid, String filename)
|
String getTaskLogKey(String taskid, String filename)
|
||||||
{
|
{
|
||||||
return StringUtils.format("%s/%s/%s", config.getS3Prefix(), taskid, filename);
|
return StringUtils.format("%s/%s/%s", config.getS3Prefix(), taskid, filename);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.apache.druid.storage.s3;
|
package org.apache.druid.storage.s3;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import javax.validation.constraints.NotNull;
|
import javax.validation.constraints.NotNull;
|
||||||
|
|
||||||
|
@ -35,13 +36,34 @@ public class S3TaskLogsConfig
|
||||||
@NotNull
|
@NotNull
|
||||||
private String s3Prefix = null;
|
private String s3Prefix = null;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private boolean disableAcl = false;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setDisableAcl(boolean disableAcl)
|
||||||
|
{
|
||||||
|
this.disableAcl = disableAcl;
|
||||||
|
}
|
||||||
|
|
||||||
public String getS3Bucket()
|
public String getS3Bucket()
|
||||||
{
|
{
|
||||||
return s3Bucket;
|
return s3Bucket;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setS3Bucket(String s3Bucket)
|
||||||
|
{
|
||||||
|
this.s3Bucket = s3Bucket;
|
||||||
|
}
|
||||||
|
|
||||||
public String getS3Prefix()
|
public String getS3Prefix()
|
||||||
{
|
{
|
||||||
return s3Prefix;
|
return s3Prefix;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean getDisableAcl()
|
||||||
|
{
|
||||||
|
return disableAcl;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,13 +28,16 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
import com.amazonaws.services.s3.model.Permission;
|
import com.amazonaws.services.s3.model.Permission;
|
||||||
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.RetryUtils;
|
import org.apache.druid.java.util.common.RetryUtils;
|
||||||
import org.apache.druid.java.util.common.RetryUtils.Task;
|
import org.apache.druid.java.util.common.RetryUtils.Task;
|
||||||
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -47,6 +50,7 @@ public class S3Utils
|
||||||
{
|
{
|
||||||
private static final Joiner JOINER = Joiner.on("/").skipNulls();
|
private static final Joiner JOINER = Joiner.on("/").skipNulls();
|
||||||
private static final String MIMETYPE_JETS3T_DIRECTORY = "application/x-directory";
|
private static final String MIMETYPE_JETS3T_DIRECTORY = "application/x-directory";
|
||||||
|
private static final Logger log = new Logger(S3Utils.class);
|
||||||
|
|
||||||
static boolean isServiceExceptionRecoverable(AmazonServiceException ex)
|
static boolean isServiceExceptionRecoverable(AmazonServiceException ex)
|
||||||
{
|
{
|
||||||
|
@ -242,4 +246,23 @@ public class S3Utils
|
||||||
|
|
||||||
return objectSummary;
|
return objectSummary;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Uploads a file to S3 if possible. First trying to set ACL to give the bucket owner full control of the file before uploading.
|
||||||
|
*
|
||||||
|
* @param service S3 client
|
||||||
|
* @param disableAcl true if ACL shouldn't be set for the file
|
||||||
|
* @param key The key under which to store the new object.
|
||||||
|
* @param file The path of the file to upload to Amazon S3.
|
||||||
|
*/
|
||||||
|
public static void uploadFileIfPossible(ServerSideEncryptingAmazonS3 service, boolean disableAcl, String bucket, String key, File file)
|
||||||
|
{
|
||||||
|
final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, file);
|
||||||
|
|
||||||
|
if (!disableAcl) {
|
||||||
|
putObjectRequest.setAccessControlList(S3Utils.grantFullControlToBucketOwner(service, bucket));
|
||||||
|
}
|
||||||
|
log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key);
|
||||||
|
service.putObject(putObjectRequest);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,106 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.storage.s3;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.model.AccessControlList;
|
||||||
|
import com.amazonaws.services.s3.model.Grant;
|
||||||
|
import com.amazonaws.services.s3.model.Owner;
|
||||||
|
import com.amazonaws.services.s3.model.Permission;
|
||||||
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||||
|
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class S3TaskLogsTest
|
||||||
|
{
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTaskLogsPushWithAclDisabled() throws Exception
|
||||||
|
{
|
||||||
|
String ownerId = "test_owner";
|
||||||
|
String ownerDisplayName = "test_owner";
|
||||||
|
|
||||||
|
List<Grant> grantList = testPushInternal(true, ownerId, ownerDisplayName);
|
||||||
|
|
||||||
|
Assert.assertTrue("Grant list should not be null", grantList != null);
|
||||||
|
Assert.assertEquals("Grant list should be empty as ACL is disabled", 0, grantList.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTaskLogsPushWithAclEnabled() throws Exception
|
||||||
|
{
|
||||||
|
String ownerId = "test_owner";
|
||||||
|
String ownerDisplayName = "test_owner";
|
||||||
|
|
||||||
|
List<Grant> grantList = testPushInternal(false, ownerId, ownerDisplayName);
|
||||||
|
|
||||||
|
Assert.assertTrue("Grant list should not be null", grantList != null);
|
||||||
|
Assert.assertEquals("Grant list size should be equal to 1", 1, grantList.size());
|
||||||
|
Grant grant = grantList.get(0);
|
||||||
|
Assert.assertEquals("The Grantee identifier should be test_owner", "test_owner", grant.getGrantee().getIdentifier());
|
||||||
|
Assert.assertEquals("The Grant should have full control permission", Permission.FullControl, grant.getPermission());
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Grant> testPushInternal(boolean disableAcl, String ownerId, String ownerDisplayName) throws Exception
|
||||||
|
{
|
||||||
|
ServerSideEncryptingAmazonS3 s3Client = EasyMock.createMock(ServerSideEncryptingAmazonS3.class);
|
||||||
|
|
||||||
|
EasyMock.expect(s3Client.putObject(EasyMock.anyObject()))
|
||||||
|
.andReturn(new PutObjectResult())
|
||||||
|
.once();
|
||||||
|
|
||||||
|
AccessControlList aclExpected = new AccessControlList();
|
||||||
|
aclExpected.setOwner(new Owner(ownerId, ownerDisplayName));
|
||||||
|
|
||||||
|
EasyMock.expect(s3Client.getBucketAcl("test_bucket"))
|
||||||
|
.andReturn(aclExpected)
|
||||||
|
.once();
|
||||||
|
|
||||||
|
EasyMock.expect(s3Client.putObject(EasyMock.anyObject(PutObjectRequest.class)))
|
||||||
|
.andReturn(new PutObjectResult())
|
||||||
|
.once();
|
||||||
|
|
||||||
|
EasyMock.replay(s3Client);
|
||||||
|
|
||||||
|
S3TaskLogsConfig config = new S3TaskLogsConfig();
|
||||||
|
config.setDisableAcl(disableAcl);
|
||||||
|
config.setS3Bucket("test_bucket");
|
||||||
|
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config);
|
||||||
|
|
||||||
|
String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z";
|
||||||
|
File logFile = tempFolder.newFile("test_log_file");
|
||||||
|
|
||||||
|
s3TaskLogs.pushTaskLog(taskId, logFile);
|
||||||
|
|
||||||
|
List<Grant> grantsAsList = aclExpected.getGrantsAsList();
|
||||||
|
|
||||||
|
return grantsAsList;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue