From 838735411f495e355820da35ab0765a56a95b090 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 18 Mar 2020 18:00:43 -0700 Subject: [PATCH] Ability to Delete task logs and segments from Google Storage (#9519) * Ability to Delete task logs and segments from Google Storage * implement ability to delete all tasks logs or all task logs written before a particular date when written to Google storage * implement ability to delete all segments from Google deep storage * * Address review comments --- .../google/GoogleDataSegmentKiller.java | 32 ++- .../druid/storage/google/GoogleTaskLogs.java | 46 +++- .../druid/storage/google/GoogleUtils.java | 39 +++ .../google/GoogleDataSegmentKillerTest.java | 135 ++++++++++- .../storage/google/GoogleTaskLogsTest.java | 226 +++++++++++++++++- .../druid/storage/google/GoogleTestUtils.java | 122 ++++++++++ .../apache/druid/storage/s3/S3TaskLogs.java | 13 +- .../org/apache/druid/storage/s3/S3Utils.java | 11 +- 8 files changed, 606 insertions(+), 18 deletions(-) create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentKiller.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentKiller.java index 2f925efee81..1c50f52e7b7 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentKiller.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentKiller.java @@ -20,6 +20,7 @@ package org.apache.druid.storage.google; import com.google.api.client.http.HttpResponseException; +import com.google.common.base.Predicates; import com.google.inject.Inject; import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.RE; @@ -37,11 +38,19 @@ public class GoogleDataSegmentKiller implements DataSegmentKiller private static final Logger LOG = new Logger(GoogleDataSegmentKiller.class); private final GoogleStorage storage; + private final GoogleAccountConfig accountConfig; + private final GoogleInputDataConfig inputDataConfig; @Inject - public GoogleDataSegmentKiller(final GoogleStorage storage) + public GoogleDataSegmentKiller( + final GoogleStorage storage, + GoogleAccountConfig accountConfig, + GoogleInputDataConfig inputDataConfig + ) { this.storage = storage; + this.accountConfig = accountConfig; + this.inputDataConfig = inputDataConfig; } @Override @@ -93,8 +102,25 @@ public class GoogleDataSegmentKiller implements DataSegmentKiller } @Override - public void killAll() + public void killAll() throws IOException { - throw new UnsupportedOperationException("not implemented"); + LOG.info( + "Deleting all segment files from gs location [bucket: '%s' prefix: '%s']", + accountConfig.getBucket(), + accountConfig.getPrefix() + ); + try { + GoogleUtils.deleteObjectsInPath( + storage, + inputDataConfig, + accountConfig.getBucket(), + accountConfig.getPrefix(), + Predicates.alwaysTrue() + ); + } + catch (Exception e) { + LOG.error("Error occurred while deleting task log files from gs. Error: %s", e.getMessage()); + throw new IOException(e); + } } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java index a5f1b99f563..1b3c5319ff2 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java @@ -23,6 +23,7 @@ import com.google.api.client.http.InputStreamContent; import com.google.common.base.Optional; import com.google.common.io.ByteSource; import com.google.inject.Inject; +import org.apache.druid.common.utils.CurrentTimeMillisSupplier; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RetryUtils; @@ -33,6 +34,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; +import java.util.Date; public class GoogleTaskLogs implements TaskLogs { @@ -40,12 +42,21 @@ public class GoogleTaskLogs implements TaskLogs private final GoogleTaskLogsConfig config; private final GoogleStorage storage; + private final GoogleInputDataConfig inputDataConfig; + private final CurrentTimeMillisSupplier timeSupplier; @Inject - public GoogleTaskLogs(GoogleTaskLogsConfig config, GoogleStorage storage) + public GoogleTaskLogs( + GoogleTaskLogsConfig config, + GoogleStorage storage, + GoogleInputDataConfig inputDataConfig, + CurrentTimeMillisSupplier timeSupplier + ) { this.config = config; this.storage = storage; + this.inputDataConfig = inputDataConfig; + this.timeSupplier = timeSupplier; } @Override @@ -159,14 +170,39 @@ public class GoogleTaskLogs implements TaskLogs } @Override - public void killAll() + public void killAll() throws IOException { - throw new UnsupportedOperationException("not implemented"); + LOG.info( + "Deleting all task logs from gs location [bucket: '%s' prefix: '%s'].", + config.getBucket(), + config.getPrefix() + ); + + long now = timeSupplier.getAsLong(); + killOlderThan(now); } @Override - public void killOlderThan(long timestamp) + public void killOlderThan(long timestamp) throws IOException { - throw new UnsupportedOperationException("not implemented"); + LOG.info( + "Deleting all task logs from gs location [bucket: '%s' prefix: '%s'] older than %s.", + config.getBucket(), + config.getPrefix(), + new Date(timestamp) + ); + try { + GoogleUtils.deleteObjectsInPath( + storage, + inputDataConfig, + config.getBucket(), + config.getPrefix(), + (object) -> object.getUpdated().getValue() < timestamp + ); + } + catch (Exception e) { + LOG.error("Error occurred while deleting task log files from gs. Error: %s", e.getMessage()); + throw new IOException(e); + } } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 2c181785065..2ed80fedb27 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -22,9 +22,11 @@ package org.apache.druid.storage.google; import com.google.api.client.http.HttpResponseException; import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.google.GoogleCloudStorageInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.logger.Logger; import java.io.IOException; import java.net.URI; @@ -32,6 +34,7 @@ import java.util.Iterator; public class GoogleUtils { + private static final Logger log = new Logger(GoogleUtils.class); public static final Predicate GOOGLE_RETRY = GoogleUtils::isRetryable; public static boolean isRetryable(Throwable t) @@ -66,4 +69,40 @@ public class GoogleUtils { return new ObjectStorageIterator(storage, uris, maxListingLength); } + + /** + * Delete the files from Google Storage in a specified bucket, matching a specified prefix and filter + * + * @param storage Google Storage client + * @param config specifies the configuration to use when finding matching files in Google Storage to delete + * @param bucket Google Storage bucket + * @param prefix the file prefix + * @param filter function which returns true if the prefix file found should be deleted and false otherwise. + * @throws Exception + */ + public static void deleteObjectsInPath( + GoogleStorage storage, + GoogleInputDataConfig config, + String bucket, + String prefix, + Predicate filter + ) + throws Exception + { + final Iterator iterator = lazyFetchingStorageObjectsIterator( + storage, + ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("gs")).iterator(), + config.getMaxListingLength() + ); + + while (iterator.hasNext()) { + final StorageObject nextObject = iterator.next(); + if (filter.apply(nextObject)) { + retryGoogleCloudStorageOperation(() -> { + storage.delete(nextObject.getBucket(), nextObject.getName()); + return null; + }); + } + } + } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java index bf99cf3e188..82cda77a018 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java @@ -21,25 +21,43 @@ package org.apache.druid.storage.google; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.googleapis.testing.json.GoogleJsonResponseExceptionFactoryTesting; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpResponseException; import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.StorageObject; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.net.URI; public class GoogleDataSegmentKillerTest extends EasyMockSupport { + private static final String KEY_1 = "key1"; + private static final String KEY_2 = "key2"; private static final String BUCKET = "bucket"; + private static final String PREFIX = "test/log"; + private static final URI PREFIX_URI = URI.create(StringUtils.format("gs://%s/%s", BUCKET, PREFIX)); private static final String INDEX_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"; private static final String DESCRIPTOR_PATH = DataSegmentKiller.descriptorPath(INDEX_PATH); + private static final long TIME_0 = 0L; + private static final long TIME_1 = 1L; + private static final int MAX_KEYS = 1; + private static final Exception RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(429, "recoverable", new HttpHeaders()).build(); + private static final Exception NON_RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(404, "non recoverable", new HttpHeaders()).build(); + private static final DataSegment DATA_SEGMENT = new DataSegment( "test", @@ -54,10 +72,14 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport ); private GoogleStorage storage; + private GoogleAccountConfig accountConfig; + private GoogleInputDataConfig inputDataConfig; @Before public void before() { + accountConfig = createMock(GoogleAccountConfig.class); + inputDataConfig = createMock(GoogleInputDataConfig.class); storage = createMock(GoogleStorage.class); } @@ -71,7 +93,7 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport replayAll(); - GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage); + GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); killer.kill(DATA_SEGMENT); @@ -91,7 +113,7 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport replayAll(); - GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage); + GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); killer.kill(DATA_SEGMENT); @@ -113,10 +135,117 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport replayAll(); - GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage); + GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); killer.kill(DATA_SEGMENT); verifyAll(); } + + @Test + public void test_killAll_noException_deletesAllTaskLogs() throws IOException + { + StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1); + + Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); + + GoogleTestUtils.expectListObjects( + listRequest, + PREFIX_URI, + MAX_KEYS, + ImmutableList.of(object1, object2) + ); + + GoogleTestUtils.expectDeleteObjects( + storage, + ImmutableList.of(object1, object2), + ImmutableMap.of() + ); + EasyMock.expect(accountConfig.getBucket()).andReturn(BUCKET).anyTimes(); + EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes(); + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + + EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage); + + GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); + + killer.killAll(); + + EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage); + } + + + @Test + public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException + { + StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + + Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); + + GoogleTestUtils.expectListObjects( + listRequest, + PREFIX_URI, + MAX_KEYS, + ImmutableList.of(object1) + ); + + GoogleTestUtils.expectDeleteObjects( + storage, + ImmutableList.of(object1), + ImmutableMap.of(object1, RECOVERABLE_EXCEPTION) + ); + + EasyMock.expect(accountConfig.getBucket()).andReturn(BUCKET).anyTimes(); + EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes(); + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + + EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage); + + GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); + killer.killAll(); + + EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage); + } + + @Test + public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() + { + boolean ioExceptionThrown = false; + Storage.Objects.List listRequest = null; + try { + StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + + listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); + + GoogleTestUtils.expectListObjects( + listRequest, + PREFIX_URI, + MAX_KEYS, + ImmutableList.of(object1) + ); + + GoogleTestUtils.expectDeleteObjects( + storage, + ImmutableList.of(), + ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION) + ); + + EasyMock.expect(accountConfig.getBucket()).andReturn(BUCKET).anyTimes(); + EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes(); + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + + EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage); + + GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); + killer.killAll(); + } + catch (IOException e) { + ioExceptionThrown = true; + } + + Assert.assertTrue(ioExceptionThrown); + + EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage); + } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java index 127f4d40bb7..b399e7f5eba 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java @@ -19,10 +19,17 @@ package org.apache.druid.storage.google; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpResponseException; import com.google.api.client.http.InputStreamContent; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSource; import org.apache.commons.io.IOUtils; +import org.apache.druid.common.utils.CurrentTimeMillisSupplier; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; import org.easymock.EasyMock; @@ -34,25 +41,43 @@ import org.junit.Test; import java.io.BufferedWriter; import java.io.ByteArrayInputStream; import java.io.File; +import java.io.IOException; import java.io.StringWriter; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.nio.file.Files; public class GoogleTaskLogsTest extends EasyMockSupport { + private static final String KEY_1 = "key1"; + private static final String KEY_2 = "key2"; private static final String BUCKET = "test"; private static final String PREFIX = "test/log"; + private static final URI PREFIX_URI = URI.create(StringUtils.format("gs://%s/%s", BUCKET, PREFIX)); private static final String TASKID = "taskid"; + private static final long TIME_0 = 0L; + private static final long TIME_1 = 1L; + private static final long TIME_NOW = 2L; + private static final long TIME_FUTURE = 3L; + private static final int MAX_KEYS = 1; + private static final Exception RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(429, "recoverable", new HttpHeaders()).build(); + private static final Exception NON_RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(404, "non recoverable", new HttpHeaders()).build(); private GoogleStorage storage; private GoogleTaskLogs googleTaskLogs; + private GoogleTaskLogsConfig config; + private GoogleInputDataConfig inputDataConfig; + private CurrentTimeMillisSupplier timeSupplier; @Before public void before() { storage = createMock(GoogleStorage.class); - GoogleTaskLogsConfig config = new GoogleTaskLogsConfig(BUCKET, PREFIX); - googleTaskLogs = new GoogleTaskLogs(config, storage); + inputDataConfig = createMock(GoogleInputDataConfig.class); + timeSupplier = createMock(CurrentTimeMillisSupplier.class); + + config = new GoogleTaskLogsConfig(BUCKET, PREFIX); + googleTaskLogs = new GoogleTaskLogs(config, storage, inputDataConfig, timeSupplier); } @Test @@ -146,4 +171,201 @@ public class GoogleTaskLogsTest extends EasyMockSupport verifyAll(); } + + @Test + public void test_killAll_noException_deletesAllTaskLogs() throws IOException + { + StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1); + + EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); + Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); + + GoogleTestUtils.expectListObjects( + listRequest, + PREFIX_URI, + MAX_KEYS, + ImmutableList.of(object1, object2) + ); + + GoogleTestUtils.expectDeleteObjects( + storage, + ImmutableList.of(object1, object2), + ImmutableMap.of() + ); + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + + EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier); + + googleTaskLogs.killAll(); + + EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier); + } + + + @Test + public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException + { + StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + + EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); + + Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); + + GoogleTestUtils.expectListObjects( + listRequest, + PREFIX_URI, + MAX_KEYS, + ImmutableList.of(object1) + ); + + GoogleTestUtils.expectDeleteObjects( + storage, + ImmutableList.of(object1), + ImmutableMap.of(object1, RECOVERABLE_EXCEPTION) + ); + + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + + EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier); + + googleTaskLogs.killAll(); + + EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier); + } + + @Test + public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() + { + boolean ioExceptionThrown = false; + Storage.Objects.List listRequest = null; + try { + StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + + EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); + + listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); + + GoogleTestUtils.expectListObjects( + listRequest, + PREFIX_URI, + MAX_KEYS, + ImmutableList.of(object1) + ); + + GoogleTestUtils.expectDeleteObjects( + storage, + ImmutableList.of(), + ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION) + ); + + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + + EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier); + + googleTaskLogs.killAll(); + } + catch (IOException e) { + ioExceptionThrown = true; + } + + Assert.assertTrue(ioExceptionThrown); + + EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier); + } + + @Test + public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws IOException + { + StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_FUTURE); + + Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); + + GoogleTestUtils.expectListObjects( + listRequest, + PREFIX_URI, + MAX_KEYS, + ImmutableList.of(object1, object2) + ); + + GoogleTestUtils.expectDeleteObjects( + storage, + ImmutableList.of(object1), + ImmutableMap.of() + ); + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + + EasyMock.replay(listRequest, inputDataConfig, storage); + googleTaskLogs.killOlderThan(TIME_NOW); + + EasyMock.verify(listRequest, inputDataConfig, storage); + } + + @Test + public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException + { + StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + + Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); + + GoogleTestUtils.expectListObjects( + listRequest, + PREFIX_URI, + MAX_KEYS, + ImmutableList.of(object1) + ); + + GoogleTestUtils.expectDeleteObjects( + storage, + ImmutableList.of(object1), + ImmutableMap.of(object1, RECOVERABLE_EXCEPTION) + ); + + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + + EasyMock.replay(listRequest, inputDataConfig, storage); + + googleTaskLogs.killOlderThan(TIME_NOW); + + EasyMock.verify(listRequest, inputDataConfig, storage); + } + + @Test + public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() + { + boolean ioExceptionThrown = false; + Storage.Objects.List listRequest = null; + try { + StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + + listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); + + GoogleTestUtils.expectListObjects( + listRequest, + PREFIX_URI, + MAX_KEYS, + ImmutableList.of(object1) + ); + + GoogleTestUtils.expectDeleteObjects( + storage, + ImmutableList.of(), + ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION) + ); + + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + + EasyMock.replay(listRequest, inputDataConfig, storage); + + googleTaskLogs.killOlderThan(TIME_NOW); + } + catch (IOException e) { + ioExceptionThrown = true; + } + + Assert.assertTrue(ioExceptionThrown); + + EasyMock.verify(listRequest, inputDataConfig, storage); + } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java new file mode 100644 index 00000000000..d95bbb4f684 --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import com.google.api.client.util.DateTime; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; +import org.easymock.IExpectationSetters; + +import java.io.IOException; +import java.math.BigInteger; +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class GoogleTestUtils extends EasyMockSupport +{ + private static final org.joda.time.DateTime NOW = DateTimes.nowUtc(); + private static final byte[] CONTENT = + StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis())); + + public static StorageObject newStorageObject( + String bucket, + String key, + long lastModifiedTimestamp + ) + { + StorageObject object = new StorageObject(); + object.setBucket(bucket); + object.setName(key); + object.setUpdated(new DateTime(lastModifiedTimestamp)); + object.setEtag("etag"); + object.setSize(BigInteger.valueOf(CONTENT.length)); + return object; + } + + public static Storage.Objects.List expectListRequest( + GoogleStorage storage, + URI prefix + ) throws IOException + { + Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class); + String bucket = prefix.getAuthority(); + EasyMock.expect( + storage.list(bucket) + ).andReturn(listRequest).once(); + return listRequest; + } + + public static void expectListObjects( + Storage.Objects.List listRequest, + URI prefix, + long maxListingLength, + List objects + ) throws IOException + { + EasyMock.expect(listRequest.setPrefix(StringUtils.maybeRemoveLeadingSlash(prefix.getPath()))).andReturn(listRequest); + EasyMock.expect(listRequest.setMaxResults(maxListingLength)).andReturn(listRequest); + EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).anyTimes(); + + Objects resultObjects = new Objects(); + resultObjects.setItems(objects); + + EasyMock.expect( + listRequest.execute() + ).andReturn(resultObjects).once(); + } + + public static void expectDeleteObjects( + GoogleStorage storage, + List deleteObjectExpected, + Map deleteObjectToException + ) throws IOException + { + Map> requestToResultExpectationSetter = new HashMap<>(); + for (Map.Entry deleteObjectAndException : deleteObjectToException.entrySet()) { + StorageObject deleteObject = deleteObjectAndException.getKey(); + Exception exception = deleteObjectAndException.getValue(); + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject); + if (resultExpectationSetter == null) { + storage.delete(deleteObject.getBucket(), deleteObject.getName()); + resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception); + requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter); + } else { + resultExpectationSetter.andThrow(exception); + } + } + + for (StorageObject deleteObject : deleteObjectExpected) { + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject); + if (resultExpectationSetter == null) { + storage.delete(deleteObject.getBucket(), deleteObject.getName()); + resultExpectationSetter = EasyMock.expectLastCall(); + requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter); + } + resultExpectationSetter.andVoid(); + } + } +} diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java index 75fadf6e17e..838b47174a2 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java @@ -165,8 +165,10 @@ public class S3TaskLogs implements TaskLogs @Override public void killAll() throws IOException { - log.info("Deleting all task logs from s3 location [bucket: %s prefix: %s].", - config.getS3Bucket(), config.getS3Prefix() + log.info( + "Deleting all task logs from s3 location [bucket: '%s' prefix: '%s'].", + config.getS3Bucket(), + config.getS3Prefix() ); long now = timeSupplier.getAsLong(); @@ -176,8 +178,11 @@ public class S3TaskLogs implements TaskLogs @Override public void killOlderThan(long timestamp) throws IOException { - log.info("Deleting all task logs from s3 location [bucket: '%s' prefix: '%s'] older than %s.", - config.getS3Bucket(), config.getS3Prefix(), new Date(timestamp) + log.info( + "Deleting all task logs from s3 location [bucket: '%s' prefix: '%s'] older than %s.", + config.getS3Bucket(), + config.getS3Prefix(), + new Date(timestamp) ); try { S3Utils.deleteObjectsInPath( diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index e19639a41d7..97fde22b03f 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -204,6 +204,15 @@ public class S3Utils return objectSummary; } + /** + * Delete the files from S3 in a specified bucket, matching a specified prefix and filter + * @param s3Client s3 client + * @param config specifies the configuration to use when finding matching files in S3 to delete + * @param bucket s3 bucket + * @param prefix the file prefix + * @param filter function which returns true if the prefix file found should be deleted and false otherwise. + * @throws Exception + */ public static void deleteObjectsInPath( ServerSideEncryptingAmazonS3 s3Client, S3InputDataConfig config, @@ -238,7 +247,7 @@ public class S3Utils } } - public static void deleteBucketKeys( + private static void deleteBucketKeys( ServerSideEncryptingAmazonS3 s3Client, String bucket, List keysToDelete