From 13f32b28c90f4dda7ea21c95380d8c0879ba91fe Mon Sep 17 00:00:00 2001 From: Andrew Gaul Date: Mon, 28 Jan 2019 21:45:20 -0800 Subject: [PATCH] Lazily open InputStream during complete MPU Previously the filesystem provider could exhaust file descriptors by eagerly opening up to 10,000 parts. This partially undoes JCLOUDS-1367. --- .../FilesystemBlobIntegrationTest.java | 28 +++++++++ .../blobstore/config/LocalBlobStore.java | 59 +++++++++++++++---- 2 files changed, 77 insertions(+), 10 deletions(-) diff --git a/apis/filesystem/src/test/java/org/jclouds/filesystem/integration/FilesystemBlobIntegrationTest.java b/apis/filesystem/src/test/java/org/jclouds/filesystem/integration/FilesystemBlobIntegrationTest.java index cda138c24a..ff23e3f673 100644 --- a/apis/filesystem/src/test/java/org/jclouds/filesystem/integration/FilesystemBlobIntegrationTest.java +++ b/apis/filesystem/src/test/java/org/jclouds/filesystem/integration/FilesystemBlobIntegrationTest.java @@ -164,6 +164,34 @@ public class FilesystemBlobIntegrationTest extends BaseBlobIntegrationTest { } } + @Test(groups = { "integration", "live" }) + public void test10000PartMultipartUpload() throws Exception { + BlobStore blobStore = view.getBlobStore(); + String container = getContainerName(); + int partSize = (int) blobStore.getMinimumMultipartPartSize(); + try { + String name = "blob-name"; + BlobBuilder blobBuilder = blobStore.blobBuilder(name); + Blob blob = blobBuilder.build(); + MultipartUpload mpu = blobStore.initiateMultipartUpload(container, blob.getMetadata(), new PutOptions()); + ImmutableList.Builder parts = ImmutableList.builder(); + byte[] content = new byte[partSize]; + + for (int i = 0; i < 10 * 1000; ++i) { + Payload payload = Payloads.newByteArrayPayload(content); + payload.getContentMetadata().setContentLength((long) partSize); + parts.add(blobStore.uploadMultipartPart(mpu, i, payload)); + } + + blobStore.completeMultipartUpload(mpu, parts.build()); + + BlobMetadata newBlobMetadata = blobStore.blobMetadata(container, name); + assertThat(newBlobMetadata.getSize()).isEqualTo(10 * 1000 * partSize); + } finally { + returnContainer(container); + } + } + protected void checkExtendedAttributesSupport() { if (isMacOSX()) { throw new SkipException("filesystem does not support extended attributes in Mac OSX"); diff --git a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java index b62ad188e6..81c6ae32fe 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java @@ -28,12 +28,14 @@ import static com.google.common.collect.Sets.newTreeSet; import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.SequenceInputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -756,7 +758,14 @@ public final class LocalBlobStore implements BlobStore { // return InputStream to more closely follow real blobstore Payload payload; try { - payload = new InputStreamPayload(blob.getPayload().openStream()); + InputStream is = blob.getPayload().openStream(); + if (is instanceof FileInputStream) { + // except for FileInputStream since large MPU can open too many fds + is.close(); + payload = blob.getPayload(); + } else { + payload = new InputStreamPayload(blob.getPayload().openStream()); + } } catch (IOException ioe) { throw new RuntimeException(ioe); } @@ -825,20 +834,14 @@ public final class LocalBlobStore implements BlobStore { @Override public String completeMultipartUpload(MultipartUpload mpu, List parts) { - ImmutableList.Builder streams = ImmutableList.builder(); + ImmutableList.Builder blobs = ImmutableList.builder(); long contentLength = 0; Hasher md5Hasher = Hashing.md5().newHasher(); for (MultipartPart part : parts) { Blob blobPart = getBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber()); contentLength += blobPart.getMetadata().getContentMetadata().getContentLength(); - InputStream is; - try { - is = blobPart.getPayload().openStream(); - } catch (IOException ioe) { - throw propagate(ioe); - } - streams.add(is); + blobs.add(blobPart); md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(blobPart.getMetadata().getETag())); } String mpuETag = new StringBuilder("\"") @@ -849,7 +852,7 @@ public final class LocalBlobStore implements BlobStore { .toString(); PayloadBlobBuilder blobBuilder = blobBuilder(mpu.blobName()) .userMetadata(mpu.blobMetadata().getUserMetadata()) - .payload(new SequenceInputStream(Iterators.asEnumeration(streams.build().iterator()))) + .payload(new MultiBlobInputStream(blobs.build())) .contentLength(contentLength) .eTag(mpuETag); String cacheControl = mpu.blobMetadata().getContentMetadata().getCacheControl(); @@ -998,5 +1001,41 @@ public final class LocalBlobStore implements BlobStore { return eTag; } + private static final class MultiBlobInputStream extends InputStream { + private final Iterator blobs; + private InputStream current; + MultiBlobInputStream(List blobs) { + this.blobs = blobs.iterator(); + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + int result = read(b, 0, b.length); + if (result == -1) { + return -1; + } + return b[0] & 0x000000FF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + while (true) { + if (current == null) { + if (!blobs.hasNext()) { + return -1; + } + current = blobs.next().getPayload().openStream(); + } + int result = current.read(b, off, len); + if (result == -1) { + current.close(); + current = null; + continue; + } + return result; + } + } + } }