JCLOUDS-356 multipart uploads with InputStream payloads

This changeset introduces an alternative to PayloadSlicer,
IterablePayloadSlicer, with a method for returning a Payload iterator.

...swift.blobstore.strategy.internal.SequentialMultipartUploadStrategy
has been updated to to use a payload iterator.
This commit is contained in:
Eric Evans 2013-10-24 15:33:13 -05:00 committed by Andrew Phillips
parent c40dc996d9
commit 15a3c04fb7
5 changed files with 258 additions and 20 deletions

View File

@ -66,22 +66,13 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg
long chunkSize = algorithm.calculateChunkSize(length);
int partCount = algorithm.getParts();
if (partCount > 0) {
int part;
while ((part = algorithm.getNextPart()) <= partCount) {
Payload chunkedPart = slicer.slice(payload, algorithm.getNextChunkOffset(), chunkSize);
for (Payload part : slicer.slice(payload, chunkSize)) {
int partNum = algorithm.getNextPart();
Blob blobPart = blobBuilders.get()
.name(key + PART_SEPARATOR + part)
.payload(chunkedPart)
.contentDisposition(key + PART_SEPARATOR + part).build();
client.putObject(container, blob2Object.apply(blobPart));
}
long remaining = algorithm.getRemaining();
if (remaining > 0) {
Payload chunkedPart = slicer.slice(payload, algorithm.getNextChunkOffset(), remaining);
Blob blobPart = blobBuilders.get()
.name(key + PART_SEPARATOR + part)
.payload(chunkedPart)
.contentDisposition(key + PART_SEPARATOR + part).build();
.name(key + PART_SEPARATOR + partNum)
.payload(part)
.contentDisposition(key + PART_SEPARATOR + partNum)
.build();
client.putObject(container, blob2Object.apply(blobPart));
}
return client.putObjectManifest(container, key);

View File

@ -16,12 +16,17 @@
*/
package org.jclouds.openstack.swift.blobstore.integration;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import com.google.common.io.ByteStreams;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.integration.internal.BaseBlobIntegrationTest;
@ -34,12 +39,11 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.google.common.hash.Hashing;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
/**
*
* @author James Murty
@ -128,6 +132,32 @@ public class SwiftBlobIntegrationLiveTest extends BaseBlobIntegrationTest {
}
}
// InputStreamPayloads are handled differently than File; Test InputStreams too
@Test(groups = { "integration", "live" })
public void testMultipartChunkedInputStream() throws InterruptedException, IOException {
String container = getContainerName();
try {
BlobStore blobStore = view.getBlobStore();
blobStore.createContainerInLocation(null, container);
File inFile = createFileBiggerThan(PART_SIZE);
File outFile = new File("target/lots-of-const-readback.txt");
InputStream contentToUpload = new FileInputStream(inFile);
Blob write = blobStore.blobBuilder("const.txt").payload(contentToUpload).contentLength(inFile.length()).build();
blobStore.putBlob(container, write, PutOptions.Builder.multipart());
Blob read = blobStore.getBlob(container, "const.txt");
read.getPayload().writeTo(new FileOutputStream(outFile));
assertEquals(Files.hash(outFile, Hashing.md5()), Files.hash(inFile, Hashing.md5()));
} finally {
returnContainer(container);
}
}
@Override
protected int getIncorrectContentMD5StatusCode() {
return 422;

View File

@ -41,4 +41,16 @@ public interface PayloadSlicer {
* if offset or length are negative
*/
Payload slice(Payload input, long offset, long length);
/**
* Returns an {@link Iterable} of {@link Payload} instances that are no larger than
* <code>size</code> bytes in length.
*
* @param input
* the {@link Payload} to be sliced
* @param size
* the maximum size of each slice
* @return an {@link Iterable} of {@link Payload} instances
*/
Iterable<Payload> slice(Payload input, long size);
}

View File

@ -19,15 +19,24 @@ package org.jclouds.io.internal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NoSuchElementException;
import javax.inject.Singleton;
import org.jclouds.io.ContentMetadata;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.io.payloads.BaseMutableContentMetadata;
import org.jclouds.io.payloads.ByteArrayPayload;
import org.jclouds.io.payloads.InputStreamPayload;
import org.jclouds.io.payloads.InputStreamSupplierPayload;
@ -42,6 +51,79 @@ import com.google.common.io.InputSupplier;
*/
@Singleton
public class BasePayloadSlicer implements PayloadSlicer {
public static class PayloadIterator implements Iterable<Payload>, Iterator<Payload> {
private final InputStream input;
private final ContentMetadata metaData;
private Payload nextPayload;
private final int readLen;
public PayloadIterator(InputStream input, ContentMetadata meta) {
this.input = checkNotNull(input, "input");
this.metaData = checkNotNull(meta, "meta");
this.readLen = checkNotNull(this.metaData.getContentLength(), "content-length").intValue();
this.nextPayload = getNextPayload();
}
@Override
public boolean hasNext() {
return (nextPayload != null);
}
@Override
public Payload next() {
Payload payload;
if (!hasNext())
throw new NoSuchElementException();
payload = nextPayload;
nextPayload = getNextPayload();
return payload;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public Iterator<Payload> iterator() {
return this;
}
private Payload getNextPayload() {
byte[] content = new byte[readLen];
int read = 0;
try {
if ((read = input.read(content)) == -1) {
return null;
}
} catch (IOException e) {
throw Throwables.propagate(e);
}
return createPayload((content.length == read) ? content : Arrays.copyOf(content, read));
}
private Payload createPayload(byte[] content) {
Payload payload = null;
if (content.length > 0) {
payload = new ByteArrayPayload(content);
ContentMetadata cm = metaData.toBuilder().contentLength((long)content.length).contentMD5(null).build();
payload.setContentMetadata(BaseMutableContentMetadata.fromContentMetadata(cm));
}
return payload;
}
}
/**
* {@inheritDoc}
*/
@ -105,4 +187,57 @@ public class BasePayloadSlicer implements PayloadSlicer {
return returnVal;
}
@Override
public Iterable<Payload> slice(Payload input, long size) {
checkNotNull(input, "input");
checkArgument(size >= 0, "size must be non-negative but was: %s", size);
ContentMetadata meta = BaseMutableContentMetadata.fromContentMetadata(input.getContentMetadata())
.toBuilder()
.contentLength(size)
.contentMD5(null)
.build();
Object rawContent = input.getRawContent();
if (rawContent instanceof File) {
return doSlice((File) rawContent, meta);
} else if (rawContent instanceof String) {
return doSlice((String) rawContent, meta);
} else if (rawContent instanceof byte[]) {
return doSlice((byte[]) rawContent, meta);
} else if (rawContent instanceof InputStream) {
return doSlice((InputStream) rawContent, meta);
} else {
return doSlice(input, meta);
}
}
protected Iterable<Payload> doSlice(Payload input, ContentMetadata meta) {
return doSlice(input.getInput(), meta);
}
protected Iterable<Payload> doSlice(String rawContent, ContentMetadata meta) {
try {
return doSlice(rawContent.getBytes("UTF-8"), meta);
} catch (UnsupportedEncodingException e) {
throw Throwables.propagate(e);
}
}
protected Iterable<Payload> doSlice(byte[] rawContent, ContentMetadata meta) {
return doSlice(new ByteArrayInputStream(rawContent), meta);
}
protected Iterable<Payload> doSlice(File rawContent, ContentMetadata meta) {
try {
return doSlice(new FileInputStream(rawContent), meta);
} catch (FileNotFoundException e) {
throw Throwables.propagate(e);
}
}
protected Iterable<Payload> doSlice(InputStream rawContent, ContentMetadata meta) {
return new PayloadIterator(rawContent, meta);
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.io.internal;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Iterator;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.io.payloads.InputStreamPayload;
import org.jclouds.util.Strings2;
import org.testng.annotations.Test;
import com.google.common.base.Charsets;
import com.google.common.io.ByteStreams;
@Test
public class BasePayloadSlicerTest {
@Test
public void testIterableSliceExpectedSingle() throws IOException {
PayloadSlicer slicer = new BasePayloadSlicer();
String contents = "aaaaaaaaaabbbbbbbbbbccccc";
Payload payload = new InputStreamPayload(new ByteArrayInputStream(contents.getBytes(Charsets.US_ASCII)));
Iterator<Payload> iter = slicer.slice(payload, 25).iterator();
assertTrue(iter.hasNext(), "Not enough results");
assertEquals(new String(ByteStreams.toByteArray(iter.next())), contents);
assertFalse(iter.hasNext());
}
@Test
public void testIterableSliceExpectedMulti() throws IOException {
PayloadSlicer slicer = new BasePayloadSlicer();
Payload payload = new InputStreamPayload(new ByteArrayInputStream("aaaaaaaaaabbbbbbbbbbccccc".getBytes(Charsets.US_ASCII)));
Iterator<Payload> iter = slicer.slice(payload, 10).iterator();
assertTrue(iter.hasNext(), "Not enough results");
assertEquals(Strings2.toStringAndClose(iter.next().getInput()), "aaaaaaaaaa");
assertTrue(iter.hasNext(), "Not enough results");
assertEquals(Strings2.toStringAndClose(iter.next().getInput()), "bbbbbbbbbb");
assertTrue(iter.hasNext(), "Not enough results");
assertEquals(Strings2.toStringAndClose(iter.next().getInput()), "ccccc");
assertFalse(iter.hasNext());
}
}