From 655aa444d71d8a24ac831fa4c3d365042c4a8ebb Mon Sep 17 00:00:00 2001 From: Shri Javadekar Date: Sun, 24 Nov 2013 23:35:57 -0800 Subject: [PATCH] JCLOUDS-510 Delete objects in a container efficiently. The existing approach for deleting objects in a container suffers from a head-of-line blocking problem. This commit implements a better scheme which does not have that problem. This scheme uses a counting semaphore for making sure that a certain number of futures are issued in parallel. As each of these futures is completed, one permit of the semaphore is released. Added unit tests for testing this new scheme. --- .../internal/DeleteAllKeysInList.java | 448 +++++++++++++----- .../internal/DeleteAllKeysInListTest.java | 149 +++++- core/src/main/java/org/jclouds/Constants.java | 5 + .../apis/internal/BaseApiMetadata.java | 11 +- 4 files changed, 500 insertions(+), 113 deletions(-) diff --git a/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java b/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java index 6a9b015a90..fe8aa4dc92 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java @@ -16,12 +16,16 @@ */ package org.jclouds.blobstore.strategy.internal; -import static com.google.common.base.Throwables.propagate; import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive; -import static org.jclouds.concurrent.FutureIterables.awaitCompletion; -import java.util.Map; +import java.util.HashSet; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Resource; import javax.inject.Named; @@ -29,6 +33,7 @@ import javax.inject.Singleton; import org.jclouds.Constants; import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.ContainerNotFoundException; import org.jclouds.blobstore.domain.PageSet; import org.jclouds.blobstore.domain.StorageMetadata; import org.jclouds.blobstore.internal.BlobRuntimeException; @@ -39,15 +44,18 @@ import org.jclouds.blobstore.strategy.ClearListStrategy; import org.jclouds.http.handlers.BackoffLimitedRetryHandler; import org.jclouds.logging.Logger; -import com.google.common.collect.Maps; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; /** * Deletes all keys in the container - * + * * @author Adrian Cole + * @author Shrinand Javadekar */ @Singleton public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStrategy { @@ -56,6 +64,7 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr protected Logger logger = Logger.NULL; protected final BackoffLimitedRetryHandler retryHandler; + private final ListeningExecutorService executorService; protected final BlobStore blobStore; @@ -66,12 +75,17 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr /** Maximum times to retry an operation. */ protected int maxErrors = 3; + /** Maximum parallel deletes. */ + private int maxParallelDeletes; + @Inject DeleteAllKeysInList(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService executorService, - BlobStore blobStore, BackoffLimitedRetryHandler retryHandler) { + BlobStore blobStore, BackoffLimitedRetryHandler retryHandler, + @Named(Constants.PROPERTY_MAX_PARALLEL_DELETES) int maxParallelDeletes) { this.executorService = executorService; this.blobStore = blobStore; this.retryHandler = retryHandler; + this.maxParallelDeletes = maxParallelDeletes; } @Inject(optional = true) @@ -88,121 +102,337 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr execute(containerName, recursive()); } - public void execute(final String containerName, ListContainerOptions options) { - String message = options.getDir() != null ? String.format("clearing path %s/%s", - containerName, options.getDir()) : String.format("clearing container %s", - containerName); - options = options.clone(); - if (options.isRecursive()) - message += " recursively"; - logger.debug(message); - Map exceptions = Maps.newHashMap(); - for (int numErrors = 0; numErrors < maxErrors; ) { - // fetch partial directory listing - PageSet listing = - blobStore.list(containerName, options); + private boolean parentIsFolder(final ListContainerOptions options, + final StorageMetadata md) { + return options.getDir() != null && md.getName().indexOf('/') == -1; + } - // recurse on subdirectories - if (options.isRecursive()) { - for (StorageMetadata md : listing) { - String fullPath = parentIsFolder(options, md) ? options.getDir() + "/" - + md.getName() : md.getName(); - switch (md.getType()) { - case BLOB: - break; - case FOLDER: - case RELATIVE_PATH: - if (options.isRecursive() && !fullPath.equals(options.getDir())) { - execute(containerName, options.clone().inDirectory(fullPath)); - } - break; - case CONTAINER: - throw new IllegalArgumentException("Container type not supported"); - } - } - } + private void cancelOutstandingFutures( + final Set> outstandingFutures) { + for (ListenableFuture future : outstandingFutures) { + future.cancel(/*mayInterruptIfRunning=*/ true); + } + } - // remove blobs and now-empty subdirectories - Map> responses = Maps.newHashMap(); - for (final StorageMetadata md : listing) { - final String fullPath = parentIsFolder(options, md) ? options.getDir() + "/" - + md.getName() : md.getName(); + private String getMessage(final String containerName, + final ListContainerOptions options) { + return options.getDir() != null ? String.format("clearing path %s/%s", + containerName, options.getDir()) : String.format( + "clearing container %s", containerName); + } + + /** + * Get the object listing from a given container based on the options. For + * recursive listing of directories, identify a directory and call execute() + * with the appropriate options to get listing inside the directory. + * + * @param containerName + * The container from which to get the object list. + * @param options + * The options used for getting the listing. + * @returns A PageSet of StorageMetadata objects. + */ + private PageSet getListing( + final String containerName, + final ListContainerOptions options, + final Semaphore semaphore, + final Set> outstandingFutures, + final AtomicBoolean deleteFailure) { + // fetch partial directory listing + PageSet listing = null; + + // There's nothing much to do if the container doesn't exist. + // Note that if the container has just been created, trying to get the + // container listing might throw a ContainerNotFoundException because of + // eventual consistency. + try { + listing = blobStore.list(containerName, options); + } catch (ContainerNotFoundException ce) { + return listing; + } + + // recurse on subdirectories + if (options.isRecursive()) { + for (StorageMetadata md : listing) { + String fullPath = parentIsFolder(options, md) ? options.getDir() + + "/" + md.getName() : md.getName(); switch (md.getType()) { - case BLOB: - responses.put(md, executorService.submit(new Runnable() { - @Override - public void run() { - blobStore.removeBlob(containerName, fullPath); - } - })); - break; - case FOLDER: - if (options.isRecursive()) { - responses.put(md, executorService.submit(new Runnable() { - @Override - public void run() { - blobStore.deleteDirectory(containerName, fullPath); - } - })); - } - break; - case RELATIVE_PATH: - if (options.isRecursive()) { - responses.put(md, executorService.submit(new Runnable() { - @Override - public void run() { - blobStore.deleteDirectory(containerName, md.getName()); - } - })); - } - break; - case CONTAINER: - throw new IllegalArgumentException("Container type not supported"); - } - } - - try { - exceptions = awaitCompletion(responses, executorService, maxTime, logger, message); - } catch (TimeoutException te) { - ++numErrors; - if (numErrors == maxErrors) { - throw propagate(te); - } - retryHandler.imposeBackoffExponentialDelay(numErrors, message); - continue; - } finally { - for (ListenableFuture future : responses.values()) { - future.cancel(true); - } - } - - if (!exceptions.isEmpty()) { - ++numErrors; - if (numErrors == maxErrors) { + case BLOB: break; + case FOLDER: + case RELATIVE_PATH: + if (!fullPath.equals(options.getDir())) { + executeOneIteration(containerName, + options.clone().inDirectory(fullPath), semaphore, + outstandingFutures, deleteFailure, /*blocking=*/ true); + } + break; + case CONTAINER: + throw new IllegalArgumentException( + "Container type not supported"); } - retryHandler.imposeBackoffExponentialDelay(numErrors, message); - continue; + } + } + + return listing; + } + + /** + * Delete the blobs from a given PageSet. The PageSet may contain blobs or + * directories. If there are directories, they are expected to be empty. + * + * The logic of acquiring a semaphore, submitting a callable to the + * executorService and releasing the semaphore resides here. + * + * @param containerName + * The container from which the objects are listed. + * @param options + * The options used for getting the container listing. + * @param listing + * The actual list of objects. + * @param semaphore + * The semaphore used for making sure that only a certain number of + * futures are outstanding. + * @param deleteFailure + * This is set to true if any future used for deleting blobs + * failed. + * @param outstandingFutures + * The List of outstanding futures. + * @throws TimeoutException + * If any blob deletion takes too long. + */ + private void deleteBlobsAndEmptyDirs(final String containerName, + ListContainerOptions options, + PageSet listing, final Semaphore semaphore, + final AtomicBoolean deleteFailure, + final Set> outstandingFutures) + throws TimeoutException { + for (final StorageMetadata md : listing) { + final String fullPath = parentIsFolder(options, md) ? options.getDir() + + "/" + md.getName() : md.getName(); + + // Attempt to acquire a semaphore within the time limit. At least + // one outstanding future should complete within this period for the + // semaphore to be acquired. + try { + if (!semaphore.tryAcquire(maxTime, TimeUnit.MILLISECONDS)) { + throw new TimeoutException("Timeout waiting for semaphore"); + } + } catch (InterruptedException ie) { + logger.debug("Interrupted while deleting blobs"); + Thread.currentThread().interrupt(); + } + + final ListenableFuture blobDelFuture; + switch (md.getType()) { + case BLOB: + blobDelFuture = executorService.submit(new Callable() { + @Override + public Void call() { + blobStore.removeBlob(containerName, fullPath); + return null; + } + }); + break; + case FOLDER: + if (options.isRecursive()) { + blobDelFuture = executorService.submit(new Callable() { + @Override + public Void call() { + blobStore.deleteDirectory(containerName, fullPath); + return null; + } + }); + } else { + blobDelFuture = null; + } + break; + case RELATIVE_PATH: + if (options.isRecursive()) { + blobDelFuture = executorService.submit(new Callable() { + @Override + public Void call() { + blobStore.deleteDirectory(containerName, md.getName()); + return null; + } + }); + } else { + blobDelFuture = null; + } + break; + case CONTAINER: + throw new IllegalArgumentException("Container type not supported"); + default: + blobDelFuture = null; + } + + // If a future to delete a blob/directory actually got created above, + // keep a reference of that in the outstandingFutures list. This is + // useful in case of a timeout exception. All outstanding futures can + // then be cancelled. + if (blobDelFuture != null) { + outstandingFutures.add(blobDelFuture); + + // Add a callback to release the semaphore. This is required for + // other threads waiting to acquire a semaphore above to make + // progress. + Futures.addCallback(blobDelFuture, new FutureCallback() { + @Override + public void onSuccess(final Object o) { + outstandingFutures.remove(blobDelFuture); + semaphore.release(); + } + + @Override + public void onFailure(final Throwable t) { + // Make a note the fact that some blob/directory could not be + // deleted successfully. This is used for retrying later. + deleteFailure.set(true); + outstandingFutures.remove(blobDelFuture); + semaphore.release(); + } + }); + } else { + // It is possible above to acquire a semaphore but not submit any + // task to the executorService. For e.g. if the listing contains + // an object of type 'FOLDER' and the ListContianerOptions are *not* + // recursive. In this case, there is no blobDelFuture and therefore + // no FutureCallback to release the semaphore. This semaphore is + // released here. + semaphore.release(); + } + } + } + + /** + * This method goes through all the blobs from a container and attempts to + * create futures for deleting them. If there is a TimeoutException when + * doing this, sets the deleteFailure flag to true and returns. If there are + * more retries left, this will get called again. + * + * @param containerName + * The container from which to get the object list. + * @param listOptions + * The options used for getting the listing. + * @param semaphore + * The semaphore used for controlling number of outstanding + * futures. + * @param outstandingFutures + * A list of outstanding futures. + * @param deleteFailure + * A flag used to track of whether there was a failure while + * deleting any blob. + * @param blocking + * when true, block until all outstanding operations have completed + * @return A PageSet of StorageMetadata objects. + */ + @VisibleForTesting + void executeOneIteration( + final String containerName, + ListContainerOptions listOptions, final Semaphore semaphore, + final Set> outstandingFutures, + final AtomicBoolean deleteFailure, final boolean blocking) { + ListContainerOptions options = listOptions.clone(); + String message = getMessage(containerName, listOptions); + if (options.isRecursive()) { + message += " recursively"; + } + logger.debug(message); + + PageSet listing = getListing(containerName, + options, semaphore, outstandingFutures, deleteFailure); + while (listing != null && !listing.isEmpty()) { + try { + // Remove blobs and now-empty subdirectories. + deleteBlobsAndEmptyDirs(containerName, options, listing, semaphore, + deleteFailure, outstandingFutures); + } catch (TimeoutException te) { + logger.debug("TimeoutException while deleting blobs: {}", + te.getMessage()); + cancelOutstandingFutures(outstandingFutures); + deleteFailure.set(true); } String marker = listing.getNextMarker(); - if (marker == null) { + if (marker != null) { + logger.debug("%s with marker %s", message, marker); + options = options.afterMarker(marker); + listing = getListing(containerName, options, semaphore, + outstandingFutures, deleteFailure); + } else { break; } - logger.debug("%s with marker %s", message, marker); - options = options.afterMarker(marker); - - // Reset numErrors if we execute a successful iteration. This ensures - // that we only try an unsuccessful operation maxErrors times but - // allow progress with directories containing many blobs in the face - // of some failures. - numErrors = 0; } - if (!exceptions.isEmpty()) - throw new BlobRuntimeException(String.format("error %s: %s", message, exceptions)); + + if (blocking) { + waitForCompletion(semaphore, outstandingFutures); + } } - private boolean parentIsFolder(final ListContainerOptions options, final StorageMetadata md) { - return options.getDir() != null && md.getName().indexOf('/') == -1; + private void waitForCompletion(final Semaphore semaphore, + final Set> outstandingFutures) { + // Wait for all futures to complete by waiting to acquire all + // semaphores. + try { + semaphore.acquire(maxParallelDeletes); + semaphore.release(maxParallelDeletes); + } catch (InterruptedException e) { + logger.debug("Interrupted while waiting for blobs to be deleted"); + cancelOutstandingFutures(outstandingFutures); + Thread.currentThread().interrupt(); + } + } + + public void execute(final String containerName, + ListContainerOptions listOptions) { + final AtomicBoolean deleteFailure = new AtomicBoolean(); + int retries = maxErrors; + + /* + * A Semaphore is used to control the number of outstanding delete + * requests. One permit of the semaphore is acquired before submitting a + * request to the executorService to delete a blob. As requests complete, + * their FutureCallback will release the semaphore permit. That will allow + * the next delete request to proceed. + * + * If no Future completes in 'maxTime', i.e. a semaphore cannot be + * acquired in 'maxTime', a TimeoutException is thrown. Any outstanding + * futures at that time are cancelled. + */ + final Semaphore semaphore = new Semaphore(maxParallelDeletes); + /* + * When a future is created, a reference for that is added to the + * outstandingFutures list. This reference is removed from the list in the + * FutureCallback since it no longer needs to be cancelled in the event of + * a timeout. Also, when the reference is removed from this list and when + * the executorService removes the reference that it has maintained, the + * future will be marked for GC since there should be no other references + * to it. This is important because this code can generate an unbounded + * number of futures. + */ + final Set> outstandingFutures = Collections + .synchronizedSet(new HashSet>()); + // TODO: Remove this retry loop. + while (retries > 0) { + deleteFailure.set(false); + executeOneIteration(containerName, listOptions.clone(), semaphore, + outstandingFutures, deleteFailure, /*blocking=*/ false); + waitForCompletion(semaphore, outstandingFutures); + + // Try again if there was any failure while deleting blobs and the max + // retry count hasn't been reached. + if (deleteFailure.get() && --retries > 0) { + String message = getMessage(containerName, listOptions); + retryHandler.imposeBackoffExponentialDelay(maxErrors - retries, + message); + } else { + break; + } + } + + if (retries == 0) { + cancelOutstandingFutures(outstandingFutures); + throw new BlobRuntimeException("Exceeded maximum retry attempts"); + } } } diff --git a/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInListTest.java b/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInListTest.java index 016fc23b8c..10b74f37b9 100644 --- a/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInListTest.java +++ b/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInListTest.java @@ -16,35 +16,64 @@ */ package org.jclouds.blobstore.strategy.internal; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.createMockBuilder; +import static org.easymock.EasyMock.createControl; +import static org.easymock.EasyMock.isA; +import static org.easymock.EasyMock.replay; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.easymock.EasyMock; +import org.easymock.IMocksControl; import org.jclouds.ContextBuilder; import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.ContainerNotFoundException; import org.jclouds.blobstore.options.ListContainerOptions; import org.jclouds.util.Closeables2; +import org.jclouds.blobstore.domain.PageSet; +import org.jclouds.blobstore.domain.StorageMetadata; +import org.jclouds.blobstore.internal.BlobRuntimeException; +import org.jclouds.http.handlers.BackoffLimitedRetryHandler; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Injector; /** - * + * * @author Adrian Cole */ @Test(testName = "DeleteAllKeysInListTest", singleThreaded = true) public class DeleteAllKeysInListTest { private BlobStore blobstore; private DeleteAllKeysInList deleter; + private BackoffLimitedRetryHandler retryHandler; private static final String containerName = "container"; private static final String directoryName = "directory"; + private static final int maxParallelDeletes = 1024; @BeforeMethod void setupBlobStore() { - Injector injector = ContextBuilder.newBuilder("transient").buildInjector(); + Injector injector = ContextBuilder.newBuilder("transient") + .buildInjector(); blobstore = injector.getInstance(BlobStore.class); deleter = injector.getInstance(DeleteAllKeysInList.class); + retryHandler = injector.getInstance(BackoffLimitedRetryHandler.class); createDataSet(); } @@ -73,6 +102,122 @@ public class DeleteAllKeysInListTest { assertEquals(blobstore.countBlobs(containerName), 1111); } + public void testContainerNotFound() { + IMocksControl mockControl = createControl(); + BlobStore blobStore = mockControl.createMock(BlobStore.class); + ListeningExecutorService executorService = mockControl + .createMock(ListeningExecutorService.class); + DeleteAllKeysInList testDeleter = createMockBuilder( + DeleteAllKeysInList.class).withConstructor(executorService, + blobStore, retryHandler, maxParallelDeletes).createMock(); + EasyMock.> expect(blobStore.list( + isA(String.class), isA(ListContainerOptions.class))) + .andThrow(new ContainerNotFoundException()).once(); + replay(blobStore); + testDeleter.execute(containerName, + ListContainerOptions.Builder.recursive()); + // No blobs will be deleted since blobStore.list will throw a + // ContainerNotFoundException. + assertEquals(blobstore.countBlobs(containerName), 3333); + } + + @SuppressWarnings("unchecked") + public void testDeleteAfterFutureFailure() { + IMocksControl mockControl = createControl(); + ListeningExecutorService executorService = mockControl + .createMock(ListeningExecutorService.class); + DeleteAllKeysInList testDeleter = createMockBuilder( + DeleteAllKeysInList.class).withConstructor(executorService, + blobstore, retryHandler, maxParallelDeletes).createMock(); + // Fail the first future that is created for deleting blobs. + EasyMock.> expect( + executorService.submit(isA(Callable.class))) + .andReturn( + Futures. immediateFailedFuture(new RuntimeException())) + .once(); + // There should be at least another 3333 calls to executorService.submit + // since there are 3333 blobs. + EasyMock.expectLastCall().andReturn(Futures. immediateFuture(null)) + .times(3333, Integer.MAX_VALUE); + replay(executorService); + testDeleter.execute(containerName, + ListContainerOptions.Builder.recursive()); + } + + @SuppressWarnings("unchecked") + public void testExceptionThrownAfterMaxRetries() { + IMocksControl mockControl = createControl(); + ListeningExecutorService executorService = mockControl + .createMock(ListeningExecutorService.class); + DeleteAllKeysInList testDeleter = createMockBuilder( + DeleteAllKeysInList.class).withConstructor(executorService, + blobstore, retryHandler, maxParallelDeletes).createMock(); + // Fail the first future that is created for deleting blobs. + EasyMock.> expect( + executorService.submit(isA(Callable.class))) + .andReturn( + Futures. immediateFailedFuture(new RuntimeException())) + .once(); + EasyMock.expectLastCall().andReturn(Futures. immediateFuture(null)) + .anyTimes(); + replay(executorService); + testDeleter.setMaxErrors(1); + + boolean blobRunTimeExceptionThrown = false; + try { + testDeleter.execute(containerName, + ListContainerOptions.Builder.recursive()); + } catch (BlobRuntimeException be) { + blobRunTimeExceptionThrown = true; + } + + assertTrue(blobRunTimeExceptionThrown, "Expected a BlobRunTimeException"); + } + + @SuppressWarnings("unchecked") + public void testFuturesCancelledOnFailure() { + IMocksControl mockControl = createControl(); + ListeningExecutorService executorService = mockControl + .createMock(ListeningExecutorService.class); + DeleteAllKeysInList testDeleter = createMockBuilder( + DeleteAllKeysInList.class).withConstructor(executorService, + blobstore, retryHandler, maxParallelDeletes).createMock(); + final AtomicBoolean deleteFailure = new AtomicBoolean(); + final Semaphore semaphore = createMock(Semaphore.class); + final Set> outstandingFutures = Collections + .synchronizedSet(new HashSet>()); + final ListenableFuture blobDelFuture = createMock(ListenableFuture.class); + try { + + // Allow the first semaphore acquire to succeed. + EasyMock.expect(semaphore.tryAcquire(Long.MAX_VALUE, + TimeUnit.MILLISECONDS)).andReturn(true).once(); + EasyMock.> expect( + executorService.submit(isA(Callable.class))) + .andReturn(blobDelFuture).once(); + + // Fail the second semaphore acquire. + EasyMock.expect(semaphore.tryAcquire(Long.MAX_VALUE, + TimeUnit.MILLISECONDS)) + .andReturn(false).anyTimes(); + + blobDelFuture.addListener(isA(Runnable.class), isA(Executor.class)); + EasyMock.expectLastCall(); + EasyMock.expect(blobDelFuture.cancel(true)).andReturn(true) + .atLeastOnce(); + } catch (InterruptedException e) { + fail(); + } + + replay(semaphore, executorService, blobDelFuture); + testDeleter.setMaxErrors(1); + testDeleter.executeOneIteration(containerName, + ListContainerOptions.Builder.recursive(), semaphore, + outstandingFutures, deleteFailure, /* blocking = */false); + assertEquals(outstandingFutures.size(), 1); + assertTrue(deleteFailure.get()); + } + /** * Create a container "container" with 1111 blobs named "blob-%d". Create a * subdirectory "directory" which contains 2222 more blobs named diff --git a/core/src/main/java/org/jclouds/Constants.java b/core/src/main/java/org/jclouds/Constants.java index 2dc2349aff..45d77cf566 100644 --- a/core/src/main/java/org/jclouds/Constants.java +++ b/core/src/main/java/org/jclouds/Constants.java @@ -299,4 +299,9 @@ public interface Constants { * providers that don't properly support Expect headers. Defaults to false. */ public static final String PROPERTY_STRIP_EXPECT_HEADER = "jclouds.strip-expect-header"; + + /** + * The maximum number of blob deletes happening in parallel at any point in time. + */ + public static final String PROPERTY_MAX_PARALLEL_DELETES = "jclouds.max-parallel-deletes"; } diff --git a/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java b/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java index a814c613eb..f8bc2850bd 100644 --- a/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java +++ b/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java @@ -24,6 +24,7 @@ import static org.jclouds.Constants.PROPERTY_ISO3166_CODES; import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_CONTEXT; import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_HOST; import static org.jclouds.Constants.PROPERTY_MAX_CONNECTION_REUSE; +import static org.jclouds.Constants.PROPERTY_MAX_PARALLEL_DELETES; import static org.jclouds.Constants.PROPERTY_MAX_SESSION_FAILURES; import static org.jclouds.Constants.PROPERTY_PRETTY_PRINT_PAYLOADS; import static org.jclouds.Constants.PROPERTY_SCHEDULER_THREADS; @@ -60,6 +61,8 @@ public abstract class BaseApiMetadata implements ApiMetadata { public static Properties defaultProperties() { Properties props = new Properties(); // TODO: move this to ApiMetadata + final int numUserThreads = 50; + props.setProperty(PROPERTY_ISO3166_CODES, ""); props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_CONTEXT, 20 + ""); props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_HOST, 0 + ""); @@ -67,16 +70,20 @@ public abstract class BaseApiMetadata implements ApiMetadata { props.setProperty(PROPERTY_CONNECTION_TIMEOUT, 60000 + ""); props.setProperty(PROPERTY_IO_WORKER_THREADS, 20 + ""); // Successfully tested 50 user threads with BlobStore.clearContainer. - props.setProperty(PROPERTY_USER_THREADS, 50 + ""); + props.setProperty(PROPERTY_USER_THREADS, numUserThreads + ""); props.setProperty(PROPERTY_SCHEDULER_THREADS, 10 + ""); props.setProperty(PROPERTY_MAX_CONNECTION_REUSE, 75 + ""); props.setProperty(PROPERTY_MAX_SESSION_FAILURES, 2 + ""); props.setProperty(PROPERTY_SESSION_INTERVAL, 60 + ""); props.setProperty(PROPERTY_PRETTY_PRINT_PAYLOADS, "true"); props.setProperty(PROPERTY_STRIP_EXPECT_HEADER, "false"); + + // By default, we allow maximum parallel deletes to be equal to the number + // of user threads since one thread is used to delete on blob. + props.setProperty(PROPERTY_MAX_PARALLEL_DELETES, numUserThreads + ""); return props; } - + public abstract static class Builder> implements ApiMetadata.Builder { protected abstract T self();