From b7f28f1e6a2632fce579360c7900b6dc548ff25f Mon Sep 17 00:00:00 2001 From: Andrew Gaul Date: Sun, 22 Jan 2023 13:20:59 +0900 Subject: [PATCH] Lazily open parts during LocalBlobStore complete MPU This removes a previous workaround for opening too many FileInputStream and exhausting rlimits. --- .../blobstore/config/LocalBlobStore.java | 43 +++++++++---------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java index c50a405817..abb7bd0d43 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java @@ -28,7 +28,6 @@ import static com.google.common.collect.Sets.newTreeSet; import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -731,14 +730,8 @@ public final class LocalBlobStore implements BlobStore { Payload payload; try { InputStream is = blob.getPayload().openStream(); - if (is instanceof FileInputStream) { - // except for FileInputStream since large MPU can open too many fds - is.close(); - payload = blob.getPayload(); - } else { - blob.resetPayload(/*release=*/ false); - payload = new InputStreamPayload(is); - } + blob.resetPayload(/*release=*/ false); + payload = new InputStreamPayload(is); } catch (IOException ioe) { throw new RuntimeException(ioe); } @@ -825,16 +818,16 @@ public final class LocalBlobStore implements BlobStore { @Override public String completeMultipartUpload(MultipartUpload mpu, List parts) { - ImmutableList.Builder blobs = ImmutableList.builder(); + ImmutableList.Builder metas = ImmutableList.builder(); long contentLength = 0; Hasher md5Hasher = Hashing.md5().newHasher(); for (MultipartPart part : parts) { - Blob blobPart = getBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber()); - contentLength += blobPart.getMetadata().getContentMetadata().getContentLength(); - blobs.add(blobPart); - if (blobPart.getMetadata().getETag() != null) { - md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(blobPart.getMetadata().getETag())); + BlobMetadata meta = blobMetadata(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber()); + contentLength += meta.getContentMetadata().getContentLength(); + metas.add(meta); + if (meta.getETag() != null) { + md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(meta.getETag())); } } String mpuETag = new StringBuilder("\"") @@ -845,7 +838,7 @@ public final class LocalBlobStore implements BlobStore { .toString(); PayloadBlobBuilder blobBuilder = blobBuilder(mpu.blobName()) .userMetadata(mpu.blobMetadata().getUserMetadata()) - .payload(new MultiBlobInputStream(blobs.build())) + .payload(new MultiBlobInputStream(this, metas.build())) .contentLength(contentLength) .eTag(mpuETag); String cacheControl = mpu.blobMetadata().getContentMetadata().getCacheControl(); @@ -995,21 +988,24 @@ public final class LocalBlobStore implements BlobStore { } private static final class MultiBlobInputStream extends InputStream { - private final Iterator blobs; + private final BlobStore blobStore; + private final Iterator metas; private InputStream current; - MultiBlobInputStream(List blobs) { - this.blobs = blobs.iterator(); + MultiBlobInputStream(BlobStore blobStore, List metas) { + this.blobStore = blobStore; + this.metas = metas.iterator(); } @Override public int read() throws IOException { while (true) { if (current == null) { - if (!blobs.hasNext()) { + if (!metas.hasNext()) { return -1; } - current = blobs.next().getPayload().openStream(); + BlobMetadata meta = metas.next(); + current = blobStore.getBlob(meta.getContainer(), meta.getName()).getPayload().openStream(); } int result = current.read(); if (result == -1) { @@ -1025,10 +1021,11 @@ public final class LocalBlobStore implements BlobStore { public int read(byte[] b, int off, int len) throws IOException { while (true) { if (current == null) { - if (!blobs.hasNext()) { + if (!metas.hasNext()) { return -1; } - current = blobs.next().getPayload().openStream(); + BlobMetadata meta = metas.next(); + current = blobStore.getBlob(meta.getContainer(), meta.getName()).getPayload().openStream(); } int result = current.read(b, off, len); if (result == -1) {