Add support for bulk delete operation in snapshot repository

Currently when we delete files belonging to deleted snapshots we issue one delete command to underlying snapshot store at a time. Some repositories can benefit from bulk deletes of multiple files.

Closes #12533
This commit is contained in:
Igor Motov 2015-07-31 12:33:57 -04:00
parent 39508c4d7a
commit 8fe5d903b7
4 changed files with 41 additions and 14 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Collection;
import java.util.Map; import java.util.Map;
/** /**
@ -48,10 +49,17 @@ public interface BlobContainer {
/** /**
* Deletes a blob with giving name. * Deletes a blob with giving name.
* *
* If blob exist but cannot be deleted an exception has to be thrown. * If a blob exists but cannot be deleted an exception has to be thrown.
*/ */
void deleteBlob(String blobName) throws IOException; void deleteBlob(String blobName) throws IOException;
/**
* Deletes blobs with giving names.
*
* If a blob exists but cannot be deleted an exception has to be thrown.
*/
void deleteBlobs(Collection<String> blobNames) throws IOException;
/** /**
* Deletes all blobs in the container that match the specified prefix. * Deletes all blobs in the container that match the specified prefix.
*/ */

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.Map; import java.util.Map;
/** /**
@ -50,4 +51,11 @@ public abstract class AbstractBlobContainer implements BlobContainer {
deleteBlob(blob.name()); deleteBlob(blob.name());
} }
} }
@Override
public void deleteBlobs(Collection<String> blobNames) throws IOException {
for(String blob: blobNames) {
deleteBlob(blob);
}
}
} }

View File

@ -352,33 +352,38 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
*/ */
protected void finalize(List<SnapshotFiles> snapshots, int fileListGeneration, Map<String, BlobMetaData> blobs) { protected void finalize(List<SnapshotFiles> snapshots, int fileListGeneration, Map<String, BlobMetaData> blobs) {
BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots); BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
List<String> blobsToDelete = newArrayList();
// delete old index files first // delete old index files first
for (String blobName : blobs.keySet()) { for (String blobName : blobs.keySet()) {
// delete old file lists // delete old file lists
if (indexShardSnapshotsFormat.isTempBlobName(blobName) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) { if (indexShardSnapshotsFormat.isTempBlobName(blobName) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try { blobsToDelete.add(blobName);
blobContainer.deleteBlob(blobName);
} catch (IOException e) {
// We cannot delete index file - this is fatal, we cannot continue, otherwise we might end up
// with references to non-existing files
throw new IndexShardSnapshotFailedException(shardId, "error deleting index file [{}] during cleanup", e);
}
} }
} }
try {
blobContainer.deleteBlobs(blobsToDelete);
} catch (IOException e) {
// We cannot delete index file - this is fatal, we cannot continue, otherwise we might end up
// with references to non-existing files
throw new IndexShardSnapshotFailedException(shardId, "error deleting index files during cleanup, reason: " + e.getMessage(), e);
}
blobsToDelete = newArrayList();
// now go over all the blobs, and if they don't exists in a snapshot, delete them // now go over all the blobs, and if they don't exists in a snapshot, delete them
for (String blobName : blobs.keySet()) { for (String blobName : blobs.keySet()) {
// delete old file lists // delete unused files
if (blobName.startsWith(DATA_BLOB_PREFIX)) { if (blobName.startsWith(DATA_BLOB_PREFIX)) {
if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) { if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) {
try { blobsToDelete.add(blobName);
blobContainer.deleteBlob(blobName);
} catch (IOException e) {
logger.debug("[{}] [{}] error deleting blob [{}] during cleanup", e, snapshotId, shardId, blobName);
}
} }
} }
} }
try {
blobContainer.deleteBlobs(blobsToDelete);
} catch (IOException e) {
logger.debug("[{}] [{}] error deleting some of the blobs [{}] during cleanup", e, snapshotId, shardId, blobsToDelete);
}
// If we deleted all snapshots - we don't need to create the index file // If we deleted all snapshots - we don't need to create the index file
if (snapshots.size() > 0) { if (snapshots.size() > 0) {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.blobstore.BlobPath;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Collection;
import java.util.Map; import java.util.Map;
/** /**
@ -63,6 +64,11 @@ public class BlobContainerWrapper implements BlobContainer {
delegate.deleteBlob(blobName); delegate.deleteBlob(blobName);
} }
@Override
public void deleteBlobs(Collection<String> blobNames) throws IOException {
delegate.deleteBlobs(blobNames);
}
@Override @Override
public void deleteBlobsByPrefix(String blobNamePrefix) throws IOException { public void deleteBlobsByPrefix(String blobNamePrefix) throws IOException {
delegate.deleteBlobsByPrefix(blobNamePrefix); delegate.deleteBlobsByPrefix(blobNamePrefix);