Ability to Delete task logs and segments from Google Storage (#9519)

* Ability to Delete task logs and segments from Google Storage

* implement ability to delete all tasks logs or all task logs
  written before a particular date when written to Google storage

* implement ability to delete all segments from Google deep storage

* * Address review comments
This commit is contained in:
zachjsh 2020-03-18 18:00:43 -07:00 committed by GitHub
parent b18dd2b7a9
commit 838735411f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 606 additions and 18 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.storage.google;
import com.google.api.client.http.HttpResponseException;
import com.google.common.base.Predicates;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.RE;
@ -37,11 +38,19 @@ public class GoogleDataSegmentKiller implements DataSegmentKiller
private static final Logger LOG = new Logger(GoogleDataSegmentKiller.class);
private final GoogleStorage storage;
private final GoogleAccountConfig accountConfig;
private final GoogleInputDataConfig inputDataConfig;
@Inject
public GoogleDataSegmentKiller(final GoogleStorage storage)
public GoogleDataSegmentKiller(
final GoogleStorage storage,
GoogleAccountConfig accountConfig,
GoogleInputDataConfig inputDataConfig
)
{
this.storage = storage;
this.accountConfig = accountConfig;
this.inputDataConfig = inputDataConfig;
}
@Override
@ -93,8 +102,25 @@ public class GoogleDataSegmentKiller implements DataSegmentKiller
}
@Override
public void killAll()
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
LOG.info(
"Deleting all segment files from gs location [bucket: '%s' prefix: '%s']",
accountConfig.getBucket(),
accountConfig.getPrefix()
);
try {
GoogleUtils.deleteObjectsInPath(
storage,
inputDataConfig,
accountConfig.getBucket(),
accountConfig.getPrefix(),
Predicates.alwaysTrue()
);
}
catch (Exception e) {
LOG.error("Error occurred while deleting task log files from gs. Error: %s", e.getMessage());
throw new IOException(e);
}
}
}

View File

@ -23,6 +23,7 @@ import com.google.api.client.http.InputStreamContent;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
@ -33,6 +34,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.Date;
public class GoogleTaskLogs implements TaskLogs
{
@ -40,12 +42,21 @@ public class GoogleTaskLogs implements TaskLogs
private final GoogleTaskLogsConfig config;
private final GoogleStorage storage;
private final GoogleInputDataConfig inputDataConfig;
private final CurrentTimeMillisSupplier timeSupplier;
@Inject
public GoogleTaskLogs(GoogleTaskLogsConfig config, GoogleStorage storage)
public GoogleTaskLogs(
GoogleTaskLogsConfig config,
GoogleStorage storage,
GoogleInputDataConfig inputDataConfig,
CurrentTimeMillisSupplier timeSupplier
)
{
this.config = config;
this.storage = storage;
this.inputDataConfig = inputDataConfig;
this.timeSupplier = timeSupplier;
}
@Override
@ -159,14 +170,39 @@ public class GoogleTaskLogs implements TaskLogs
}
@Override
public void killAll()
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
LOG.info(
"Deleting all task logs from gs location [bucket: '%s' prefix: '%s'].",
config.getBucket(),
config.getPrefix()
);
long now = timeSupplier.getAsLong();
killOlderThan(now);
}
@Override
public void killOlderThan(long timestamp)
public void killOlderThan(long timestamp) throws IOException
{
throw new UnsupportedOperationException("not implemented");
LOG.info(
"Deleting all task logs from gs location [bucket: '%s' prefix: '%s'] older than %s.",
config.getBucket(),
config.getPrefix(),
new Date(timestamp)
);
try {
GoogleUtils.deleteObjectsInPath(
storage,
inputDataConfig,
config.getBucket(),
config.getPrefix(),
(object) -> object.getUpdated().getValue() < timestamp
);
}
catch (Exception e) {
LOG.error("Error occurred while deleting task log files from gs. Error: %s", e.getMessage());
throw new IOException(e);
}
}
}

View File

@ -22,9 +22,11 @@ package org.apache.druid.storage.google;
import com.google.api.client.http.HttpResponseException;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.google.GoogleCloudStorageInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.IOException;
import java.net.URI;
@ -32,6 +34,7 @@ import java.util.Iterator;
public class GoogleUtils
{
private static final Logger log = new Logger(GoogleUtils.class);
public static final Predicate<Throwable> GOOGLE_RETRY = GoogleUtils::isRetryable;
public static boolean isRetryable(Throwable t)
@ -66,4 +69,40 @@ public class GoogleUtils
{
return new ObjectStorageIterator(storage, uris, maxListingLength);
}
/**
* Delete the files from Google Storage in a specified bucket, matching a specified prefix and filter
*
* @param storage Google Storage client
* @param config specifies the configuration to use when finding matching files in Google Storage to delete
* @param bucket Google Storage bucket
* @param prefix the file prefix
* @param filter function which returns true if the prefix file found should be deleted and false otherwise.
* @throws Exception
*/
public static void deleteObjectsInPath(
GoogleStorage storage,
GoogleInputDataConfig config,
String bucket,
String prefix,
Predicate<StorageObject> filter
)
throws Exception
{
final Iterator<StorageObject> iterator = lazyFetchingStorageObjectsIterator(
storage,
ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("gs")).iterator(),
config.getMaxListingLength()
);
while (iterator.hasNext()) {
final StorageObject nextObject = iterator.next();
if (filter.apply(nextObject)) {
retryGoogleCloudStorageOperation(() -> {
storage.delete(nextObject.getBucket(), nextObject.getName());
return null;
});
}
}
}
}

View File

@ -21,25 +21,43 @@ package org.apache.druid.storage.google;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.testing.json.GoogleJsonResponseExceptionFactoryTesting;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
public class GoogleDataSegmentKillerTest extends EasyMockSupport
{
private static final String KEY_1 = "key1";
private static final String KEY_2 = "key2";
private static final String BUCKET = "bucket";
private static final String PREFIX = "test/log";
private static final URI PREFIX_URI = URI.create(StringUtils.format("gs://%s/%s", BUCKET, PREFIX));
private static final String INDEX_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
private static final String DESCRIPTOR_PATH = DataSegmentKiller.descriptorPath(INDEX_PATH);
private static final long TIME_0 = 0L;
private static final long TIME_1 = 1L;
private static final int MAX_KEYS = 1;
private static final Exception RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(429, "recoverable", new HttpHeaders()).build();
private static final Exception NON_RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(404, "non recoverable", new HttpHeaders()).build();
private static final DataSegment DATA_SEGMENT = new DataSegment(
"test",
@ -54,10 +72,14 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
);
private GoogleStorage storage;
private GoogleAccountConfig accountConfig;
private GoogleInputDataConfig inputDataConfig;
@Before
public void before()
{
accountConfig = createMock(GoogleAccountConfig.class);
inputDataConfig = createMock(GoogleInputDataConfig.class);
storage = createMock(GoogleStorage.class);
}
@ -71,7 +93,7 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
replayAll();
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.kill(DATA_SEGMENT);
@ -91,7 +113,7 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
replayAll();
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.kill(DATA_SEGMENT);
@ -113,10 +135,117 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
replayAll();
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.kill(DATA_SEGMENT);
verifyAll();
}
@Test
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
{
StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1);
Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
GoogleTestUtils.expectListObjects(
listRequest,
PREFIX_URI,
MAX_KEYS,
ImmutableList.of(object1, object2)
);
GoogleTestUtils.expectDeleteObjects(
storage,
ImmutableList.of(object1, object2),
ImmutableMap.of()
);
EasyMock.expect(accountConfig.getBucket()).andReturn(BUCKET).anyTimes();
EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.killAll();
EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage);
}
@Test
public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException
{
StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
GoogleTestUtils.expectListObjects(
listRequest,
PREFIX_URI,
MAX_KEYS,
ImmutableList.of(object1)
);
GoogleTestUtils.expectDeleteObjects(
storage,
ImmutableList.of(object1),
ImmutableMap.of(object1, RECOVERABLE_EXCEPTION)
);
EasyMock.expect(accountConfig.getBucket()).andReturn(BUCKET).anyTimes();
EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.killAll();
EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage);
}
@Test
public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
{
boolean ioExceptionThrown = false;
Storage.Objects.List listRequest = null;
try {
StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
GoogleTestUtils.expectListObjects(
listRequest,
PREFIX_URI,
MAX_KEYS,
ImmutableList.of(object1)
);
GoogleTestUtils.expectDeleteObjects(
storage,
ImmutableList.of(),
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
);
EasyMock.expect(accountConfig.getBucket()).andReturn(BUCKET).anyTimes();
EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.killAll();
}
catch (IOException e) {
ioExceptionThrown = true;
}
Assert.assertTrue(ioExceptionThrown);
EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage);
}
}

View File

@ -19,10 +19,17 @@
package org.apache.druid.storage.google;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.InputStreamContent;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import org.apache.commons.io.IOUtils;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.easymock.EasyMock;
@ -34,25 +41,43 @@ import org.junit.Test;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
public class GoogleTaskLogsTest extends EasyMockSupport
{
private static final String KEY_1 = "key1";
private static final String KEY_2 = "key2";
private static final String BUCKET = "test";
private static final String PREFIX = "test/log";
private static final URI PREFIX_URI = URI.create(StringUtils.format("gs://%s/%s", BUCKET, PREFIX));
private static final String TASKID = "taskid";
private static final long TIME_0 = 0L;
private static final long TIME_1 = 1L;
private static final long TIME_NOW = 2L;
private static final long TIME_FUTURE = 3L;
private static final int MAX_KEYS = 1;
private static final Exception RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(429, "recoverable", new HttpHeaders()).build();
private static final Exception NON_RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(404, "non recoverable", new HttpHeaders()).build();
private GoogleStorage storage;
private GoogleTaskLogs googleTaskLogs;
private GoogleTaskLogsConfig config;
private GoogleInputDataConfig inputDataConfig;
private CurrentTimeMillisSupplier timeSupplier;
@Before
public void before()
{
storage = createMock(GoogleStorage.class);
GoogleTaskLogsConfig config = new GoogleTaskLogsConfig(BUCKET, PREFIX);
googleTaskLogs = new GoogleTaskLogs(config, storage);
inputDataConfig = createMock(GoogleInputDataConfig.class);
timeSupplier = createMock(CurrentTimeMillisSupplier.class);
config = new GoogleTaskLogsConfig(BUCKET, PREFIX);
googleTaskLogs = new GoogleTaskLogs(config, storage, inputDataConfig, timeSupplier);
}
@Test
@ -146,4 +171,201 @@ public class GoogleTaskLogsTest extends EasyMockSupport
verifyAll();
}
@Test
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
{
StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1);
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
GoogleTestUtils.expectListObjects(
listRequest,
PREFIX_URI,
MAX_KEYS,
ImmutableList.of(object1, object2)
);
GoogleTestUtils.expectDeleteObjects(
storage,
ImmutableList.of(object1, object2),
ImmutableMap.of()
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier);
googleTaskLogs.killAll();
EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier);
}
@Test
public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException
{
StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
GoogleTestUtils.expectListObjects(
listRequest,
PREFIX_URI,
MAX_KEYS,
ImmutableList.of(object1)
);
GoogleTestUtils.expectDeleteObjects(
storage,
ImmutableList.of(object1),
ImmutableMap.of(object1, RECOVERABLE_EXCEPTION)
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier);
googleTaskLogs.killAll();
EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier);
}
@Test
public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
{
boolean ioExceptionThrown = false;
Storage.Objects.List listRequest = null;
try {
StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
GoogleTestUtils.expectListObjects(
listRequest,
PREFIX_URI,
MAX_KEYS,
ImmutableList.of(object1)
);
GoogleTestUtils.expectDeleteObjects(
storage,
ImmutableList.of(),
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier);
googleTaskLogs.killAll();
}
catch (IOException e) {
ioExceptionThrown = true;
}
Assert.assertTrue(ioExceptionThrown);
EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier);
}
@Test
public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws IOException
{
StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_FUTURE);
Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
GoogleTestUtils.expectListObjects(
listRequest,
PREFIX_URI,
MAX_KEYS,
ImmutableList.of(object1, object2)
);
GoogleTestUtils.expectDeleteObjects(
storage,
ImmutableList.of(object1),
ImmutableMap.of()
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.replay(listRequest, inputDataConfig, storage);
googleTaskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(listRequest, inputDataConfig, storage);
}
@Test
public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException
{
StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
GoogleTestUtils.expectListObjects(
listRequest,
PREFIX_URI,
MAX_KEYS,
ImmutableList.of(object1)
);
GoogleTestUtils.expectDeleteObjects(
storage,
ImmutableList.of(object1),
ImmutableMap.of(object1, RECOVERABLE_EXCEPTION)
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.replay(listRequest, inputDataConfig, storage);
googleTaskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(listRequest, inputDataConfig, storage);
}
@Test
public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
{
boolean ioExceptionThrown = false;
Storage.Objects.List listRequest = null;
try {
StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
GoogleTestUtils.expectListObjects(
listRequest,
PREFIX_URI,
MAX_KEYS,
ImmutableList.of(object1)
);
GoogleTestUtils.expectDeleteObjects(
storage,
ImmutableList.of(),
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.replay(listRequest, inputDataConfig, storage);
googleTaskLogs.killOlderThan(TIME_NOW);
}
catch (IOException e) {
ioExceptionThrown = true;
}
Assert.assertTrue(ioExceptionThrown);
EasyMock.verify(listRequest, inputDataConfig, storage);
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.google;
import com.google.api.client.util.DateTime;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IExpectationSetters;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GoogleTestUtils extends EasyMockSupport
{
private static final org.joda.time.DateTime NOW = DateTimes.nowUtc();
private static final byte[] CONTENT =
StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis()));
public static StorageObject newStorageObject(
String bucket,
String key,
long lastModifiedTimestamp
)
{
StorageObject object = new StorageObject();
object.setBucket(bucket);
object.setName(key);
object.setUpdated(new DateTime(lastModifiedTimestamp));
object.setEtag("etag");
object.setSize(BigInteger.valueOf(CONTENT.length));
return object;
}
public static Storage.Objects.List expectListRequest(
GoogleStorage storage,
URI prefix
) throws IOException
{
Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class);
String bucket = prefix.getAuthority();
EasyMock.expect(
storage.list(bucket)
).andReturn(listRequest).once();
return listRequest;
}
public static void expectListObjects(
Storage.Objects.List listRequest,
URI prefix,
long maxListingLength,
List<StorageObject> objects
) throws IOException
{
EasyMock.expect(listRequest.setPrefix(StringUtils.maybeRemoveLeadingSlash(prefix.getPath()))).andReturn(listRequest);
EasyMock.expect(listRequest.setMaxResults(maxListingLength)).andReturn(listRequest);
EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).anyTimes();
Objects resultObjects = new Objects();
resultObjects.setItems(objects);
EasyMock.expect(
listRequest.execute()
).andReturn(resultObjects).once();
}
public static void expectDeleteObjects(
GoogleStorage storage,
List<StorageObject> deleteObjectExpected,
Map<StorageObject, Exception> deleteObjectToException
) throws IOException
{
Map<StorageObject, IExpectationSetters<StorageObject>> requestToResultExpectationSetter = new HashMap<>();
for (Map.Entry<StorageObject, Exception> deleteObjectAndException : deleteObjectToException.entrySet()) {
StorageObject deleteObject = deleteObjectAndException.getKey();
Exception exception = deleteObjectAndException.getValue();
IExpectationSetters<StorageObject> resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject);
if (resultExpectationSetter == null) {
storage.delete(deleteObject.getBucket(), deleteObject.getName());
resultExpectationSetter = EasyMock.<StorageObject>expectLastCall().andThrow(exception);
requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter);
} else {
resultExpectationSetter.andThrow(exception);
}
}
for (StorageObject deleteObject : deleteObjectExpected) {
IExpectationSetters<StorageObject> resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject);
if (resultExpectationSetter == null) {
storage.delete(deleteObject.getBucket(), deleteObject.getName());
resultExpectationSetter = EasyMock.expectLastCall();
requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter);
}
resultExpectationSetter.andVoid();
}
}
}

View File

@ -165,8 +165,10 @@ public class S3TaskLogs implements TaskLogs
@Override
public void killAll() throws IOException
{
log.info("Deleting all task logs from s3 location [bucket: %s prefix: %s].",
config.getS3Bucket(), config.getS3Prefix()
log.info(
"Deleting all task logs from s3 location [bucket: '%s' prefix: '%s'].",
config.getS3Bucket(),
config.getS3Prefix()
);
long now = timeSupplier.getAsLong();
@ -176,8 +178,11 @@ public class S3TaskLogs implements TaskLogs
@Override
public void killOlderThan(long timestamp) throws IOException
{
log.info("Deleting all task logs from s3 location [bucket: '%s' prefix: '%s'] older than %s.",
config.getS3Bucket(), config.getS3Prefix(), new Date(timestamp)
log.info(
"Deleting all task logs from s3 location [bucket: '%s' prefix: '%s'] older than %s.",
config.getS3Bucket(),
config.getS3Prefix(),
new Date(timestamp)
);
try {
S3Utils.deleteObjectsInPath(

View File

@ -204,6 +204,15 @@ public class S3Utils
return objectSummary;
}
/**
* Delete the files from S3 in a specified bucket, matching a specified prefix and filter
* @param s3Client s3 client
* @param config specifies the configuration to use when finding matching files in S3 to delete
* @param bucket s3 bucket
* @param prefix the file prefix
* @param filter function which returns true if the prefix file found should be deleted and false otherwise.
* @throws Exception
*/
public static void deleteObjectsInPath(
ServerSideEncryptingAmazonS3 s3Client,
S3InputDataConfig config,
@ -238,7 +247,7 @@ public class S3Utils
}
}
public static void deleteBucketKeys(
private static void deleteBucketKeys(
ServerSideEncryptingAmazonS3 s3Client,
String bucket,
List<DeleteObjectsRequest.KeyVersion> keysToDelete