JCLOUDS-1366: JCLOUDS-1472: Fix InputStream MPU

Previously jclouds attempted to slice non-repeatable InputStream
Payloads in order to upload sequentially.  This never worked due to
mutating the single stream via skip and close.  Also backfill test
which spuriously succeeded.
This commit is contained in:
Andrew Gaul 2019-01-04 14:56:29 -08:00
parent a36c9dcef0
commit 2393c7920b
2 changed files with 35 additions and 10 deletions

View File

@ -23,6 +23,7 @@ import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursi
import static org.jclouds.util.Predicates2.retry; import static org.jclouds.util.Predicates2.retry;
import java.io.File; import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
@ -57,6 +58,7 @@ import org.jclouds.http.HttpResponse;
import org.jclouds.http.HttpResponseException; import org.jclouds.http.HttpResponseException;
import org.jclouds.io.ContentMetadata; import org.jclouds.io.ContentMetadata;
import org.jclouds.io.Payload; import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.io.PayloadSlicer; import org.jclouds.io.PayloadSlicer;
import org.jclouds.util.Closeables2; import org.jclouds.util.Closeables2;
@ -352,6 +354,26 @@ public abstract class BaseBlobStore implements BlobStore {
protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) { protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) {
ArrayList<ListenableFuture<MultipartPart>> parts = new ArrayList<ListenableFuture<MultipartPart>>(); ArrayList<ListenableFuture<MultipartPart>> parts = new ArrayList<ListenableFuture<MultipartPart>>();
MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), overrides); MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), overrides);
// Cannot slice InputStream Payload since slice and close mutate the
// underlying stream. Also issue synchronous uploads to avoid buffering
// arbitrary amounts of data in-memory.
Payload payload = blob.getPayload();
boolean repeatable = blob.getPayload().isRepeatable();
if (!repeatable) {
payload = Payloads.newInputStreamPayload(new FilterInputStream((InputStream) payload.getRawContent()) {
@Override
public long skip(long offset) throws IOException {
// intentionally not implemented
return offset;
}
@Override
public void close() throws IOException {
// intentionally not implemented
}
});
}
try { try {
long contentLength = blob.getMetadata().getContentMetadata().getContentLength(); long contentLength = blob.getMetadata().getContentMetadata().getContentLength();
// TODO: inject MultipartUploadSlicingAlgorithm to override default part size // TODO: inject MultipartUploadSlicingAlgorithm to override default part size
@ -359,19 +381,16 @@ public abstract class BaseBlobStore implements BlobStore {
getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts()); getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts());
long partSize = algorithm.calculateChunkSize(contentLength); long partSize = algorithm.calculateChunkSize(contentLength);
int partNumber = 1; int partNumber = 1;
// TODO: for InputStream payloads, this buffers all parts in-memory!
while (partNumber <= algorithm.getParts()) { while (partNumber <= algorithm.getParts()) {
Payload payload = slicer.slice(blob.getPayload(), algorithm.getCopied(), partSize); Payload slice = slicer.slice(payload, algorithm.getCopied(), partSize);
BlobUploader b = BlobUploader b = new BlobUploader(mpu, partNumber++, slice);
new BlobUploader(mpu, partNumber++, payload); parts.add(repeatable ? executor.submit(b) : Futures.immediateFuture(b.call()));
parts.add(executor.submit(b));
algorithm.addCopied(partSize); algorithm.addCopied(partSize);
} }
if (algorithm.getRemaining() != 0) { if (algorithm.getRemaining() != 0) {
Payload payload = slicer.slice(blob.getPayload(), algorithm.getCopied(), algorithm.getRemaining()); Payload slice = slicer.slice(payload, algorithm.getCopied(), algorithm.getRemaining());
BlobUploader b = BlobUploader b = new BlobUploader(mpu, partNumber, slice);
new BlobUploader(mpu, partNumber, payload); parts.add(repeatable ? executor.submit(b) : Futures.immediateFuture(b.call()));
parts.add(executor.submit(b));
} }
return completeMultipartUpload(mpu, Futures.getUnchecked(Futures.allAsList(parts))); return completeMultipartUpload(mpu, Futures.getUnchecked(Futures.allAsList(parts)));
} catch (RuntimeException re) { } catch (RuntimeException re) {

View File

@ -679,7 +679,13 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
@Test(groups = { "integration", "live" }) @Test(groups = { "integration", "live" })
public void testPutMultipartInputStream() throws Exception { public void testPutMultipartInputStream() throws Exception {
long length = getMinimumMultipartBlobSize(); long length = Math.max(getMinimumMultipartBlobSize(), MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE + 1);
BlobStore blobStore = view.getBlobStore();
MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(
blobStore.getMinimumMultipartPartSize(), blobStore.getMaximumMultipartPartSize(),
blobStore.getMaximumNumberOfParts());
// make sure that we are creating multiple parts
assertThat(algorithm.calculateChunkSize(length)).isLessThan(length);
ByteSource byteSource = TestUtils.randomByteSource().slice(0, length); ByteSource byteSource = TestUtils.randomByteSource().slice(0, length);
Payload payload = new InputStreamPayload(byteSource.openStream()); Payload payload = new InputStreamPayload(byteSource.openStream());
testPut(payload, null, new ByteSourcePayload(byteSource), length, new PutOptions().multipart(true)); testPut(payload, null, new ByteSourcePayload(byteSource), length, new PutOptions().multipart(true));