From 7a593a1630a5015beeb5098d312e6f1762aa6e36 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 11 Aug 2010 02:13:28 -0700 Subject: [PATCH] added WriteTo interface for streaming puts --- .../blobstore/TransientAsyncBlobStore.java | 107 +++++------ .../internal/BaseBlobIntegrationTest.java | 81 +++++--- .../src/main/java/org/jclouds/io/Payload.java | 21 +-- .../src/main/java/org/jclouds/io/WriteTo.java | 36 ++++ .../jclouds/io/payloads/StreamingPayload.java | 174 ++++++++++++++++++ .../org/jclouds/gae/ConvertToGaeRequest.java | 16 +- rackspace/src/test/resources/log4j.xml | 4 +- 7 files changed, 335 insertions(+), 104 deletions(-) create mode 100644 core/src/main/java/org/jclouds/io/WriteTo.java create mode 100644 core/src/main/java/org/jclouds/io/payloads/StreamingPayload.java diff --git a/blobstore/src/main/java/org/jclouds/blobstore/TransientAsyncBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/TransientAsyncBlobStore.java index 4a0e0498fd..1686154042 100755 --- a/blobstore/src/main/java/org/jclouds/blobstore/TransientAsyncBlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/TransientAsyncBlobStore.java @@ -38,7 +38,6 @@ import static com.google.common.util.concurrent.Futures.immediateFuture; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; import java.io.ObjectInput; import java.io.ObjectInputStream; import java.io.ObjectOutput; @@ -95,13 +94,13 @@ import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Multimaps; -import com.google.common.io.Closeables; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.internal.Nullable; /** - * Implementation of {@link BaseAsyncBlobStore} which keeps all data in a local Map object. + * Implementation of {@link BaseAsyncBlobStore} which keeps all data in a local + * Map object. * * @author Adrian Cole * @author James Murty @@ -118,12 +117,11 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore { @Inject protected TransientAsyncBlobStore(BlobStoreContext context, DateService dateService, Crypto crypto, - ConcurrentMap> containerToBlobs, - ConcurrentMap containerToLocation, - HttpGetOptionsListToGetOptions httpGetOptionsConverter, - IfDirectoryReturnNameStrategy ifDirectoryReturnName, Blob.Factory blobFactory, BlobUtils blobUtils, - @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Location defaultLocation, - Set locations) { + ConcurrentMap> containerToBlobs, + ConcurrentMap containerToLocation, HttpGetOptionsListToGetOptions httpGetOptionsConverter, + IfDirectoryReturnNameStrategy ifDirectoryReturnName, Blob.Factory blobFactory, BlobUtils blobUtils, + @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Location defaultLocation, + Set locations) { super(context, blobUtils, service, defaultLocation, locations); this.blobFactory = blobFactory; this.dateService = dateService; @@ -147,21 +145,21 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore { return immediateFailedFuture(cnfe(container)); SortedSet contents = newTreeSet(transform(realContents.keySet(), - new Function() { - public StorageMetadata apply(String key) { - Blob oldBlob = realContents.get(key); - checkState(oldBlob != null, "blob " + key + " is not present although it was in the list of " - + container); - checkState(oldBlob.getMetadata() != null, "blob " + container + "/" + key + " has no metadata"); - MutableBlobMetadata md = copy(oldBlob.getMetadata()); - String directoryName = ifDirectoryReturnName.execute(md); - if (directoryName != null) { - md.setName(directoryName); - md.setType(StorageType.RELATIVE_PATH); - } - return md; + new Function() { + public StorageMetadata apply(String key) { + Blob oldBlob = realContents.get(key); + checkState(oldBlob != null, "blob " + key + " is not present although it was in the list of " + + container); + checkState(oldBlob.getMetadata() != null, "blob " + container + "/" + key + " has no metadata"); + MutableBlobMetadata md = copy(oldBlob.getMetadata()); + String directoryName = ifDirectoryReturnName.execute(md); + if (directoryName != null) { + md.setName(directoryName); + md.setType(StorageType.RELATIVE_PATH); } - })); + return md; + } + })); if (options.getMarker() != null) { final String finalMarker = options.getMarker(); @@ -206,14 +204,14 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore { contents = newTreeSet(filter(contents, new DelimiterFilter(prefix != null ? prefix : null, delimiter))); Iterables. addAll(contents, transform(commonPrefixes, - new Function() { - public StorageMetadata apply(String o) { - MutableStorageMetadata md = new MutableStorageMetadataImpl(); - md.setType(StorageType.RELATIVE_PATH); - md.setName(o); - return md; - } - })); + new Function() { + public StorageMetadata apply(String o) { + MutableStorageMetadata md = new MutableStorageMetadataImpl(); + md.setType(StorageType.RELATIVE_PATH); + md.setName(o); + return md; + } + })); } // trim metadata, if the response isn't supposed to be detailed. @@ -224,13 +222,13 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore { } return Futures.> immediateFuture(new PageSetImpl(contents, - marker)); + marker)); } private ContainerNotFoundException cnfe(final String name) { return new ContainerNotFoundException(name, String.format("container %s not in %s", name, getContainerToBlobs() - .keySet())); + .keySet())); } public static MutableBlobMetadata copy(MutableBlobMetadata in) { @@ -320,15 +318,15 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture> list() { return Futures.> immediateFuture(new PageSetImpl(transform( - getContainerToBlobs().keySet(), new Function() { - public StorageMetadata apply(String name) { - MutableStorageMetadata cmd = create(); - cmd.setName(name); - cmd.setType(StorageType.CONTAINER); - cmd.setLocation(getContainerToLocation().get(name)); - return cmd; - } - }), null)); + getContainerToBlobs().keySet(), new Function() { + public StorageMetadata apply(String name) { + MutableStorageMetadata cmd = create(); + cmd.setName(name); + cmd.setType(StorageType.CONTAINER); + cmd.setLocation(getContainerToLocation().get(name)); + return cmd; + } + }), null)); } protected MutableStorageMetadata create() { @@ -462,21 +460,18 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore { } ByteArrayPayload payload = (object.getPayload() instanceof ByteArrayPayload) ? ByteArrayPayload.class.cast(object - .getPayload()) : null; + .getPayload()) : null; if (payload == null) payload = (object.getPayload() instanceof DelegatingPayload) ? (DelegatingPayload.class.cast( - object.getPayload()).getDelegate() instanceof ByteArrayPayload) ? ByteArrayPayload.class - .cast(DelegatingPayload.class.cast(object.getPayload()).getDelegate()) : null : null; + object.getPayload()).getDelegate() instanceof ByteArrayPayload) ? ByteArrayPayload.class + .cast(DelegatingPayload.class.cast(object.getPayload()).getDelegate()) : null : null; try { if (payload == null || !(payload instanceof ByteArrayPayload)) { - InputStream input = object.getPayload().getInput(); - try { - String oldContentType = object.getPayload().getContentType(); - payload = (ByteArrayPayload) Payloads.calculateMD5(Payloads.newPayload(object.getPayload().getInput())); - payload.setContentType(oldContentType); - } finally { - Closeables.closeQuietly(input); - } + String oldContentType = object.getPayload().getContentType(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + object.getPayload().writeTo(out); + payload = (ByteArrayPayload) Payloads.calculateMD5(Payloads.newPayload(out.toByteArray())); + payload.setContentType(oldContentType); } else { if (payload.getContentMD5() == null) Payloads.calculateMD5(object, crypto.md5()); @@ -497,7 +492,7 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore { // Set HTTP headers to match metadata blob.getAllHeaders().put(HttpHeaders.LAST_MODIFIED, - dateService.rfc822DateFormat(blob.getMetadata().getLastModified())); + dateService.rfc822DateFormat(blob.getMetadata().getLastModified())); blob.getAllHeaders().put(HttpHeaders.ETAG, eTag); blob.getAllHeaders().put(HttpHeaders.CONTENT_TYPE, payload.getContentType()); blob.getAllHeaders().put(HttpHeaders.CONTENT_LENGTH, payload.getContentLength() + ""); @@ -544,7 +539,7 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore { if (object.getMetadata().getLastModified().before(modifiedSince)) { HttpResponse response = new HttpResponse(304, null, null); return immediateFailedFuture(new HttpResponseException(String.format("%1$s is before %2$s", object - .getMetadata().getLastModified(), modifiedSince), null, response)); + .getMetadata().getLastModified(), modifiedSince), null, response)); } } @@ -553,7 +548,7 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore { if (object.getMetadata().getLastModified().after(unmodifiedSince)) { HttpResponse response = new HttpResponse(412, null, null); return immediateFailedFuture(new HttpResponseException(String.format("%1$s is after %2$s", object - .getMetadata().getLastModified(), unmodifiedSince), null, response)); + .getMetadata().getLastModified(), unmodifiedSince), null, response)); } } Blob returnVal = copyBlob(object); diff --git a/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java b/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java index 5ba835329e..d14e9d8ed5 100755 --- a/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java +++ b/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java @@ -34,11 +34,13 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.util.Date; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.zip.GZIPInputStream; @@ -58,6 +60,8 @@ import org.jclouds.http.BaseJettyTest; import org.jclouds.http.HttpResponseException; import org.jclouds.io.InputSuppliers; import org.jclouds.io.Payloads; +import org.jclouds.io.WriteTo; +import org.jclouds.io.payloads.StreamingPayload; import org.jclouds.logging.Logger; import org.jclouds.util.Utils; import org.testng.ITestContext; @@ -91,7 +95,7 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest { @SuppressWarnings("unchecked") public static InputSupplier getTestDataSupplier() throws IOException { byte[] oneConstitution = ByteStreams.toByteArray(new GZIPInputStream(BaseJettyTest.class - .getResourceAsStream("/const.txt.gz"))); + .getResourceAsStream("/const.txt.gz"))); InputSupplier constitutionSupplier = ByteStreams.newInputStreamSupplier(oneConstitution); InputSupplier temp = ByteStreams.join(constitutionSupplier); @@ -113,22 +117,22 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest { for (int i = 0; i < 10; i++) { responses.put(i, Futures.compose(context.getAsyncBlobStore().getBlob(containerName, key), - new Function() { + new Function() { - @Override - public Void apply(Blob from) { - try { - assertEquals(CryptoStreams.md5(from.getPayload()), oneHundredOneConstitutionsMD5); - } catch (IOException e) { - Throwables.propagate(e); - } - return null; + @Override + public Void apply(Blob from) { + try { + assertEquals(CryptoStreams.md5(from.getPayload()), oneHundredOneConstitutionsMD5); + } catch (IOException e) { + Throwables.propagate(e); } + return null; + } - }, this.exec)); + }, this.exec)); } Map exceptions = awaitCompletion(responses, exec, 30000l, Logger.CONSOLE, - "get constitution"); + "get constitution"); assert exceptions.size() == 0 : exceptions; } finally { @@ -353,8 +357,8 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest { @DataProvider(name = "delete") public Object[][] createData() { - return new Object[][] { { "normal" }, { "sp ace" }, { "qu?stion" }, { "unic₪de" }, { "path/foo" }, { "colon:" }, - { "asteri*k" }, { "quote\"" }, { "{greaten" }, { "p|pe" } }; + return new Object[][] { { "normal" }, { "sp ace" }, { "qu?stion" }, { "unic₪de" }, { "path/foo" }, + { "colon:" }, { "asteri*k" }, { "quote\"" }, { "{greaten" }, { "p|pe" } }; } @Test(groups = { "integration", "live" }, dataProvider = "delete") @@ -371,17 +375,17 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest { private void assertContainerEmptyDeleting(String containerName, String key) { Iterable listing = Iterables.filter(context.getBlobStore().list(containerName), - new Predicate() { + new Predicate() { - @Override - public boolean apply(StorageMetadata input) { - return input.getType() == StorageType.BLOB; - } + @Override + public boolean apply(StorageMetadata input) { + return input.getType() == StorageType.BLOB; + } - }); + }); assertEquals(Iterables.size(listing), 0, String.format( - "deleting %s, we still have %s blobs left in container %s, using encoding %s", key, Iterables - .size(listing), containerName, LOCAL_ENCODING)); + "deleting %s, we still have %s blobs left in container %s, using encoding %s", key, + Iterables.size(listing), containerName, LOCAL_ENCODING)); } @Test(groups = { "integration", "live" }) @@ -401,13 +405,13 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest { String realObject = Utils.toStringAndClose(new FileInputStream("pom.xml")); return new Object[][] { { "file", "text/xml", new File("pom.xml"), realObject }, - { "string", "text/xml", realObject, realObject }, - { "bytes", "application/octet-stream", realObject.getBytes(), realObject } }; + { "string", "text/xml", realObject, realObject }, + { "bytes", "application/octet-stream", realObject.getBytes(), realObject } }; } @Test(groups = { "integration", "live" }, dataProvider = "putTests") public void testPutObject(String key, String type, Object content, Object realObject) throws InterruptedException, - IOException { + IOException { Blob blob = context.getBlobStore().newBlob(key); blob.getMetadata().setContentType(type); blob.setPayload(Payloads.newPayload(content)); @@ -427,6 +431,33 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest { } } + @Test(groups = { "integration", "live" }) + public void testPutObjectStream() throws InterruptedException, IOException, ExecutionException { + Blob blob = context.getBlobStore().newBlob("streaming"); + blob.setPayload(new StreamingPayload(new WriteTo() { + @Override + public void writeTo(OutputStream outstream) throws IOException { + outstream.write("foo".getBytes()); + } + })); + blob.getMetadata().setContentType("text/csv"); + + String containerName = getContainerName(); + try { + + assertNotNull(context.getBlobStore().putBlob(containerName, blob)); + + blob = context.getBlobStore().getBlob(containerName, blob.getMetadata().getName()); + String returnedString = getContentAsStringOrNullAndClose(blob); + assertEquals(returnedString, "foo"); + assertEquals(blob.getPayload().getContentType(), "text/csv"); + PageSet set = context.getBlobStore().list(containerName); + assert set.size() == 1 : set; + } finally { + returnContainer(containerName); + } + } + protected volatile static Crypto crypto; static { try { diff --git a/core/src/main/java/org/jclouds/io/Payload.java b/core/src/main/java/org/jclouds/io/Payload.java index 401830cb12..a44d6ff8f2 100644 --- a/core/src/main/java/org/jclouds/io/Payload.java +++ b/core/src/main/java/org/jclouds/io/Payload.java @@ -19,9 +19,7 @@ package org.jclouds.io; import java.io.Closeable; -import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import javax.annotation.Nullable; @@ -30,7 +28,7 @@ import com.google.common.io.InputSupplier; /** * @author Adrian Cole */ -public interface Payload extends InputSupplier, Closeable{ +public interface Payload extends InputSupplier, WriteTo, Closeable { /** * Creates a new InputStream object of the payload. @@ -47,22 +45,16 @@ public interface Payload extends InputSupplier, Closeable{ */ boolean isRepeatable(); - /** - * Writes the payload content to the output stream. - * - * @throws IOException - */ - void writeTo(OutputStream outstream) throws IOException; - void setContentLength(@Nullable Long contentLength); /** * Returns the total size of the payload, or the chunk that's available. *

- * Chunking is only used when {@link org.jclouds.http.GetOptions} is called with options like - * tail, range, or startAt. + * Chunking is only used when {@link org.jclouds.http.GetOptions} is called + * with options like tail, range, or startAt. * - * @return the length in bytes that can be be obtained from {@link #getInput()} + * @return the length in bytes that can be be obtained from + * {@link #getInput()} * @see javax.ws.rs.core.HttpHeaders#CONTENT_LENGTH * @see org.jclouds.http.options.GetOptions */ @@ -80,7 +72,8 @@ public interface Payload extends InputSupplier, Closeable{ String getContentType(); /** - * release resources used by this entity. This should be called when data is discarded. + * release resources used by this entity. This should be called when data is + * discarded. */ void release(); } \ No newline at end of file diff --git a/core/src/main/java/org/jclouds/io/WriteTo.java b/core/src/main/java/org/jclouds/io/WriteTo.java new file mode 100644 index 0000000000..2de7a29609 --- /dev/null +++ b/core/src/main/java/org/jclouds/io/WriteTo.java @@ -0,0 +1,36 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ +package org.jclouds.io; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * @author Adrian Cole + */ +public interface WriteTo { + + /** + * Writes the payload content to the output stream. + * + * @throws IOException + */ + void writeTo(OutputStream outstream) throws IOException; + +} \ No newline at end of file diff --git a/core/src/main/java/org/jclouds/io/payloads/StreamingPayload.java b/core/src/main/java/org/jclouds/io/payloads/StreamingPayload.java new file mode 100644 index 0000000000..2e807a585e --- /dev/null +++ b/core/src/main/java/org/jclouds/io/payloads/StreamingPayload.java @@ -0,0 +1,174 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ +package org.jclouds.io.payloads; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import javax.annotation.Nullable; + +import org.jclouds.io.Payload; +import org.jclouds.io.WriteTo; + +/** + * Note that not all services accept streaming payloads. For example, Rackspace + * CloudFiles accepts streaming while Amazon S3 does not. + * + * @author Adrian Cole + */ +public class StreamingPayload implements Payload { + protected String contentType; + protected transient volatile boolean written; + protected final WriteTo writeTo; + + public StreamingPayload(WriteTo writeTo) { + this.writeTo = checkNotNull(writeTo, "writeTo"); + this.contentType = "application/unknown"; + } + + /** + * @throws UnsupportedOperationException + * this payload is for streaming writes only + */ + @Override + public Object getRawContent() { + throw new UnsupportedOperationException("this payload is for streaming writes only"); + } + + /** + * @throws UnsupportedOperationException + * this payload is for streaming writes only + */ + @Override + public InputStream getInput() { + throw new UnsupportedOperationException("this payload is for streaming writes only"); + } + + /** + * {@inheritDoc} + */ + @Override + public Long getContentLength() { + return null; + } + + /** + * {@inheritDoc} + */ + @Override + public void setContentLength(@Nullable Long contentLength) { + throw new UnsupportedOperationException("this payload is for streaming writes only"); + } + + /** + * {@inheritDoc} + */ + @Override + public byte[] getContentMD5() { + return null; + } + + /** + * {@inheritDoc} + */ + @Override + public void setContentMD5(byte[] md5) { + throw new UnsupportedOperationException("this payload is for streaming writes only"); + } + + /** + * {@inheritDoc} + */ + @Override + public String getContentType() { + return contentType; + } + + /** + * {@inheritDoc} + */ + @Override + public void setContentType(@Nullable String contentType) { + this.contentType = contentType; + } + + /** + * {@inheritDoc} + */ + @Override + public void writeTo(OutputStream outstream) throws IOException { + writeTo.writeTo(outstream); + } + + @Override + public String toString() { + return "[contentType=" + contentType + ", written=" + written + "]"; + } + + /** + * By default we are not repeatable. + */ + @Override + public boolean isRepeatable() { + return true; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((contentType == null) ? 0 : contentType.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + StreamingPayload other = (StreamingPayload) obj; + if (contentType == null) { + if (other.contentType != null) + return false; + } else if (!contentType.equals(other.contentType)) + return false; + return true; + } + + /** + * By default there are no resources to release. + */ + @Override + public void release() { + } + + /** + * Delegates to release() + */ + @Override + public void close() { + release(); + } +} \ No newline at end of file diff --git a/extensions/gae/src/main/java/org/jclouds/gae/ConvertToGaeRequest.java b/extensions/gae/src/main/java/org/jclouds/gae/ConvertToGaeRequest.java index bfcb7ad9e1..a66af54cf6 100644 --- a/extensions/gae/src/main/java/org/jclouds/gae/ConvertToGaeRequest.java +++ b/extensions/gae/src/main/java/org/jclouds/gae/ConvertToGaeRequest.java @@ -20,9 +20,9 @@ package org.jclouds.gae; import static com.google.appengine.api.urlfetch.FetchOptions.Builder.disallowTruncate; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.io.ByteStreams.toByteArray; import static com.google.common.io.Closeables.closeQuietly; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; @@ -51,8 +51,8 @@ public class ConvertToGaeRequest implements Function { public static final String USER_AGENT = "jclouds/1.0 urlfetch/1.3.5"; /** - * byte [] content is replayable and the only content type supportable by GAE. As such, we - * convert the original request content to a byte array. + * byte [] content is replayable and the only content type supportable by + * GAE. As such, we convert the original request content to a byte array. */ @Override public HTTPRequest apply(HttpRequest request) { @@ -76,13 +76,15 @@ public class ConvertToGaeRequest implements Function { } gaeRequest.addHeader(new HTTPHeader(HttpHeaders.USER_AGENT, USER_AGENT)); /** - * byte [] content is replayable and the only content type supportable by GAE. As such, we - * convert the original request content to a byte array. + * byte [] content is replayable and the only content type supportable by + * GAE. As such, we convert the original request content to a byte array. */ if (request.getPayload() != null) { InputStream input = request.getPayload().getInput(); try { - byte[] array = toByteArray(input); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + request.getPayload().writeTo(out); + byte[] array = out.toByteArray(); if (!request.getPayload().isRepeatable()) { Payload oldPayload = request.getPayload(); request.setPayload(array); @@ -99,7 +101,7 @@ public class ConvertToGaeRequest implements Function { } if (request.getPayload().getContentMD5() != null) gaeRequest.setHeader(new HTTPHeader("Content-MD5", CryptoStreams.base64(request.getPayload() - .getContentMD5()))); + .getContentMD5()))); if (request.getPayload().getContentType() != null) gaeRequest.setHeader(new HTTPHeader(HttpHeaders.CONTENT_TYPE, request.getPayload().getContentType())); Long length = checkNotNull(request.getPayload().getContentLength(), "payload.getContentLength"); diff --git a/rackspace/src/test/resources/log4j.xml b/rackspace/src/test/resources/log4j.xml index aee54728e8..825de7fcc8 100755 --- a/rackspace/src/test/resources/log4j.xml +++ b/rackspace/src/test/resources/log4j.xml @@ -161,13 +161,13 @@ - +