diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java index f74f2958387..d1da7e10f2b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java @@ -23,6 +23,11 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -76,11 +81,11 @@ public class InputStreamResponseListener extends Listener.Adapter private final CountDownLatch responseLatch = new CountDownLatch(1); private final CountDownLatch resultLatch = new CountDownLatch(1); private final AtomicReference stream = new AtomicReference<>(); + private final Queue chunks = new ArrayDeque<>(); private Response response; private Result result; private Throwable failure; private boolean closed; - private DeferredContentProvider.Chunk chunk; public InputStreamResponseListener() { @@ -121,7 +126,9 @@ public class InputStreamResponseListener extends Listener.Adapter closed = this.closed; if (!closed) { - chunk = new DeferredContentProvider.Chunk(content, callback); + if (LOG.isDebugEnabled()) + LOG.debug("Queueing content {}", content); + chunks.add(new DeferredContentProvider.Chunk(content, callback)); lock.notifyAll(); } } @@ -139,7 +146,8 @@ public class InputStreamResponseListener extends Listener.Adapter { synchronized (lock) { - chunk = EOF; + if (!closed) + chunks.add(EOF); lock.notifyAll(); } @@ -150,37 +158,34 @@ public class InputStreamResponseListener extends Listener.Adapter @Override public void onFailure(Response response, Throwable failure) { - Callback callback = null; + List callbacks; synchronized (lock) { if (this.failure != null) return; this.failure = failure; - if (chunk != null) - callback = chunk.callback; + callbacks = drain(); lock.notifyAll(); } if (LOG.isDebugEnabled()) LOG.debug("Content failure", failure); - if (callback != null) - callback.failed(failure); + callbacks.forEach(callback -> callback.failed(failure)); } @Override public void onComplete(Result result) { Throwable failure = result.getFailure(); - Callback callback = null; + List callbacks = Collections.emptyList(); synchronized (lock) { this.result = result; if (result.isFailed() && this.failure == null) { this.failure = failure; - if (chunk != null) - callback = chunk.callback; + callbacks = drain(); } // Notify the response latch in case of request failures. responseLatch.countDown(); @@ -196,8 +201,7 @@ public class InputStreamResponseListener extends Listener.Adapter LOG.debug("Result failure", failure); } - if (callback != null) - callback.failed(failure); + callbacks.forEach(callback -> callback.failed(failure)); } /** @@ -265,6 +269,23 @@ public class InputStreamResponseListener extends Listener.Adapter return IO.getClosedStream(); } + private List drain() + { + List callbacks = new ArrayList<>(); + synchronized (lock) + { + while (true) + { + DeferredContentProvider.Chunk chunk = chunks.peek(); + if (chunk == null || chunk == EOF) + break; + callbacks.add(chunk.callback); + chunks.poll(); + } + } + return callbacks; + } + private class Input extends InputStream { @Override @@ -286,16 +307,22 @@ public class InputStreamResponseListener extends Listener.Adapter Callback callback = null; synchronized (lock) { + DeferredContentProvider.Chunk chunk; while (true) { - if (failure != null) - throw toIOException(failure); + chunk = chunks.peek(); if (chunk == EOF) return -1; - if (closed) - throw new AsynchronousCloseException(); + if (chunk != null) break; + + if (failure != null) + throw toIOException(failure); + + if (closed) + throw new AsynchronousCloseException(); + lock.wait(); } @@ -305,7 +332,7 @@ public class InputStreamResponseListener extends Listener.Adapter if (!buffer.hasRemaining()) { callback = chunk.callback; - chunk = null; + chunks.poll(); } } if (callback != null) @@ -329,22 +356,21 @@ public class InputStreamResponseListener extends Listener.Adapter @Override public void close() throws IOException { - Callback callback = null; + List callbacks; synchronized (lock) { if (closed) return; closed = true; - if (chunk != null) - callback = chunk.callback; + callbacks = drain(); lock.notifyAll(); } if (LOG.isDebugEnabled()) LOG.debug("InputStream close"); - if (callback != null) - callback.failed(new AsynchronousCloseException()); + Throwable failure = new AsynchronousCloseException(); + callbacks.forEach(callback -> callback.failed(failure)); super.close(); } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/EmptyServerHandler.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/EmptyServerHandler.java new file mode 100644 index 00000000000..e93f4cedc6c --- /dev/null +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/EmptyServerHandler.java @@ -0,0 +1,37 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http.client; + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; + +public class EmptyServerHandler extends AbstractHandler +{ + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + } +} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientStreamTest.java similarity index 95% rename from jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java rename to tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientStreamTest.java index 7be86646b59..d4934b65dfa 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientStreamTest.java @@ -16,7 +16,7 @@ // ======================================================================== // -package org.eclipse.jetty.client; +package org.eclipse.jetty.http.client; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -65,15 +65,15 @@ import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; -import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; -public class HttpClientStreamTest extends AbstractHttpClientServerTest +public class HttpClientStreamTest extends AbstractTest { - public HttpClientStreamTest(SslContextFactory sslContextFactory) + public HttpClientStreamTest(Transport transport) { - super(sslContextFactory); + super(transport); } @Test @@ -90,11 +90,30 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest output.write(kb); } - start(new RespondThenConsumeHandler()); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + response.setStatus(200); + response.setContentLength(0); + response.flushBuffer(); + + InputStream in = request.getInputStream(); + byte[] buffer = new byte[1024]; + while (true) + { + int read = in.read(buffer); + if (read < 0) + break; + } + } + }); final AtomicLong requestTime = new AtomicLong(); ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .file(upload) .onRequestSuccess(request -> requestTime.set(System.nanoTime())) .timeout(30, TimeUnit.SECONDS) @@ -127,7 +146,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest InputStreamResponseListener listener = new InputStreamResponseListener(); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .send(listener); Response response = listener.get(5, TimeUnit.SECONDS); Assert.assertNotNull(response); @@ -168,7 +187,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest InputStreamResponseListener listener = new InputStreamResponseListener(); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .send(listener); Response response = listener.get(5, TimeUnit.SECONDS); Assert.assertNotNull(response); @@ -215,7 +234,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest InputStreamResponseListener listener = new InputStreamResponseListener(); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .send(listener); Response response = listener.get(5, TimeUnit.SECONDS); Assert.assertNotNull(response); @@ -241,7 +260,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest // Expected. } - Assert.assertEquals(data.length, length); + Assert.assertThat(length, Matchers.lessThanOrEqualTo(data.length)); Result result = listener.await(5, TimeUnit.SECONDS); Assert.assertNotNull(result); @@ -267,7 +286,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest stream.close(); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .content(new BytesContentProvider(new byte[]{0, 1, 2, 3})) .send(listener); Response response = listener.get(5, TimeUnit.SECONDS); @@ -309,7 +328,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest } }; client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .send(listener); Response response = listener.get(5, TimeUnit.SECONDS); @@ -366,7 +385,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest } }; client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .send(listener); Response response = listener.get(5, TimeUnit.SECONDS); Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); @@ -418,7 +437,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest } }; client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .send(listener); Response response = listener.get(5, TimeUnit.SECONDS); Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); @@ -443,7 +462,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest InputStreamResponseListener listener = new InputStreamResponseListener(); // Connect to the wrong port client.newRequest("localhost", port) - .scheme(scheme) + .scheme(getScheme()) .send(listener); Result result = listener.await(5, TimeUnit.SECONDS); Assert.assertNotNull(result); @@ -464,7 +483,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest final byte[] data = new byte[]{0, 1, 2, 3}; client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .content(new InputStreamContentProvider(new InputStream() { private int index = 0; @@ -510,7 +529,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest InputStreamResponseListener listener = new InputStreamResponseListener(); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .send(listener); Response response = listener.get(5, TimeUnit.SECONDS); Assert.assertNotNull(response); @@ -555,7 +574,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest InputStreamResponseListener listener = new InputStreamResponseListener(); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .send(listener); Response response = listener.get(5, TimeUnit.SECONDS); Assert.assertNotNull(response); @@ -591,7 +610,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest InputStreamResponseListener listener = new InputStreamResponseListener(); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .send(listener); Response response = listener.get(5, TimeUnit.SECONDS); Assert.assertNotNull(response); @@ -630,7 +649,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest try (DeferredContentProvider content = new DeferredContentProvider()) { client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .content(content) .send(result -> { @@ -680,7 +699,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest }); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .content(content) .send(result -> { @@ -720,7 +739,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest }; client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .content(content) .send(new BufferingResponseListener() { @@ -784,7 +803,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest { byte[] chunk = iteratorData[index.getAndIncrement()]; ByteBuffer result = chunk == null ? null : ByteBuffer.wrap(chunk); - if (index.get() == 2) + if (index.get() < iteratorData.length) { contentRef.get().offer(result == null ? BufferUtil.EMPTY_BUFFER : result); contentRef.get().close(); @@ -802,7 +821,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest contentRef.set(content); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .content(content) .send(new BufferingResponseListener() { @@ -836,7 +855,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest final CountDownLatch latch = new CountDownLatch(1); OutputStreamContentProvider content = new OutputStreamContentProvider(); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .content(content) .send(new BufferingResponseListener() { @@ -879,7 +898,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest final CountDownLatch latch = new CountDownLatch(1); OutputStreamContentProvider content = new OutputStreamContentProvider(); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .content(content) .send(new BufferingResponseListener(data.length) { @@ -920,7 +939,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest final CountDownLatch latch = new CountDownLatch(1); OutputStreamContentProvider content = new OutputStreamContentProvider(); client.newRequest("0.0.0.1", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .content(content) .send(result -> { @@ -959,7 +978,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest final CountDownLatch completeLatch = new CountDownLatch(1); final DeferredContentProvider content = new DeferredContentProvider(); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .content(content) .onRequestBegin(request -> { @@ -1007,7 +1026,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest final CountDownLatch completeLatch = new CountDownLatch(1); client.newRequest("0.0.0.1", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .content(content) .send(result -> { @@ -1079,7 +1098,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest final CountDownLatch completeLatch = new CountDownLatch(1); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .content(provider) .onRequestCommit(request -> commit.set(true)) .send(result -> @@ -1110,7 +1129,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest InputStreamResponseListener listener = new InputStreamResponseListener(); client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) + .scheme(getScheme()) .timeout(5, TimeUnit.SECONDS) .send(listener);