JCLOUDS-510 Delete objects in a container efficiently.

The existing approach for deleting objects in a container suffers
from a head-of-line blocking problem. This commit implements a better
scheme which does not have that problem. This scheme uses a counting
semaphore for making sure that a certain number of futures are
issued in parallel. As each of these futures is completed, one
permit of the semaphore is released.

Added unit tests for testing this new scheme.
This commit is contained in:
Shri Javadekar 2013-11-24 23:35:57 -08:00 committed by Andrew Gaul
parent b93cfa42e1
commit 655aa444d7
4 changed files with 500 additions and 113 deletions

View File

@ -16,12 +16,16 @@
*/ */
package org.jclouds.blobstore.strategy.internal; package org.jclouds.blobstore.strategy.internal;
import static com.google.common.base.Throwables.propagate;
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive; import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
import java.util.Map; import java.util.HashSet;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.inject.Named; import javax.inject.Named;
@ -29,6 +33,7 @@ import javax.inject.Singleton;
import org.jclouds.Constants; import org.jclouds.Constants;
import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.ContainerNotFoundException;
import org.jclouds.blobstore.domain.PageSet; import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata; import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.blobstore.internal.BlobRuntimeException; import org.jclouds.blobstore.internal.BlobRuntimeException;
@ -39,15 +44,18 @@ import org.jclouds.blobstore.strategy.ClearListStrategy;
import org.jclouds.http.handlers.BackoffLimitedRetryHandler; import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
import org.jclouds.logging.Logger; import org.jclouds.logging.Logger;
import com.google.common.collect.Maps; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject; import com.google.inject.Inject;
/** /**
* Deletes all keys in the container * Deletes all keys in the container
* *
* @author Adrian Cole * @author Adrian Cole
* @author Shrinand Javadekar
*/ */
@Singleton @Singleton
public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStrategy { public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStrategy {
@ -56,6 +64,7 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
protected Logger logger = Logger.NULL; protected Logger logger = Logger.NULL;
protected final BackoffLimitedRetryHandler retryHandler; protected final BackoffLimitedRetryHandler retryHandler;
private final ListeningExecutorService executorService; private final ListeningExecutorService executorService;
protected final BlobStore blobStore; protected final BlobStore blobStore;
@ -66,12 +75,17 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
/** Maximum times to retry an operation. */ /** Maximum times to retry an operation. */
protected int maxErrors = 3; protected int maxErrors = 3;
/** Maximum parallel deletes. */
private int maxParallelDeletes;
@Inject @Inject
DeleteAllKeysInList(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService executorService, DeleteAllKeysInList(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService executorService,
BlobStore blobStore, BackoffLimitedRetryHandler retryHandler) { BlobStore blobStore, BackoffLimitedRetryHandler retryHandler,
@Named(Constants.PROPERTY_MAX_PARALLEL_DELETES) int maxParallelDeletes) {
this.executorService = executorService; this.executorService = executorService;
this.blobStore = blobStore; this.blobStore = blobStore;
this.retryHandler = retryHandler; this.retryHandler = retryHandler;
this.maxParallelDeletes = maxParallelDeletes;
} }
@Inject(optional = true) @Inject(optional = true)
@ -88,121 +102,337 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
execute(containerName, recursive()); execute(containerName, recursive());
} }
public void execute(final String containerName, ListContainerOptions options) { private boolean parentIsFolder(final ListContainerOptions options,
String message = options.getDir() != null ? String.format("clearing path %s/%s", final StorageMetadata md) {
containerName, options.getDir()) : String.format("clearing container %s", return options.getDir() != null && md.getName().indexOf('/') == -1;
containerName); }
options = options.clone();
if (options.isRecursive())
message += " recursively";
logger.debug(message);
Map<StorageMetadata, Exception> exceptions = Maps.newHashMap();
for (int numErrors = 0; numErrors < maxErrors; ) {
// fetch partial directory listing
PageSet<? extends StorageMetadata> listing =
blobStore.list(containerName, options);
// recurse on subdirectories private void cancelOutstandingFutures(
if (options.isRecursive()) { final Set<ListenableFuture<Void>> outstandingFutures) {
for (StorageMetadata md : listing) { for (ListenableFuture<Void> future : outstandingFutures) {
String fullPath = parentIsFolder(options, md) ? options.getDir() + "/" future.cancel(/*mayInterruptIfRunning=*/ true);
+ md.getName() : md.getName(); }
switch (md.getType()) { }
case BLOB:
break;
case FOLDER:
case RELATIVE_PATH:
if (options.isRecursive() && !fullPath.equals(options.getDir())) {
execute(containerName, options.clone().inDirectory(fullPath));
}
break;
case CONTAINER:
throw new IllegalArgumentException("Container type not supported");
}
}
}
// remove blobs and now-empty subdirectories private String getMessage(final String containerName,
Map<StorageMetadata, ListenableFuture<?>> responses = Maps.newHashMap(); final ListContainerOptions options) {
for (final StorageMetadata md : listing) { return options.getDir() != null ? String.format("clearing path %s/%s",
final String fullPath = parentIsFolder(options, md) ? options.getDir() + "/" containerName, options.getDir()) : String.format(
+ md.getName() : md.getName(); "clearing container %s", containerName);
}
/**
* Get the object listing from a given container based on the options. For
* recursive listing of directories, identify a directory and call execute()
* with the appropriate options to get listing inside the directory.
*
* @param containerName
* The container from which to get the object list.
* @param options
* The options used for getting the listing.
* @returns A PageSet of StorageMetadata objects.
*/
private PageSet<? extends StorageMetadata> getListing(
final String containerName,
final ListContainerOptions options,
final Semaphore semaphore,
final Set<ListenableFuture<Void>> outstandingFutures,
final AtomicBoolean deleteFailure) {
// fetch partial directory listing
PageSet<? extends StorageMetadata> listing = null;
// There's nothing much to do if the container doesn't exist.
// Note that if the container has just been created, trying to get the
// container listing might throw a ContainerNotFoundException because of
// eventual consistency.
try {
listing = blobStore.list(containerName, options);
} catch (ContainerNotFoundException ce) {
return listing;
}
// recurse on subdirectories
if (options.isRecursive()) {
for (StorageMetadata md : listing) {
String fullPath = parentIsFolder(options, md) ? options.getDir()
+ "/" + md.getName() : md.getName();
switch (md.getType()) { switch (md.getType()) {
case BLOB: case BLOB:
responses.put(md, executorService.submit(new Runnable() {
@Override
public void run() {
blobStore.removeBlob(containerName, fullPath);
}
}));
break;
case FOLDER:
if (options.isRecursive()) {
responses.put(md, executorService.submit(new Runnable() {
@Override
public void run() {
blobStore.deleteDirectory(containerName, fullPath);
}
}));
}
break;
case RELATIVE_PATH:
if (options.isRecursive()) {
responses.put(md, executorService.submit(new Runnable() {
@Override
public void run() {
blobStore.deleteDirectory(containerName, md.getName());
}
}));
}
break;
case CONTAINER:
throw new IllegalArgumentException("Container type not supported");
}
}
try {
exceptions = awaitCompletion(responses, executorService, maxTime, logger, message);
} catch (TimeoutException te) {
++numErrors;
if (numErrors == maxErrors) {
throw propagate(te);
}
retryHandler.imposeBackoffExponentialDelay(numErrors, message);
continue;
} finally {
for (ListenableFuture<?> future : responses.values()) {
future.cancel(true);
}
}
if (!exceptions.isEmpty()) {
++numErrors;
if (numErrors == maxErrors) {
break; break;
case FOLDER:
case RELATIVE_PATH:
if (!fullPath.equals(options.getDir())) {
executeOneIteration(containerName,
options.clone().inDirectory(fullPath), semaphore,
outstandingFutures, deleteFailure, /*blocking=*/ true);
}
break;
case CONTAINER:
throw new IllegalArgumentException(
"Container type not supported");
} }
retryHandler.imposeBackoffExponentialDelay(numErrors, message); }
continue; }
return listing;
}
/**
* Delete the blobs from a given PageSet. The PageSet may contain blobs or
* directories. If there are directories, they are expected to be empty.
*
* The logic of acquiring a semaphore, submitting a callable to the
* executorService and releasing the semaphore resides here.
*
* @param containerName
* The container from which the objects are listed.
* @param options
* The options used for getting the container listing.
* @param listing
* The actual list of objects.
* @param semaphore
* The semaphore used for making sure that only a certain number of
* futures are outstanding.
* @param deleteFailure
* This is set to true if any future used for deleting blobs
* failed.
* @param outstandingFutures
* The List of outstanding futures.
* @throws TimeoutException
* If any blob deletion takes too long.
*/
private void deleteBlobsAndEmptyDirs(final String containerName,
ListContainerOptions options,
PageSet<? extends StorageMetadata> listing, final Semaphore semaphore,
final AtomicBoolean deleteFailure,
final Set<ListenableFuture<Void>> outstandingFutures)
throws TimeoutException {
for (final StorageMetadata md : listing) {
final String fullPath = parentIsFolder(options, md) ? options.getDir()
+ "/" + md.getName() : md.getName();
// Attempt to acquire a semaphore within the time limit. At least
// one outstanding future should complete within this period for the
// semaphore to be acquired.
try {
if (!semaphore.tryAcquire(maxTime, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timeout waiting for semaphore");
}
} catch (InterruptedException ie) {
logger.debug("Interrupted while deleting blobs");
Thread.currentThread().interrupt();
}
final ListenableFuture<Void> blobDelFuture;
switch (md.getType()) {
case BLOB:
blobDelFuture = executorService.submit(new Callable<Void>() {
@Override
public Void call() {
blobStore.removeBlob(containerName, fullPath);
return null;
}
});
break;
case FOLDER:
if (options.isRecursive()) {
blobDelFuture = executorService.submit(new Callable<Void>() {
@Override
public Void call() {
blobStore.deleteDirectory(containerName, fullPath);
return null;
}
});
} else {
blobDelFuture = null;
}
break;
case RELATIVE_PATH:
if (options.isRecursive()) {
blobDelFuture = executorService.submit(new Callable<Void>() {
@Override
public Void call() {
blobStore.deleteDirectory(containerName, md.getName());
return null;
}
});
} else {
blobDelFuture = null;
}
break;
case CONTAINER:
throw new IllegalArgumentException("Container type not supported");
default:
blobDelFuture = null;
}
// If a future to delete a blob/directory actually got created above,
// keep a reference of that in the outstandingFutures list. This is
// useful in case of a timeout exception. All outstanding futures can
// then be cancelled.
if (blobDelFuture != null) {
outstandingFutures.add(blobDelFuture);
// Add a callback to release the semaphore. This is required for
// other threads waiting to acquire a semaphore above to make
// progress.
Futures.addCallback(blobDelFuture, new FutureCallback<Object>() {
@Override
public void onSuccess(final Object o) {
outstandingFutures.remove(blobDelFuture);
semaphore.release();
}
@Override
public void onFailure(final Throwable t) {
// Make a note the fact that some blob/directory could not be
// deleted successfully. This is used for retrying later.
deleteFailure.set(true);
outstandingFutures.remove(blobDelFuture);
semaphore.release();
}
});
} else {
// It is possible above to acquire a semaphore but not submit any
// task to the executorService. For e.g. if the listing contains
// an object of type 'FOLDER' and the ListContianerOptions are *not*
// recursive. In this case, there is no blobDelFuture and therefore
// no FutureCallback to release the semaphore. This semaphore is
// released here.
semaphore.release();
}
}
}
/**
* This method goes through all the blobs from a container and attempts to
* create futures for deleting them. If there is a TimeoutException when
* doing this, sets the deleteFailure flag to true and returns. If there are
* more retries left, this will get called again.
*
* @param containerName
* The container from which to get the object list.
* @param listOptions
* The options used for getting the listing.
* @param semaphore
* The semaphore used for controlling number of outstanding
* futures.
* @param outstandingFutures
* A list of outstanding futures.
* @param deleteFailure
* A flag used to track of whether there was a failure while
* deleting any blob.
* @param blocking
* when true, block until all outstanding operations have completed
* @return A PageSet of StorageMetadata objects.
*/
@VisibleForTesting
void executeOneIteration(
final String containerName,
ListContainerOptions listOptions, final Semaphore semaphore,
final Set<ListenableFuture<Void>> outstandingFutures,
final AtomicBoolean deleteFailure, final boolean blocking) {
ListContainerOptions options = listOptions.clone();
String message = getMessage(containerName, listOptions);
if (options.isRecursive()) {
message += " recursively";
}
logger.debug(message);
PageSet<? extends StorageMetadata> listing = getListing(containerName,
options, semaphore, outstandingFutures, deleteFailure);
while (listing != null && !listing.isEmpty()) {
try {
// Remove blobs and now-empty subdirectories.
deleteBlobsAndEmptyDirs(containerName, options, listing, semaphore,
deleteFailure, outstandingFutures);
} catch (TimeoutException te) {
logger.debug("TimeoutException while deleting blobs: {}",
te.getMessage());
cancelOutstandingFutures(outstandingFutures);
deleteFailure.set(true);
} }
String marker = listing.getNextMarker(); String marker = listing.getNextMarker();
if (marker == null) { if (marker != null) {
logger.debug("%s with marker %s", message, marker);
options = options.afterMarker(marker);
listing = getListing(containerName, options, semaphore,
outstandingFutures, deleteFailure);
} else {
break; break;
} }
logger.debug("%s with marker %s", message, marker);
options = options.afterMarker(marker);
// Reset numErrors if we execute a successful iteration. This ensures
// that we only try an unsuccessful operation maxErrors times but
// allow progress with directories containing many blobs in the face
// of some failures.
numErrors = 0;
} }
if (!exceptions.isEmpty())
throw new BlobRuntimeException(String.format("error %s: %s", message, exceptions)); if (blocking) {
waitForCompletion(semaphore, outstandingFutures);
}
} }
private boolean parentIsFolder(final ListContainerOptions options, final StorageMetadata md) { private void waitForCompletion(final Semaphore semaphore,
return options.getDir() != null && md.getName().indexOf('/') == -1; final Set<ListenableFuture<Void>> outstandingFutures) {
// Wait for all futures to complete by waiting to acquire all
// semaphores.
try {
semaphore.acquire(maxParallelDeletes);
semaphore.release(maxParallelDeletes);
} catch (InterruptedException e) {
logger.debug("Interrupted while waiting for blobs to be deleted");
cancelOutstandingFutures(outstandingFutures);
Thread.currentThread().interrupt();
}
}
public void execute(final String containerName,
ListContainerOptions listOptions) {
final AtomicBoolean deleteFailure = new AtomicBoolean();
int retries = maxErrors;
/*
* A Semaphore is used to control the number of outstanding delete
* requests. One permit of the semaphore is acquired before submitting a
* request to the executorService to delete a blob. As requests complete,
* their FutureCallback will release the semaphore permit. That will allow
* the next delete request to proceed.
*
* If no Future completes in 'maxTime', i.e. a semaphore cannot be
* acquired in 'maxTime', a TimeoutException is thrown. Any outstanding
* futures at that time are cancelled.
*/
final Semaphore semaphore = new Semaphore(maxParallelDeletes);
/*
* When a future is created, a reference for that is added to the
* outstandingFutures list. This reference is removed from the list in the
* FutureCallback since it no longer needs to be cancelled in the event of
* a timeout. Also, when the reference is removed from this list and when
* the executorService removes the reference that it has maintained, the
* future will be marked for GC since there should be no other references
* to it. This is important because this code can generate an unbounded
* number of futures.
*/
final Set<ListenableFuture<Void>> outstandingFutures = Collections
.synchronizedSet(new HashSet<ListenableFuture<Void>>());
// TODO: Remove this retry loop.
while (retries > 0) {
deleteFailure.set(false);
executeOneIteration(containerName, listOptions.clone(), semaphore,
outstandingFutures, deleteFailure, /*blocking=*/ false);
waitForCompletion(semaphore, outstandingFutures);
// Try again if there was any failure while deleting blobs and the max
// retry count hasn't been reached.
if (deleteFailure.get() && --retries > 0) {
String message = getMessage(containerName, listOptions);
retryHandler.imposeBackoffExponentialDelay(maxErrors - retries,
message);
} else {
break;
}
}
if (retries == 0) {
cancelOutstandingFutures(outstandingFutures);
throw new BlobRuntimeException("Exceeded maximum retry attempts");
}
} }
} }

View File

@ -16,35 +16,64 @@
*/ */
package org.jclouds.blobstore.strategy.internal; package org.jclouds.blobstore.strategy.internal;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.createMockBuilder;
import static org.easymock.EasyMock.createControl;
import static org.easymock.EasyMock.isA;
import static org.easymock.EasyMock.replay;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
import org.jclouds.ContextBuilder; import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.ContainerNotFoundException;
import org.jclouds.blobstore.options.ListContainerOptions; import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.util.Closeables2; import org.jclouds.util.Closeables2;
import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.blobstore.internal.BlobRuntimeException;
import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Injector; import com.google.inject.Injector;
/** /**
* *
* @author Adrian Cole * @author Adrian Cole
*/ */
@Test(testName = "DeleteAllKeysInListTest", singleThreaded = true) @Test(testName = "DeleteAllKeysInListTest", singleThreaded = true)
public class DeleteAllKeysInListTest { public class DeleteAllKeysInListTest {
private BlobStore blobstore; private BlobStore blobstore;
private DeleteAllKeysInList deleter; private DeleteAllKeysInList deleter;
private BackoffLimitedRetryHandler retryHandler;
private static final String containerName = "container"; private static final String containerName = "container";
private static final String directoryName = "directory"; private static final String directoryName = "directory";
private static final int maxParallelDeletes = 1024;
@BeforeMethod @BeforeMethod
void setupBlobStore() { void setupBlobStore() {
Injector injector = ContextBuilder.newBuilder("transient").buildInjector(); Injector injector = ContextBuilder.newBuilder("transient")
.buildInjector();
blobstore = injector.getInstance(BlobStore.class); blobstore = injector.getInstance(BlobStore.class);
deleter = injector.getInstance(DeleteAllKeysInList.class); deleter = injector.getInstance(DeleteAllKeysInList.class);
retryHandler = injector.getInstance(BackoffLimitedRetryHandler.class);
createDataSet(); createDataSet();
} }
@ -73,6 +102,122 @@ public class DeleteAllKeysInListTest {
assertEquals(blobstore.countBlobs(containerName), 1111); assertEquals(blobstore.countBlobs(containerName), 1111);
} }
public void testContainerNotFound() {
IMocksControl mockControl = createControl();
BlobStore blobStore = mockControl.createMock(BlobStore.class);
ListeningExecutorService executorService = mockControl
.createMock(ListeningExecutorService.class);
DeleteAllKeysInList testDeleter = createMockBuilder(
DeleteAllKeysInList.class).withConstructor(executorService,
blobStore, retryHandler, maxParallelDeletes).createMock();
EasyMock.<PageSet<? extends StorageMetadata>> expect(blobStore.list(
isA(String.class), isA(ListContainerOptions.class)))
.andThrow(new ContainerNotFoundException()).once();
replay(blobStore);
testDeleter.execute(containerName,
ListContainerOptions.Builder.recursive());
// No blobs will be deleted since blobStore.list will throw a
// ContainerNotFoundException.
assertEquals(blobstore.countBlobs(containerName), 3333);
}
@SuppressWarnings("unchecked")
public void testDeleteAfterFutureFailure() {
IMocksControl mockControl = createControl();
ListeningExecutorService executorService = mockControl
.createMock(ListeningExecutorService.class);
DeleteAllKeysInList testDeleter = createMockBuilder(
DeleteAllKeysInList.class).withConstructor(executorService,
blobstore, retryHandler, maxParallelDeletes).createMock();
// Fail the first future that is created for deleting blobs.
EasyMock.<ListenableFuture<?>> expect(
executorService.submit(isA(Callable.class)))
.andReturn(
Futures.<Void> immediateFailedFuture(new RuntimeException()))
.once();
// There should be at least another 3333 calls to executorService.submit
// since there are 3333 blobs.
EasyMock.expectLastCall().andReturn(Futures.<Void> immediateFuture(null))
.times(3333, Integer.MAX_VALUE);
replay(executorService);
testDeleter.execute(containerName,
ListContainerOptions.Builder.recursive());
}
@SuppressWarnings("unchecked")
public void testExceptionThrownAfterMaxRetries() {
IMocksControl mockControl = createControl();
ListeningExecutorService executorService = mockControl
.createMock(ListeningExecutorService.class);
DeleteAllKeysInList testDeleter = createMockBuilder(
DeleteAllKeysInList.class).withConstructor(executorService,
blobstore, retryHandler, maxParallelDeletes).createMock();
// Fail the first future that is created for deleting blobs.
EasyMock.<ListenableFuture<?>> expect(
executorService.submit(isA(Callable.class)))
.andReturn(
Futures.<Void> immediateFailedFuture(new RuntimeException()))
.once();
EasyMock.expectLastCall().andReturn(Futures.<Void> immediateFuture(null))
.anyTimes();
replay(executorService);
testDeleter.setMaxErrors(1);
boolean blobRunTimeExceptionThrown = false;
try {
testDeleter.execute(containerName,
ListContainerOptions.Builder.recursive());
} catch (BlobRuntimeException be) {
blobRunTimeExceptionThrown = true;
}
assertTrue(blobRunTimeExceptionThrown, "Expected a BlobRunTimeException");
}
@SuppressWarnings("unchecked")
public void testFuturesCancelledOnFailure() {
IMocksControl mockControl = createControl();
ListeningExecutorService executorService = mockControl
.createMock(ListeningExecutorService.class);
DeleteAllKeysInList testDeleter = createMockBuilder(
DeleteAllKeysInList.class).withConstructor(executorService,
blobstore, retryHandler, maxParallelDeletes).createMock();
final AtomicBoolean deleteFailure = new AtomicBoolean();
final Semaphore semaphore = createMock(Semaphore.class);
final Set<ListenableFuture<Void>> outstandingFutures = Collections
.synchronizedSet(new HashSet<ListenableFuture<Void>>());
final ListenableFuture<Void> blobDelFuture = createMock(ListenableFuture.class);
try {
// Allow the first semaphore acquire to succeed.
EasyMock.expect(semaphore.tryAcquire(Long.MAX_VALUE,
TimeUnit.MILLISECONDS)).andReturn(true).once();
EasyMock.<ListenableFuture<?>> expect(
executorService.submit(isA(Callable.class)))
.andReturn(blobDelFuture).once();
// Fail the second semaphore acquire.
EasyMock.expect(semaphore.tryAcquire(Long.MAX_VALUE,
TimeUnit.MILLISECONDS))
.andReturn(false).anyTimes();
blobDelFuture.addListener(isA(Runnable.class), isA(Executor.class));
EasyMock.expectLastCall();
EasyMock.expect(blobDelFuture.cancel(true)).andReturn(true)
.atLeastOnce();
} catch (InterruptedException e) {
fail();
}
replay(semaphore, executorService, blobDelFuture);
testDeleter.setMaxErrors(1);
testDeleter.executeOneIteration(containerName,
ListContainerOptions.Builder.recursive(), semaphore,
outstandingFutures, deleteFailure, /* blocking = */false);
assertEquals(outstandingFutures.size(), 1);
assertTrue(deleteFailure.get());
}
/** /**
* Create a container "container" with 1111 blobs named "blob-%d". Create a * Create a container "container" with 1111 blobs named "blob-%d". Create a
* subdirectory "directory" which contains 2222 more blobs named * subdirectory "directory" which contains 2222 more blobs named

View File

@ -299,4 +299,9 @@ public interface Constants {
* providers that don't properly support Expect headers. Defaults to false. * providers that don't properly support Expect headers. Defaults to false.
*/ */
public static final String PROPERTY_STRIP_EXPECT_HEADER = "jclouds.strip-expect-header"; public static final String PROPERTY_STRIP_EXPECT_HEADER = "jclouds.strip-expect-header";
/**
* The maximum number of blob deletes happening in parallel at any point in time.
*/
public static final String PROPERTY_MAX_PARALLEL_DELETES = "jclouds.max-parallel-deletes";
} }

View File

@ -24,6 +24,7 @@ import static org.jclouds.Constants.PROPERTY_ISO3166_CODES;
import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_CONTEXT; import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_CONTEXT;
import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_HOST; import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_HOST;
import static org.jclouds.Constants.PROPERTY_MAX_CONNECTION_REUSE; import static org.jclouds.Constants.PROPERTY_MAX_CONNECTION_REUSE;
import static org.jclouds.Constants.PROPERTY_MAX_PARALLEL_DELETES;
import static org.jclouds.Constants.PROPERTY_MAX_SESSION_FAILURES; import static org.jclouds.Constants.PROPERTY_MAX_SESSION_FAILURES;
import static org.jclouds.Constants.PROPERTY_PRETTY_PRINT_PAYLOADS; import static org.jclouds.Constants.PROPERTY_PRETTY_PRINT_PAYLOADS;
import static org.jclouds.Constants.PROPERTY_SCHEDULER_THREADS; import static org.jclouds.Constants.PROPERTY_SCHEDULER_THREADS;
@ -60,6 +61,8 @@ public abstract class BaseApiMetadata implements ApiMetadata {
public static Properties defaultProperties() { public static Properties defaultProperties() {
Properties props = new Properties(); Properties props = new Properties();
// TODO: move this to ApiMetadata // TODO: move this to ApiMetadata
final int numUserThreads = 50;
props.setProperty(PROPERTY_ISO3166_CODES, ""); props.setProperty(PROPERTY_ISO3166_CODES, "");
props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_CONTEXT, 20 + ""); props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_CONTEXT, 20 + "");
props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_HOST, 0 + ""); props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_HOST, 0 + "");
@ -67,16 +70,20 @@ public abstract class BaseApiMetadata implements ApiMetadata {
props.setProperty(PROPERTY_CONNECTION_TIMEOUT, 60000 + ""); props.setProperty(PROPERTY_CONNECTION_TIMEOUT, 60000 + "");
props.setProperty(PROPERTY_IO_WORKER_THREADS, 20 + ""); props.setProperty(PROPERTY_IO_WORKER_THREADS, 20 + "");
// Successfully tested 50 user threads with BlobStore.clearContainer. // Successfully tested 50 user threads with BlobStore.clearContainer.
props.setProperty(PROPERTY_USER_THREADS, 50 + ""); props.setProperty(PROPERTY_USER_THREADS, numUserThreads + "");
props.setProperty(PROPERTY_SCHEDULER_THREADS, 10 + ""); props.setProperty(PROPERTY_SCHEDULER_THREADS, 10 + "");
props.setProperty(PROPERTY_MAX_CONNECTION_REUSE, 75 + ""); props.setProperty(PROPERTY_MAX_CONNECTION_REUSE, 75 + "");
props.setProperty(PROPERTY_MAX_SESSION_FAILURES, 2 + ""); props.setProperty(PROPERTY_MAX_SESSION_FAILURES, 2 + "");
props.setProperty(PROPERTY_SESSION_INTERVAL, 60 + ""); props.setProperty(PROPERTY_SESSION_INTERVAL, 60 + "");
props.setProperty(PROPERTY_PRETTY_PRINT_PAYLOADS, "true"); props.setProperty(PROPERTY_PRETTY_PRINT_PAYLOADS, "true");
props.setProperty(PROPERTY_STRIP_EXPECT_HEADER, "false"); props.setProperty(PROPERTY_STRIP_EXPECT_HEADER, "false");
// By default, we allow maximum parallel deletes to be equal to the number
// of user threads since one thread is used to delete on blob.
props.setProperty(PROPERTY_MAX_PARALLEL_DELETES, numUserThreads + "");
return props; return props;
} }
public abstract static class Builder<T extends Builder<T>> implements ApiMetadata.Builder<T> { public abstract static class Builder<T extends Builder<T>> implements ApiMetadata.Builder<T> {
protected abstract T self(); protected abstract T self();