Ability to Delete task logs and segments from S3 (#9459)

* Ability to Delete task logs and segments from S3

* implement ability to delete all tasks logs or all task logs
  written before a particular date when written to S3
* implement ability to delete all segments from S3 deep storage
* upgrade version of aws SDK in use

* * update licenses for updated AWS SDK version

* * fix bug in iterating through results from S3
* revert back to original version of AWS SDK

* * Address review comments

* * Fix failing dependency check
This commit is contained in:
zachjsh 2020-03-10 13:13:46 -07:00 committed by GitHub
parent 75a5591448
commit 7e0e767cc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 820 additions and 27 deletions

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.common.utils;
import java.util.function.LongSupplier;
public class CurrentTimeMillisSupplier implements LongSupplier
{
@Override
public long getAsLong()
{
return System.currentTimeMillis();
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.storage.s3;
import com.amazonaws.AmazonServiceException;
import com.google.common.base.Predicates;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.logger.Logger;
@ -27,20 +28,30 @@ import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.Map;
/**
*
*/
public class S3DataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(S3DataSegmentKiller.class);
private final ServerSideEncryptingAmazonS3 s3Client;
private final S3DataSegmentPusherConfig segmentPusherConfig;
private final S3InputDataConfig inputDataConfig;
@Inject
public S3DataSegmentKiller(ServerSideEncryptingAmazonS3 s3Client)
public S3DataSegmentKiller(
ServerSideEncryptingAmazonS3 s3Client,
S3DataSegmentPusherConfig segmentPusherConfig,
S3InputDataConfig inputDataConfig
)
{
this.s3Client = s3Client;
this.segmentPusherConfig = segmentPusherConfig;
this.inputDataConfig = inputDataConfig;
}
@Override
@ -69,8 +80,23 @@ public class S3DataSegmentKiller implements DataSegmentKiller
}
@Override
public void killAll()
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
log.info("Deleting all segment files from s3 location [bucket: '%s' prefix: '%s']",
segmentPusherConfig.getBucket(), segmentPusherConfig.getBaseKey()
);
try {
S3Utils.deleteObjectsInPath(
s3Client,
inputDataConfig,
segmentPusherConfig.getBucket(),
segmentPusherConfig.getBaseKey(),
Predicates.alwaysTrue()
);
}
catch (Exception e) {
log.error("Error occurred while deleting segment files from s3. Error: %s", e.getMessage());
throw new IOException(e);
}
}
}

View File

@ -155,6 +155,7 @@ public class S3StorageDruidModule implements DruidModule
.to(S3DataSegmentArchiver.class)
.in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(S3DataSegmentPusher.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", S3InputDataConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3StorageConfig.class);

View File

@ -27,6 +27,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@ -35,6 +36,7 @@ import org.apache.druid.tasklogs.TaskLogs;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
/**
* Provides task logs archived on S3.
@ -45,12 +47,21 @@ public class S3TaskLogs implements TaskLogs
private final ServerSideEncryptingAmazonS3 service;
private final S3TaskLogsConfig config;
private final S3InputDataConfig inputDataConfig;
private final CurrentTimeMillisSupplier timeSupplier;
@Inject
public S3TaskLogs(ServerSideEncryptingAmazonS3 service, S3TaskLogsConfig config)
public S3TaskLogs(
ServerSideEncryptingAmazonS3 service,
S3TaskLogsConfig config,
S3InputDataConfig inputDataConfig,
CurrentTimeMillisSupplier timeSupplier
)
{
this.service = service;
this.config = config;
this.inputDataConfig = inputDataConfig;
this.timeSupplier = timeSupplier;
}
@Override
@ -152,14 +163,34 @@ public class S3TaskLogs implements TaskLogs
}
@Override
public void killAll()
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
log.info("Deleting all task logs from s3 location [bucket: %s prefix: %s].",
config.getS3Bucket(), config.getS3Prefix()
);
long now = timeSupplier.getAsLong();
killOlderThan(now);
}
@Override
public void killOlderThan(long timestamp)
public void killOlderThan(long timestamp) throws IOException
{
throw new UnsupportedOperationException("not implemented");
log.info("Deleting all task logs from s3 location [bucket: '%s' prefix: '%s'] older than %s.",
config.getS3Bucket(), config.getS3Prefix(), new Date(timestamp)
);
try {
S3Utils.deleteObjectsInPath(
service,
inputDataConfig,
config.getS3Bucket(),
config.getS3Prefix(),
(object) -> object.getLastModified().getTime() < timestamp
);
}
catch (Exception e) {
log.error("Error occurred while deleting task log files from s3. Error: %s", e.getMessage());
throw new IOException(e);
}
}
}

View File

@ -61,6 +61,12 @@ public class S3TaskLogsConfig
return s3Prefix;
}
@VisibleForTesting
void setS3Prefix(String s3Prefix)
{
this.s3Prefix = s3Prefix;
}
public boolean getDisableAcl()
{
return disableAcl;

View File

@ -23,6 +23,7 @@ import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
@ -31,6 +32,7 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
@ -41,7 +43,9 @@ import org.apache.druid.java.util.common.logger.Logger;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
*
@ -200,6 +204,54 @@ public class S3Utils
return objectSummary;
}
public static void deleteObjectsInPath(
ServerSideEncryptingAmazonS3 s3Client,
S3InputDataConfig config,
String bucket,
String prefix,
Predicate<S3ObjectSummary> filter
)
throws Exception
{
final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(config.getMaxListingLength());
final ObjectSummaryIterator iterator = new ObjectSummaryIterator(
s3Client,
ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("s3")),
config.getMaxListingLength()
);
while (iterator.hasNext()) {
final S3ObjectSummary nextObject = iterator.next();
if (filter.apply(nextObject)) {
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(nextObject.getKey()));
if (keysToDelete.size() == config.getMaxListingLength()) {
deleteBucketKeys(s3Client, bucket, keysToDelete);
log.info("Deleted %d files", keysToDelete.size());
keysToDelete.clear();
}
}
}
if (keysToDelete.size() > 0) {
deleteBucketKeys(s3Client, bucket, keysToDelete);
log.info("Deleted %d files", keysToDelete.size());
}
}
public static void deleteBucketKeys(
ServerSideEncryptingAmazonS3 s3Client,
String bucket,
List<DeleteObjectsRequest.KeyVersion> keysToDelete
)
throws Exception
{
DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
S3Utils.retryS3Operation(() -> {
s3Client.deleteObjects(deleteRequest);
return null;
});
}
/**
* Uploads a file to S3 if possible. First trying to set ACL to give the bucket owner full control of the file before uploading.
*

View File

@ -25,6 +25,7 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CopyObjectResult;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
@ -128,6 +129,11 @@ public class ServerSideEncryptingAmazonS3
amazonS3.deleteObject(bucket, key);
}
public void deleteObjects(DeleteObjectsRequest request)
{
amazonS3.deleteObjects(request);
}
public static class Builder
{
private AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3Client.builder();

View File

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.s3;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.StringUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.io.IOException;
import java.net.URI;
@RunWith(EasyMockRunner.class)
public class S3DataSegmentKillerTest extends EasyMockSupport
{
private static final String KEY_1 = "key1";
private static final String KEY_2 = "key2";
private static final String TEST_BUCKET = "test_bucket";
private static final String TEST_PREFIX = "test_prefix";
private static final URI PREFIX_URI = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, TEST_PREFIX));
private static final long TIME_0 = 0L;
private static final long TIME_1 = 1L;
private static final int MAX_KEYS = 1;
private static final Exception RECOVERABLE_EXCEPTION = new SdkClientException(new IOException());
private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException());
@Mock
private ServerSideEncryptingAmazonS3 s3Client;
@Mock
private S3DataSegmentPusherConfig segmentPusherConfig;
@Mock
private S3InputDataConfig inputDataConfig;
private S3DataSegmentKiller segmentKiller;
@Test
public void test_killAll_noException_deletesAllSegments() throws IOException
{
S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
S3ObjectSummary objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2, TIME_1);
S3TestUtils.expectListObjects(
s3Client,
PREFIX_URI,
ImmutableList.of(objectSummary1, objectSummary2)
);
DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET)
.withBucketName(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_1)
));
DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET)
.withBucketName(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_2)
));
S3TestUtils.mockS3ClientDeleteObjects(
s3Client,
ImmutableList.of(deleteRequest1, deleteRequest2),
ImmutableMap.of()
);
EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET);
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(segmentPusherConfig.getBaseKey()).andReturn(TEST_PREFIX);
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller.killAll();
EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig);
}
@Test
public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegments() throws IOException
{
S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
S3TestUtils.expectListObjects(
s3Client,
PREFIX_URI,
ImmutableList.of(objectSummary1)
);
DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET)
.withBucketName(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_1)
));
S3TestUtils.mockS3ClientDeleteObjects(
s3Client,
ImmutableList.of(deleteRequest1),
ImmutableMap.of(deleteRequest1, RECOVERABLE_EXCEPTION)
);
EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET);
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(segmentPusherConfig.getBaseKey()).andReturn(TEST_PREFIX);
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller.killAll();
EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig);
}
@Test
public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSegments()
{
boolean ioExceptionThrown = false;
try {
S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
S3TestUtils.expectListObjects(
s3Client,
PREFIX_URI,
ImmutableList.of(objectSummary1)
);
DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET)
.withBucketName(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_1)
));
S3TestUtils.mockS3ClientDeleteObjects(
s3Client,
ImmutableList.of(),
ImmutableMap.of(deleteRequest1, NON_RECOVERABLE_EXCEPTION)
);
EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET);
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(segmentPusherConfig.getBaseKey()).andReturn(TEST_PREFIX);
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller.killAll();
}
catch (IOException e) {
ioExceptionThrown = true;
}
Assert.assertTrue(ioExceptionThrown);
EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig);
}
}

View File

@ -19,24 +19,56 @@
package org.apache.druid.storage.s3;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.StringUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
public class S3TaskLogsTest
@RunWith(EasyMockRunner.class)
public class S3TaskLogsTest extends EasyMockSupport
{
private static final String KEY_1 = "key1";
private static final String KEY_2 = "key2";
private static final String TEST_BUCKET = "test_bucket";
private static final String TEST_PREFIX = "test_prefix";
private static final URI PREFIX_URI = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, TEST_PREFIX));
private static final long TIME_0 = 0L;
private static final long TIME_1 = 1L;
private static final long TIME_NOW = 2L;
private static final long TIME_FUTURE = 3L;
private static final int MAX_KEYS = 1;
private static final Exception RECOVERABLE_EXCEPTION = new SdkClientException(new IOException());
private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException());
@Mock
private CurrentTimeMillisSupplier timeSupplier;
@Mock
private ServerSideEncryptingAmazonS3 s3Client;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@ -48,7 +80,7 @@ public class S3TaskLogsTest
List<Grant> grantList = testPushInternal(true, ownerId, ownerDisplayName);
Assert.assertTrue("Grant list should not be null", grantList != null);
Assert.assertNotNull("Grant list should not be null", grantList);
Assert.assertEquals("Grant list should be empty as ACL is disabled", 0, grantList.size());
}
@ -60,17 +92,255 @@ public class S3TaskLogsTest
List<Grant> grantList = testPushInternal(false, ownerId, ownerDisplayName);
Assert.assertTrue("Grant list should not be null", grantList != null);
Assert.assertNotNull("Grant list should not be null", grantList);
Assert.assertEquals("Grant list size should be equal to 1", 1, grantList.size());
Grant grant = grantList.get(0);
Assert.assertEquals("The Grantee identifier should be test_owner", "test_owner", grant.getGrantee().getIdentifier());
Assert.assertEquals(
"The Grantee identifier should be test_owner",
"test_owner",
grant.getGrantee().getIdentifier()
);
Assert.assertEquals("The Grant should have full control permission", Permission.FullControl, grant.getPermission());
}
@Test
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
{
S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
S3ObjectSummary objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2, TIME_1);
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
S3TestUtils.expectListObjects(
s3Client,
PREFIX_URI,
ImmutableList.of(objectSummary1, objectSummary2)
);
DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET)
.withBucketName(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_1)
));
DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET)
.withBucketName(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_2)
));
S3TestUtils.mockS3ClientDeleteObjects(
s3Client,
ImmutableList.of(deleteRequest1, deleteRequest2),
ImmutableMap.of()
);
EasyMock.replay(s3Client, timeSupplier);
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
s3TaskLogs.killAll();
EasyMock.verify(s3Client, timeSupplier);
}
@Test
public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException
{
S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
S3TestUtils.expectListObjects(
s3Client,
PREFIX_URI,
ImmutableList.of(objectSummary1)
);
DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET)
.withBucketName(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_1)
));
S3TestUtils.mockS3ClientDeleteObjects(
s3Client,
ImmutableList.of(deleteRequest1),
ImmutableMap.of(deleteRequest1, RECOVERABLE_EXCEPTION)
);
EasyMock.replay(s3Client, timeSupplier);
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
s3TaskLogs.killAll();
EasyMock.verify(s3Client, timeSupplier);
}
@Test
public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
{
boolean ioExceptionThrown = false;
try {
S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
S3TestUtils.expectListObjects(
s3Client,
PREFIX_URI,
ImmutableList.of(objectSummary1)
);
DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET)
.withBucketName(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_1)
));
S3TestUtils.mockS3ClientDeleteObjects(
s3Client,
ImmutableList.of(),
ImmutableMap.of(deleteRequest1, NON_RECOVERABLE_EXCEPTION)
);
EasyMock.replay(s3Client, timeSupplier);
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
s3TaskLogs.killAll();
}
catch (IOException e) {
ioExceptionThrown = true;
}
Assert.assertTrue(ioExceptionThrown);
EasyMock.verify(s3Client, timeSupplier);
}
@Test
public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws IOException
{
S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
S3ObjectSummary objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2, TIME_FUTURE);
S3TestUtils.expectListObjects(
s3Client,
PREFIX_URI,
ImmutableList.of(objectSummary1, objectSummary2)
);
DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET)
.withBucketName(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_1)
));
S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1), ImmutableMap.of());
EasyMock.replay(s3Client, timeSupplier);
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
s3TaskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(s3Client, timeSupplier);
}
@Test
public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException
{
S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
S3TestUtils.expectListObjects(
s3Client,
PREFIX_URI,
ImmutableList.of(objectSummary1)
);
DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET)
.withBucketName(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_1)
));
S3TestUtils.mockS3ClientDeleteObjects(
s3Client,
ImmutableList.of(deleteRequest1),
ImmutableMap.of(deleteRequest1, RECOVERABLE_EXCEPTION)
);
EasyMock.replay(s3Client, timeSupplier);
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
s3TaskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(s3Client, timeSupplier);
}
@Test
public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
{
boolean ioExceptionThrown = false;
try {
S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
S3TestUtils.expectListObjects(
s3Client,
PREFIX_URI,
ImmutableList.of(objectSummary1)
);
DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET)
.withBucketName(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_1)
));
S3TestUtils.mockS3ClientDeleteObjects(
s3Client,
ImmutableList.of(),
ImmutableMap.of(deleteRequest1, NON_RECOVERABLE_EXCEPTION)
);
EasyMock.replay(s3Client, timeSupplier);
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
s3TaskLogs.killOlderThan(TIME_NOW);
}
catch (IOException e) {
ioExceptionThrown = true;
}
Assert.assertTrue(ioExceptionThrown);
EasyMock.verify(s3Client, timeSupplier);
}
private List<Grant> testPushInternal(boolean disableAcl, String ownerId, String ownerDisplayName) throws Exception
{
ServerSideEncryptingAmazonS3 s3Client = EasyMock.createMock(ServerSideEncryptingAmazonS3.class);
EasyMock.expect(s3Client.putObject(EasyMock.anyObject()))
.andReturn(new PutObjectResult())
.once();
@ -78,7 +348,7 @@ public class S3TaskLogsTest
AccessControlList aclExpected = new AccessControlList();
aclExpected.setOwner(new Owner(ownerId, ownerDisplayName));
EasyMock.expect(s3Client.getBucketAcl("test_bucket"))
EasyMock.expect(s3Client.getBucketAcl(TEST_BUCKET))
.andReturn(aclExpected)
.once();
@ -90,17 +360,16 @@ public class S3TaskLogsTest
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setDisableAcl(disableAcl);
config.setS3Bucket("test_bucket");
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config);
config.setS3Bucket(TEST_BUCKET);
CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier();
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z";
File logFile = tempFolder.newFile("test_log_file");
s3TaskLogs.pushTaskLog(taskId, logFile);
List<Grant> grantsAsList = aclExpected.getGrantsAsList();
return grantsAsList;
return aclExpected.getGrantsAsList();
}
}

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.s3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IArgumentMatcher;
import org.easymock.IExpectationSetters;
import org.joda.time.DateTime;
import java.net.URI;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class S3TestUtils extends EasyMockSupport
{
private static final DateTime NOW = DateTimes.nowUtc();
private static final byte[] CONTENT =
StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis()));
public static DeleteObjectsRequest deleteObjectsRequestArgumentMatcher(DeleteObjectsRequest deleteObjectsRequest)
{
EasyMock.reportMatcher(new IArgumentMatcher()
{
@Override
public boolean matches(Object argument)
{
boolean matches = argument instanceof DeleteObjectsRequest
&& deleteObjectsRequest.getBucketName()
.equals(((DeleteObjectsRequest) argument).getBucketName())
&& deleteObjectsRequest.getKeys().size() == ((DeleteObjectsRequest) argument).getKeys()
.size();
if (matches) {
Map<String, String> expectedKeysAndVersions = deleteObjectsRequest.getKeys().stream().collect(
Collectors.toMap(DeleteObjectsRequest.KeyVersion::getKey, x -> {
return x.getVersion() == null ? "null" : x.getVersion();
}));
Map<String, String> actualKeysAndVersions = ((DeleteObjectsRequest) argument).getKeys().stream().collect(
Collectors.toMap(DeleteObjectsRequest.KeyVersion::getKey, x -> {
return x.getVersion() == null ? "null" : x.getVersion();
}));
matches = expectedKeysAndVersions.equals(actualKeysAndVersions);
}
return matches;
}
@Override
public void appendTo(StringBuffer buffer)
{
String str = "DeleteObjectsRequest(\"bucketName:\" \""
+ deleteObjectsRequest.getBucketName()
+ "\", \"keys:\""
+ deleteObjectsRequest.getKeys()
+ "\")";
buffer.append(str);
}
});
return null;
}
public static void expectListObjects(
ServerSideEncryptingAmazonS3 s3Client,
URI prefix,
List<S3ObjectSummary> objectSummaries)
{
final ListObjectsV2Result result = new ListObjectsV2Result();
result.setBucketName(prefix.getAuthority());
result.setKeyCount(objectSummaries.size());
for (S3ObjectSummary objectSummary : objectSummaries) {
result.getObjectSummaries().add(objectSummary);
}
EasyMock.expect(
s3Client.listObjectsV2(matchListObjectsRequest(prefix))
).andReturn(result).once();
}
public static void mockS3ClientDeleteObjects(
ServerSideEncryptingAmazonS3 s3Client,
List<DeleteObjectsRequest> deleteRequestsExpected,
Map<DeleteObjectsRequest, Exception> requestToException
)
{
Map<DeleteObjectsRequest, IExpectationSetters<DeleteObjectsRequest>> requestToResultExpectationSetter = new HashMap<>();
for (Map.Entry<DeleteObjectsRequest, Exception> requestsAndErrors : requestToException.entrySet()) {
DeleteObjectsRequest request = requestsAndErrors.getKey();
Exception exception = requestsAndErrors.getValue();
IExpectationSetters<DeleteObjectsRequest> resultExpectationSetter = requestToResultExpectationSetter.get(request);
if (resultExpectationSetter == null) {
s3Client.deleteObjects(
S3TestUtils.deleteObjectsRequestArgumentMatcher(request));
resultExpectationSetter = EasyMock.<DeleteObjectsRequest>expectLastCall().andThrow(exception);
requestToResultExpectationSetter.put(request, resultExpectationSetter);
} else {
resultExpectationSetter.andThrow(exception);
}
}
for (DeleteObjectsRequest request : deleteRequestsExpected) {
IExpectationSetters<DeleteObjectsRequest> resultExpectationSetter = requestToResultExpectationSetter.get(request);
if (resultExpectationSetter == null) {
s3Client.deleteObjects(S3TestUtils.deleteObjectsRequestArgumentMatcher(request));
resultExpectationSetter = EasyMock.expectLastCall();
requestToResultExpectationSetter.put(request, resultExpectationSetter);
}
resultExpectationSetter.andVoid();
}
}
public static ListObjectsV2Request matchListObjectsRequest(final URI prefixUri)
{
// Use an IArgumentMatcher to verify that the request has the correct bucket and prefix.
EasyMock.reportMatcher(
new IArgumentMatcher()
{
@Override
public boolean matches(Object argument)
{
if (!(argument instanceof ListObjectsV2Request)) {
return false;
}
final ListObjectsV2Request request = (ListObjectsV2Request) argument;
return prefixUri.getAuthority().equals(request.getBucketName())
&& S3Utils.extractS3Key(prefixUri).equals(request.getPrefix());
}
@Override
public void appendTo(StringBuffer buffer)
{
buffer.append("<request for prefix [").append(prefixUri).append("]>");
}
}
);
return null;
}
public static S3ObjectSummary newS3ObjectSummary(
String bucket,
String key,
long lastModifiedTimestamp)
{
S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(bucket);
objectSummary.setKey(key);
objectSummary.setLastModified(new Date(lastModifiedTimestamp));
objectSummary.setETag("etag");
objectSummary.setSize(CONTENT.length);
return objectSummary;
}
}