From 9355e066034458cbb92d9cb37ed8f5245c213aa3 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 17 Sep 2012 15:11:31 +0200 Subject: [PATCH] Intermediate commit. --- .../org/eclipse/jetty/client/api/Request.java | 4 +- .../util/InputStreamResponseListener.java | 106 ++++++++++++++++++ .../util/StreamingResponseListener.java | 42 ------- .../jetty/client/HttpClientStreamTest.java | 45 ++++++++ .../org/eclipse/jetty/client/api/Usage.java | 37 ++++-- 5 files changed, 180 insertions(+), 54 deletions(-) create mode 100644 jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java delete mode 100644 jetty-client/src/main/java/org/eclipse/jetty/client/util/StreamingResponseListener.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java index 9fbb1373883..1c1899e3754 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java @@ -23,7 +23,7 @@ import java.nio.file.Path; import java.util.concurrent.Future; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.util.StreamingResponseListener; +import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpVersion; @@ -213,7 +213,7 @@ public interface Request * that the response content can be buffered without exceeding memory constraints. * For example, this method is not appropriate to download big files from a server; consider using * {@link #send(Response.Listener)} instead, passing your own {@link Response.Listener} or a utility - * listener such as {@link StreamingResponseListener}. + * listener such as {@link InputStreamResponseListener}. *

* The future will return when {@link Response.Listener#onComplete(Result)} is invoked. * diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java new file mode 100644 index 00000000000..1ef3a202c53 --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java @@ -0,0 +1,106 @@ +// +// ======================================================================== +// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client.util; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; + +public class InputStreamResponseListener extends Response.Listener.Empty +{ + private final AtomicLong length = new AtomicLong(); + private final CountDownLatch responseLatch = new CountDownLatch(1); + private final long capacity; + private Response response; + private volatile Throwable failure; + + public InputStreamResponseListener() + { + this(1024 * 1024L); + } + + public InputStreamResponseListener(long capacity) + { + this.capacity = capacity; + } + + @Override + public void onHeaders(Response response) + { + this.response = response; + responseLatch.countDown(); + } + + @Override + public void onContent(Response response, ByteBuffer content) + { + int remaining = content.remaining(); + byte[] bytes = new byte[remaining]; + content.get(bytes); + + long newLength = length.addAndGet(remaining); + if (newLength > capacity) + // wait + ; + } + + @Override + public void onFailure(Response response, Throwable failure) + { + this.failure = failure; + responseLatch.countDown(); + } + + public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException + { + boolean expired = !responseLatch.await(timeout, unit); + if (expired) + throw new TimeoutException(); + if (failure != null) + throw new ExecutionException(failure); + return response; + } + + public Result await(long timeout, TimeUnit seconds) + { + return null; + } + + public InputStream getInputStream() + { + return new Input(); + } + + private class Input extends InputStream + { + @Override + public int read() throws IOException + { + return 0; + } + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/StreamingResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/StreamingResponseListener.java deleted file mode 100644 index e0402b01284..00000000000 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/StreamingResponseListener.java +++ /dev/null @@ -1,42 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.client.util; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.concurrent.TimeUnit; - -import org.eclipse.jetty.client.api.Response; - -public class StreamingResponseListener extends Response.Listener.Empty -{ - public Response get(long timeout, TimeUnit seconds) - { - return null; - } - - public InputStream getInputStream() - { - return null; - } - - public void writeTo(OutputStream outputStream) - { - } -} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java index 8e8a0f928ea..f148f3d0e88 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java @@ -18,15 +18,24 @@ package org.eclipse.jetty.client; +import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.InputStreamResponseListener; +import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.Assert; @@ -80,4 +89,40 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest // This is just to avoid exception traces in the test output Thread.sleep(1000); } + + @Test + public void testDownload() throws Exception + { + final byte[] data = new byte[128 * 1024]; + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + response.getOutputStream().write(data); + } + }); + + InputStreamResponseListener listener = new InputStreamResponseListener(); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .send(listener); + Response response = listener.get(5, TimeUnit.SECONDS); + Assert.assertNotNull(response); + Assert.assertEquals(200, response.status()); + + InputStream input = listener.getInputStream(); + Assert.assertNotNull(input); + + int length = 0; + while (input.read() >= 0) + ++length; + + Assert.assertEquals(data.length, length); + + Result result = listener.await(5, TimeUnit.SECONDS); + Assert.assertNotNull(result); + Assert.assertSame(response, result.getResponse()); + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/api/Usage.java b/jetty-client/src/test/java/org/eclipse/jetty/client/api/Usage.java index 55dbec3e87c..a704dbebc59 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/api/Usage.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/api/Usage.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.client.api; -import java.io.FileOutputStream; import java.io.InputStream; import java.nio.file.Paths; import java.util.concurrent.CountDownLatch; @@ -28,8 +27,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.util.BlockingResponseListener; +import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.client.util.PathContentProvider; -import org.eclipse.jetty.client.util.StreamingResponseListener; import org.eclipse.jetty.http.HttpCookie; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpVersion; @@ -161,10 +160,10 @@ public class Usage } @Test - public void testResponseStream() throws Exception + public void testResponseInputStream() throws Exception { HttpClient client = new HttpClient(); - StreamingResponseListener listener = new StreamingResponseListener(); + InputStreamResponseListener listener = new InputStreamResponseListener(); client.newRequest("localhost", 8080).send(listener); // Call to get() blocks until the headers arrived Response response = listener.get(5, TimeUnit.SECONDS); @@ -182,16 +181,34 @@ public class Usage // No need for output stream; for example, parse bytes } } - - // Solution 2: write to output stream - try (FileOutputStream output = new FileOutputStream("")) - { - listener.writeTo(output); - } } else { response.abort(); } } + +// @Test +// public void testResponseOutputStream() throws Exception +// { +// HttpClient client = new HttpClient(); +// +// try (FileOutputStream output = new FileOutputStream("")) +// { +// OutputStreamResponseListener listener = new OutputStreamResponseListener(output); +// client.newRequest("localhost", 8080).send(listener); +// // Call to get() blocks until the headers arrived +// Response response = listener.get(5, TimeUnit.SECONDS); +// if (response.status() == 200) +// { +// // Solution 2: write to output stream +// { +// listener.writeTo(output); +// } +// } +// else +// { +// response.abort(); +// } +// } }