Cleanup S3 BlobContainer Listing Logic (#43088) (#44406)

* Cleanup duplication in creating and looping over IO Requests
This commit is contained in:
Armin Braun 2019-07-16 12:19:20 +02:00 committed by GitHub
parent a09389c511
commit 940aa71930
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 47 additions and 70 deletions

View File

@ -45,19 +45,17 @@ import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreException; import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE; import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
@ -239,36 +237,12 @@ class S3BlobContainer extends AbstractBlobContainer {
@Override @Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException { public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
final MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
try (AmazonS3Reference clientReference = blobStore.clientReference()) { try (AmazonS3Reference clientReference = blobStore.clientReference()) {
ObjectListing prevListing = null; return executeListing(clientReference, listObjectsRequest(blobNamePrefix == null ? keyPath : buildKey(blobNamePrefix)))
while (true) { .stream()
ObjectListing list; .flatMap(listing -> listing.getObjectSummaries().stream())
if (prevListing != null) { .map(summary -> new PlainBlobMetaData(summary.getKey().substring(keyPath.length()), summary.getSize()))
final ObjectListing finalPrevListing = prevListing; .collect(Collectors.toMap(PlainBlobMetaData::name, Function.identity()));
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
} else {
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
listObjectsRequest.setBucketName(blobStore.bucket());
listObjectsRequest.setDelimiter("/");
if (blobNamePrefix != null) {
listObjectsRequest.setPrefix(buildKey(blobNamePrefix));
} else {
listObjectsRequest.setPrefix(keyPath);
}
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
}
for (final S3ObjectSummary summary : list.getObjectSummaries()) {
final String name = summary.getKey().substring(keyPath.length());
blobsBuilder.put(name, new PlainBlobMetaData(name, summary.getSize()));
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
}
}
return blobsBuilder.immutableMap();
} catch (final AmazonClientException e) { } catch (final AmazonClientException e) {
throw new IOException("Exception when listing blobs by prefix [" + blobNamePrefix + "]", e); throw new IOException("Exception when listing blobs by prefix [" + blobNamePrefix + "]", e);
} }
@ -282,49 +256,52 @@ class S3BlobContainer extends AbstractBlobContainer {
@Override @Override
public Map<String, BlobContainer> children() throws IOException { public Map<String, BlobContainer> children() throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) { try (AmazonS3Reference clientReference = blobStore.clientReference()) {
ObjectListing prevListing = null; return executeListing(clientReference, listObjectsRequest(keyPath)).stream().flatMap(listing -> {
final Map<String, BlobContainer> entries = new HashMap<>(); assert listing.getObjectSummaries().stream().noneMatch(s -> {
while (true) { for (String commonPrefix : listing.getCommonPrefixes()) {
ObjectListing list; if (s.getKey().substring(keyPath.length()).startsWith(commonPrefix)) {
if (prevListing != null) { return true;
final ObjectListing finalPrevListing = prevListing; }
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
} else {
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
listObjectsRequest.setBucketName(blobStore.bucket());
listObjectsRequest.setPrefix(keyPath);
listObjectsRequest.setDelimiter("/");
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
}
for (final String summary : list.getCommonPrefixes()) {
final String name = summary.substring(keyPath.length());
if (name.isEmpty() == false) {
// Stripping the trailing slash off of the common prefix
final String last = name.substring(0, name.length() - 1);
final BlobPath path = path().add(last);
entries.put(last, blobStore.blobContainer(path));
}
}
assert list.getObjectSummaries().stream().noneMatch(s -> {
for (String commonPrefix : list.getCommonPrefixes()) {
if (s.getKey().substring(keyPath.length()).startsWith(commonPrefix)) {
return true;
} }
} return false;
return false; }) : "Response contained children for listed common prefixes.";
}) : "Response contained children for listed common prefixes."; return listing.getCommonPrefixes().stream();
if (list.isTruncated()) { })
prevListing = list; .map(prefix -> prefix.substring(keyPath.length()))
} else { .filter(name -> name.isEmpty() == false)
break; // Stripping the trailing slash off of the common prefix
} .map(name -> name.substring(0, name.length() - 1))
} .collect(Collectors.toMap(Function.identity(), name -> blobStore.blobContainer(path().add(name))));
return Collections.unmodifiableMap(entries);
} catch (final AmazonClientException e) { } catch (final AmazonClientException e) {
throw new IOException("Exception when listing children of [" + path().buildAsString() + ']', e); throw new IOException("Exception when listing children of [" + path().buildAsString() + ']', e);
} }
} }
private static List<ObjectListing> executeListing(AmazonS3Reference clientReference, ListObjectsRequest listObjectsRequest) {
final List<ObjectListing> results = new ArrayList<>();
ObjectListing prevListing = null;
while (true) {
ObjectListing list;
if (prevListing != null) {
final ObjectListing finalPrevListing = prevListing;
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
} else {
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
}
results.add(list);
if (list.isTruncated()) {
prevListing = list;
} else {
break;
}
}
return results;
}
private ListObjectsRequest listObjectsRequest(String keyPath) {
return new ListObjectsRequest().withBucketName(blobStore.bucket()).withPrefix(keyPath).withDelimiter("/");
}
private String buildKey(String blobName) { private String buildKey(String blobName) {
return keyPath + blobName; return keyPath + blobName;
} }