Ability to Delete task logs and segments from Azure Storage (#9523)

* Ability to Delete task logs and segments from Azure Storage

* implement ability to delete all tasks logs or all task logs
  written before a particular date when written to Azure storage

* implement ability to delete all segments from Azure deep storage

* * Address review comments
This commit is contained in:
zachjsh 2020-03-18 17:59:17 -07:00 committed by GitHub
parent 3b536eea7f
commit b18dd2b7a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 590 additions and 25 deletions

View File

@ -180,27 +180,27 @@
<limit> <limit>
<counter>INSTRUCTION</counter> <counter>INSTRUCTION</counter>
<value>COVEREDRATIO</value> <value>COVEREDRATIO</value>
<minimum>0.85</minimum> <minimum>0.86</minimum>
</limit> </limit>
<limit> <limit>
<counter>LINE</counter> <counter>LINE</counter>
<value>COVEREDRATIO</value> <value>COVEREDRATIO</value>
<minimum>0.84</minimum> <minimum>0.86</minimum>
</limit> </limit>
<limit> <limit>
<counter>BRANCH</counter> <counter>BRANCH</counter>
<value>COVEREDRATIO</value> <value>COVEREDRATIO</value>
<minimum>0.87</minimum> <minimum>0.89</minimum>
</limit> </limit>
<limit> <limit>
<counter>COMPLEXITY</counter> <counter>COMPLEXITY</counter>
<value>COVEREDRATIO</value> <value>COVEREDRATIO</value>
<minimum>0.79</minimum> <minimum>0.80</minimum>
</limit> </limit>
<limit> <limit>
<counter>METHOD</counter> <counter>METHOD</counter>
<value>COVEREDRATIO</value> <value>COVEREDRATIO</value>
<minimum>0.78</minimum> <minimum>0.79</minimum>
</limit> </limit>
<limit> <limit>
<counter>CLASS</counter> <counter>CLASS</counter>

View File

@ -19,6 +19,7 @@
package org.apache.druid.storage.azure; package org.apache.druid.storage.azure;
import com.google.common.base.Predicates;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.MapUtils;
@ -27,6 +28,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Map; import java.util.Map;
@ -38,14 +40,26 @@ public class AzureDataSegmentKiller implements DataSegmentKiller
{ {
private static final Logger log = new Logger(AzureDataSegmentKiller.class); private static final Logger log = new Logger(AzureDataSegmentKiller.class);
private final AzureDataSegmentConfig segmentConfig;
private final AzureInputDataConfig inputDataConfig;
private final AzureAccountConfig accountConfig;
private final AzureStorage azureStorage; private final AzureStorage azureStorage;
private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
@Inject @Inject
public AzureDataSegmentKiller( public AzureDataSegmentKiller(
final AzureStorage azureStorage AzureDataSegmentConfig segmentConfig,
AzureInputDataConfig inputDataConfig,
AzureAccountConfig accountConfig,
final AzureStorage azureStorage,
AzureCloudBlobIterableFactory azureCloudBlobIterableFactory
) )
{ {
this.segmentConfig = segmentConfig;
this.inputDataConfig = inputDataConfig;
this.accountConfig = accountConfig;
this.azureStorage = azureStorage; this.azureStorage = azureStorage;
this.azureCloudBlobIterableFactory = azureCloudBlobIterableFactory;
} }
@Override @Override
@ -72,9 +86,28 @@ public class AzureDataSegmentKiller implements DataSegmentKiller
} }
@Override @Override
public void killAll() public void killAll() throws IOException
{ {
throw new UnsupportedOperationException("not implemented"); log.info(
"Deleting all segment files from Azure storage location [bucket: '%s' prefix: '%s']",
segmentConfig.getContainer(),
segmentConfig.getPrefix()
);
try {
AzureUtils.deleteObjectsInPath(
azureStorage,
inputDataConfig,
accountConfig,
azureCloudBlobIterableFactory,
segmentConfig.getContainer(),
segmentConfig.getPrefix(),
Predicates.alwaysTrue()
);
}
catch (Exception e) {
log.error("Error occurred while deleting segment files from Azure. Error: %s", e.getMessage());
throw new IOException(e);
}
} }
} }

View File

@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
@ -32,6 +33,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Date;
/** /**
* Deals with reading and writing task logs stored in Azure. * Deals with reading and writing task logs stored in Azure.
@ -42,13 +44,27 @@ public class AzureTaskLogs implements TaskLogs
private static final Logger log = new Logger(AzureTaskLogs.class); private static final Logger log = new Logger(AzureTaskLogs.class);
private final AzureTaskLogsConfig config; private final AzureTaskLogsConfig config;
private final AzureInputDataConfig inputDataConfig;
private final AzureAccountConfig accountConfig;
private final AzureStorage azureStorage; private final AzureStorage azureStorage;
private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
private final CurrentTimeMillisSupplier timeSupplier;
@Inject @Inject
public AzureTaskLogs(AzureTaskLogsConfig config, AzureStorage azureStorage) public AzureTaskLogs(
AzureTaskLogsConfig config,
AzureInputDataConfig inputDataConfig,
AzureAccountConfig accountConfig,
AzureStorage azureStorage,
AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
CurrentTimeMillisSupplier timeSupplier)
{ {
this.config = config; this.config = config;
this.inputDataConfig = inputDataConfig;
this.azureStorage = azureStorage; this.azureStorage = azureStorage;
this.accountConfig = accountConfig;
this.azureCloudBlobIterableFactory = azureCloudBlobIterableFactory;
this.timeSupplier = timeSupplier;
} }
@Override @Override
@ -151,14 +167,41 @@ public class AzureTaskLogs implements TaskLogs
} }
@Override @Override
public void killAll() public void killAll() throws IOException
{ {
throw new UnsupportedOperationException("not implemented"); log.info(
"Deleting all task logs from Azure storage location [bucket: %s prefix: %s].",
config.getContainer(),
config.getPrefix()
);
long now = timeSupplier.getAsLong();
killOlderThan(now);
} }
@Override @Override
public void killOlderThan(long timestamp) public void killOlderThan(long timestamp) throws IOException
{ {
throw new UnsupportedOperationException("not implemented"); log.info(
"Deleting all task logs from Azure storage location [bucket: '%s' prefix: '%s'] older than %s.",
config.getContainer(),
config.getPrefix(),
new Date(timestamp)
);
try {
AzureUtils.deleteObjectsInPath(
azureStorage,
inputDataConfig,
accountConfig,
azureCloudBlobIterableFactory,
config.getContainer(),
config.getPrefix(),
(object) -> object.getLastModifed().getTime() < timestamp
);
}
catch (Exception e) {
log.error("Error occurred while deleting task log files from Azure. Error: %s", e.getMessage());
throw new IOException(e);
}
} }
} }

View File

@ -21,14 +21,18 @@ package org.apache.druid.storage.azure;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.RetryUtils.Task; import org.apache.druid.java.util.common.RetryUtils.Task;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Iterator;
/** /**
* Utility class for miscellaneous things involving Azure. * Utility class for miscellaneous things involving Azure.
@ -96,6 +100,55 @@ public class AzureUtils
} }
} }
/**
* Delete the files from Azure Storage in a specified bucket, matching a specified prefix and filter
*
* @param storage Azure Storage client
* @param config specifies the configuration to use when finding matching files in Azure Storage to delete
* @param bucket Azure Storage bucket
* @param prefix the file prefix
* @param filter function which returns true if the prefix file found should be deleted and false otherwise.
* @throws Exception
*/
public static void deleteObjectsInPath(
AzureStorage storage,
AzureInputDataConfig config,
AzureAccountConfig accountConfig,
AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
String bucket,
String prefix,
Predicate<CloudBlobHolder> filter
)
throws Exception
{
AzureCloudBlobIterable azureCloudBlobIterable =
azureCloudBlobIterableFactory.create(ImmutableList.of(new CloudObjectLocation(
bucket,
prefix
).toUri("azure")), config.getMaxListingLength());
Iterator<CloudBlobHolder> iterator = azureCloudBlobIterable.iterator();
while (iterator.hasNext()) {
final CloudBlobHolder nextObject = iterator.next();
if (filter.apply(nextObject)) {
deleteBucketKeys(storage, accountConfig.getMaxTries(), nextObject.getContainerName(), nextObject.getName());
}
}
}
private static void deleteBucketKeys(
AzureStorage storage,
int maxTries,
String bucket,
String prefix
) throws Exception
{
AzureUtils.retryAzureOperation(() -> {
storage.emptyCloudBlobDirectory(bucket, prefix);
return null;
}, maxTries);
}
static <T> T retryAzureOperation(Task<T> f, int maxTries) throws Exception static <T> T retryAzureOperation(Task<T> f, int maxTries) throws Exception
{ {
return RetryUtils.retry(f, AZURE_RETRY, maxTries); return RetryUtils.retry(f, AZURE_RETRY, maxTries);

View File

@ -23,6 +23,7 @@ import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlob;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Date;
/** /**
* Wrapper for {@link CloudBlob}. Used to make testing easier, since {@link CloudBlob} * Wrapper for {@link CloudBlob}. Used to make testing easier, since {@link CloudBlob}
@ -51,4 +52,9 @@ public class CloudBlobHolder
{ {
return delegate.getProperties().getLength(); return delegate.getProperties().getLength();
} }
public Date getLastModifed()
{
return delegate.getProperties().getLastModified();
}
} }

View File

@ -19,18 +19,24 @@
package org.apache.druid.storage.azure; package org.apache.druid.storage.azure;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.StorageExtendedErrorInformation; import com.microsoft.azure.storage.StorageExtendedErrorInformation;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.easymock.EasyMockSupport; import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
@ -39,7 +45,20 @@ import java.util.List;
public class AzureDataSegmentKillerTest extends EasyMockSupport public class AzureDataSegmentKillerTest extends EasyMockSupport
{ {
private static final String CONTAINER_NAME = "container"; private static final String CONTAINER_NAME = "container";
private static final String CONTAINER = "test";
private static final String PREFIX = "test/log";
private static final int MAX_TRIES = 3;
private static final String BLOB_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"; private static final String BLOB_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
private static final int MAX_KEYS = 1;
private static final long TIME_0 = 0L;
private static final long TIME_1 = 1L;
private static final long TIME_NOW = 2L;
private static final long TIME_FUTURE = 3L;
private static final String KEY_1 = "key1";
private static final String KEY_2 = "key2";
private static final URI PREFIX_URI = URI.create(StringUtils.format("azure://%s/%s", CONTAINER, PREFIX));
private static final Exception RECOVERABLE_EXCEPTION = new StorageException("", "", null);
private static final Exception NON_RECOVERABLE_EXCEPTION = new URISyntaxException("", "");
private static final DataSegment DATA_SEGMENT = new DataSegment( private static final DataSegment DATA_SEGMENT = new DataSegment(
"test", "test",
@ -56,12 +75,20 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
private static final StorageExtendedErrorInformation NULL_STORAGE_EXTENDED_ERROR_INFORMATION = null; private static final StorageExtendedErrorInformation NULL_STORAGE_EXTENDED_ERROR_INFORMATION = null;
private static final StorageExtendedErrorInformation STORAGE_EXTENDED_ERROR_INFORMATION = new StorageExtendedErrorInformation(); private static final StorageExtendedErrorInformation STORAGE_EXTENDED_ERROR_INFORMATION = new StorageExtendedErrorInformation();
private AzureDataSegmentConfig segmentConfig;
private AzureInputDataConfig inputDataConfig;
private AzureAccountConfig accountConfig;
private AzureStorage azureStorage; private AzureStorage azureStorage;
private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
@Before @Before
public void before() public void before()
{ {
segmentConfig = createMock(AzureDataSegmentConfig.class);
inputDataConfig = createMock(AzureInputDataConfig.class);
accountConfig = createMock(AzureAccountConfig.class);
azureStorage = createMock(AzureStorage.class); azureStorage = createMock(AzureStorage.class);
azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class);
} }
@Test @Test
@ -75,7 +102,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
replayAll(); replayAll();
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage); AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.kill(DATA_SEGMENT); killer.kill(DATA_SEGMENT);
@ -114,18 +141,127 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
replayAll(); replayAll();
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage); AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.kill(DATA_SEGMENT); killer.kill(DATA_SEGMENT);
verifyAll(); verifyAll();
} }
@Test(expected = UnsupportedOperationException.class) @Test
public void test_killAll_throwsUnsupportedOperationException() public void test_killAll_noException_deletesAllSegments() throws Exception
{ {
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage); EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce();
EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_1);
AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
ImmutableList.of(object1, object2));
EasyMock.replay(object1, object2);
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(object1, object2),
ImmutableMap.of());
EasyMock.replay(segmentConfig, inputDataConfig, accountConfig, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.killAll(); killer.killAll();
EasyMock.verify(segmentConfig, inputDataConfig, accountConfig, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
}
@Test
public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegments() throws Exception
{
EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce();
EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
ImmutableList.of(object1));
EasyMock.replay(object1);
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(object1),
ImmutableMap.of(object1, RECOVERABLE_EXCEPTION));
EasyMock.replay(segmentConfig, inputDataConfig, accountConfig, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.killAll();
EasyMock.verify(segmentConfig, inputDataConfig, accountConfig, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
}
@Test
public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSegments() throws Exception
{
boolean ioExceptionThrown = false;
CloudBlobHolder object1 = null;
AzureCloudBlobIterable azureCloudBlobIterable = null;
try {
EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce();
EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
azureCloudBlobIterable = AzureTestUtils.expectListObjects(
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
ImmutableList.of(object1)
);
EasyMock.replay(object1);
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(),
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
);
EasyMock.replay(
segmentConfig,
inputDataConfig,
accountConfig,
azureCloudBlobIterable,
azureCloudBlobIterableFactory,
azureStorage
);
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(
segmentConfig,
inputDataConfig,
accountConfig,
azureStorage,
azureCloudBlobIterableFactory
);
killer.killAll();
}
catch (IOException e) {
ioExceptionThrown = true;
}
Assert.assertTrue(ioExceptionThrown);
EasyMock.verify(
segmentConfig,
inputDataConfig,
accountConfig,
object1,
azureCloudBlobIterable,
azureCloudBlobIterableFactory,
azureStorage
);
} }
private void common_test_kill_StorageExceptionExtendedError_throwsException(StorageExtendedErrorInformation storageExtendedErrorInformation) private void common_test_kill_StorageExceptionExtendedError_throwsException(StorageExtendedErrorInformation storageExtendedErrorInformation)
@ -145,7 +281,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
replayAll(); replayAll();
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage); AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.kill(DATA_SEGMENT); killer.kill(DATA_SEGMENT);

View File

@ -20,10 +20,15 @@
package org.apache.druid.storage.azure; package org.apache.druid.storage.azure;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.microsoft.azure.storage.StorageException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.easymock.EasyMockSupport; import org.easymock.EasyMockSupport;
import org.junit.After; import org.junit.After;
@ -35,6 +40,7 @@ import java.io.ByteArrayInputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.StringWriter; import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -45,16 +51,41 @@ public class AzureTaskLogsTest extends EasyMockSupport
private static final String PREFIX = "test/log"; private static final String PREFIX = "test/log";
private static final String TASK_ID = "taskid"; private static final String TASK_ID = "taskid";
private static final String TASK_ID_NOT_FOUND = "taskidNotFound"; private static final String TASK_ID_NOT_FOUND = "taskidNotFound";
private static final AzureTaskLogsConfig AZURE_TASK_LOGS_CONFIG = new AzureTaskLogsConfig(CONTAINER, PREFIX, 3); private static final int MAX_TRIES = 3;
private static final AzureTaskLogsConfig AZURE_TASK_LOGS_CONFIG = new AzureTaskLogsConfig(CONTAINER, PREFIX, MAX_TRIES);
private static final int MAX_KEYS = 1;
private static final long TIME_0 = 0L;
private static final long TIME_1 = 1L;
private static final long TIME_NOW = 2L;
private static final long TIME_FUTURE = 3L;
private static final String KEY_1 = "key1";
private static final String KEY_2 = "key2";
private static final URI PREFIX_URI = URI.create(StringUtils.format("azure://%s/%s", CONTAINER, PREFIX));
private static final Exception RECOVERABLE_EXCEPTION = new StorageException("", "", null);
private static final Exception NON_RECOVERABLE_EXCEPTION = new URISyntaxException("", "");
private AzureInputDataConfig inputDataConfig;
private AzureAccountConfig accountConfig;
private AzureStorage azureStorage; private AzureStorage azureStorage;
private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
private CurrentTimeMillisSupplier timeSupplier;
private AzureTaskLogs azureTaskLogs; private AzureTaskLogs azureTaskLogs;
@Before @Before
public void before() public void before()
{ {
inputDataConfig = createMock(AzureInputDataConfig.class);
accountConfig = createMock(AzureAccountConfig.class);
azureStorage = createMock(AzureStorage.class); azureStorage = createMock(AzureStorage.class);
azureTaskLogs = new AzureTaskLogs(AZURE_TASK_LOGS_CONFIG, azureStorage); azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class);
timeSupplier = createMock(CurrentTimeMillisSupplier.class);
azureTaskLogs = new AzureTaskLogs(
AZURE_TASK_LOGS_CONFIG,
inputDataConfig,
accountConfig,
azureStorage,
azureCloudBlobIterableFactory,
timeSupplier);
} }
@ -292,17 +323,214 @@ public class AzureTaskLogsTest extends EasyMockSupport
verifyAll(); verifyAll();
} }
@Test (expected = UnsupportedOperationException.class) @Test
public void test_killAll_throwsUnsupportedOperationException() public void test_killAll_noException_deletesAllTaskLogs() throws Exception
{ {
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_1);
AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
ImmutableList.of(object1, object2));
EasyMock.replay(object1, object2);
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(object1, object2),
ImmutableMap.of());
EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
azureTaskLogs.killAll(); azureTaskLogs.killAll();
EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
} }
@Test
public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws Exception
{
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
ImmutableList.of(object1));
EasyMock.replay(object1);
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(object1),
ImmutableMap.of(object1, RECOVERABLE_EXCEPTION));
EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
azureTaskLogs.killAll();
EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
}
@Test
public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() throws Exception
{
boolean ioExceptionThrown = false;
CloudBlobHolder object1 = null;
AzureCloudBlobIterable azureCloudBlobIterable = null;
try {
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
azureCloudBlobIterable = AzureTestUtils.expectListObjects(
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
ImmutableList.of(object1)
);
EasyMock.replay(object1);
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(),
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
);
EasyMock.replay(
inputDataConfig,
accountConfig,
timeSupplier,
azureCloudBlobIterable,
azureCloudBlobIterableFactory,
azureStorage
);
azureTaskLogs.killAll();
}
catch (IOException e) {
ioExceptionThrown = true;
}
Assert.assertTrue(ioExceptionThrown);
EasyMock.verify(
inputDataConfig,
accountConfig,
timeSupplier,
object1,
azureCloudBlobIterable,
azureCloudBlobIterableFactory,
azureStorage
);
}
@Test
public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws Exception
{
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_FUTURE);
AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
ImmutableList.of(object1, object2));
EasyMock.replay(object1, object2);
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(object1),
ImmutableMap.of());
EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
azureTaskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
}
@Test
public void test_killOlderThan_recoverableExceptionWhenDeletingObjects_deletesOnlyTaskLogsOlderThan() throws Exception
{
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
ImmutableList.of(object1));
EasyMock.replay(object1);
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(object1),
ImmutableMap.of(object1, RECOVERABLE_EXCEPTION));
EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
azureTaskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
}
@Test
public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() throws Exception
{
boolean ioExceptionThrown = false;
CloudBlobHolder object1 = null;
AzureCloudBlobIterable azureCloudBlobIterable = null;
try {
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
azureCloudBlobIterable = AzureTestUtils.expectListObjects(
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
ImmutableList.of(object1)
);
EasyMock.replay(object1);
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(),
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
);
EasyMock.replay(
inputDataConfig,
accountConfig,
timeSupplier,
azureCloudBlobIterable,
azureCloudBlobIterableFactory,
azureStorage
);
azureTaskLogs.killOlderThan(TIME_NOW);
}
catch (IOException e) {
ioExceptionThrown = true;
}
Assert.assertTrue(ioExceptionThrown);
EasyMock.verify(
inputDataConfig,
accountConfig,
timeSupplier,
object1,
azureCloudBlobIterable,
azureCloudBlobIterableFactory,
azureStorage
);
}
/*
@Test (expected = UnsupportedOperationException.class) @Test (expected = UnsupportedOperationException.class)
public void test_killOlderThan_throwsUnsupportedOperationException() public void test_killOlderThan_throwsUnsupportedOperationException() throws IOException
{ {
azureTaskLogs.killOlderThan(0); azureTaskLogs.killOlderThan(0);
} }
*/
@After @After
public void cleanup() public void cleanup()

View File

@ -19,14 +19,25 @@
package org.apache.druid.storage.azure; package org.apache.druid.storage.azure;
import com.google.common.collect.ImmutableList;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IExpectationSetters;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
public class AzureTestUtils public class AzureTestUtils extends EasyMockSupport
{ {
public static File createZipTempFile(final String segmentFileName, final String content) throws IOException public static File createZipTempFile(final String segmentFileName, final String content) throws IOException
{ {
@ -40,4 +51,59 @@ public class AzureTestUtils
return zipFile; return zipFile;
} }
public static AzureCloudBlobIterable expectListObjects(
AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
int maxListingLength,
URI PREFIX_URI,
List<CloudBlobHolder> objects)
{
AzureCloudBlobIterable azureCloudBlobIterable = EasyMock.createMock(AzureCloudBlobIterable.class);
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(objects.iterator());
EasyMock.expect(azureCloudBlobIterableFactory.create(ImmutableList.of(PREFIX_URI), maxListingLength)).andReturn(azureCloudBlobIterable);
return azureCloudBlobIterable;
}
public static void expectDeleteObjects(
AzureStorage storage,
List<CloudBlobHolder> deleteRequestsExpected,
Map<CloudBlobHolder, Exception> deleteRequestToException) throws Exception
{
Map<CloudBlobHolder, IExpectationSetters<CloudBlobHolder>> requestToResultExpectationSetter = new HashMap<>();
for (Map.Entry<CloudBlobHolder, Exception> requestsAndErrors : deleteRequestToException.entrySet()) {
CloudBlobHolder deleteObject = requestsAndErrors.getKey();
Exception exception = requestsAndErrors.getValue();
IExpectationSetters<CloudBlobHolder> resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject);
if (resultExpectationSetter == null) {
storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName());
resultExpectationSetter = EasyMock.<CloudBlobHolder>expectLastCall().andThrow(exception);
requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter);
} else {
resultExpectationSetter.andThrow(exception);
}
}
for (CloudBlobHolder deleteObject : deleteRequestsExpected) {
IExpectationSetters<CloudBlobHolder> resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject);
if (resultExpectationSetter == null) {
storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName());
resultExpectationSetter = EasyMock.expectLastCall();
requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter);
}
resultExpectationSetter.andReturn(null);
}
}
public static CloudBlobHolder newCloudBlobHolder(
String container,
String prefix,
long lastModifiedTimestamp) throws Exception
{
CloudBlobHolder object = EasyMock.createMock(CloudBlobHolder.class);
EasyMock.expect(object.getContainerName()).andReturn(container).anyTimes();
EasyMock.expect(object.getName()).andReturn(prefix).anyTimes();
EasyMock.expect(object.getLastModifed()).andReturn(new Date(lastModifiedTimestamp)).anyTimes();
return object;
}
} }