diff --git a/extensions-core/azure-extensions/pom.xml b/extensions-core/azure-extensions/pom.xml
index ba6e3740f18..7b321bcd3bc 100644
--- a/extensions-core/azure-extensions/pom.xml
+++ b/extensions-core/azure-extensions/pom.xml
@@ -180,27 +180,27 @@
INSTRUCTION
COVEREDRATIO
- 0.85
+ 0.86
LINE
COVEREDRATIO
- 0.84
+ 0.86
BRANCH
COVEREDRATIO
- 0.87
+ 0.89
COMPLEXITY
COVEREDRATIO
- 0.79
+ 0.80
METHOD
COVEREDRATIO
- 0.78
+ 0.79
CLASS
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
index 19f9bbf6bd0..e4cfc359217 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
@@ -19,6 +19,7 @@
package org.apache.druid.storage.azure;
+import com.google.common.base.Predicates;
import com.google.inject.Inject;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.java.util.common.MapUtils;
@@ -27,6 +28,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
+import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.Map;
@@ -38,14 +40,26 @@ public class AzureDataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(AzureDataSegmentKiller.class);
+ private final AzureDataSegmentConfig segmentConfig;
+ private final AzureInputDataConfig inputDataConfig;
+ private final AzureAccountConfig accountConfig;
private final AzureStorage azureStorage;
+ private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
@Inject
public AzureDataSegmentKiller(
- final AzureStorage azureStorage
+ AzureDataSegmentConfig segmentConfig,
+ AzureInputDataConfig inputDataConfig,
+ AzureAccountConfig accountConfig,
+ final AzureStorage azureStorage,
+ AzureCloudBlobIterableFactory azureCloudBlobIterableFactory
)
{
+ this.segmentConfig = segmentConfig;
+ this.inputDataConfig = inputDataConfig;
+ this.accountConfig = accountConfig;
this.azureStorage = azureStorage;
+ this.azureCloudBlobIterableFactory = azureCloudBlobIterableFactory;
}
@Override
@@ -72,9 +86,28 @@ public class AzureDataSegmentKiller implements DataSegmentKiller
}
@Override
- public void killAll()
+ public void killAll() throws IOException
{
- throw new UnsupportedOperationException("not implemented");
+ log.info(
+ "Deleting all segment files from Azure storage location [bucket: '%s' prefix: '%s']",
+ segmentConfig.getContainer(),
+ segmentConfig.getPrefix()
+ );
+ try {
+ AzureUtils.deleteObjectsInPath(
+ azureStorage,
+ inputDataConfig,
+ accountConfig,
+ azureCloudBlobIterableFactory,
+ segmentConfig.getContainer(),
+ segmentConfig.getPrefix(),
+ Predicates.alwaysTrue()
+ );
+ }
+ catch (Exception e) {
+ log.error("Error occurred while deleting segment files from Azure. Error: %s", e.getMessage());
+ throw new IOException(e);
+ }
}
}
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
index 1d2fe864933..46e081eea38 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.microsoft.azure.storage.StorageException;
+import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@@ -32,6 +33,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
+import java.util.Date;
/**
* Deals with reading and writing task logs stored in Azure.
@@ -42,13 +44,27 @@ public class AzureTaskLogs implements TaskLogs
private static final Logger log = new Logger(AzureTaskLogs.class);
private final AzureTaskLogsConfig config;
+ private final AzureInputDataConfig inputDataConfig;
+ private final AzureAccountConfig accountConfig;
private final AzureStorage azureStorage;
+ private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
+ private final CurrentTimeMillisSupplier timeSupplier;
@Inject
- public AzureTaskLogs(AzureTaskLogsConfig config, AzureStorage azureStorage)
+ public AzureTaskLogs(
+ AzureTaskLogsConfig config,
+ AzureInputDataConfig inputDataConfig,
+ AzureAccountConfig accountConfig,
+ AzureStorage azureStorage,
+ AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
+ CurrentTimeMillisSupplier timeSupplier)
{
this.config = config;
+ this.inputDataConfig = inputDataConfig;
this.azureStorage = azureStorage;
+ this.accountConfig = accountConfig;
+ this.azureCloudBlobIterableFactory = azureCloudBlobIterableFactory;
+ this.timeSupplier = timeSupplier;
}
@Override
@@ -151,14 +167,41 @@ public class AzureTaskLogs implements TaskLogs
}
@Override
- public void killAll()
+ public void killAll() throws IOException
{
- throw new UnsupportedOperationException("not implemented");
+ log.info(
+ "Deleting all task logs from Azure storage location [bucket: %s prefix: %s].",
+ config.getContainer(),
+ config.getPrefix()
+ );
+
+ long now = timeSupplier.getAsLong();
+ killOlderThan(now);
}
@Override
- public void killOlderThan(long timestamp)
+ public void killOlderThan(long timestamp) throws IOException
{
- throw new UnsupportedOperationException("not implemented");
+ log.info(
+ "Deleting all task logs from Azure storage location [bucket: '%s' prefix: '%s'] older than %s.",
+ config.getContainer(),
+ config.getPrefix(),
+ new Date(timestamp)
+ );
+ try {
+ AzureUtils.deleteObjectsInPath(
+ azureStorage,
+ inputDataConfig,
+ accountConfig,
+ azureCloudBlobIterableFactory,
+ config.getContainer(),
+ config.getPrefix(),
+ (object) -> object.getLastModifed().getTime() < timestamp
+ );
+ }
+ catch (Exception e) {
+ log.error("Error occurred while deleting task log files from Azure. Error: %s", e.getMessage());
+ throw new IOException(e);
+ }
}
}
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
index 2a79ec87e1b..63322404f08 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
@@ -21,14 +21,18 @@ package org.apache.druid.storage.azure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
import com.microsoft.azure.storage.StorageException;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.RetryUtils.Task;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Iterator;
/**
* Utility class for miscellaneous things involving Azure.
@@ -96,6 +100,55 @@ public class AzureUtils
}
}
+ /**
+ * Delete the files from Azure Storage in a specified bucket, matching a specified prefix and filter
+ *
+ * @param storage Azure Storage client
+ * @param config specifies the configuration to use when finding matching files in Azure Storage to delete
+ * @param bucket Azure Storage bucket
+ * @param prefix the file prefix
+ * @param filter function which returns true if the prefix file found should be deleted and false otherwise.
+ * @throws Exception
+ */
+ public static void deleteObjectsInPath(
+ AzureStorage storage,
+ AzureInputDataConfig config,
+ AzureAccountConfig accountConfig,
+ AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
+ String bucket,
+ String prefix,
+ Predicate filter
+ )
+ throws Exception
+ {
+ AzureCloudBlobIterable azureCloudBlobIterable =
+ azureCloudBlobIterableFactory.create(ImmutableList.of(new CloudObjectLocation(
+ bucket,
+ prefix
+ ).toUri("azure")), config.getMaxListingLength());
+ Iterator iterator = azureCloudBlobIterable.iterator();
+
+ while (iterator.hasNext()) {
+ final CloudBlobHolder nextObject = iterator.next();
+ if (filter.apply(nextObject)) {
+ deleteBucketKeys(storage, accountConfig.getMaxTries(), nextObject.getContainerName(), nextObject.getName());
+ }
+ }
+ }
+
+ private static void deleteBucketKeys(
+ AzureStorage storage,
+ int maxTries,
+ String bucket,
+ String prefix
+ ) throws Exception
+ {
+ AzureUtils.retryAzureOperation(() -> {
+ storage.emptyCloudBlobDirectory(bucket, prefix);
+ return null;
+ }, maxTries);
+ }
+
static T retryAzureOperation(Task f, int maxTries) throws Exception
{
return RetryUtils.retry(f, AZURE_RETRY, maxTries);
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java
index 910db5a893d..3391c3c7dd1 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java
@@ -23,6 +23,7 @@ import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import java.net.URISyntaxException;
+import java.util.Date;
/**
* Wrapper for {@link CloudBlob}. Used to make testing easier, since {@link CloudBlob}
@@ -51,4 +52,9 @@ public class CloudBlobHolder
{
return delegate.getProperties().getLength();
}
+
+ public Date getLastModifed()
+ {
+ return delegate.getProperties().getLastModified();
+ }
}
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
index 3ad3842f89f..fc78b2d3a6b 100644
--- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
@@ -19,18 +19,24 @@
package org.apache.druid.storage.azure;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.StorageExtendedErrorInformation;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -39,7 +45,20 @@ import java.util.List;
public class AzureDataSegmentKillerTest extends EasyMockSupport
{
private static final String CONTAINER_NAME = "container";
+ private static final String CONTAINER = "test";
+ private static final String PREFIX = "test/log";
+ private static final int MAX_TRIES = 3;
private static final String BLOB_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
+ private static final int MAX_KEYS = 1;
+ private static final long TIME_0 = 0L;
+ private static final long TIME_1 = 1L;
+ private static final long TIME_NOW = 2L;
+ private static final long TIME_FUTURE = 3L;
+ private static final String KEY_1 = "key1";
+ private static final String KEY_2 = "key2";
+ private static final URI PREFIX_URI = URI.create(StringUtils.format("azure://%s/%s", CONTAINER, PREFIX));
+ private static final Exception RECOVERABLE_EXCEPTION = new StorageException("", "", null);
+ private static final Exception NON_RECOVERABLE_EXCEPTION = new URISyntaxException("", "");
private static final DataSegment DATA_SEGMENT = new DataSegment(
"test",
@@ -56,12 +75,20 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
private static final StorageExtendedErrorInformation NULL_STORAGE_EXTENDED_ERROR_INFORMATION = null;
private static final StorageExtendedErrorInformation STORAGE_EXTENDED_ERROR_INFORMATION = new StorageExtendedErrorInformation();
+ private AzureDataSegmentConfig segmentConfig;
+ private AzureInputDataConfig inputDataConfig;
+ private AzureAccountConfig accountConfig;
private AzureStorage azureStorage;
+ private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
@Before
public void before()
{
+ segmentConfig = createMock(AzureDataSegmentConfig.class);
+ inputDataConfig = createMock(AzureInputDataConfig.class);
+ accountConfig = createMock(AzureAccountConfig.class);
azureStorage = createMock(AzureStorage.class);
+ azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class);
}
@Test
@@ -75,7 +102,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
replayAll();
- AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage);
+ AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.kill(DATA_SEGMENT);
@@ -114,18 +141,127 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
replayAll();
- AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage);
+ AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.kill(DATA_SEGMENT);
verifyAll();
}
- @Test(expected = UnsupportedOperationException.class)
- public void test_killAll_throwsUnsupportedOperationException()
+ @Test
+ public void test_killAll_noException_deletesAllSegments() throws Exception
{
- AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage);
+ EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce();
+ EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce();
+ EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
+ EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
+
+ CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
+ CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_1);
+
+ AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
+ azureCloudBlobIterableFactory,
+ MAX_KEYS,
+ PREFIX_URI,
+ ImmutableList.of(object1, object2));
+
+ EasyMock.replay(object1, object2);
+ AzureTestUtils.expectDeleteObjects(
+ azureStorage,
+ ImmutableList.of(object1, object2),
+ ImmutableMap.of());
+ EasyMock.replay(segmentConfig, inputDataConfig, accountConfig, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
+ AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.killAll();
+ EasyMock.verify(segmentConfig, inputDataConfig, accountConfig, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
+ }
+
+ @Test
+ public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegments() throws Exception
+ {
+ EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce();
+ EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce();
+ EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
+ EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
+
+ CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
+
+ AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
+ azureCloudBlobIterableFactory,
+ MAX_KEYS,
+ PREFIX_URI,
+ ImmutableList.of(object1));
+
+ EasyMock.replay(object1);
+ AzureTestUtils.expectDeleteObjects(
+ azureStorage,
+ ImmutableList.of(object1),
+ ImmutableMap.of(object1, RECOVERABLE_EXCEPTION));
+ EasyMock.replay(segmentConfig, inputDataConfig, accountConfig, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
+ AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
+ killer.killAll();
+ EasyMock.verify(segmentConfig, inputDataConfig, accountConfig, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
+ }
+
+ @Test
+ public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSegments() throws Exception
+ {
+ boolean ioExceptionThrown = false;
+ CloudBlobHolder object1 = null;
+ AzureCloudBlobIterable azureCloudBlobIterable = null;
+ try {
+ EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce();
+ EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce();
+ EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
+ EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
+
+ object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
+
+ azureCloudBlobIterable = AzureTestUtils.expectListObjects(
+ azureCloudBlobIterableFactory,
+ MAX_KEYS,
+ PREFIX_URI,
+ ImmutableList.of(object1)
+ );
+
+ EasyMock.replay(object1);
+ AzureTestUtils.expectDeleteObjects(
+ azureStorage,
+ ImmutableList.of(),
+ ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
+ );
+ EasyMock.replay(
+ segmentConfig,
+ inputDataConfig,
+ accountConfig,
+ azureCloudBlobIterable,
+ azureCloudBlobIterableFactory,
+ azureStorage
+ );
+ AzureDataSegmentKiller killer = new AzureDataSegmentKiller(
+ segmentConfig,
+ inputDataConfig,
+ accountConfig,
+ azureStorage,
+ azureCloudBlobIterableFactory
+ );
+ killer.killAll();
+ }
+ catch (IOException e) {
+ ioExceptionThrown = true;
+ }
+
+ Assert.assertTrue(ioExceptionThrown);
+
+ EasyMock.verify(
+ segmentConfig,
+ inputDataConfig,
+ accountConfig,
+ object1,
+ azureCloudBlobIterable,
+ azureCloudBlobIterableFactory,
+ azureStorage
+ );
}
private void common_test_kill_StorageExceptionExtendedError_throwsException(StorageExtendedErrorInformation storageExtendedErrorInformation)
@@ -145,7 +281,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
replayAll();
- AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage);
+ AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.kill(DATA_SEGMENT);
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
index a3733afdd93..519a6e923f1 100644
--- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
@@ -20,10 +20,15 @@
package org.apache.druid.storage.azure;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
+import com.microsoft.azure.storage.StorageException;
import org.apache.commons.io.IOUtils;
+import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.After;
@@ -35,6 +40,7 @@ import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
+import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
@@ -45,16 +51,41 @@ public class AzureTaskLogsTest extends EasyMockSupport
private static final String PREFIX = "test/log";
private static final String TASK_ID = "taskid";
private static final String TASK_ID_NOT_FOUND = "taskidNotFound";
- private static final AzureTaskLogsConfig AZURE_TASK_LOGS_CONFIG = new AzureTaskLogsConfig(CONTAINER, PREFIX, 3);
+ private static final int MAX_TRIES = 3;
+ private static final AzureTaskLogsConfig AZURE_TASK_LOGS_CONFIG = new AzureTaskLogsConfig(CONTAINER, PREFIX, MAX_TRIES);
+ private static final int MAX_KEYS = 1;
+ private static final long TIME_0 = 0L;
+ private static final long TIME_1 = 1L;
+ private static final long TIME_NOW = 2L;
+ private static final long TIME_FUTURE = 3L;
+ private static final String KEY_1 = "key1";
+ private static final String KEY_2 = "key2";
+ private static final URI PREFIX_URI = URI.create(StringUtils.format("azure://%s/%s", CONTAINER, PREFIX));
+ private static final Exception RECOVERABLE_EXCEPTION = new StorageException("", "", null);
+ private static final Exception NON_RECOVERABLE_EXCEPTION = new URISyntaxException("", "");
+ private AzureInputDataConfig inputDataConfig;
+ private AzureAccountConfig accountConfig;
private AzureStorage azureStorage;
+ private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
+ private CurrentTimeMillisSupplier timeSupplier;
private AzureTaskLogs azureTaskLogs;
@Before
public void before()
{
+ inputDataConfig = createMock(AzureInputDataConfig.class);
+ accountConfig = createMock(AzureAccountConfig.class);
azureStorage = createMock(AzureStorage.class);
- azureTaskLogs = new AzureTaskLogs(AZURE_TASK_LOGS_CONFIG, azureStorage);
+ azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class);
+ timeSupplier = createMock(CurrentTimeMillisSupplier.class);
+ azureTaskLogs = new AzureTaskLogs(
+ AZURE_TASK_LOGS_CONFIG,
+ inputDataConfig,
+ accountConfig,
+ azureStorage,
+ azureCloudBlobIterableFactory,
+ timeSupplier);
}
@@ -292,17 +323,214 @@ public class AzureTaskLogsTest extends EasyMockSupport
verifyAll();
}
- @Test (expected = UnsupportedOperationException.class)
- public void test_killAll_throwsUnsupportedOperationException()
+ @Test
+ public void test_killAll_noException_deletesAllTaskLogs() throws Exception
{
+ EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
+ EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
+ EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
+
+ CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
+ CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_1);
+
+ AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
+ azureCloudBlobIterableFactory,
+ MAX_KEYS,
+ PREFIX_URI,
+ ImmutableList.of(object1, object2));
+
+ EasyMock.replay(object1, object2);
+ AzureTestUtils.expectDeleteObjects(
+ azureStorage,
+ ImmutableList.of(object1, object2),
+ ImmutableMap.of());
+ EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
azureTaskLogs.killAll();
+ EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
}
+ @Test
+ public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws Exception
+ {
+ EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
+ EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
+ EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
+
+ CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
+
+ AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
+ azureCloudBlobIterableFactory,
+ MAX_KEYS,
+ PREFIX_URI,
+ ImmutableList.of(object1));
+
+ EasyMock.replay(object1);
+ AzureTestUtils.expectDeleteObjects(
+ azureStorage,
+ ImmutableList.of(object1),
+ ImmutableMap.of(object1, RECOVERABLE_EXCEPTION));
+ EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
+ azureTaskLogs.killAll();
+ EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
+ }
+
+ @Test
+ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() throws Exception
+ {
+ boolean ioExceptionThrown = false;
+ CloudBlobHolder object1 = null;
+ AzureCloudBlobIterable azureCloudBlobIterable = null;
+ try {
+ EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
+ EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
+ EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
+
+ object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
+
+ azureCloudBlobIterable = AzureTestUtils.expectListObjects(
+ azureCloudBlobIterableFactory,
+ MAX_KEYS,
+ PREFIX_URI,
+ ImmutableList.of(object1)
+ );
+
+ EasyMock.replay(object1);
+ AzureTestUtils.expectDeleteObjects(
+ azureStorage,
+ ImmutableList.of(),
+ ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
+ );
+ EasyMock.replay(
+ inputDataConfig,
+ accountConfig,
+ timeSupplier,
+ azureCloudBlobIterable,
+ azureCloudBlobIterableFactory,
+ azureStorage
+ );
+ azureTaskLogs.killAll();
+ }
+ catch (IOException e) {
+ ioExceptionThrown = true;
+ }
+ Assert.assertTrue(ioExceptionThrown);
+ EasyMock.verify(
+ inputDataConfig,
+ accountConfig,
+ timeSupplier,
+ object1,
+ azureCloudBlobIterable,
+ azureCloudBlobIterableFactory,
+ azureStorage
+ );
+ }
+
+ @Test
+ public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws Exception
+ {
+ EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
+ EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
+
+ CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
+ CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_FUTURE);
+
+ AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
+ azureCloudBlobIterableFactory,
+ MAX_KEYS,
+ PREFIX_URI,
+ ImmutableList.of(object1, object2));
+
+ EasyMock.replay(object1, object2);
+ AzureTestUtils.expectDeleteObjects(
+ azureStorage,
+ ImmutableList.of(object1),
+ ImmutableMap.of());
+ EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
+ azureTaskLogs.killOlderThan(TIME_NOW);
+ EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
+ }
+
+ @Test
+ public void test_killOlderThan_recoverableExceptionWhenDeletingObjects_deletesOnlyTaskLogsOlderThan() throws Exception
+ {
+ EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
+ EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
+
+ CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
+
+ AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
+ azureCloudBlobIterableFactory,
+ MAX_KEYS,
+ PREFIX_URI,
+ ImmutableList.of(object1));
+
+ EasyMock.replay(object1);
+ AzureTestUtils.expectDeleteObjects(
+ azureStorage,
+ ImmutableList.of(object1),
+ ImmutableMap.of(object1, RECOVERABLE_EXCEPTION));
+ EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
+ azureTaskLogs.killOlderThan(TIME_NOW);
+ EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
+ }
+
+ @Test
+ public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() throws Exception
+ {
+ boolean ioExceptionThrown = false;
+ CloudBlobHolder object1 = null;
+ AzureCloudBlobIterable azureCloudBlobIterable = null;
+ try {
+ EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
+ EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
+
+ object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
+
+ azureCloudBlobIterable = AzureTestUtils.expectListObjects(
+ azureCloudBlobIterableFactory,
+ MAX_KEYS,
+ PREFIX_URI,
+ ImmutableList.of(object1)
+ );
+
+ EasyMock.replay(object1);
+ AzureTestUtils.expectDeleteObjects(
+ azureStorage,
+ ImmutableList.of(),
+ ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
+ );
+ EasyMock.replay(
+ inputDataConfig,
+ accountConfig,
+ timeSupplier,
+ azureCloudBlobIterable,
+ azureCloudBlobIterableFactory,
+ azureStorage
+ );
+ azureTaskLogs.killOlderThan(TIME_NOW);
+ }
+ catch (IOException e) {
+ ioExceptionThrown = true;
+ }
+ Assert.assertTrue(ioExceptionThrown);
+ EasyMock.verify(
+ inputDataConfig,
+ accountConfig,
+ timeSupplier,
+ object1,
+ azureCloudBlobIterable,
+ azureCloudBlobIterableFactory,
+ azureStorage
+ );
+ }
+
+ /*
@Test (expected = UnsupportedOperationException.class)
- public void test_killOlderThan_throwsUnsupportedOperationException()
+ public void test_killOlderThan_throwsUnsupportedOperationException() throws IOException
{
azureTaskLogs.killOlderThan(0);
}
+ */
@After
public void cleanup()
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java
index d73fa3c6175..5c89041b205 100644
--- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java
@@ -19,14 +19,25 @@
package org.apache.druid.storage.azure;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.storage.azure.blob.CloudBlobHolder;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.easymock.IExpectationSetters;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URI;
import java.nio.file.Files;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
-public class AzureTestUtils
+public class AzureTestUtils extends EasyMockSupport
{
public static File createZipTempFile(final String segmentFileName, final String content) throws IOException
{
@@ -40,4 +51,59 @@ public class AzureTestUtils
return zipFile;
}
+
+ public static AzureCloudBlobIterable expectListObjects(
+ AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
+ int maxListingLength,
+ URI PREFIX_URI,
+ List objects)
+ {
+ AzureCloudBlobIterable azureCloudBlobIterable = EasyMock.createMock(AzureCloudBlobIterable.class);
+ EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(objects.iterator());
+ EasyMock.expect(azureCloudBlobIterableFactory.create(ImmutableList.of(PREFIX_URI), maxListingLength)).andReturn(azureCloudBlobIterable);
+ return azureCloudBlobIterable;
+ }
+
+ public static void expectDeleteObjects(
+ AzureStorage storage,
+ List deleteRequestsExpected,
+ Map deleteRequestToException) throws Exception
+ {
+ Map> requestToResultExpectationSetter = new HashMap<>();
+
+ for (Map.Entry requestsAndErrors : deleteRequestToException.entrySet()) {
+ CloudBlobHolder deleteObject = requestsAndErrors.getKey();
+ Exception exception = requestsAndErrors.getValue();
+ IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject);
+ if (resultExpectationSetter == null) {
+ storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName());
+ resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception);
+ requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter);
+ } else {
+ resultExpectationSetter.andThrow(exception);
+ }
+ }
+
+ for (CloudBlobHolder deleteObject : deleteRequestsExpected) {
+ IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject);
+ if (resultExpectationSetter == null) {
+ storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName());
+ resultExpectationSetter = EasyMock.expectLastCall();
+ requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter);
+ }
+ resultExpectationSetter.andReturn(null);
+ }
+ }
+
+ public static CloudBlobHolder newCloudBlobHolder(
+ String container,
+ String prefix,
+ long lastModifiedTimestamp) throws Exception
+ {
+ CloudBlobHolder object = EasyMock.createMock(CloudBlobHolder.class);
+ EasyMock.expect(object.getContainerName()).andReturn(container).anyTimes();
+ EasyMock.expect(object.getName()).andReturn(prefix).anyTimes();
+ EasyMock.expect(object.getLastModifed()).andReturn(new Date(lastModifiedTimestamp)).anyTimes();
+ return object;
+ }
}