diff --git a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java index eed9ab124b..1c0ba0c313 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java @@ -23,6 +23,7 @@ import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursi import static org.jclouds.util.Predicates2.retry; import java.io.File; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -57,6 +58,7 @@ import org.jclouds.http.HttpResponse; import org.jclouds.http.HttpResponseException; import org.jclouds.io.ContentMetadata; import org.jclouds.io.Payload; +import org.jclouds.io.Payloads; import org.jclouds.io.PayloadSlicer; import org.jclouds.util.Closeables2; @@ -352,6 +354,26 @@ public abstract class BaseBlobStore implements BlobStore { protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) { ArrayList> parts = new ArrayList>(); MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), overrides); + // Cannot slice InputStream Payload since slice and close mutate the + // underlying stream. Also issue synchronous uploads to avoid buffering + // arbitrary amounts of data in-memory. + Payload payload = blob.getPayload(); + boolean repeatable = blob.getPayload().isRepeatable(); + if (!repeatable) { + payload = Payloads.newInputStreamPayload(new FilterInputStream((InputStream) payload.getRawContent()) { + @Override + public long skip(long offset) throws IOException { + // intentionally not implemented + return offset; + } + + @Override + public void close() throws IOException { + // intentionally not implemented + } + }); + } + try { long contentLength = blob.getMetadata().getContentMetadata().getContentLength(); // TODO: inject MultipartUploadSlicingAlgorithm to override default part size @@ -359,19 +381,16 @@ public abstract class BaseBlobStore implements BlobStore { getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts()); long partSize = algorithm.calculateChunkSize(contentLength); int partNumber = 1; - // TODO: for InputStream payloads, this buffers all parts in-memory! while (partNumber <= algorithm.getParts()) { - Payload payload = slicer.slice(blob.getPayload(), algorithm.getCopied(), partSize); - BlobUploader b = - new BlobUploader(mpu, partNumber++, payload); - parts.add(executor.submit(b)); + Payload slice = slicer.slice(payload, algorithm.getCopied(), partSize); + BlobUploader b = new BlobUploader(mpu, partNumber++, slice); + parts.add(repeatable ? executor.submit(b) : Futures.immediateFuture(b.call())); algorithm.addCopied(partSize); } if (algorithm.getRemaining() != 0) { - Payload payload = slicer.slice(blob.getPayload(), algorithm.getCopied(), algorithm.getRemaining()); - BlobUploader b = - new BlobUploader(mpu, partNumber, payload); - parts.add(executor.submit(b)); + Payload slice = slicer.slice(payload, algorithm.getCopied(), algorithm.getRemaining()); + BlobUploader b = new BlobUploader(mpu, partNumber, slice); + parts.add(repeatable ? executor.submit(b) : Futures.immediateFuture(b.call())); } return completeMultipartUpload(mpu, Futures.getUnchecked(Futures.allAsList(parts))); } catch (RuntimeException re) { diff --git a/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java b/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java index 35c43acc65..f250a60477 100644 --- a/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java +++ b/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java @@ -679,7 +679,13 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest { @Test(groups = { "integration", "live" }) public void testPutMultipartInputStream() throws Exception { - long length = getMinimumMultipartBlobSize(); + long length = Math.max(getMinimumMultipartBlobSize(), MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE + 1); + BlobStore blobStore = view.getBlobStore(); + MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm( + blobStore.getMinimumMultipartPartSize(), blobStore.getMaximumMultipartPartSize(), + blobStore.getMaximumNumberOfParts()); + // make sure that we are creating multiple parts + assertThat(algorithm.calculateChunkSize(length)).isLessThan(length); ByteSource byteSource = TestUtils.randomByteSource().slice(0, length); Payload payload = new InputStreamPayload(byteSource.openStream()); testPut(payload, null, new ByteSourcePayload(byteSource), length, new PutOptions().multipart(true));