From 9c075ff85ca4e754df6a81f49143b68c09ba4953 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 11 Feb 2016 09:37:46 +0100 Subject: [PATCH 1/2] Converted anonymous inner classes to lambdas. --- .../jetty/client/HttpClientStreamTest.java | 97 ++++++------------- 1 file changed, 27 insertions(+), 70 deletions(-) diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java index 9ee92a07b9d..0c03f991d05 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java @@ -18,9 +18,6 @@ package org.eclipse.jetty.client; -import static java.nio.file.StandardOpenOption.CREATE; -import static org.junit.Assert.fail; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -51,7 +48,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BufferingResponseListener; @@ -70,6 +66,9 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.Assert; import org.junit.Test; +import static java.nio.file.StandardOpenOption.CREATE; +import static org.junit.Assert.fail; + public class HttpClientStreamTest extends AbstractHttpClientServerTest { public HttpClientStreamTest(SslContextFactory sslContextFactory) @@ -97,14 +96,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .file(upload) - .onRequestSuccess(new Request.SuccessListener() - { - @Override - public void onSuccess(Request request) - { - requestTime.set(System.nanoTime()); - } - }) + .onRequestSuccess(request -> requestTime.set(System.nanoTime())) .timeout(10, TimeUnit.SECONDS) .send(); long responseTime = System.nanoTime(); @@ -649,14 +641,10 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .content(content) - .send(new Response.CompleteListener() + .send(result -> { - @Override - public void onComplete(Result result) - { - if (result.isSucceeded() && result.getResponse().getStatus() == 200) - latch.countDown(); - } + if (result.isSucceeded() && result.getResponse().getStatus() == 200) + latch.countDown(); }); // Make sure we provide the content *after* the request has been "sent". @@ -703,14 +691,10 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .content(content) - .send(new Response.CompleteListener() + .send(result -> { - @Override - public void onComplete(Result result) - { - if (result.isSucceeded() && result.getResponse().getStatus() == 200) - latch.countDown(); - } + if (result.isSucceeded() && result.getResponse().getStatus() == 200) + latch.countDown(); }); } Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); @@ -947,14 +931,10 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest client.newRequest("0.0.0.1", connector.getLocalPort()) .scheme(scheme) .content(content) - .send(new Response.CompleteListener() + .send(result -> { - @Override - public void onComplete(Result result) - { - if (result.isFailed()) - latch.countDown(); - } + if (result.isFailed()) + latch.countDown(); }); try (OutputStream output = content.getOutputStream()) @@ -990,24 +970,16 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .content(content) - .onRequestBegin(new Request.BeginListener() + .onRequestBegin(request -> { - @Override - public void onBegin(Request request) - { - content.offer(ByteBuffer.wrap(new byte[256]), callback); - content.offer(ByteBuffer.wrap(new byte[256]), callback); - request.abort(new Exception("explicitly_thrown_by_test")); - } + content.offer(ByteBuffer.wrap(new byte[256]), callback); + content.offer(ByteBuffer.wrap(new byte[256]), callback); + request.abort(new Exception("explicitly_thrown_by_test")); }) - .send(new Response.CompleteListener() + .send(result -> { - @Override - public void onComplete(Result result) - { - if (result.isFailed()) - completeLatch.countDown(); - } + if (result.isFailed()) + completeLatch.countDown(); }); Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(failLatch.await(5, TimeUnit.SECONDS)); @@ -1046,14 +1018,10 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest client.newRequest("0.0.0.1", connector.getLocalPort()) .scheme(scheme) .content(content) - .send(new Response.CompleteListener() + .send(result -> { - @Override - public void onComplete(Result result) - { - Assert.assertTrue(result.isFailed()); - completeLatch.countDown(); - } + Assert.assertTrue(result.isFailed()); + completeLatch.countDown(); }); Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); @@ -1122,22 +1090,11 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .content(provider) - .onRequestCommit(new Request.CommitListener() + .onRequestCommit(request -> commit.set(true)) + .send(result -> { - @Override - public void onCommit(Request request) - { - commit.set(true); - } - }) - .send(new Response.CompleteListener() - { - @Override - public void onComplete(Result result) - { - Assert.assertTrue(result.isFailed()); - completeLatch.countDown(); - } + Assert.assertTrue(result.isFailed()); + completeLatch.countDown(); }); Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); From 7c7c49f06b5783482e973d41f1eb3ec97d617eab Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 12 Feb 2016 11:19:31 +0100 Subject: [PATCH 2/2] 484446 - InputStreamResponseListener's InputStream uses default read (3) and blocks early on never-ending response. Implemented read(byte[],int.int) to fix the reported issue. Reworked InputStreamResponseListener to use a callback approach rather than blocking waiting for content. --- .../util/InputStreamResponseListener.java | 294 +++++++++--------- .../jetty/client/HttpClientStreamTest.java | 284 ++++++++++------- 2 files changed, 312 insertions(+), 266 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java index 258309e73c0..f74f2958387 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java @@ -23,19 +23,18 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response.Listener; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -63,8 +62,6 @@ import org.eclipse.jetty.util.log.Logger; *

* The {@link HttpClient} implementation (the producer) will feed the input stream * asynchronously while the application (the consumer) is reading from it. - * Chunks of content are maintained in a queue, and it is possible to specify a - * maximum buffer size for the bytes held in the queue, by default 16384 bytes. *

* If the consumer is faster than the producer, then the consumer will block * with the typical {@link InputStream#read()} semantic. @@ -74,137 +71,133 @@ import org.eclipse.jetty.util.log.Logger; public class InputStreamResponseListener extends Listener.Adapter { private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class); - private static final byte[] EOF = new byte[0]; - private static final byte[] CLOSED = new byte[0]; - private static final byte[] FAILURE = new byte[0]; - private final BlockingQueue queue = new LinkedBlockingQueue<>(); - private final AtomicLong length = new AtomicLong(); + private static final DeferredContentProvider.Chunk EOF = new DeferredContentProvider.Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP); + private final Object lock = this; private final CountDownLatch responseLatch = new CountDownLatch(1); private final CountDownLatch resultLatch = new CountDownLatch(1); private final AtomicReference stream = new AtomicReference<>(); - private final long maxBufferSize; private Response response; private Result result; - private volatile Throwable failure; - private volatile boolean closed; + private Throwable failure; + private boolean closed; + private DeferredContentProvider.Chunk chunk; public InputStreamResponseListener() { - this(16 * 1024L); } + /** + * @deprecated response content is not buffered anymore, but handled asynchronously. + */ + @Deprecated public InputStreamResponseListener(long maxBufferSize) { - this.maxBufferSize = maxBufferSize; } @Override public void onHeaders(Response response) { - this.response = response; - responseLatch.countDown(); + synchronized (lock) + { + this.response = response; + responseLatch.countDown(); + } } @Override - public void onContent(Response response, ByteBuffer content) + public void onContent(Response response, ByteBuffer content, Callback callback) { - if (!closed) + if (content.remaining() == 0) { - int remaining = content.remaining(); - if (remaining > 0) - { + if (LOG.isDebugEnabled()) + LOG.debug("Skipped empty content {}", content); + callback.succeeded(); + return; + } - byte[] bytes = new byte[remaining]; - content.get(bytes); - if (LOG.isDebugEnabled()) - LOG.debug("Queuing {}/{} bytes", bytes, remaining); - queue.offer(bytes); - - long newLength = length.addAndGet(remaining); - while (newLength >= maxBufferSize) - { - if (LOG.isDebugEnabled()) - LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize); - // Block to avoid infinite buffering - if (!await()) - break; - newLength = length.get(); - if (LOG.isDebugEnabled()) - LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize); - } - } - else + boolean closed; + synchronized (lock) + { + closed = this.closed; + if (!closed) { - if (LOG.isDebugEnabled()) - LOG.debug("Queuing skipped, empty content {}", content); + chunk = new DeferredContentProvider.Chunk(content, callback); + lock.notifyAll(); } } - else + + if (closed) { - LOG.debug("Queuing skipped, stream already closed"); + if (LOG.isDebugEnabled()) + LOG.debug("InputStream closed, ignored content {}", content); + callback.failed(new AsynchronousCloseException()); } } @Override public void onSuccess(Response response) { + synchronized (lock) + { + chunk = EOF; + lock.notifyAll(); + } + if (LOG.isDebugEnabled()) - LOG.debug("Queuing end of content {}{}", EOF, ""); - queue.offer(EOF); - signal(); + LOG.debug("End of content"); } @Override public void onFailure(Response response, Throwable failure) { - fail(failure); - signal(); + Callback callback = null; + synchronized (lock) + { + if (this.failure != null) + return; + this.failure = failure; + if (chunk != null) + callback = chunk.callback; + lock.notifyAll(); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Content failure", failure); + + if (callback != null) + callback.failed(failure); } @Override public void onComplete(Result result) { - if (result.isFailed() && failure == null) - fail(result.getFailure()); - this.result = result; - resultLatch.countDown(); - signal(); - } - - private void fail(Throwable failure) - { - if (LOG.isDebugEnabled()) - LOG.debug("Queuing failure {} {}", FAILURE, failure); - queue.offer(FAILURE); - this.failure = failure; - responseLatch.countDown(); - } - - protected boolean await() - { - try + Throwable failure = result.getFailure(); + Callback callback = null; + synchronized (lock) { - synchronized (this) + this.result = result; + if (result.isFailed() && this.failure == null) { - while (length.get() >= maxBufferSize && failure == null && !closed) - wait(); - // Re-read the values as they may have changed while waiting. - return failure == null && !closed; + this.failure = failure; + if (chunk != null) + callback = chunk.callback; } + // Notify the response latch in case of request failures. + responseLatch.countDown(); + resultLatch.countDown(); + lock.notifyAll(); } - catch (InterruptedException x) - { - Thread.currentThread().interrupt(); - return false; - } - } - protected void signal() - { - synchronized (this) + if (LOG.isDebugEnabled()) { - notifyAll(); + if (failure == null) + LOG.debug("Result success"); + else + LOG.debug("Result failure", failure); } + + if (callback != null) + callback.failed(failure); } /** @@ -225,9 +218,13 @@ public class InputStreamResponseListener extends Listener.Adapter boolean expired = !responseLatch.await(timeout, unit); if (expired) throw new TimeoutException(); - if (failure != null) - throw new ExecutionException(failure); - return response; + synchronized (lock) + { + // If the request failed there is no response. + if (response == null) + throw new ExecutionException(failure); + return response; + } } /** @@ -247,7 +244,10 @@ public class InputStreamResponseListener extends Listener.Adapter boolean expired = !resultLatch.await(timeout, unit); if (expired) throw new TimeoutException(); - return result; + synchronized (lock) + { + return result; + } } /** @@ -267,65 +267,50 @@ public class InputStreamResponseListener extends Listener.Adapter private class Input extends InputStream { - private byte[] bytes; - private int index; - @Override public int read() throws IOException { - while (true) - { - if (bytes == EOF) - { - // Mark the fact that we saw -1, - // so that in the close case we don't throw - index = -1; - return -1; - } - else if (bytes == FAILURE) - { - throw failure(); - } - else if (bytes == CLOSED) - { - if (index < 0) - return -1; - throw new AsynchronousCloseException(); - } - else if (bytes != null) - { - int result = bytes[index] & 0xFF; - if (++index == bytes.length) - { - length.addAndGet(-index); - bytes = null; - index = 0; - signal(); - } - return result; - } - else - { - bytes = take(); - if (LOG.isDebugEnabled()) - LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length); - } - } + byte[] tmp = new byte[1]; + int read = read(tmp); + if (read < 0) + return read; + return tmp[0] & 0xFF; } - private IOException failure() - { - if (failure instanceof IOException) - return (IOException)failure; - else - return new IOException(failure); - } - - private byte[] take() throws IOException + @Override + public int read(byte[] b, int offset, int length) throws IOException { try { - return queue.take(); + int result; + Callback callback = null; + synchronized (lock) + { + while (true) + { + if (failure != null) + throw toIOException(failure); + if (chunk == EOF) + return -1; + if (closed) + throw new AsynchronousCloseException(); + if (chunk != null) + break; + lock.wait(); + } + + ByteBuffer buffer = chunk.buffer; + result = Math.min(buffer.remaining(), length); + buffer.get(b, offset, result); + if (!buffer.hasRemaining()) + { + callback = chunk.callback; + chunk = null; + } + } + if (callback != null) + callback.succeeded(); + return result; } catch (InterruptedException x) { @@ -333,18 +318,35 @@ public class InputStreamResponseListener extends Listener.Adapter } } + private IOException toIOException(Throwable failure) + { + if (failure instanceof IOException) + return (IOException)failure; + else + return new IOException(failure); + } + @Override public void close() throws IOException { - if (!closed) + Callback callback = null; + synchronized (lock) { - super.close(); - if (LOG.isDebugEnabled()) - LOG.debug("Queuing close {}{}", CLOSED, ""); - queue.offer(CLOSED); + if (closed) + return; closed = true; - signal(); + if (chunk != null) + callback = chunk.callback; + lock.notifyAll(); } + + if (LOG.isDebugEnabled()) + LOG.debug("InputStream close"); + + if (callback != null) + callback.failed(new AsynchronousCloseException()); + + super.close(); } } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java index 0c03f991d05..dad45e1c498 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java @@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.Iterator; import java.util.Random; @@ -56,6 +57,8 @@ import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.InputStreamContentProvider; import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.client.util.OutputStreamContentProvider; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.annotation.Slow; @@ -66,9 +69,6 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.Assert; import org.junit.Test; -import static java.nio.file.StandardOpenOption.CREATE; -import static org.junit.Assert.fail; - public class HttpClientStreamTest extends AbstractHttpClientServerTest { public HttpClientStreamTest(SslContextFactory sslContextFactory) @@ -83,7 +83,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath(); Files.createDirectories(targetTestsDir); Path upload = Paths.get(targetTestsDir.toString(), "http_client_upload.big"); - try (OutputStream output = Files.newOutputStream(upload, CREATE)) + try (OutputStream output = Files.newOutputStream(upload, StandardOpenOption.CREATE)) { byte[] kb = new byte[1024]; for (int i = 0; i < 10 * 1024; ++i) @@ -234,10 +234,11 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest Thread.sleep(1); ++length; } - fail(); + Assert.fail(); } - catch (IOException expected) + catch (IOException x) { + // Expected. } Assert.assertEquals(data.length, length); @@ -262,7 +263,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest InputStreamResponseListener listener = new InputStreamResponseListener(); InputStream stream = listener.getInputStream(); - // Close the stream immediately + // Close the stream immediately. stream.close(); client.newRequest("localhost", connector.getLocalPort()) @@ -275,171 +276,161 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest stream.read(); // Throws } - @Test - public void testInputStreamResponseListenerClosedWhileWaiting() throws Exception + @Test(expected = AsynchronousCloseException.class) + public void testInputStreamResponseListenerClosedBeforeContent() throws Exception { - final byte[] chunk1 = new byte[]{0, 1}; - final byte[] chunk2 = new byte[]{2, 3}; - final CountDownLatch closeLatch = new CountDownLatch(1); + AtomicReference contextRef = new AtomicReference<>(); start(new AbstractHandler() { @Override - public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + contextRef.set(request.startAsync()); + response.flushBuffer(); + } + }); + + CountDownLatch latch = new CountDownLatch(1); + InputStreamResponseListener listener = new InputStreamResponseListener() + { + @Override + public void onContent(Response response, ByteBuffer content, Callback callback) + { + super.onContent(response, content, new Callback() + { + @Override + public void failed(Throwable x) + { + latch.countDown(); + callback.failed(x); + } + }); + } + }; + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .send(listener); + + Response response = listener.get(5, TimeUnit.SECONDS); + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + + InputStream input = listener.getInputStream(); + input.close(); + + AsyncContext asyncContext = contextRef.get(); + asyncContext.getResponse().getOutputStream().write(new byte[1024]); + asyncContext.complete(); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + + input.read(); // Throws + } + + @Test + public void testInputStreamResponseListenerClosedWhileWaiting() throws Exception + { + byte[] chunk1 = new byte[]{0, 1}; + byte[] chunk2 = new byte[]{2, 3}; + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { baseRequest.setHandled(true); response.setContentLength(chunk1.length + chunk2.length); ServletOutputStream output = response.getOutputStream(); output.write(chunk1); output.flush(); - try - { - closeLatch.await(5, TimeUnit.SECONDS); - output.write(chunk2); - output.flush(); - } - catch (InterruptedException x) - { - throw new InterruptedIOException(); - } + output.write(chunk2); } }); - final CountDownLatch waitLatch = new CountDownLatch(1); - final CountDownLatch waitedLatch = new CountDownLatch(1); - InputStreamResponseListener listener = new InputStreamResponseListener(1) + CountDownLatch failedLatch = new CountDownLatch(1); + CountDownLatch contentLatch = new CountDownLatch(1); + InputStreamResponseListener listener = new InputStreamResponseListener() { @Override - protected boolean await() + public void onContent(Response response, ByteBuffer content, Callback callback) { - waitLatch.countDown(); - boolean result = super.await(); - waitedLatch.countDown(); - return result; + super.onContent(response, content, new Callback() + { + @Override + public void failed(Throwable x) + { + failedLatch.countDown(); + callback.failed(x); + } + }); + contentLatch.countDown(); } }; client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .send(listener); Response response = listener.get(5, TimeUnit.SECONDS); - Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + // Wait until we get some content. + Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS)); + + // Close the stream. InputStream stream = listener.getInputStream(); - // Wait until we block - Assert.assertTrue(waitLatch.await(5, TimeUnit.SECONDS)); - // Close the stream stream.close(); - closeLatch.countDown(); - // Be sure we're not stuck waiting - Assert.assertTrue(waitedLatch.await(5, TimeUnit.SECONDS)); + // Make sure that the callback has been invoked. + Assert.assertTrue(failedLatch.await(5, TimeUnit.SECONDS)); } @Test public void testInputStreamResponseListenerFailedWhileWaiting() throws Exception { - final byte[] chunk1 = new byte[]{0, 1}; - final byte[] chunk2 = new byte[]{2, 3}; - final CountDownLatch closeLatch = new CountDownLatch(1); start(new AbstractHandler() { @Override - public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { baseRequest.setHandled(true); - response.setContentLength(chunk1.length + chunk2.length); + byte[] data = new byte[1024]; + response.setContentLength(data.length); ServletOutputStream output = response.getOutputStream(); - output.write(chunk1); - output.flush(); - try - { - closeLatch.await(5, TimeUnit.SECONDS); - output.write(chunk2); - output.flush(); - } - catch (InterruptedException x) - { - throw new InterruptedIOException(); - } + output.write(data); } }); - final CountDownLatch waitLatch = new CountDownLatch(1); - final CountDownLatch waitedLatch = new CountDownLatch(1); - InputStreamResponseListener listener = new InputStreamResponseListener(1) + CountDownLatch failedLatch = new CountDownLatch(1); + CountDownLatch contentLatch = new CountDownLatch(1); + InputStreamResponseListener listener = new InputStreamResponseListener() { @Override - protected boolean await() + public void onContent(Response response, ByteBuffer content, Callback callback) { - waitLatch.countDown(); - boolean result = super.await(); - waitedLatch.countDown(); - return result; + super.onContent(response, content, new Callback() + { + @Override + public void failed(Throwable x) + { + failedLatch.countDown(); + callback.failed(x); + } + }); + contentLatch.countDown(); } }; client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .send(listener); Response response = listener.get(5, TimeUnit.SECONDS); - Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); - // Wait until we block - Assert.assertTrue(waitLatch.await(5, TimeUnit.SECONDS)); - // Fail the response + // Wait until we get some content. + Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS)); + + // Abort the response. response.abort(new Exception()); - closeLatch.countDown(); - // Be sure we're not stuck waiting - Assert.assertTrue(waitedLatch.await(5, TimeUnit.SECONDS)); - } - - @Test - public void testInputStreamResponseListenerConsumingBeforeWaiting() throws Exception - { - final byte[] data = new byte[]{0, 1}; - start(new AbstractHandler() - { - @Override - public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException - { - baseRequest.setHandled(true); - response.setContentLength(data.length); - ServletOutputStream output = response.getOutputStream(); - output.write(data); - output.flush(); - } - }); - - final AtomicReference failure = new AtomicReference<>(); - InputStreamResponseListener listener = new InputStreamResponseListener(1) - { - @Override - protected boolean await() - { - // Consume everything just before waiting - InputStream stream = getInputStream(); - consume(stream, data); - return super.await(); - } - - private void consume(InputStream stream, byte[] data) - { - try - { - for (byte datum : data) - Assert.assertEquals(datum, stream.read()); - } - catch (IOException x) - { - failure.compareAndSet(null, x); - } - } - }; - client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) - .send(listener); - Result result = listener.await(5, TimeUnit.SECONDS); - Assert.assertEquals(200, result.getResponse().getStatus()); - Assert.assertNull(failure.get()); + // Make sure that the callback has been invoked. + Assert.assertTrue(failedLatch.await(5, TimeUnit.SECONDS)); } @Test @@ -1100,4 +1091,57 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testInputStreamResponseListenerBufferedRead() throws Exception + { + AtomicReference asyncContextRef = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + asyncContextRef.set(request.startAsync()); + latch.countDown(); + } + }); + + InputStreamResponseListener listener = new InputStreamResponseListener(); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .timeout(5, TimeUnit.SECONDS) + .send(listener); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + + AsyncContext asyncContext = asyncContextRef.get(); + Assert.assertNotNull(asyncContext); + + Random random = new Random(); + + byte[] chunk = new byte[64]; + random.nextBytes(chunk); + ServletOutputStream output = asyncContext.getResponse().getOutputStream(); + output.write(chunk); + output.flush(); + + // Use a buffer larger than the data + // written to test that the read returns. + byte[] buffer = new byte[2 * chunk.length]; + InputStream stream = listener.getInputStream(); + int totalRead = 0; + while (totalRead < chunk.length) + { + int read = stream.read(buffer); + Assert.assertTrue(read > 0); + totalRead += read; + } + + asyncContext.complete(); + + Response response = listener.get(5, TimeUnit.SECONDS); + Assert.assertEquals(200, response.getStatus()); + } }