diff --git a/core/src/main/java/org/apache/druid/common/utils/CurrentTimeMillisSupplier.java b/core/src/main/java/org/apache/druid/common/utils/CurrentTimeMillisSupplier.java new file mode 100644 index 00000000000..d540c781e06 --- /dev/null +++ b/core/src/main/java/org/apache/druid/common/utils/CurrentTimeMillisSupplier.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.druid.common.utils; + +import java.util.function.LongSupplier; + +public class CurrentTimeMillisSupplier implements LongSupplier +{ + @Override + public long getAsLong() + { + return System.currentTimeMillis(); + } +} diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java index 6df4161d846..cffc3d36399 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java @@ -20,6 +20,7 @@ package org.apache.druid.storage.s3; import com.amazonaws.AmazonServiceException; +import com.google.common.base.Predicates; import com.google.inject.Inject; import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -27,20 +28,30 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.timeline.DataSegment; +import java.io.IOException; import java.util.Map; /** + * */ public class S3DataSegmentKiller implements DataSegmentKiller { private static final Logger log = new Logger(S3DataSegmentKiller.class); private final ServerSideEncryptingAmazonS3 s3Client; + private final S3DataSegmentPusherConfig segmentPusherConfig; + private final S3InputDataConfig inputDataConfig; @Inject - public S3DataSegmentKiller(ServerSideEncryptingAmazonS3 s3Client) + public S3DataSegmentKiller( + ServerSideEncryptingAmazonS3 s3Client, + S3DataSegmentPusherConfig segmentPusherConfig, + S3InputDataConfig inputDataConfig + ) { this.s3Client = s3Client; + this.segmentPusherConfig = segmentPusherConfig; + this.inputDataConfig = inputDataConfig; } @Override @@ -69,8 +80,23 @@ public class S3DataSegmentKiller implements DataSegmentKiller } @Override - public void killAll() + public void killAll() throws IOException { - throw new UnsupportedOperationException("not implemented"); + log.info("Deleting all segment files from s3 location [bucket: '%s' prefix: '%s']", + segmentPusherConfig.getBucket(), segmentPusherConfig.getBaseKey() + ); + try { + S3Utils.deleteObjectsInPath( + s3Client, + inputDataConfig, + segmentPusherConfig.getBucket(), + segmentPusherConfig.getBaseKey(), + Predicates.alwaysTrue() + ); + } + catch (Exception e) { + log.error("Error occurred while deleting segment files from s3. Error: %s", e.getMessage()); + throw new IOException(e); + } } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java index 2676b6d4aa9..723548b1fcb 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java @@ -155,6 +155,7 @@ public class S3StorageDruidModule implements DruidModule .to(S3DataSegmentArchiver.class) .in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(S3DataSegmentPusher.class).in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.storage", S3InputDataConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3StorageConfig.class); diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java index 9cee08b177e..75fadf6e17e 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java @@ -27,6 +27,7 @@ import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.io.ByteSource; import com.google.inject.Inject; +import org.apache.druid.common.utils.CurrentTimeMillisSupplier; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -35,6 +36,7 @@ import org.apache.druid.tasklogs.TaskLogs; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.Date; /** * Provides task logs archived on S3. @@ -45,12 +47,21 @@ public class S3TaskLogs implements TaskLogs private final ServerSideEncryptingAmazonS3 service; private final S3TaskLogsConfig config; + private final S3InputDataConfig inputDataConfig; + private final CurrentTimeMillisSupplier timeSupplier; @Inject - public S3TaskLogs(ServerSideEncryptingAmazonS3 service, S3TaskLogsConfig config) + public S3TaskLogs( + ServerSideEncryptingAmazonS3 service, + S3TaskLogsConfig config, + S3InputDataConfig inputDataConfig, + CurrentTimeMillisSupplier timeSupplier + ) { this.service = service; this.config = config; + this.inputDataConfig = inputDataConfig; + this.timeSupplier = timeSupplier; } @Override @@ -152,14 +163,34 @@ public class S3TaskLogs implements TaskLogs } @Override - public void killAll() + public void killAll() throws IOException { - throw new UnsupportedOperationException("not implemented"); + log.info("Deleting all task logs from s3 location [bucket: %s prefix: %s].", + config.getS3Bucket(), config.getS3Prefix() + ); + + long now = timeSupplier.getAsLong(); + killOlderThan(now); } @Override - public void killOlderThan(long timestamp) + public void killOlderThan(long timestamp) throws IOException { - throw new UnsupportedOperationException("not implemented"); + log.info("Deleting all task logs from s3 location [bucket: '%s' prefix: '%s'] older than %s.", + config.getS3Bucket(), config.getS3Prefix(), new Date(timestamp) + ); + try { + S3Utils.deleteObjectsInPath( + service, + inputDataConfig, + config.getS3Bucket(), + config.getS3Prefix(), + (object) -> object.getLastModified().getTime() < timestamp + ); + } + catch (Exception e) { + log.error("Error occurred while deleting task log files from s3. Error: %s", e.getMessage()); + throw new IOException(e); + } } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogsConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogsConfig.java index e61dbf79592..6fc04113f5a 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogsConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogsConfig.java @@ -61,6 +61,12 @@ public class S3TaskLogsConfig return s3Prefix; } + @VisibleForTesting + void setS3Prefix(String s3Prefix) + { + this.s3Prefix = s3Prefix; + } + public boolean getDisableAcl() { return disableAcl; diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index eb704ed819e..e19639a41d7 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -23,6 +23,7 @@ import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CanonicalGrantee; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.Grant; import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; @@ -31,6 +32,7 @@ import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Joiner; import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RetryUtils; @@ -41,7 +43,9 @@ import org.apache.druid.java.util.common.logger.Logger; import java.io.File; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; /** * @@ -200,6 +204,54 @@ public class S3Utils return objectSummary; } + public static void deleteObjectsInPath( + ServerSideEncryptingAmazonS3 s3Client, + S3InputDataConfig config, + String bucket, + String prefix, + Predicate filter + ) + throws Exception + { + final List keysToDelete = new ArrayList<>(config.getMaxListingLength()); + final ObjectSummaryIterator iterator = new ObjectSummaryIterator( + s3Client, + ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("s3")), + config.getMaxListingLength() + ); + + while (iterator.hasNext()) { + final S3ObjectSummary nextObject = iterator.next(); + if (filter.apply(nextObject)) { + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(nextObject.getKey())); + if (keysToDelete.size() == config.getMaxListingLength()) { + deleteBucketKeys(s3Client, bucket, keysToDelete); + log.info("Deleted %d files", keysToDelete.size()); + keysToDelete.clear(); + } + } + } + + if (keysToDelete.size() > 0) { + deleteBucketKeys(s3Client, bucket, keysToDelete); + log.info("Deleted %d files", keysToDelete.size()); + } + } + + public static void deleteBucketKeys( + ServerSideEncryptingAmazonS3 s3Client, + String bucket, + List keysToDelete + ) + throws Exception + { + DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keysToDelete); + S3Utils.retryS3Operation(() -> { + s3Client.deleteObjects(deleteRequest); + return null; + }); + } + /** * Uploads a file to S3 if possible. First trying to set ACL to give the bucket owner full control of the file before uploading. * diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java index 236cb2c8748..6c353959dcd 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java @@ -25,6 +25,7 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.CopyObjectResult; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.ListObjectsV2Request; @@ -128,6 +129,11 @@ public class ServerSideEncryptingAmazonS3 amazonS3.deleteObject(bucket, key); } + public void deleteObjects(DeleteObjectsRequest request) + { + amazonS3.deleteObjects(request); + } + public static class Builder { private AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3Client.builder(); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java new file mode 100644 index 00000000000..6260d3a25d2 --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.s3; + +import com.amazonaws.SdkClientException; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.StringUtils; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.IOException; +import java.net.URI; + +@RunWith(EasyMockRunner.class) +public class S3DataSegmentKillerTest extends EasyMockSupport +{ + private static final String KEY_1 = "key1"; + private static final String KEY_2 = "key2"; + private static final String TEST_BUCKET = "test_bucket"; + private static final String TEST_PREFIX = "test_prefix"; + private static final URI PREFIX_URI = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, TEST_PREFIX)); + private static final long TIME_0 = 0L; + private static final long TIME_1 = 1L; + private static final int MAX_KEYS = 1; + private static final Exception RECOVERABLE_EXCEPTION = new SdkClientException(new IOException()); + private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException()); + + @Mock + private ServerSideEncryptingAmazonS3 s3Client; + @Mock + private S3DataSegmentPusherConfig segmentPusherConfig; + @Mock + private S3InputDataConfig inputDataConfig; + + private S3DataSegmentKiller segmentKiller; + + @Test + public void test_killAll_noException_deletesAllSegments() throws IOException + { + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + S3ObjectSummary objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2, TIME_1); + + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1, objectSummary2) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_2) + )); + + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(deleteRequest1, deleteRequest2), + ImmutableMap.of() + ); + + EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(segmentPusherConfig.getBaseKey()).andReturn(TEST_PREFIX); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); + + segmentKiller = new S3DataSegmentKiller(s3Client, segmentPusherConfig, inputDataConfig); + segmentKiller.killAll(); + EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig); + } + + @Test + public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegments() throws IOException + { + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(deleteRequest1), + ImmutableMap.of(deleteRequest1, RECOVERABLE_EXCEPTION) + ); + + EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(segmentPusherConfig.getBaseKey()).andReturn(TEST_PREFIX); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); + + segmentKiller = new S3DataSegmentKiller(s3Client, segmentPusherConfig, inputDataConfig); + segmentKiller.killAll(); + EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig); + } + + @Test + public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSegments() + { + boolean ioExceptionThrown = false; + try { + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(), + ImmutableMap.of(deleteRequest1, NON_RECOVERABLE_EXCEPTION) + ); + + + EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(segmentPusherConfig.getBaseKey()).andReturn(TEST_PREFIX); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); + + segmentKiller = new S3DataSegmentKiller(s3Client, segmentPusherConfig, inputDataConfig); + segmentKiller.killAll(); + } + catch (IOException e) { + ioExceptionThrown = true; + } + + Assert.assertTrue(ioExceptionThrown); + EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig); + } +} diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java index a342b3b0cb9..502897f0104 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java @@ -19,24 +19,56 @@ package org.apache.druid.storage.s3; +import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.Grant; import com.amazonaws.services.s3.model.Owner; import com.amazonaws.services.s3.model.Permission; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.utils.CurrentTimeMillisSupplier; +import org.apache.druid.java.util.common.StringUtils; import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; import java.io.File; +import java.io.IOException; +import java.net.URI; import java.util.List; -public class S3TaskLogsTest +@RunWith(EasyMockRunner.class) +public class S3TaskLogsTest extends EasyMockSupport { + private static final String KEY_1 = "key1"; + private static final String KEY_2 = "key2"; + private static final String TEST_BUCKET = "test_bucket"; + private static final String TEST_PREFIX = "test_prefix"; + private static final URI PREFIX_URI = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, TEST_PREFIX)); + private static final long TIME_0 = 0L; + private static final long TIME_1 = 1L; + private static final long TIME_NOW = 2L; + private static final long TIME_FUTURE = 3L; + private static final int MAX_KEYS = 1; + private static final Exception RECOVERABLE_EXCEPTION = new SdkClientException(new IOException()); + private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException()); + + @Mock + private CurrentTimeMillisSupplier timeSupplier; + @Mock + private ServerSideEncryptingAmazonS3 s3Client; + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); @@ -48,7 +80,7 @@ public class S3TaskLogsTest List grantList = testPushInternal(true, ownerId, ownerDisplayName); - Assert.assertTrue("Grant list should not be null", grantList != null); + Assert.assertNotNull("Grant list should not be null", grantList); Assert.assertEquals("Grant list should be empty as ACL is disabled", 0, grantList.size()); } @@ -60,47 +92,284 @@ public class S3TaskLogsTest List grantList = testPushInternal(false, ownerId, ownerDisplayName); - Assert.assertTrue("Grant list should not be null", grantList != null); + Assert.assertNotNull("Grant list should not be null", grantList); Assert.assertEquals("Grant list size should be equal to 1", 1, grantList.size()); Grant grant = grantList.get(0); - Assert.assertEquals("The Grantee identifier should be test_owner", "test_owner", grant.getGrantee().getIdentifier()); + Assert.assertEquals( + "The Grantee identifier should be test_owner", + "test_owner", + grant.getGrantee().getIdentifier() + ); Assert.assertEquals("The Grant should have full control permission", Permission.FullControl, grant.getPermission()); } + @Test + public void test_killAll_noException_deletesAllTaskLogs() throws IOException + { + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + S3ObjectSummary objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2, TIME_1); + + EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); + + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1, objectSummary2) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_2) + )); + + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(deleteRequest1, deleteRequest2), + ImmutableMap.of() + ); + + EasyMock.replay(s3Client, timeSupplier); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix(TEST_PREFIX); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_KEYS); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + s3TaskLogs.killAll(); + + EasyMock.verify(s3Client, timeSupplier); + } + + @Test + public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException + { + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + + EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); + + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(deleteRequest1), + ImmutableMap.of(deleteRequest1, RECOVERABLE_EXCEPTION) + ); + + EasyMock.replay(s3Client, timeSupplier); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix(TEST_PREFIX); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_KEYS); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + s3TaskLogs.killAll(); + + EasyMock.verify(s3Client, timeSupplier); + } + + @Test + public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() + { + boolean ioExceptionThrown = false; + try { + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(), + ImmutableMap.of(deleteRequest1, NON_RECOVERABLE_EXCEPTION) + ); + + EasyMock.replay(s3Client, timeSupplier); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix(TEST_PREFIX); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_KEYS); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + s3TaskLogs.killAll(); + } + catch (IOException e) { + ioExceptionThrown = true; + } + + Assert.assertTrue(ioExceptionThrown); + + EasyMock.verify(s3Client, timeSupplier); + } + + @Test + public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws IOException + { + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + S3ObjectSummary objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2, TIME_FUTURE); + + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1, objectSummary2) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + + S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1), ImmutableMap.of()); + + EasyMock.replay(s3Client, timeSupplier); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix(TEST_PREFIX); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_KEYS); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + s3TaskLogs.killOlderThan(TIME_NOW); + + EasyMock.verify(s3Client, timeSupplier); + } + + @Test + public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException + { + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(deleteRequest1), + ImmutableMap.of(deleteRequest1, RECOVERABLE_EXCEPTION) + ); + + EasyMock.replay(s3Client, timeSupplier); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix(TEST_PREFIX); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_KEYS); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + s3TaskLogs.killOlderThan(TIME_NOW); + + EasyMock.verify(s3Client, timeSupplier); + } + + @Test + public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() + { + boolean ioExceptionThrown = false; + try { + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(), + ImmutableMap.of(deleteRequest1, NON_RECOVERABLE_EXCEPTION) + ); + + EasyMock.replay(s3Client, timeSupplier); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix(TEST_PREFIX); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_KEYS); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + s3TaskLogs.killOlderThan(TIME_NOW); + } + catch (IOException e) { + ioExceptionThrown = true; + } + + Assert.assertTrue(ioExceptionThrown); + + EasyMock.verify(s3Client, timeSupplier); + } + private List testPushInternal(boolean disableAcl, String ownerId, String ownerDisplayName) throws Exception { - ServerSideEncryptingAmazonS3 s3Client = EasyMock.createMock(ServerSideEncryptingAmazonS3.class); - EasyMock.expect(s3Client.putObject(EasyMock.anyObject())) - .andReturn(new PutObjectResult()) - .once(); + .andReturn(new PutObjectResult()) + .once(); AccessControlList aclExpected = new AccessControlList(); aclExpected.setOwner(new Owner(ownerId, ownerDisplayName)); - EasyMock.expect(s3Client.getBucketAcl("test_bucket")) - .andReturn(aclExpected) - .once(); + EasyMock.expect(s3Client.getBucketAcl(TEST_BUCKET)) + .andReturn(aclExpected) + .once(); EasyMock.expect(s3Client.putObject(EasyMock.anyObject(PutObjectRequest.class))) - .andReturn(new PutObjectResult()) - .once(); + .andReturn(new PutObjectResult()) + .once(); EasyMock.replay(s3Client); S3TaskLogsConfig config = new S3TaskLogsConfig(); config.setDisableAcl(disableAcl); - config.setS3Bucket("test_bucket"); - S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config); + config.setS3Bucket(TEST_BUCKET); + CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier(); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z"; File logFile = tempFolder.newFile("test_log_file"); s3TaskLogs.pushTaskLog(taskId, logFile); - List grantsAsList = aclExpected.getGrantsAsList(); - - return grantsAsList; + return aclExpected.getGrantsAsList(); } - } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java new file mode 100644 index 00000000000..b178bb1014f --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.s3; + +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; +import org.easymock.IArgumentMatcher; +import org.easymock.IExpectationSetters; +import org.joda.time.DateTime; + +import java.net.URI; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class S3TestUtils extends EasyMockSupport +{ + private static final DateTime NOW = DateTimes.nowUtc(); + private static final byte[] CONTENT = + StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis())); + + public static DeleteObjectsRequest deleteObjectsRequestArgumentMatcher(DeleteObjectsRequest deleteObjectsRequest) + { + EasyMock.reportMatcher(new IArgumentMatcher() + { + @Override + public boolean matches(Object argument) + { + + boolean matches = argument instanceof DeleteObjectsRequest + && deleteObjectsRequest.getBucketName() + .equals(((DeleteObjectsRequest) argument).getBucketName()) + && deleteObjectsRequest.getKeys().size() == ((DeleteObjectsRequest) argument).getKeys() + .size(); + if (matches) { + Map expectedKeysAndVersions = deleteObjectsRequest.getKeys().stream().collect( + Collectors.toMap(DeleteObjectsRequest.KeyVersion::getKey, x -> { + return x.getVersion() == null ? "null" : x.getVersion(); + })); + Map actualKeysAndVersions = ((DeleteObjectsRequest) argument).getKeys().stream().collect( + Collectors.toMap(DeleteObjectsRequest.KeyVersion::getKey, x -> { + return x.getVersion() == null ? "null" : x.getVersion(); + })); + matches = expectedKeysAndVersions.equals(actualKeysAndVersions); + } + return matches; + } + + @Override + public void appendTo(StringBuffer buffer) + { + String str = "DeleteObjectsRequest(\"bucketName:\" \"" + + deleteObjectsRequest.getBucketName() + + "\", \"keys:\"" + + deleteObjectsRequest.getKeys() + + "\")"; + buffer.append(str); + } + }); + return null; + } + + public static void expectListObjects( + ServerSideEncryptingAmazonS3 s3Client, + URI prefix, + List objectSummaries) + { + final ListObjectsV2Result result = new ListObjectsV2Result(); + result.setBucketName(prefix.getAuthority()); + result.setKeyCount(objectSummaries.size()); + for (S3ObjectSummary objectSummary : objectSummaries) { + result.getObjectSummaries().add(objectSummary); + } + + EasyMock.expect( + s3Client.listObjectsV2(matchListObjectsRequest(prefix)) + ).andReturn(result).once(); + } + + public static void mockS3ClientDeleteObjects( + ServerSideEncryptingAmazonS3 s3Client, + List deleteRequestsExpected, + Map requestToException + ) + { + Map> requestToResultExpectationSetter = new HashMap<>(); + + for (Map.Entry requestsAndErrors : requestToException.entrySet()) { + DeleteObjectsRequest request = requestsAndErrors.getKey(); + Exception exception = requestsAndErrors.getValue(); + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(request); + if (resultExpectationSetter == null) { + s3Client.deleteObjects( + S3TestUtils.deleteObjectsRequestArgumentMatcher(request)); + resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception); + requestToResultExpectationSetter.put(request, resultExpectationSetter); + } else { + resultExpectationSetter.andThrow(exception); + } + } + + for (DeleteObjectsRequest request : deleteRequestsExpected) { + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(request); + if (resultExpectationSetter == null) { + s3Client.deleteObjects(S3TestUtils.deleteObjectsRequestArgumentMatcher(request)); + resultExpectationSetter = EasyMock.expectLastCall(); + requestToResultExpectationSetter.put(request, resultExpectationSetter); + } + resultExpectationSetter.andVoid(); + } + } + + public static ListObjectsV2Request matchListObjectsRequest(final URI prefixUri) + { + // Use an IArgumentMatcher to verify that the request has the correct bucket and prefix. + EasyMock.reportMatcher( + new IArgumentMatcher() + { + @Override + public boolean matches(Object argument) + { + if (!(argument instanceof ListObjectsV2Request)) { + return false; + } + + final ListObjectsV2Request request = (ListObjectsV2Request) argument; + return prefixUri.getAuthority().equals(request.getBucketName()) + && S3Utils.extractS3Key(prefixUri).equals(request.getPrefix()); + } + + @Override + public void appendTo(StringBuffer buffer) + { + buffer.append(""); + } + } + ); + + return null; + } + + public static S3ObjectSummary newS3ObjectSummary( + String bucket, + String key, + long lastModifiedTimestamp) + { + S3ObjectSummary objectSummary = new S3ObjectSummary(); + objectSummary.setBucketName(bucket); + objectSummary.setKey(key); + objectSummary.setLastModified(new Date(lastModifiedTimestamp)); + objectSummary.setETag("etag"); + objectSummary.setSize(CONTENT.length); + return objectSummary; + } +}