From 6649b890a7f6fc6ce37ac9081b7821288386c745 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 11 Feb 2013 19:03:10 +0100 Subject: [PATCH] 400434 - Add support for an OutputStream ContentProvider. --- .../client/util/DeferredContentProvider.java | 13 +- .../util/OutputStreamContentProvider.java | 132 ++++++++++++++++++ .../jetty/client/HttpClientStreamTest.java | 44 ++++++ .../org/eclipse/jetty/client/api/Usage.java | 29 ++++ 4 files changed, 216 insertions(+), 2 deletions(-) create mode 100644 jetty-client/src/main/java/org/eclipse/jetty/client/util/OutputStreamContentProvider.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java index 64d0e77c504..e168e038392 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.AsyncContentProvider; @@ -34,6 +35,10 @@ import org.eclipse.jetty.client.api.Response; * A {@link ContentProvider} that allows to add content after {@link Request#send(Response.CompleteListener)} * has been called, therefore providing the request content at a later time. *

+ * {@link DeferredContentProvider} can only be used in conjunction with + * {@link Request#send(Response.CompleteListener)} (and not with its blocking counterpart {@link Request#send()}) + * because it provides content asynchronously. + *

* The deferred content is provided once and then fully consumed. * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator * because the stream has been consumed on the first invocation. @@ -79,6 +84,7 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea private final Queue chunks = new ConcurrentLinkedQueue<>(); private final AtomicReference listener = new AtomicReference<>(); private final Iterator iterator = new DeferredContentProviderIterator(); + private final AtomicBoolean closed = new AtomicBoolean(); /** * Creates a new {@link DeferredContentProvider} with the given initial content @@ -124,8 +130,11 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea */ public void close() { - chunks.offer(CLOSE); - notifyListener(); + if (closed.compareAndSet(false, true)) + { + chunks.offer(CLOSE); + notifyListener(); + } } private void notifyListener() diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/OutputStreamContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/OutputStreamContentProvider.java new file mode 100644 index 00000000000..5becf700319 --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/OutputStreamContentProvider.java @@ -0,0 +1,132 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Iterator; + +import org.eclipse.jetty.client.AsyncContentProvider; +import org.eclipse.jetty.client.api.ContentProvider; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; + +/** + * A {@link ContentProvider} that provides content asynchronously through an {@link OutputStream} + * similar to {@link DeferredContentProvider}. + *

+ * {@link OutputStreamContentProvider} can only be used in conjunction with + * {@link Request#send(Response.CompleteListener)} (and not with its blocking counterpart {@link Request#send()}) + * because it provides content asynchronously. + *

+ * The deferred content is provided once by writing to the {@link #getOutputStream() output stream} + * and then fully consumed. + * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator + * because the stream has been consumed on the first invocation. + * However, it is possible for subclasses to support multiple invocations of {@link #iterator()} + * by overriding {@link #write(ByteBuffer)} and {@link #close()}, copying the bytes and making them + * available for subsequent invocations. + *

+ * Content must be provided by writing to the {@link #getOutputStream() output stream}, that must be + * {@link OutputStream#close() closed} when all content has been provided. + *

+ * Example usage: + *

+ * HttpClient httpClient = ...;
+ *
+ * // Use try-with-resources to autoclose the output stream
+ * OutputStreamContentProvider content = new OutputStreamContentProvider();
+ * try (OutputStream output = content.getOutputStream())
+ * {
+ *     httpClient.newRequest("localhost", 8080)
+ *             .content(content)
+ *             .send(new Response.CompleteListener()
+ *             {
+ *                 @Override
+ *                 public void onComplete(Result result)
+ *                 {
+ *                     // Your logic here
+ *                 }
+ *             });
+ *
+ *     // At a later time...
+ *     output.write("some content".getBytes());
+ * }
+ * 
+ */ +public class OutputStreamContentProvider implements AsyncContentProvider +{ + private final DeferredContentProvider deferred = new DeferredContentProvider(); + private final OutputStream output = new DeferredOutputStream(); + + @Override + public long getLength() + { + return deferred.getLength(); + } + + @Override + public Iterator iterator() + { + return deferred.iterator(); + } + + @Override + public void setListener(Listener listener) + { + deferred.setListener(listener); + } + + public OutputStream getOutputStream() + { + return output; + } + + protected void write(ByteBuffer buffer) + { + deferred.offer(buffer); + } + + protected void close() + { + deferred.close(); + } + + private class DeferredOutputStream extends OutputStream + { + @Override + public void write(int b) throws IOException + { + write(new byte[]{(byte)b}, 0, 1); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + OutputStreamContentProvider.this.write(ByteBuffer.wrap(b, off, len)); + } + + @Override + public void close() throws IOException + { + OutputStreamContentProvider.this.close(); + } + } +} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java index 856172a20b8..ab7aaf13814 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java @@ -47,6 +47,7 @@ import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.InputStreamResponseListener; +import org.eclipse.jetty.client.util.OutputStreamContentProvider; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.annotation.Slow; @@ -366,6 +367,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest } }); + // Make sure we provide the content *after* the request has been "sent". Thread.sleep(1000); try (ByteArrayInputStream input = new ByteArrayInputStream(new byte[1024])) @@ -505,4 +507,46 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testUploadWithOutputStream() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + IO.copy(request.getInputStream(), response.getOutputStream()); + } + }); + + final byte[] data = new byte[512]; + final CountDownLatch latch = new CountDownLatch(1); + OutputStreamContentProvider content = new OutputStreamContentProvider(); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .content(content) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded() && + result.getResponse().getStatus() == 200 && + Arrays.equals(data, getContent())) + latch.countDown(); + } + }); + + // Make sure we provide the content *after* the request has been "sent". + Thread.sleep(1000); + + try (OutputStream output = content.getOutputStream()) + { + output.write(data); + } + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/api/Usage.java b/jetty-client/src/test/java/org/eclipse/jetty/client/api/Usage.java index eeb93dcac3d..f5caedbef42 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/api/Usage.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/api/Usage.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.client.api; import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.io.OutputStream; import java.net.HttpCookie; import java.net.URI; import java.nio.ByteBuffer; @@ -35,6 +36,7 @@ import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.FutureResponseListener; import org.eclipse.jetty.client.util.InputStreamContentProvider; import org.eclipse.jetty.client.util.InputStreamResponseListener; +import org.eclipse.jetty.client.util.OutputStreamContentProvider; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpVersion; import org.junit.Assert; @@ -274,6 +276,33 @@ public class Usage Assert.assertEquals(200, response.getStatus()); } + @Test + public void testRequestOutputStream() throws Exception + { + HttpClient client = new HttpClient(); + client.start(); + + OutputStreamContentProvider content = new OutputStreamContentProvider(); + try (OutputStream output = content.getOutputStream()) + { + client.newRequest("localhost", 8080) + .content(content) + .send(new Response.CompleteListener() + { + @Override + public void onComplete(Result result) + { + Assert.assertEquals(200, result.getResponse().getStatus()); + } + }); + + output.write(new byte[1024]); + output.write(new byte[512]); + output.write(new byte[256]); + output.write(new byte[128]); + } + } + @Test public void testProxyUsage() throws Exception {