From b18dd2b7a935b7802a32ca099b5bd771126788fb Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 18 Mar 2020 17:59:17 -0700 Subject: [PATCH] Ability to Delete task logs and segments from Azure Storage (#9523) * Ability to Delete task logs and segments from Azure Storage * implement ability to delete all tasks logs or all task logs written before a particular date when written to Azure storage * implement ability to delete all segments from Azure deep storage * * Address review comments --- extensions-core/azure-extensions/pom.xml | 10 +- .../storage/azure/AzureDataSegmentKiller.java | 39 ++- .../druid/storage/azure/AzureTaskLogs.java | 53 +++- .../druid/storage/azure/AzureUtils.java | 53 ++++ .../storage/azure/blob/CloudBlobHolder.java | 6 + .../azure/AzureDataSegmentKillerTest.java | 148 ++++++++++- .../storage/azure/AzureTaskLogsTest.java | 238 +++++++++++++++++- .../druid/storage/azure/AzureTestUtils.java | 68 ++++- 8 files changed, 590 insertions(+), 25 deletions(-) diff --git a/extensions-core/azure-extensions/pom.xml b/extensions-core/azure-extensions/pom.xml index ba6e3740f18..7b321bcd3bc 100644 --- a/extensions-core/azure-extensions/pom.xml +++ b/extensions-core/azure-extensions/pom.xml @@ -180,27 +180,27 @@ INSTRUCTION COVEREDRATIO - 0.85 + 0.86 LINE COVEREDRATIO - 0.84 + 0.86 BRANCH COVEREDRATIO - 0.87 + 0.89 COMPLEXITY COVEREDRATIO - 0.79 + 0.80 METHOD COVEREDRATIO - 0.78 + 0.79 CLASS diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java index 19f9bbf6bd0..e4cfc359217 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java @@ -19,6 +19,7 @@ package org.apache.druid.storage.azure; +import com.google.common.base.Predicates; import com.google.inject.Inject; import com.microsoft.azure.storage.StorageException; import org.apache.druid.java.util.common.MapUtils; @@ -27,6 +28,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.timeline.DataSegment; +import java.io.IOException; import java.net.URISyntaxException; import java.nio.file.Paths; import java.util.Map; @@ -38,14 +40,26 @@ public class AzureDataSegmentKiller implements DataSegmentKiller { private static final Logger log = new Logger(AzureDataSegmentKiller.class); + private final AzureDataSegmentConfig segmentConfig; + private final AzureInputDataConfig inputDataConfig; + private final AzureAccountConfig accountConfig; private final AzureStorage azureStorage; + private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory; @Inject public AzureDataSegmentKiller( - final AzureStorage azureStorage + AzureDataSegmentConfig segmentConfig, + AzureInputDataConfig inputDataConfig, + AzureAccountConfig accountConfig, + final AzureStorage azureStorage, + AzureCloudBlobIterableFactory azureCloudBlobIterableFactory ) { + this.segmentConfig = segmentConfig; + this.inputDataConfig = inputDataConfig; + this.accountConfig = accountConfig; this.azureStorage = azureStorage; + this.azureCloudBlobIterableFactory = azureCloudBlobIterableFactory; } @Override @@ -72,9 +86,28 @@ public class AzureDataSegmentKiller implements DataSegmentKiller } @Override - public void killAll() + public void killAll() throws IOException { - throw new UnsupportedOperationException("not implemented"); + log.info( + "Deleting all segment files from Azure storage location [bucket: '%s' prefix: '%s']", + segmentConfig.getContainer(), + segmentConfig.getPrefix() + ); + try { + AzureUtils.deleteObjectsInPath( + azureStorage, + inputDataConfig, + accountConfig, + azureCloudBlobIterableFactory, + segmentConfig.getContainer(), + segmentConfig.getPrefix(), + Predicates.alwaysTrue() + ); + } + catch (Exception e) { + log.error("Error occurred while deleting segment files from Azure. Error: %s", e.getMessage()); + throw new IOException(e); + } } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java index 1d2fe864933..46e081eea38 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java @@ -23,6 +23,7 @@ import com.google.common.base.Optional; import com.google.common.io.ByteSource; import com.google.inject.Inject; import com.microsoft.azure.storage.StorageException; +import org.apache.druid.common.utils.CurrentTimeMillisSupplier; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -32,6 +33,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; +import java.util.Date; /** * Deals with reading and writing task logs stored in Azure. @@ -42,13 +44,27 @@ public class AzureTaskLogs implements TaskLogs private static final Logger log = new Logger(AzureTaskLogs.class); private final AzureTaskLogsConfig config; + private final AzureInputDataConfig inputDataConfig; + private final AzureAccountConfig accountConfig; private final AzureStorage azureStorage; + private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory; + private final CurrentTimeMillisSupplier timeSupplier; @Inject - public AzureTaskLogs(AzureTaskLogsConfig config, AzureStorage azureStorage) + public AzureTaskLogs( + AzureTaskLogsConfig config, + AzureInputDataConfig inputDataConfig, + AzureAccountConfig accountConfig, + AzureStorage azureStorage, + AzureCloudBlobIterableFactory azureCloudBlobIterableFactory, + CurrentTimeMillisSupplier timeSupplier) { this.config = config; + this.inputDataConfig = inputDataConfig; this.azureStorage = azureStorage; + this.accountConfig = accountConfig; + this.azureCloudBlobIterableFactory = azureCloudBlobIterableFactory; + this.timeSupplier = timeSupplier; } @Override @@ -151,14 +167,41 @@ public class AzureTaskLogs implements TaskLogs } @Override - public void killAll() + public void killAll() throws IOException { - throw new UnsupportedOperationException("not implemented"); + log.info( + "Deleting all task logs from Azure storage location [bucket: %s prefix: %s].", + config.getContainer(), + config.getPrefix() + ); + + long now = timeSupplier.getAsLong(); + killOlderThan(now); } @Override - public void killOlderThan(long timestamp) + public void killOlderThan(long timestamp) throws IOException { - throw new UnsupportedOperationException("not implemented"); + log.info( + "Deleting all task logs from Azure storage location [bucket: '%s' prefix: '%s'] older than %s.", + config.getContainer(), + config.getPrefix(), + new Date(timestamp) + ); + try { + AzureUtils.deleteObjectsInPath( + azureStorage, + inputDataConfig, + accountConfig, + azureCloudBlobIterableFactory, + config.getContainer(), + config.getPrefix(), + (object) -> object.getLastModifed().getTime() < timestamp + ); + } + catch (Exception e) { + log.error("Error occurred while deleting task log files from Azure. Error: %s", e.getMessage()); + throw new IOException(e); + } } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java index 2a79ec87e1b..63322404f08 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java @@ -21,14 +21,18 @@ package org.apache.druid.storage.azure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; import com.microsoft.azure.storage.StorageException; +import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.RetryUtils.Task; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.storage.azure.blob.CloudBlobHolder; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Iterator; /** * Utility class for miscellaneous things involving Azure. @@ -96,6 +100,55 @@ public class AzureUtils } } + /** + * Delete the files from Azure Storage in a specified bucket, matching a specified prefix and filter + * + * @param storage Azure Storage client + * @param config specifies the configuration to use when finding matching files in Azure Storage to delete + * @param bucket Azure Storage bucket + * @param prefix the file prefix + * @param filter function which returns true if the prefix file found should be deleted and false otherwise. + * @throws Exception + */ + public static void deleteObjectsInPath( + AzureStorage storage, + AzureInputDataConfig config, + AzureAccountConfig accountConfig, + AzureCloudBlobIterableFactory azureCloudBlobIterableFactory, + String bucket, + String prefix, + Predicate filter + ) + throws Exception + { + AzureCloudBlobIterable azureCloudBlobIterable = + azureCloudBlobIterableFactory.create(ImmutableList.of(new CloudObjectLocation( + bucket, + prefix + ).toUri("azure")), config.getMaxListingLength()); + Iterator iterator = azureCloudBlobIterable.iterator(); + + while (iterator.hasNext()) { + final CloudBlobHolder nextObject = iterator.next(); + if (filter.apply(nextObject)) { + deleteBucketKeys(storage, accountConfig.getMaxTries(), nextObject.getContainerName(), nextObject.getName()); + } + } + } + + private static void deleteBucketKeys( + AzureStorage storage, + int maxTries, + String bucket, + String prefix + ) throws Exception + { + AzureUtils.retryAzureOperation(() -> { + storage.emptyCloudBlobDirectory(bucket, prefix); + return null; + }, maxTries); + } + static T retryAzureOperation(Task f, int maxTries) throws Exception { return RetryUtils.retry(f, AZURE_RETRY, maxTries); diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java index 910db5a893d..3391c3c7dd1 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java @@ -23,6 +23,7 @@ import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlob; import java.net.URISyntaxException; +import java.util.Date; /** * Wrapper for {@link CloudBlob}. Used to make testing easier, since {@link CloudBlob} @@ -51,4 +52,9 @@ public class CloudBlobHolder { return delegate.getProperties().getLength(); } + + public Date getLastModifed() + { + return delegate.getProperties().getLastModified(); + } } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java index 3ad3842f89f..fc78b2d3a6b 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java @@ -19,18 +19,24 @@ package org.apache.druid.storage.azure; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageExtendedErrorInformation; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.storage.azure.blob.CloudBlobHolder; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Paths; import java.util.ArrayList; @@ -39,7 +45,20 @@ import java.util.List; public class AzureDataSegmentKillerTest extends EasyMockSupport { private static final String CONTAINER_NAME = "container"; + private static final String CONTAINER = "test"; + private static final String PREFIX = "test/log"; + private static final int MAX_TRIES = 3; private static final String BLOB_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"; + private static final int MAX_KEYS = 1; + private static final long TIME_0 = 0L; + private static final long TIME_1 = 1L; + private static final long TIME_NOW = 2L; + private static final long TIME_FUTURE = 3L; + private static final String KEY_1 = "key1"; + private static final String KEY_2 = "key2"; + private static final URI PREFIX_URI = URI.create(StringUtils.format("azure://%s/%s", CONTAINER, PREFIX)); + private static final Exception RECOVERABLE_EXCEPTION = new StorageException("", "", null); + private static final Exception NON_RECOVERABLE_EXCEPTION = new URISyntaxException("", ""); private static final DataSegment DATA_SEGMENT = new DataSegment( "test", @@ -56,12 +75,20 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport private static final StorageExtendedErrorInformation NULL_STORAGE_EXTENDED_ERROR_INFORMATION = null; private static final StorageExtendedErrorInformation STORAGE_EXTENDED_ERROR_INFORMATION = new StorageExtendedErrorInformation(); + private AzureDataSegmentConfig segmentConfig; + private AzureInputDataConfig inputDataConfig; + private AzureAccountConfig accountConfig; private AzureStorage azureStorage; + private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory; @Before public void before() { + segmentConfig = createMock(AzureDataSegmentConfig.class); + inputDataConfig = createMock(AzureInputDataConfig.class); + accountConfig = createMock(AzureAccountConfig.class); azureStorage = createMock(AzureStorage.class); + azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class); } @Test @@ -75,7 +102,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport replayAll(); - AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage); + AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory); killer.kill(DATA_SEGMENT); @@ -114,18 +141,127 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport replayAll(); - AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage); + AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory); killer.kill(DATA_SEGMENT); verifyAll(); } - @Test(expected = UnsupportedOperationException.class) - public void test_killAll_throwsUnsupportedOperationException() + @Test + public void test_killAll_noException_deletesAllSegments() throws Exception { - AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage); + EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce(); + EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce(); + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + + CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); + CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_1); + + AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects( + azureCloudBlobIterableFactory, + MAX_KEYS, + PREFIX_URI, + ImmutableList.of(object1, object2)); + + EasyMock.replay(object1, object2); + AzureTestUtils.expectDeleteObjects( + azureStorage, + ImmutableList.of(object1, object2), + ImmutableMap.of()); + EasyMock.replay(segmentConfig, inputDataConfig, accountConfig, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); + AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory); killer.killAll(); + EasyMock.verify(segmentConfig, inputDataConfig, accountConfig, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); + } + + @Test + public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegments() throws Exception + { + EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce(); + EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce(); + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + + CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); + + AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects( + azureCloudBlobIterableFactory, + MAX_KEYS, + PREFIX_URI, + ImmutableList.of(object1)); + + EasyMock.replay(object1); + AzureTestUtils.expectDeleteObjects( + azureStorage, + ImmutableList.of(object1), + ImmutableMap.of(object1, RECOVERABLE_EXCEPTION)); + EasyMock.replay(segmentConfig, inputDataConfig, accountConfig, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); + AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory); + killer.killAll(); + EasyMock.verify(segmentConfig, inputDataConfig, accountConfig, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); + } + + @Test + public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSegments() throws Exception + { + boolean ioExceptionThrown = false; + CloudBlobHolder object1 = null; + AzureCloudBlobIterable azureCloudBlobIterable = null; + try { + EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce(); + EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce(); + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + + object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); + + azureCloudBlobIterable = AzureTestUtils.expectListObjects( + azureCloudBlobIterableFactory, + MAX_KEYS, + PREFIX_URI, + ImmutableList.of(object1) + ); + + EasyMock.replay(object1); + AzureTestUtils.expectDeleteObjects( + azureStorage, + ImmutableList.of(), + ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION) + ); + EasyMock.replay( + segmentConfig, + inputDataConfig, + accountConfig, + azureCloudBlobIterable, + azureCloudBlobIterableFactory, + azureStorage + ); + AzureDataSegmentKiller killer = new AzureDataSegmentKiller( + segmentConfig, + inputDataConfig, + accountConfig, + azureStorage, + azureCloudBlobIterableFactory + ); + killer.killAll(); + } + catch (IOException e) { + ioExceptionThrown = true; + } + + Assert.assertTrue(ioExceptionThrown); + + EasyMock.verify( + segmentConfig, + inputDataConfig, + accountConfig, + object1, + azureCloudBlobIterable, + azureCloudBlobIterableFactory, + azureStorage + ); } private void common_test_kill_StorageExceptionExtendedError_throwsException(StorageExtendedErrorInformation storageExtendedErrorInformation) @@ -145,7 +281,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport replayAll(); - AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage); + AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory); killer.kill(DATA_SEGMENT); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java index a3733afdd93..519a6e923f1 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java @@ -20,10 +20,15 @@ package org.apache.druid.storage.azure; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSource; +import com.microsoft.azure.storage.StorageException; import org.apache.commons.io.IOUtils; +import org.apache.druid.common.utils.CurrentTimeMillisSupplier; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.storage.azure.blob.CloudBlobHolder; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.junit.After; @@ -35,6 +40,7 @@ import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.StringWriter; +import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; @@ -45,16 +51,41 @@ public class AzureTaskLogsTest extends EasyMockSupport private static final String PREFIX = "test/log"; private static final String TASK_ID = "taskid"; private static final String TASK_ID_NOT_FOUND = "taskidNotFound"; - private static final AzureTaskLogsConfig AZURE_TASK_LOGS_CONFIG = new AzureTaskLogsConfig(CONTAINER, PREFIX, 3); + private static final int MAX_TRIES = 3; + private static final AzureTaskLogsConfig AZURE_TASK_LOGS_CONFIG = new AzureTaskLogsConfig(CONTAINER, PREFIX, MAX_TRIES); + private static final int MAX_KEYS = 1; + private static final long TIME_0 = 0L; + private static final long TIME_1 = 1L; + private static final long TIME_NOW = 2L; + private static final long TIME_FUTURE = 3L; + private static final String KEY_1 = "key1"; + private static final String KEY_2 = "key2"; + private static final URI PREFIX_URI = URI.create(StringUtils.format("azure://%s/%s", CONTAINER, PREFIX)); + private static final Exception RECOVERABLE_EXCEPTION = new StorageException("", "", null); + private static final Exception NON_RECOVERABLE_EXCEPTION = new URISyntaxException("", ""); + private AzureInputDataConfig inputDataConfig; + private AzureAccountConfig accountConfig; private AzureStorage azureStorage; + private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory; + private CurrentTimeMillisSupplier timeSupplier; private AzureTaskLogs azureTaskLogs; @Before public void before() { + inputDataConfig = createMock(AzureInputDataConfig.class); + accountConfig = createMock(AzureAccountConfig.class); azureStorage = createMock(AzureStorage.class); - azureTaskLogs = new AzureTaskLogs(AZURE_TASK_LOGS_CONFIG, azureStorage); + azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class); + timeSupplier = createMock(CurrentTimeMillisSupplier.class); + azureTaskLogs = new AzureTaskLogs( + AZURE_TASK_LOGS_CONFIG, + inputDataConfig, + accountConfig, + azureStorage, + azureCloudBlobIterableFactory, + timeSupplier); } @@ -292,17 +323,214 @@ public class AzureTaskLogsTest extends EasyMockSupport verifyAll(); } - @Test (expected = UnsupportedOperationException.class) - public void test_killAll_throwsUnsupportedOperationException() + @Test + public void test_killAll_noException_deletesAllTaskLogs() throws Exception { + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); + + CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); + CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_1); + + AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects( + azureCloudBlobIterableFactory, + MAX_KEYS, + PREFIX_URI, + ImmutableList.of(object1, object2)); + + EasyMock.replay(object1, object2); + AzureTestUtils.expectDeleteObjects( + azureStorage, + ImmutableList.of(object1, object2), + ImmutableMap.of()); + EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); azureTaskLogs.killAll(); + EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); } + @Test + public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws Exception + { + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); + + CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); + + AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects( + azureCloudBlobIterableFactory, + MAX_KEYS, + PREFIX_URI, + ImmutableList.of(object1)); + + EasyMock.replay(object1); + AzureTestUtils.expectDeleteObjects( + azureStorage, + ImmutableList.of(object1), + ImmutableMap.of(object1, RECOVERABLE_EXCEPTION)); + EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); + azureTaskLogs.killAll(); + EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); + } + + @Test + public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() throws Exception + { + boolean ioExceptionThrown = false; + CloudBlobHolder object1 = null; + AzureCloudBlobIterable azureCloudBlobIterable = null; + try { + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); + + object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); + + azureCloudBlobIterable = AzureTestUtils.expectListObjects( + azureCloudBlobIterableFactory, + MAX_KEYS, + PREFIX_URI, + ImmutableList.of(object1) + ); + + EasyMock.replay(object1); + AzureTestUtils.expectDeleteObjects( + azureStorage, + ImmutableList.of(), + ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION) + ); + EasyMock.replay( + inputDataConfig, + accountConfig, + timeSupplier, + azureCloudBlobIterable, + azureCloudBlobIterableFactory, + azureStorage + ); + azureTaskLogs.killAll(); + } + catch (IOException e) { + ioExceptionThrown = true; + } + Assert.assertTrue(ioExceptionThrown); + EasyMock.verify( + inputDataConfig, + accountConfig, + timeSupplier, + object1, + azureCloudBlobIterable, + azureCloudBlobIterableFactory, + azureStorage + ); + } + + @Test + public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws Exception + { + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + + CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); + CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_FUTURE); + + AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects( + azureCloudBlobIterableFactory, + MAX_KEYS, + PREFIX_URI, + ImmutableList.of(object1, object2)); + + EasyMock.replay(object1, object2); + AzureTestUtils.expectDeleteObjects( + azureStorage, + ImmutableList.of(object1), + ImmutableMap.of()); + EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); + azureTaskLogs.killOlderThan(TIME_NOW); + EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); + } + + @Test + public void test_killOlderThan_recoverableExceptionWhenDeletingObjects_deletesOnlyTaskLogsOlderThan() throws Exception + { + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + + CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); + + AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects( + azureCloudBlobIterableFactory, + MAX_KEYS, + PREFIX_URI, + ImmutableList.of(object1)); + + EasyMock.replay(object1); + AzureTestUtils.expectDeleteObjects( + azureStorage, + ImmutableList.of(object1), + ImmutableMap.of(object1, RECOVERABLE_EXCEPTION)); + EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); + azureTaskLogs.killOlderThan(TIME_NOW); + EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); + } + + @Test + public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() throws Exception + { + boolean ioExceptionThrown = false; + CloudBlobHolder object1 = null; + AzureCloudBlobIterable azureCloudBlobIterable = null; + try { + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + + object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); + + azureCloudBlobIterable = AzureTestUtils.expectListObjects( + azureCloudBlobIterableFactory, + MAX_KEYS, + PREFIX_URI, + ImmutableList.of(object1) + ); + + EasyMock.replay(object1); + AzureTestUtils.expectDeleteObjects( + azureStorage, + ImmutableList.of(), + ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION) + ); + EasyMock.replay( + inputDataConfig, + accountConfig, + timeSupplier, + azureCloudBlobIterable, + azureCloudBlobIterableFactory, + azureStorage + ); + azureTaskLogs.killOlderThan(TIME_NOW); + } + catch (IOException e) { + ioExceptionThrown = true; + } + Assert.assertTrue(ioExceptionThrown); + EasyMock.verify( + inputDataConfig, + accountConfig, + timeSupplier, + object1, + azureCloudBlobIterable, + azureCloudBlobIterableFactory, + azureStorage + ); + } + + /* @Test (expected = UnsupportedOperationException.class) - public void test_killOlderThan_throwsUnsupportedOperationException() + public void test_killOlderThan_throwsUnsupportedOperationException() throws IOException { azureTaskLogs.killOlderThan(0); } + */ @After public void cleanup() diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java index d73fa3c6175..5c89041b205 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java @@ -19,14 +19,25 @@ package org.apache.druid.storage.azure; +import com.google.common.collect.ImmutableList; +import org.apache.druid.storage.azure.blob.CloudBlobHolder; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; +import org.easymock.IExpectationSetters; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.nio.file.Files; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; -public class AzureTestUtils +public class AzureTestUtils extends EasyMockSupport { public static File createZipTempFile(final String segmentFileName, final String content) throws IOException { @@ -40,4 +51,59 @@ public class AzureTestUtils return zipFile; } + + public static AzureCloudBlobIterable expectListObjects( + AzureCloudBlobIterableFactory azureCloudBlobIterableFactory, + int maxListingLength, + URI PREFIX_URI, + List objects) + { + AzureCloudBlobIterable azureCloudBlobIterable = EasyMock.createMock(AzureCloudBlobIterable.class); + EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(objects.iterator()); + EasyMock.expect(azureCloudBlobIterableFactory.create(ImmutableList.of(PREFIX_URI), maxListingLength)).andReturn(azureCloudBlobIterable); + return azureCloudBlobIterable; + } + + public static void expectDeleteObjects( + AzureStorage storage, + List deleteRequestsExpected, + Map deleteRequestToException) throws Exception + { + Map> requestToResultExpectationSetter = new HashMap<>(); + + for (Map.Entry requestsAndErrors : deleteRequestToException.entrySet()) { + CloudBlobHolder deleteObject = requestsAndErrors.getKey(); + Exception exception = requestsAndErrors.getValue(); + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject); + if (resultExpectationSetter == null) { + storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName()); + resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception); + requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter); + } else { + resultExpectationSetter.andThrow(exception); + } + } + + for (CloudBlobHolder deleteObject : deleteRequestsExpected) { + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject); + if (resultExpectationSetter == null) { + storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName()); + resultExpectationSetter = EasyMock.expectLastCall(); + requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter); + } + resultExpectationSetter.andReturn(null); + } + } + + public static CloudBlobHolder newCloudBlobHolder( + String container, + String prefix, + long lastModifiedTimestamp) throws Exception + { + CloudBlobHolder object = EasyMock.createMock(CloudBlobHolder.class); + EasyMock.expect(object.getContainerName()).andReturn(container).anyTimes(); + EasyMock.expect(object.getName()).andReturn(prefix).anyTimes(); + EasyMock.expect(object.getLastModifed()).andReturn(new Date(lastModifiedTimestamp)).anyTimes(); + return object; + } }