S3: Attach SSE key to doesObjectExist calls. (#14290)

* S3: Attach SSE key to doesObjectExist calls.

We did not previously attach the SSE key to the doesObjectExist request,
leading to an inconsistency that may cause problems on "S3-compatible"
implementations. This patch implements doesObjectExist using similar
logic to the S3 client itself, but calls our implementation of
getObjectMetadata rather than the S3 client's, ensuring the request
is decorated with the SSE key.

* Fix tests.
This commit is contained in:
Gian Merlino 2023-06-23 15:23:59 -07:00 committed by GitHub
parent 155fde33ff
commit ddd0fc1b85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 115 additions and 73 deletions

View File

@ -25,6 +25,7 @@ import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.CopyObjectRequest;
@ -43,11 +44,8 @@ import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import java.io.ByteArrayInputStream;
import java.io.File; import java.io.File;
import java.io.InputStream;
/** /**
* {@link AmazonS3} wrapper with {@link ServerSideEncryption}. Every {@link AmazonS3#putObject}, * {@link AmazonS3} wrapper with {@link ServerSideEncryption}. Every {@link AmazonS3#putObject},
@ -76,7 +74,20 @@ public class ServerSideEncryptingAmazonS3
public boolean doesObjectExist(String bucket, String objectName) public boolean doesObjectExist(String bucket, String objectName)
{ {
return amazonS3.doesObjectExist(bucket, objectName); try {
// Ignore return value, just want to see if we can get the metadata at all.
getObjectMetadata(bucket, objectName);
return true;
}
catch (AmazonS3Exception e) {
if (e.getStatusCode() == 404) {
// Object not found.
return false;
} else {
// Some other error: re-throw.
throw e;
}
}
} }
public ListObjectsV2Result listObjectsV2(ListObjectsV2Request request) public ListObjectsV2Result listObjectsV2(ListObjectsV2Request request)
@ -107,22 +118,11 @@ public class ServerSideEncryptingAmazonS3
return amazonS3.getObject(serverSideEncryption.decorate(request)); return amazonS3.getObject(serverSideEncryption.decorate(request));
} }
public PutObjectResult putObject(String bucket, String key, String content)
{
final InputStream in = new ByteArrayInputStream(StringUtils.toUtf8(content));
return putObject(new PutObjectRequest(bucket, key, in, new ObjectMetadata()));
}
public PutObjectResult putObject(String bucket, String key, File file) public PutObjectResult putObject(String bucket, String key, File file)
{ {
return putObject(new PutObjectRequest(bucket, key, file)); return putObject(new PutObjectRequest(bucket, key, file));
} }
public PutObjectResult putObject(String bucket, String key, InputStream in, ObjectMetadata objectMetadata)
{
return putObject(new PutObjectRequest(bucket, key, in, objectMetadata));
}
public PutObjectResult putObject(PutObjectRequest request) public PutObjectResult putObject(PutObjectRequest request)
{ {
return amazonS3.putObject(serverSideEncryption.decorate(request)); return amazonS3.putObject(serverSideEncryption.decorate(request));

View File

@ -20,7 +20,9 @@
package org.apache.druid.storage.s3.output; package org.apache.druid.storage.s3.output;
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.ListObjectsV2Result;
@ -34,10 +36,13 @@ import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.easymock.Capture; import org.easymock.Capture;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.internal.matchers.ThrowableCauseMatcher;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import java.io.BufferedReader; import java.io.BufferedReader;
@ -52,22 +57,20 @@ import java.util.stream.Collectors;
public class S3StorageConnectorTest public class S3StorageConnectorTest
{ {
private static final AmazonS3Client S3_CLIENT = EasyMock.createMock(AmazonS3Client.class);
private static final ServerSideEncryptingAmazonS3 SERVICE = new ServerSideEncryptingAmazonS3(
S3_CLIENT,
new NoopServerSideEncryption()
);
private static final String BUCKET = "BUCKET"; private static final String BUCKET = "BUCKET";
private static final String PREFIX = "P/R/E/F/I/X"; private static final String PREFIX = "P/R/E/F/I/X";
public static final String TEST_FILE = "test.csv"; public static final String TEST_FILE = "test.csv";
private final AmazonS3Client s3Client = EasyMock.createMock(AmazonS3Client.class);
private final ServerSideEncryptingAmazonS3 service = new ServerSideEncryptingAmazonS3(
s3Client,
new NoopServerSideEncryption()
);
private final ListObjectsV2Result testResult = EasyMock.createMock(ListObjectsV2Result.class);
@Rule @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder(); public TemporaryFolder temporaryFolder = new TemporaryFolder();
public static final ListObjectsV2Result TEST_RESULT = EasyMock.createMock(ListObjectsV2Result.class);
private StorageConnector storageConnector; private StorageConnector storageConnector;
@Before @Before
@ -81,40 +84,79 @@ public class S3StorageConnectorTest
null, null,
null, null,
true true
), SERVICE); ), service);
} }
catch (IOException e) { catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@Test @Test
public void pathExists() throws IOException public void pathExists_yes() throws IOException
{ {
EasyMock.reset(S3_CLIENT); final Capture<GetObjectMetadataRequest> request = Capture.newInstance();
EasyMock.expect(S3_CLIENT.doesObjectExist(BUCKET, PREFIX + "/" + TEST_FILE)).andReturn(true); EasyMock.reset(s3Client);
EasyMock.replay(S3_CLIENT); EasyMock.expect(s3Client.getObjectMetadata(EasyMock.capture(request)))
.andReturn(new ObjectMetadata());
EasyMock.replay(s3Client);
Assert.assertTrue(storageConnector.pathExists(TEST_FILE)); Assert.assertTrue(storageConnector.pathExists(TEST_FILE));
EasyMock.reset(S3_CLIENT); Assert.assertEquals(BUCKET, request.getValue().getBucketName());
Assert.assertFalse(storageConnector.pathExists("test1.csv")); Assert.assertEquals(PREFIX + "/" + TEST_FILE, request.getValue().getKey());
EasyMock.verify(s3Client);
} }
@Test
public void pathExists_notFound() throws IOException
{
final Capture<GetObjectMetadataRequest> request = Capture.newInstance();
final AmazonS3Exception e = new AmazonS3Exception("not found");
e.setStatusCode(404);
EasyMock.reset(s3Client);
EasyMock.expect(s3Client.getObjectMetadata(EasyMock.capture(request)))
.andThrow(e);
EasyMock.replay(s3Client);
Assert.assertFalse(storageConnector.pathExists(TEST_FILE));
Assert.assertEquals(BUCKET, request.getValue().getBucketName());
Assert.assertEquals(PREFIX + "/" + TEST_FILE, request.getValue().getKey());
EasyMock.verify(s3Client);
}
@Test
public void pathExists_error()
{
final Capture<GetObjectMetadataRequest> request = Capture.newInstance();
final AmazonS3Exception e = new AmazonS3Exception("not found");
e.setStatusCode(403);
EasyMock.reset(s3Client);
EasyMock.expect(s3Client.getObjectMetadata(EasyMock.capture(request)))
.andThrow(e);
EasyMock.replay(s3Client);
final IOException e2 = Assert.assertThrows(
IOException.class,
() -> storageConnector.pathExists(TEST_FILE)
);
Assert.assertEquals(BUCKET, request.getValue().getBucketName());
Assert.assertEquals(PREFIX + "/" + TEST_FILE, request.getValue().getKey());
MatcherAssert.assertThat(e2, ThrowableCauseMatcher.hasCause(CoreMatchers.instanceOf(AmazonS3Exception.class)));
EasyMock.verify(s3Client);
}
@Test @Test
public void pathRead() throws IOException public void pathRead() throws IOException
{ {
EasyMock.reset(S3_CLIENT); EasyMock.reset(s3Client);
ObjectMetadata objectMetadata = new ObjectMetadata(); ObjectMetadata objectMetadata = new ObjectMetadata();
long contentLength = "test".getBytes(StandardCharsets.UTF_8).length; long contentLength = "test".getBytes(StandardCharsets.UTF_8).length;
objectMetadata.setContentLength(contentLength); objectMetadata.setContentLength(contentLength);
S3Object s3Object = new S3Object(); S3Object s3Object = new S3Object();
s3Object.setObjectContent(new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8))); s3Object.setObjectContent(new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)));
EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject())).andReturn(objectMetadata); EasyMock.expect(s3Client.getObjectMetadata(EasyMock.anyObject())).andReturn(objectMetadata);
EasyMock.expect(S3_CLIENT.getObject( EasyMock.expect(s3Client.getObject(
new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(0, contentLength - 1)) new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(0, contentLength - 1))
).andReturn(s3Object); ).andReturn(s3Object);
EasyMock.replay(S3_CLIENT); EasyMock.replay(s3Client);
String readText = new BufferedReader( String readText = new BufferedReader(
new InputStreamReader(storageConnector.read(TEST_FILE), StandardCharsets.UTF_8)) new InputStreamReader(storageConnector.read(TEST_FILE), StandardCharsets.UTF_8))
@ -122,7 +164,7 @@ public class S3StorageConnectorTest
.collect(Collectors.joining("\n")); .collect(Collectors.joining("\n"));
Assert.assertEquals("test", readText); Assert.assertEquals("test", readText);
EasyMock.reset(S3_CLIENT); EasyMock.reset(s3Client);
} }
@Test @Test
@ -134,123 +176,124 @@ public class S3StorageConnectorTest
for (int start = 0; start < data.length(); start++) { for (int start = 0; start < data.length(); start++) {
for (int length = 1; length <= data.length() - start; length++) { for (int length = 1; length <= data.length() - start; length++) {
String dataQueried = data.substring(start, start + length); String dataQueried = data.substring(start, start + length);
EasyMock.reset(S3_CLIENT); EasyMock.reset(s3Client);
S3Object s3Object = new S3Object(); S3Object s3Object = new S3Object();
s3Object.setObjectContent( s3Object.setObjectContent(
new ByteArrayInputStream(dataQueried.getBytes(StandardCharsets.UTF_8)) new ByteArrayInputStream(dataQueried.getBytes(StandardCharsets.UTF_8))
); );
EasyMock.expect( EasyMock.expect(
S3_CLIENT.getObject( s3Client.getObject(
new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(start, start + length - 1) new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(start, start + length - 1)
) )
).andReturn(s3Object); ).andReturn(s3Object);
EasyMock.replay(S3_CLIENT); EasyMock.replay(s3Client);
InputStream is = storageConnector.readRange(TEST_FILE, start, length); InputStream is = storageConnector.readRange(TEST_FILE, start, length);
byte[] dataBytes = new byte[length]; byte[] dataBytes = new byte[length];
Assert.assertEquals(length, is.read(dataBytes)); Assert.assertEquals(length, is.read(dataBytes));
Assert.assertEquals(-1, is.read()); // reading further produces no data Assert.assertEquals(-1, is.read()); // reading further produces no data
Assert.assertEquals(dataQueried, new String(dataBytes, StandardCharsets.UTF_8)); Assert.assertEquals(dataQueried, new String(dataBytes, StandardCharsets.UTF_8));
EasyMock.reset(S3_CLIENT); EasyMock.reset(s3Client);
} }
} }
// empty read // empty read
EasyMock.reset(S3_CLIENT); EasyMock.reset(s3Client);
S3Object s3Object = new S3Object(); S3Object s3Object = new S3Object();
s3Object.setObjectContent( s3Object.setObjectContent(
new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)) new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8))
); );
EasyMock.expect( EasyMock.expect(
S3_CLIENT.getObject( s3Client.getObject(
new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(0, -1) new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(0, -1)
) )
).andReturn(s3Object); ).andReturn(s3Object);
EasyMock.replay(S3_CLIENT); EasyMock.replay(s3Client);
InputStream is = storageConnector.readRange(TEST_FILE, 0, 0); InputStream is = storageConnector.readRange(TEST_FILE, 0, 0);
byte[] dataBytes = new byte[0]; byte[] dataBytes = new byte[0];
Assert.assertEquals(is.read(dataBytes), -1); Assert.assertEquals(is.read(dataBytes), -1);
Assert.assertEquals("", new String(dataBytes, StandardCharsets.UTF_8)); Assert.assertEquals("", new String(dataBytes, StandardCharsets.UTF_8));
EasyMock.reset(S3_CLIENT); EasyMock.reset(s3Client);
} }
@Test @Test
public void testDeleteSinglePath() throws IOException public void testDeleteSinglePath() throws IOException
{ {
EasyMock.reset(S3_CLIENT); EasyMock.reset(s3Client);
S3_CLIENT.deleteObject(BUCKET, PREFIX + "/" + TEST_FILE); s3Client.deleteObject(BUCKET, PREFIX + "/" + TEST_FILE);
EasyMock.expectLastCall(); EasyMock.expectLastCall();
storageConnector.deleteFile(TEST_FILE); storageConnector.deleteFile(TEST_FILE);
EasyMock.reset(S3_CLIENT); EasyMock.reset(s3Client);
} }
@Test @Test
public void testDeleteMultiplePaths() throws IOException public void testDeleteMultiplePaths() throws IOException
{ {
EasyMock.reset(S3_CLIENT); EasyMock.reset(s3Client);
String testFile2 = "file2"; String testFile2 = "file2";
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(BUCKET); DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(BUCKET);
deleteObjectsRequest.withKeys(PREFIX + "/" + TEST_FILE, PREFIX + "/" + testFile2); deleteObjectsRequest.withKeys(PREFIX + "/" + TEST_FILE, PREFIX + "/" + testFile2);
Capture<DeleteObjectsRequest> capturedArgument = EasyMock.newCapture(); Capture<DeleteObjectsRequest> capturedArgument = EasyMock.newCapture();
EasyMock.expect(S3_CLIENT.deleteObjects(EasyMock.capture(capturedArgument))).andReturn(null).once(); EasyMock.expect(s3Client.deleteObjects(EasyMock.capture(capturedArgument))).andReturn(null).once();
EasyMock.replay(S3_CLIENT); EasyMock.replay(s3Client);
storageConnector.deleteFiles(Lists.newArrayList(TEST_FILE, testFile2)); storageConnector.deleteFiles(Lists.newArrayList(TEST_FILE, testFile2));
Assert.assertEquals(convertDeleteObjectsRequestToString(deleteObjectsRequest), convertDeleteObjectsRequestToString(capturedArgument.getValue())); Assert.assertEquals(
EasyMock.reset(S3_CLIENT); convertDeleteObjectsRequestToString(deleteObjectsRequest),
convertDeleteObjectsRequestToString(capturedArgument.getValue())
);
EasyMock.reset(s3Client);
} }
@Test @Test
public void testPathDeleteRecursively() throws IOException public void testPathDeleteRecursively() throws IOException
{ {
EasyMock.reset(S3_CLIENT, TEST_RESULT); EasyMock.reset(s3Client, testResult);
S3ObjectSummary s3ObjectSummary = new S3ObjectSummary(); S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
s3ObjectSummary.setBucketName(BUCKET); s3ObjectSummary.setBucketName(BUCKET);
s3ObjectSummary.setKey(PREFIX + "/test/" + TEST_FILE); s3ObjectSummary.setKey(PREFIX + "/test/" + TEST_FILE);
s3ObjectSummary.setSize(1); s3ObjectSummary.setSize(1);
EasyMock.expect(S3_CLIENT.listObjectsV2((ListObjectsV2Request) EasyMock.anyObject())) EasyMock.expect(s3Client.listObjectsV2((ListObjectsV2Request) EasyMock.anyObject()))
.andReturn(TEST_RESULT); .andReturn(testResult);
EasyMock.expect(TEST_RESULT.getBucketName()).andReturn("123").anyTimes(); EasyMock.expect(testResult.getBucketName()).andReturn("123").anyTimes();
EasyMock.expect(TEST_RESULT.getObjectSummaries()).andReturn(Collections.singletonList(s3ObjectSummary)).anyTimes(); EasyMock.expect(testResult.getObjectSummaries()).andReturn(Collections.singletonList(s3ObjectSummary)).anyTimes();
EasyMock.expect(TEST_RESULT.isTruncated()).andReturn(false).times(1); EasyMock.expect(testResult.isTruncated()).andReturn(false).times(1);
EasyMock.expect(TEST_RESULT.getNextContinuationToken()).andReturn(null); EasyMock.expect(testResult.getNextContinuationToken()).andReturn(null);
Capture<DeleteObjectsRequest> capturedArgument = EasyMock.newCapture(); Capture<DeleteObjectsRequest> capturedArgument = EasyMock.newCapture();
EasyMock.expect(S3_CLIENT.deleteObjects(EasyMock.and( EasyMock.expect(s3Client.deleteObjects(EasyMock.and(
EasyMock.capture(capturedArgument), EasyMock.capture(capturedArgument),
EasyMock.isA(DeleteObjectsRequest.class) EasyMock.isA(DeleteObjectsRequest.class)
))).andReturn(null); ))).andReturn(null);
EasyMock.replay(S3_CLIENT, TEST_RESULT); EasyMock.replay(s3Client, testResult);
storageConnector.deleteRecursively("test"); storageConnector.deleteRecursively("test");
Assert.assertEquals(1, capturedArgument.getValue().getKeys().size()); Assert.assertEquals(1, capturedArgument.getValue().getKeys().size());
Assert.assertEquals(PREFIX + "/test/" + TEST_FILE, capturedArgument.getValue().getKeys().get(0).getKey()); Assert.assertEquals(PREFIX + "/test/" + TEST_FILE, capturedArgument.getValue().getKeys().get(0).getKey());
EasyMock.reset(S3_CLIENT, TEST_RESULT); EasyMock.reset(s3Client, testResult);
} }
@Test @Test
public void testListDir() throws IOException public void testListDir() throws IOException
{ {
EasyMock.reset(S3_CLIENT, TEST_RESULT); EasyMock.reset(s3Client, testResult);
S3ObjectSummary s3ObjectSummary = new S3ObjectSummary(); S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
s3ObjectSummary.setBucketName(BUCKET); s3ObjectSummary.setBucketName(BUCKET);
s3ObjectSummary.setKey(PREFIX + "/test/" + TEST_FILE); s3ObjectSummary.setKey(PREFIX + "/test/" + TEST_FILE);
s3ObjectSummary.setSize(1); s3ObjectSummary.setSize(1);
EasyMock.expect(TEST_RESULT.getObjectSummaries()).andReturn(Collections.singletonList(s3ObjectSummary)).times(2); EasyMock.expect(testResult.getObjectSummaries()).andReturn(Collections.singletonList(s3ObjectSummary)).times(2);
EasyMock.expect(TEST_RESULT.isTruncated()).andReturn(false); EasyMock.expect(testResult.isTruncated()).andReturn(false);
EasyMock.expect(TEST_RESULT.getNextContinuationToken()).andReturn(null); EasyMock.expect(testResult.getNextContinuationToken()).andReturn(null);
EasyMock.expect(S3_CLIENT.listObjectsV2((ListObjectsV2Request) EasyMock.anyObject())) EasyMock.expect(s3Client.listObjectsV2((ListObjectsV2Request) EasyMock.anyObject()))
.andReturn(TEST_RESULT); .andReturn(testResult);
EasyMock.replay(S3_CLIENT, TEST_RESULT); EasyMock.replay(s3Client, testResult);
List<String> listDirResult = Lists.newArrayList(storageConnector.listDir("test/")); List<String> listDirResult = Lists.newArrayList(storageConnector.listDir("test/"));
Assert.assertEquals(ImmutableList.of(TEST_FILE), listDirResult); Assert.assertEquals(ImmutableList.of(TEST_FILE), listDirResult);
@ -264,5 +307,4 @@ public class S3StorageConnectorTest
.collect( .collect(
Collectors.joining()); Collectors.joining());
} }
} }