mirror of https://github.com/apache/jclouds.git
Optimize clearContainer for large folders
Previously we built up a list of all blobs then executed removeBlob on all of them simultaneously. For sufficiently large folders this would cause an OutOfMemoryError. There are cleaner ways to write this but this approach mimimizes the patch. I will continue working on this in subsequent commits, including optimizing for deep subdirectories. This commit also fixes a bug in the transient blobstore where deleting elements while reading a directory resulted in NoSuchElementException.
This commit is contained in:
parent
588a7c38ad
commit
e4514240e0
|
@ -179,11 +179,13 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
|
||||||
final String finalMarker = options.getMarker();
|
final String finalMarker = options.getMarker();
|
||||||
StorageMetadata lastMarkerMetadata = find(contents, new Predicate<StorageMetadata>() {
|
StorageMetadata lastMarkerMetadata = find(contents, new Predicate<StorageMetadata>() {
|
||||||
public boolean apply(StorageMetadata metadata) {
|
public boolean apply(StorageMetadata metadata) {
|
||||||
return metadata.getName().equals(finalMarker);
|
return metadata.getName().compareTo(finalMarker) >= 0;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
contents = contents.tailSet(lastMarkerMetadata);
|
contents = contents.tailSet(lastMarkerMetadata);
|
||||||
contents.remove(lastMarkerMetadata);
|
if (finalMarker.equals(lastMarkerMetadata.getName())) {
|
||||||
|
contents.remove(lastMarkerMetadata);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final String prefix = options.getDir();
|
final String prefix = options.getDir();
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursi
|
||||||
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
@ -31,17 +32,18 @@ import javax.inject.Singleton;
|
||||||
|
|
||||||
import org.jclouds.Constants;
|
import org.jclouds.Constants;
|
||||||
import org.jclouds.blobstore.AsyncBlobStore;
|
import org.jclouds.blobstore.AsyncBlobStore;
|
||||||
|
import org.jclouds.blobstore.domain.PageSet;
|
||||||
import org.jclouds.blobstore.domain.StorageMetadata;
|
import org.jclouds.blobstore.domain.StorageMetadata;
|
||||||
import org.jclouds.blobstore.internal.BlobRuntimeException;
|
import org.jclouds.blobstore.internal.BlobRuntimeException;
|
||||||
import org.jclouds.blobstore.options.ListContainerOptions;
|
import org.jclouds.blobstore.options.ListContainerOptions;
|
||||||
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
||||||
import org.jclouds.blobstore.strategy.ClearContainerStrategy;
|
import org.jclouds.blobstore.strategy.ClearContainerStrategy;
|
||||||
import org.jclouds.blobstore.strategy.ClearListStrategy;
|
import org.jclouds.blobstore.strategy.ClearListStrategy;
|
||||||
import org.jclouds.blobstore.strategy.ListContainerStrategy;
|
|
||||||
import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
|
import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
|
||||||
import org.jclouds.logging.Logger;
|
import org.jclouds.logging.Logger;
|
||||||
|
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
@ -57,7 +59,6 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
|
||||||
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||||
protected Logger logger = Logger.NULL;
|
protected Logger logger = Logger.NULL;
|
||||||
|
|
||||||
protected final ListContainerStrategy listContainer;
|
|
||||||
protected final BackoffLimitedRetryHandler retryHandler;
|
protected final BackoffLimitedRetryHandler retryHandler;
|
||||||
private final ExecutorService userExecutor;
|
private final ExecutorService userExecutor;
|
||||||
|
|
||||||
|
@ -71,12 +72,11 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
DeleteAllKeysInList(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userExecutor,
|
DeleteAllKeysInList(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userExecutor,
|
||||||
AsyncBlobStore connection, ListContainerStrategy listContainer,
|
AsyncBlobStore connection,
|
||||||
BackoffLimitedRetryHandler retryHandler) {
|
BackoffLimitedRetryHandler retryHandler) {
|
||||||
|
|
||||||
this.userExecutor = userExecutor;
|
this.userExecutor = userExecutor;
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
this.listContainer = listContainer;
|
|
||||||
this.retryHandler = retryHandler;
|
this.retryHandler = retryHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,19 +84,31 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
|
||||||
execute(containerName, recursive());
|
execute(containerName, recursive());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void execute(final String containerName, final ListContainerOptions options) {
|
public void execute(final String containerName, ListContainerOptions options) {
|
||||||
String message = options.getDir() != null ? String.format("clearing path %s/%s",
|
String message = options.getDir() != null ? String.format("clearing path %s/%s",
|
||||||
containerName, options.getDir()) : String.format("clearing container %s",
|
containerName, options.getDir()) : String.format("clearing container %s",
|
||||||
containerName);
|
containerName);
|
||||||
|
options = options.clone();
|
||||||
if (options.isRecursive())
|
if (options.isRecursive())
|
||||||
message = message + " recursively";
|
message = message + " recursively";
|
||||||
Map<StorageMetadata, Exception> exceptions = Maps.newHashMap();
|
Map<StorageMetadata, Exception> exceptions = Maps.newHashMap();
|
||||||
|
PageSet<? extends StorageMetadata> listing;
|
||||||
Iterable<? extends StorageMetadata> toDelete;
|
Iterable<? extends StorageMetadata> toDelete;
|
||||||
for (int i = 0; i < 3; i++) { // TODO parameterize
|
int maxErrors = 3; // TODO parameterize
|
||||||
toDelete = getResourcesToDelete(containerName, options);
|
for (int i = 0; i < maxErrors; ) {
|
||||||
if (Iterables.isEmpty(toDelete)) {
|
try {
|
||||||
break;
|
listing = connection.list(containerName, options).get();
|
||||||
|
} catch (ExecutionException ee) {
|
||||||
|
++i;
|
||||||
|
if (i == maxErrors) {
|
||||||
|
throw new BlobRuntimeException("list error", ee.getCause());
|
||||||
|
}
|
||||||
|
retryHandler.imposeBackoffExponentialDelay(i, message);
|
||||||
|
continue;
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
throw Throwables.propagate(ie);
|
||||||
}
|
}
|
||||||
|
toDelete = filterListing(listing, options);
|
||||||
|
|
||||||
Map<StorageMetadata, Future<?>> responses = Maps.newHashMap();
|
Map<StorageMetadata, Future<?>> responses = Maps.newHashMap();
|
||||||
try {
|
try {
|
||||||
|
@ -127,24 +139,30 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
|
||||||
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message);
|
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message);
|
||||||
}
|
}
|
||||||
if (!exceptions.isEmpty()) {
|
if (!exceptions.isEmpty()) {
|
||||||
retryHandler.imposeBackoffExponentialDelay(i + 1, message);
|
++i;
|
||||||
|
retryHandler.imposeBackoffExponentialDelay(i, message);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String marker = listing.getNextMarker();
|
||||||
|
if (marker == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
options = options.afterMarker(marker);
|
||||||
}
|
}
|
||||||
if (!exceptions.isEmpty())
|
if (!exceptions.isEmpty())
|
||||||
throw new BlobRuntimeException(String.format("error %s: %s", message, exceptions));
|
throw new BlobRuntimeException(String.format("error %s: %s", message, exceptions));
|
||||||
toDelete = getResourcesToDelete(containerName, options);
|
|
||||||
assert Iterables.isEmpty(toDelete) : String.format("items remaining %s: %s", message,
|
|
||||||
toDelete);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean parentIsFolder(final ListContainerOptions options, final StorageMetadata md) {
|
private boolean parentIsFolder(final ListContainerOptions options, final StorageMetadata md) {
|
||||||
return (options.getDir() != null && md.getName().indexOf('/') == -1);
|
return (options.getDir() != null && md.getName().indexOf('/') == -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Iterable<? extends StorageMetadata> getResourcesToDelete(final String containerName,
|
private Iterable<? extends StorageMetadata> filterListing(
|
||||||
|
final PageSet<? extends StorageMetadata> listing,
|
||||||
final ListContainerOptions options) {
|
final ListContainerOptions options) {
|
||||||
Iterable<? extends StorageMetadata> toDelete = Iterables.filter(listContainer.execute(
|
Iterable<? extends StorageMetadata> toDelete = Iterables.filter(listing,
|
||||||
containerName, options), new Predicate<StorageMetadata>() {
|
new Predicate<StorageMetadata>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(StorageMetadata input) {
|
public boolean apply(StorageMetadata input) {
|
||||||
|
|
Loading…
Reference in New Issue