added WriteTo interface for streaming puts

This commit is contained in:
Adrian Cole 2010-08-11 02:13:28 -07:00
parent e99baf92a7
commit 7a593a1630
7 changed files with 335 additions and 104 deletions

View File

@ -38,7 +38,6 @@ import static com.google.common.util.concurrent.Futures.immediateFuture;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput; import java.io.ObjectInput;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.io.ObjectOutput; import java.io.ObjectOutput;
@ -95,13 +94,13 @@ import com.google.common.base.Predicate;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Multimaps; import com.google.common.collect.Multimaps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.internal.Nullable; import com.google.inject.internal.Nullable;
/** /**
* Implementation of {@link BaseAsyncBlobStore} which keeps all data in a local Map object. * Implementation of {@link BaseAsyncBlobStore} which keeps all data in a local
* Map object.
* *
* @author Adrian Cole * @author Adrian Cole
* @author James Murty * @author James Murty
@ -118,12 +117,11 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
@Inject @Inject
protected TransientAsyncBlobStore(BlobStoreContext context, DateService dateService, Crypto crypto, protected TransientAsyncBlobStore(BlobStoreContext context, DateService dateService, Crypto crypto,
ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs, ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs,
ConcurrentMap<String, Location> containerToLocation, ConcurrentMap<String, Location> containerToLocation, HttpGetOptionsListToGetOptions httpGetOptionsConverter,
HttpGetOptionsListToGetOptions httpGetOptionsConverter, IfDirectoryReturnNameStrategy ifDirectoryReturnName, Blob.Factory blobFactory, BlobUtils blobUtils,
IfDirectoryReturnNameStrategy ifDirectoryReturnName, Blob.Factory blobFactory, BlobUtils blobUtils, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Location defaultLocation,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Location defaultLocation, Set<Location> locations) {
Set<Location> locations) {
super(context, blobUtils, service, defaultLocation, locations); super(context, blobUtils, service, defaultLocation, locations);
this.blobFactory = blobFactory; this.blobFactory = blobFactory;
this.dateService = dateService; this.dateService = dateService;
@ -147,21 +145,21 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
return immediateFailedFuture(cnfe(container)); return immediateFailedFuture(cnfe(container));
SortedSet<StorageMetadata> contents = newTreeSet(transform(realContents.keySet(), SortedSet<StorageMetadata> contents = newTreeSet(transform(realContents.keySet(),
new Function<String, StorageMetadata>() { new Function<String, StorageMetadata>() {
public StorageMetadata apply(String key) { public StorageMetadata apply(String key) {
Blob oldBlob = realContents.get(key); Blob oldBlob = realContents.get(key);
checkState(oldBlob != null, "blob " + key + " is not present although it was in the list of " checkState(oldBlob != null, "blob " + key + " is not present although it was in the list of "
+ container); + container);
checkState(oldBlob.getMetadata() != null, "blob " + container + "/" + key + " has no metadata"); checkState(oldBlob.getMetadata() != null, "blob " + container + "/" + key + " has no metadata");
MutableBlobMetadata md = copy(oldBlob.getMetadata()); MutableBlobMetadata md = copy(oldBlob.getMetadata());
String directoryName = ifDirectoryReturnName.execute(md); String directoryName = ifDirectoryReturnName.execute(md);
if (directoryName != null) { if (directoryName != null) {
md.setName(directoryName); md.setName(directoryName);
md.setType(StorageType.RELATIVE_PATH); md.setType(StorageType.RELATIVE_PATH);
}
return md;
} }
})); return md;
}
}));
if (options.getMarker() != null) { if (options.getMarker() != null) {
final String finalMarker = options.getMarker(); final String finalMarker = options.getMarker();
@ -206,14 +204,14 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
contents = newTreeSet(filter(contents, new DelimiterFilter(prefix != null ? prefix : null, delimiter))); contents = newTreeSet(filter(contents, new DelimiterFilter(prefix != null ? prefix : null, delimiter)));
Iterables.<StorageMetadata> addAll(contents, transform(commonPrefixes, Iterables.<StorageMetadata> addAll(contents, transform(commonPrefixes,
new Function<String, StorageMetadata>() { new Function<String, StorageMetadata>() {
public StorageMetadata apply(String o) { public StorageMetadata apply(String o) {
MutableStorageMetadata md = new MutableStorageMetadataImpl(); MutableStorageMetadata md = new MutableStorageMetadataImpl();
md.setType(StorageType.RELATIVE_PATH); md.setType(StorageType.RELATIVE_PATH);
md.setName(o); md.setName(o);
return md; return md;
} }
})); }));
} }
// trim metadata, if the response isn't supposed to be detailed. // trim metadata, if the response isn't supposed to be detailed.
@ -224,13 +222,13 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
} }
return Futures.<PageSet<? extends StorageMetadata>> immediateFuture(new PageSetImpl<StorageMetadata>(contents, return Futures.<PageSet<? extends StorageMetadata>> immediateFuture(new PageSetImpl<StorageMetadata>(contents,
marker)); marker));
} }
private ContainerNotFoundException cnfe(final String name) { private ContainerNotFoundException cnfe(final String name) {
return new ContainerNotFoundException(name, String.format("container %s not in %s", name, getContainerToBlobs() return new ContainerNotFoundException(name, String.format("container %s not in %s", name, getContainerToBlobs()
.keySet())); .keySet()));
} }
public static MutableBlobMetadata copy(MutableBlobMetadata in) { public static MutableBlobMetadata copy(MutableBlobMetadata in) {
@ -320,15 +318,15 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
@Override @Override
public ListenableFuture<PageSet<? extends StorageMetadata>> list() { public ListenableFuture<PageSet<? extends StorageMetadata>> list() {
return Futures.<PageSet<? extends StorageMetadata>> immediateFuture(new PageSetImpl<StorageMetadata>(transform( return Futures.<PageSet<? extends StorageMetadata>> immediateFuture(new PageSetImpl<StorageMetadata>(transform(
getContainerToBlobs().keySet(), new Function<String, StorageMetadata>() { getContainerToBlobs().keySet(), new Function<String, StorageMetadata>() {
public StorageMetadata apply(String name) { public StorageMetadata apply(String name) {
MutableStorageMetadata cmd = create(); MutableStorageMetadata cmd = create();
cmd.setName(name); cmd.setName(name);
cmd.setType(StorageType.CONTAINER); cmd.setType(StorageType.CONTAINER);
cmd.setLocation(getContainerToLocation().get(name)); cmd.setLocation(getContainerToLocation().get(name));
return cmd; return cmd;
} }
}), null)); }), null));
} }
protected MutableStorageMetadata create() { protected MutableStorageMetadata create() {
@ -462,21 +460,18 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
} }
ByteArrayPayload payload = (object.getPayload() instanceof ByteArrayPayload) ? ByteArrayPayload.class.cast(object ByteArrayPayload payload = (object.getPayload() instanceof ByteArrayPayload) ? ByteArrayPayload.class.cast(object
.getPayload()) : null; .getPayload()) : null;
if (payload == null) if (payload == null)
payload = (object.getPayload() instanceof DelegatingPayload) ? (DelegatingPayload.class.cast( payload = (object.getPayload() instanceof DelegatingPayload) ? (DelegatingPayload.class.cast(
object.getPayload()).getDelegate() instanceof ByteArrayPayload) ? ByteArrayPayload.class object.getPayload()).getDelegate() instanceof ByteArrayPayload) ? ByteArrayPayload.class
.cast(DelegatingPayload.class.cast(object.getPayload()).getDelegate()) : null : null; .cast(DelegatingPayload.class.cast(object.getPayload()).getDelegate()) : null : null;
try { try {
if (payload == null || !(payload instanceof ByteArrayPayload)) { if (payload == null || !(payload instanceof ByteArrayPayload)) {
InputStream input = object.getPayload().getInput(); String oldContentType = object.getPayload().getContentType();
try { ByteArrayOutputStream out = new ByteArrayOutputStream();
String oldContentType = object.getPayload().getContentType(); object.getPayload().writeTo(out);
payload = (ByteArrayPayload) Payloads.calculateMD5(Payloads.newPayload(object.getPayload().getInput())); payload = (ByteArrayPayload) Payloads.calculateMD5(Payloads.newPayload(out.toByteArray()));
payload.setContentType(oldContentType); payload.setContentType(oldContentType);
} finally {
Closeables.closeQuietly(input);
}
} else { } else {
if (payload.getContentMD5() == null) if (payload.getContentMD5() == null)
Payloads.calculateMD5(object, crypto.md5()); Payloads.calculateMD5(object, crypto.md5());
@ -497,7 +492,7 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
// Set HTTP headers to match metadata // Set HTTP headers to match metadata
blob.getAllHeaders().put(HttpHeaders.LAST_MODIFIED, blob.getAllHeaders().put(HttpHeaders.LAST_MODIFIED,
dateService.rfc822DateFormat(blob.getMetadata().getLastModified())); dateService.rfc822DateFormat(blob.getMetadata().getLastModified()));
blob.getAllHeaders().put(HttpHeaders.ETAG, eTag); blob.getAllHeaders().put(HttpHeaders.ETAG, eTag);
blob.getAllHeaders().put(HttpHeaders.CONTENT_TYPE, payload.getContentType()); blob.getAllHeaders().put(HttpHeaders.CONTENT_TYPE, payload.getContentType());
blob.getAllHeaders().put(HttpHeaders.CONTENT_LENGTH, payload.getContentLength() + ""); blob.getAllHeaders().put(HttpHeaders.CONTENT_LENGTH, payload.getContentLength() + "");
@ -544,7 +539,7 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
if (object.getMetadata().getLastModified().before(modifiedSince)) { if (object.getMetadata().getLastModified().before(modifiedSince)) {
HttpResponse response = new HttpResponse(304, null, null); HttpResponse response = new HttpResponse(304, null, null);
return immediateFailedFuture(new HttpResponseException(String.format("%1$s is before %2$s", object return immediateFailedFuture(new HttpResponseException(String.format("%1$s is before %2$s", object
.getMetadata().getLastModified(), modifiedSince), null, response)); .getMetadata().getLastModified(), modifiedSince), null, response));
} }
} }
@ -553,7 +548,7 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
if (object.getMetadata().getLastModified().after(unmodifiedSince)) { if (object.getMetadata().getLastModified().after(unmodifiedSince)) {
HttpResponse response = new HttpResponse(412, null, null); HttpResponse response = new HttpResponse(412, null, null);
return immediateFailedFuture(new HttpResponseException(String.format("%1$s is after %2$s", object return immediateFailedFuture(new HttpResponseException(String.format("%1$s is after %2$s", object
.getMetadata().getLastModified(), unmodifiedSince), null, response)); .getMetadata().getLastModified(), unmodifiedSince), null, response));
} }
} }
Blob returnVal = copyBlob(object); Blob returnVal = copyBlob(object);

View File

@ -34,11 +34,13 @@ import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
import java.util.Date; import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
@ -58,6 +60,8 @@ import org.jclouds.http.BaseJettyTest;
import org.jclouds.http.HttpResponseException; import org.jclouds.http.HttpResponseException;
import org.jclouds.io.InputSuppliers; import org.jclouds.io.InputSuppliers;
import org.jclouds.io.Payloads; import org.jclouds.io.Payloads;
import org.jclouds.io.WriteTo;
import org.jclouds.io.payloads.StreamingPayload;
import org.jclouds.logging.Logger; import org.jclouds.logging.Logger;
import org.jclouds.util.Utils; import org.jclouds.util.Utils;
import org.testng.ITestContext; import org.testng.ITestContext;
@ -91,7 +95,7 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static InputSupplier<InputStream> getTestDataSupplier() throws IOException { public static InputSupplier<InputStream> getTestDataSupplier() throws IOException {
byte[] oneConstitution = ByteStreams.toByteArray(new GZIPInputStream(BaseJettyTest.class byte[] oneConstitution = ByteStreams.toByteArray(new GZIPInputStream(BaseJettyTest.class
.getResourceAsStream("/const.txt.gz"))); .getResourceAsStream("/const.txt.gz")));
InputSupplier<ByteArrayInputStream> constitutionSupplier = ByteStreams.newInputStreamSupplier(oneConstitution); InputSupplier<ByteArrayInputStream> constitutionSupplier = ByteStreams.newInputStreamSupplier(oneConstitution);
InputSupplier<InputStream> temp = ByteStreams.join(constitutionSupplier); InputSupplier<InputStream> temp = ByteStreams.join(constitutionSupplier);
@ -113,22 +117,22 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
responses.put(i, Futures.compose(context.getAsyncBlobStore().getBlob(containerName, key), responses.put(i, Futures.compose(context.getAsyncBlobStore().getBlob(containerName, key),
new Function<Blob, Void>() { new Function<Blob, Void>() {
@Override @Override
public Void apply(Blob from) { public Void apply(Blob from) {
try { try {
assertEquals(CryptoStreams.md5(from.getPayload()), oneHundredOneConstitutionsMD5); assertEquals(CryptoStreams.md5(from.getPayload()), oneHundredOneConstitutionsMD5);
} catch (IOException e) { } catch (IOException e) {
Throwables.propagate(e); Throwables.propagate(e);
}
return null;
} }
return null;
}
}, this.exec)); }, this.exec));
} }
Map<Integer, Exception> exceptions = awaitCompletion(responses, exec, 30000l, Logger.CONSOLE, Map<Integer, Exception> exceptions = awaitCompletion(responses, exec, 30000l, Logger.CONSOLE,
"get constitution"); "get constitution");
assert exceptions.size() == 0 : exceptions; assert exceptions.size() == 0 : exceptions;
} finally { } finally {
@ -353,8 +357,8 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
@DataProvider(name = "delete") @DataProvider(name = "delete")
public Object[][] createData() { public Object[][] createData() {
return new Object[][] { { "normal" }, { "sp ace" }, { "qu?stion" }, { "unic₪de" }, { "path/foo" }, { "colon:" }, return new Object[][] { { "normal" }, { "sp ace" }, { "qu?stion" }, { "unic₪de" }, { "path/foo" },
{ "asteri*k" }, { "quote\"" }, { "{great<r}" }, { "lesst>en" }, { "p|pe" } }; { "colon:" }, { "asteri*k" }, { "quote\"" }, { "{great<r}" }, { "lesst>en" }, { "p|pe" } };
} }
@Test(groups = { "integration", "live" }, dataProvider = "delete") @Test(groups = { "integration", "live" }, dataProvider = "delete")
@ -371,17 +375,17 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
private void assertContainerEmptyDeleting(String containerName, String key) { private void assertContainerEmptyDeleting(String containerName, String key) {
Iterable<? extends StorageMetadata> listing = Iterables.filter(context.getBlobStore().list(containerName), Iterable<? extends StorageMetadata> listing = Iterables.filter(context.getBlobStore().list(containerName),
new Predicate<StorageMetadata>() { new Predicate<StorageMetadata>() {
@Override @Override
public boolean apply(StorageMetadata input) { public boolean apply(StorageMetadata input) {
return input.getType() == StorageType.BLOB; return input.getType() == StorageType.BLOB;
} }
}); });
assertEquals(Iterables.size(listing), 0, String.format( assertEquals(Iterables.size(listing), 0, String.format(
"deleting %s, we still have %s blobs left in container %s, using encoding %s", key, Iterables "deleting %s, we still have %s blobs left in container %s, using encoding %s", key,
.size(listing), containerName, LOCAL_ENCODING)); Iterables.size(listing), containerName, LOCAL_ENCODING));
} }
@Test(groups = { "integration", "live" }) @Test(groups = { "integration", "live" })
@ -401,13 +405,13 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
String realObject = Utils.toStringAndClose(new FileInputStream("pom.xml")); String realObject = Utils.toStringAndClose(new FileInputStream("pom.xml"));
return new Object[][] { { "file", "text/xml", new File("pom.xml"), realObject }, return new Object[][] { { "file", "text/xml", new File("pom.xml"), realObject },
{ "string", "text/xml", realObject, realObject }, { "string", "text/xml", realObject, realObject },
{ "bytes", "application/octet-stream", realObject.getBytes(), realObject } }; { "bytes", "application/octet-stream", realObject.getBytes(), realObject } };
} }
@Test(groups = { "integration", "live" }, dataProvider = "putTests") @Test(groups = { "integration", "live" }, dataProvider = "putTests")
public void testPutObject(String key, String type, Object content, Object realObject) throws InterruptedException, public void testPutObject(String key, String type, Object content, Object realObject) throws InterruptedException,
IOException { IOException {
Blob blob = context.getBlobStore().newBlob(key); Blob blob = context.getBlobStore().newBlob(key);
blob.getMetadata().setContentType(type); blob.getMetadata().setContentType(type);
blob.setPayload(Payloads.newPayload(content)); blob.setPayload(Payloads.newPayload(content));
@ -427,6 +431,33 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
} }
} }
@Test(groups = { "integration", "live" })
public void testPutObjectStream() throws InterruptedException, IOException, ExecutionException {
Blob blob = context.getBlobStore().newBlob("streaming");
blob.setPayload(new StreamingPayload(new WriteTo() {
@Override
public void writeTo(OutputStream outstream) throws IOException {
outstream.write("foo".getBytes());
}
}));
blob.getMetadata().setContentType("text/csv");
String containerName = getContainerName();
try {
assertNotNull(context.getBlobStore().putBlob(containerName, blob));
blob = context.getBlobStore().getBlob(containerName, blob.getMetadata().getName());
String returnedString = getContentAsStringOrNullAndClose(blob);
assertEquals(returnedString, "foo");
assertEquals(blob.getPayload().getContentType(), "text/csv");
PageSet<? extends StorageMetadata> set = context.getBlobStore().list(containerName);
assert set.size() == 1 : set;
} finally {
returnContainer(containerName);
}
}
protected volatile static Crypto crypto; protected volatile static Crypto crypto;
static { static {
try { try {

View File

@ -19,9 +19,7 @@
package org.jclouds.io; package org.jclouds.io;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -30,7 +28,7 @@ import com.google.common.io.InputSupplier;
/** /**
* @author Adrian Cole * @author Adrian Cole
*/ */
public interface Payload extends InputSupplier<InputStream>, Closeable{ public interface Payload extends InputSupplier<InputStream>, WriteTo, Closeable {
/** /**
* Creates a new InputStream object of the payload. * Creates a new InputStream object of the payload.
@ -47,22 +45,16 @@ public interface Payload extends InputSupplier<InputStream>, Closeable{
*/ */
boolean isRepeatable(); boolean isRepeatable();
/**
* Writes the payload content to the output stream.
*
* @throws IOException
*/
void writeTo(OutputStream outstream) throws IOException;
void setContentLength(@Nullable Long contentLength); void setContentLength(@Nullable Long contentLength);
/** /**
* Returns the total size of the payload, or the chunk that's available. * Returns the total size of the payload, or the chunk that's available.
* <p/> * <p/>
* Chunking is only used when {@link org.jclouds.http.GetOptions} is called with options like * Chunking is only used when {@link org.jclouds.http.GetOptions} is called
* tail, range, or startAt. * with options like tail, range, or startAt.
* *
* @return the length in bytes that can be be obtained from {@link #getInput()} * @return the length in bytes that can be be obtained from
* {@link #getInput()}
* @see javax.ws.rs.core.HttpHeaders#CONTENT_LENGTH * @see javax.ws.rs.core.HttpHeaders#CONTENT_LENGTH
* @see org.jclouds.http.options.GetOptions * @see org.jclouds.http.options.GetOptions
*/ */
@ -80,7 +72,8 @@ public interface Payload extends InputSupplier<InputStream>, Closeable{
String getContentType(); String getContentType();
/** /**
* release resources used by this entity. This should be called when data is discarded. * release resources used by this entity. This should be called when data is
* discarded.
*/ */
void release(); void release();
} }

View File

@ -0,0 +1,36 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io;
import java.io.IOException;
import java.io.OutputStream;
/**
* @author Adrian Cole
*/
public interface WriteTo {
/**
* Writes the payload content to the output stream.
*
* @throws IOException
*/
void writeTo(OutputStream outstream) throws IOException;
}

View File

@ -0,0 +1,174 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io.payloads;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.annotation.Nullable;
import org.jclouds.io.Payload;
import org.jclouds.io.WriteTo;
/**
* Note that not all services accept streaming payloads. For example, Rackspace
* CloudFiles accepts streaming while Amazon S3 does not.
*
* @author Adrian Cole
*/
public class StreamingPayload implements Payload {
protected String contentType;
protected transient volatile boolean written;
protected final WriteTo writeTo;
public StreamingPayload(WriteTo writeTo) {
this.writeTo = checkNotNull(writeTo, "writeTo");
this.contentType = "application/unknown";
}
/**
* @throws UnsupportedOperationException
* this payload is for streaming writes only
*/
@Override
public Object getRawContent() {
throw new UnsupportedOperationException("this payload is for streaming writes only");
}
/**
* @throws UnsupportedOperationException
* this payload is for streaming writes only
*/
@Override
public InputStream getInput() {
throw new UnsupportedOperationException("this payload is for streaming writes only");
}
/**
* {@inheritDoc}
*/
@Override
public Long getContentLength() {
return null;
}
/**
* {@inheritDoc}
*/
@Override
public void setContentLength(@Nullable Long contentLength) {
throw new UnsupportedOperationException("this payload is for streaming writes only");
}
/**
* {@inheritDoc}
*/
@Override
public byte[] getContentMD5() {
return null;
}
/**
* {@inheritDoc}
*/
@Override
public void setContentMD5(byte[] md5) {
throw new UnsupportedOperationException("this payload is for streaming writes only");
}
/**
* {@inheritDoc}
*/
@Override
public String getContentType() {
return contentType;
}
/**
* {@inheritDoc}
*/
@Override
public void setContentType(@Nullable String contentType) {
this.contentType = contentType;
}
/**
* {@inheritDoc}
*/
@Override
public void writeTo(OutputStream outstream) throws IOException {
writeTo.writeTo(outstream);
}
@Override
public String toString() {
return "[contentType=" + contentType + ", written=" + written + "]";
}
/**
* By default we are not repeatable.
*/
@Override
public boolean isRepeatable() {
return true;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((contentType == null) ? 0 : contentType.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
StreamingPayload other = (StreamingPayload) obj;
if (contentType == null) {
if (other.contentType != null)
return false;
} else if (!contentType.equals(other.contentType))
return false;
return true;
}
/**
* By default there are no resources to release.
*/
@Override
public void release() {
}
/**
* Delegates to release()
*/
@Override
public void close() {
release();
}
}

View File

@ -20,9 +20,9 @@ package org.jclouds.gae;
import static com.google.appengine.api.urlfetch.FetchOptions.Builder.disallowTruncate; import static com.google.appengine.api.urlfetch.FetchOptions.Builder.disallowTruncate;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.io.ByteStreams.toByteArray;
import static com.google.common.io.Closeables.closeQuietly; import static com.google.common.io.Closeables.closeQuietly;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.MalformedURLException; import java.net.MalformedURLException;
@ -51,8 +51,8 @@ public class ConvertToGaeRequest implements Function<HttpRequest, HTTPRequest> {
public static final String USER_AGENT = "jclouds/1.0 urlfetch/1.3.5"; public static final String USER_AGENT = "jclouds/1.0 urlfetch/1.3.5";
/** /**
* byte [] content is replayable and the only content type supportable by GAE. As such, we * byte [] content is replayable and the only content type supportable by
* convert the original request content to a byte array. * GAE. As such, we convert the original request content to a byte array.
*/ */
@Override @Override
public HTTPRequest apply(HttpRequest request) { public HTTPRequest apply(HttpRequest request) {
@ -76,13 +76,15 @@ public class ConvertToGaeRequest implements Function<HttpRequest, HTTPRequest> {
} }
gaeRequest.addHeader(new HTTPHeader(HttpHeaders.USER_AGENT, USER_AGENT)); gaeRequest.addHeader(new HTTPHeader(HttpHeaders.USER_AGENT, USER_AGENT));
/** /**
* byte [] content is replayable and the only content type supportable by GAE. As such, we * byte [] content is replayable and the only content type supportable by
* convert the original request content to a byte array. * GAE. As such, we convert the original request content to a byte array.
*/ */
if (request.getPayload() != null) { if (request.getPayload() != null) {
InputStream input = request.getPayload().getInput(); InputStream input = request.getPayload().getInput();
try { try {
byte[] array = toByteArray(input); ByteArrayOutputStream out = new ByteArrayOutputStream();
request.getPayload().writeTo(out);
byte[] array = out.toByteArray();
if (!request.getPayload().isRepeatable()) { if (!request.getPayload().isRepeatable()) {
Payload oldPayload = request.getPayload(); Payload oldPayload = request.getPayload();
request.setPayload(array); request.setPayload(array);
@ -99,7 +101,7 @@ public class ConvertToGaeRequest implements Function<HttpRequest, HTTPRequest> {
} }
if (request.getPayload().getContentMD5() != null) if (request.getPayload().getContentMD5() != null)
gaeRequest.setHeader(new HTTPHeader("Content-MD5", CryptoStreams.base64(request.getPayload() gaeRequest.setHeader(new HTTPHeader("Content-MD5", CryptoStreams.base64(request.getPayload()
.getContentMD5()))); .getContentMD5())));
if (request.getPayload().getContentType() != null) if (request.getPayload().getContentType() != null)
gaeRequest.setHeader(new HTTPHeader(HttpHeaders.CONTENT_TYPE, request.getPayload().getContentType())); gaeRequest.setHeader(new HTTPHeader(HttpHeaders.CONTENT_TYPE, request.getPayload().getContentType()));
Long length = checkNotNull(request.getPayload().getContentLength(), "payload.getContentLength"); Long length = checkNotNull(request.getPayload().getContentLength(), "payload.getContentLength");

View File

@ -161,13 +161,13 @@
<category name="jclouds.ssh"> <category name="jclouds.ssh">
<priority value="DEBUG" /> <priority value="DEBUG" />
<appender-ref ref="ASYNCSSH" /> <appender-ref ref="ASYNCSSH" />
</category> </category><!--
<category name="jclouds.wire"> <category name="jclouds.wire">
<priority value="DEBUG" /> <priority value="DEBUG" />
<appender-ref ref="ASYNCWIRE" /> <appender-ref ref="ASYNCWIRE" />
</category> </category>
<category name="jclouds.blobstore"> --><category name="jclouds.blobstore">
<priority value="DEBUG" /> <priority value="DEBUG" />
<appender-ref ref="ASYNCBLOBSTORE" /> <appender-ref ref="ASYNCBLOBSTORE" />
</category> </category>