diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 47f7ee26e83..93ad583f221 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -45,19 +45,17 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStoreException; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; -import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.Tuple; import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE; @@ -239,36 +237,12 @@ class S3BlobContainer extends AbstractBlobContainer { @Override public Map listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException { - final MapBuilder blobsBuilder = MapBuilder.newMapBuilder(); try (AmazonS3Reference clientReference = blobStore.clientReference()) { - ObjectListing prevListing = null; - while (true) { - ObjectListing list; - if (prevListing != null) { - final ObjectListing finalPrevListing = prevListing; - list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing)); - } else { - final ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); - listObjectsRequest.setBucketName(blobStore.bucket()); - listObjectsRequest.setDelimiter("/"); - if (blobNamePrefix != null) { - listObjectsRequest.setPrefix(buildKey(blobNamePrefix)); - } else { - listObjectsRequest.setPrefix(keyPath); - } - list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest)); - } - for (final S3ObjectSummary summary : list.getObjectSummaries()) { - final String name = summary.getKey().substring(keyPath.length()); - blobsBuilder.put(name, new PlainBlobMetaData(name, summary.getSize())); - } - if (list.isTruncated()) { - prevListing = list; - } else { - break; - } - } - return blobsBuilder.immutableMap(); + return executeListing(clientReference, listObjectsRequest(blobNamePrefix == null ? keyPath : buildKey(blobNamePrefix))) + .stream() + .flatMap(listing -> listing.getObjectSummaries().stream()) + .map(summary -> new PlainBlobMetaData(summary.getKey().substring(keyPath.length()), summary.getSize())) + .collect(Collectors.toMap(PlainBlobMetaData::name, Function.identity())); } catch (final AmazonClientException e) { throw new IOException("Exception when listing blobs by prefix [" + blobNamePrefix + "]", e); } @@ -282,49 +256,52 @@ class S3BlobContainer extends AbstractBlobContainer { @Override public Map children() throws IOException { try (AmazonS3Reference clientReference = blobStore.clientReference()) { - ObjectListing prevListing = null; - final Map entries = new HashMap<>(); - while (true) { - ObjectListing list; - if (prevListing != null) { - final ObjectListing finalPrevListing = prevListing; - list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing)); - } else { - final ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); - listObjectsRequest.setBucketName(blobStore.bucket()); - listObjectsRequest.setPrefix(keyPath); - listObjectsRequest.setDelimiter("/"); - list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest)); - } - for (final String summary : list.getCommonPrefixes()) { - final String name = summary.substring(keyPath.length()); - if (name.isEmpty() == false) { - // Stripping the trailing slash off of the common prefix - final String last = name.substring(0, name.length() - 1); - final BlobPath path = path().add(last); - entries.put(last, blobStore.blobContainer(path)); - } - } - assert list.getObjectSummaries().stream().noneMatch(s -> { - for (String commonPrefix : list.getCommonPrefixes()) { - if (s.getKey().substring(keyPath.length()).startsWith(commonPrefix)) { - return true; + return executeListing(clientReference, listObjectsRequest(keyPath)).stream().flatMap(listing -> { + assert listing.getObjectSummaries().stream().noneMatch(s -> { + for (String commonPrefix : listing.getCommonPrefixes()) { + if (s.getKey().substring(keyPath.length()).startsWith(commonPrefix)) { + return true; + } } - } - return false; - }) : "Response contained children for listed common prefixes."; - if (list.isTruncated()) { - prevListing = list; - } else { - break; - } - } - return Collections.unmodifiableMap(entries); + return false; + }) : "Response contained children for listed common prefixes."; + return listing.getCommonPrefixes().stream(); + }) + .map(prefix -> prefix.substring(keyPath.length())) + .filter(name -> name.isEmpty() == false) + // Stripping the trailing slash off of the common prefix + .map(name -> name.substring(0, name.length() - 1)) + .collect(Collectors.toMap(Function.identity(), name -> blobStore.blobContainer(path().add(name)))); } catch (final AmazonClientException e) { - throw new IOException("Exception when listing children of [" + path().buildAsString() + ']', e); + throw new IOException("Exception when listing children of [" + path().buildAsString() + ']', e); } } + private static List executeListing(AmazonS3Reference clientReference, ListObjectsRequest listObjectsRequest) { + final List results = new ArrayList<>(); + ObjectListing prevListing = null; + while (true) { + ObjectListing list; + if (prevListing != null) { + final ObjectListing finalPrevListing = prevListing; + list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing)); + } else { + list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest)); + } + results.add(list); + if (list.isTruncated()) { + prevListing = list; + } else { + break; + } + } + return results; + } + + private ListObjectsRequest listObjectsRequest(String keyPath) { + return new ListObjectsRequest().withBucketName(blobStore.bucket()).withPrefix(keyPath).withDelimiter("/"); + } + private String buildKey(String blobName) { return keyPath + blobName; }