diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java index 4bdd91d1068..b80eb86a2b4 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java @@ -30,6 +30,7 @@ import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -66,31 +67,18 @@ public class AuthenticationProtocolHandler implements ProtocolHandler return new AuthenticationListener(); } - private class AuthenticationListener extends Response.Listener.Adapter + private class AuthenticationListener extends BufferingResponseListener { - private byte[] buffer = new byte[0]; - - @Override - public void onContent(Response response, ByteBuffer content) + private AuthenticationListener() { - if (buffer.length == maxContentLength) - return; - - long newLength = buffer.length + content.remaining(); - if (newLength > maxContentLength) - newLength = maxContentLength; - - byte[] newBuffer = new byte[(int)newLength]; - System.arraycopy(buffer, 0, newBuffer, 0, buffer.length); - content.get(newBuffer, buffer.length, content.remaining()); - buffer = newBuffer; + super(maxContentLength); } @Override public void onComplete(Result result) { Request request = result.getRequest(); - ContentResponse response = new HttpContentResponse(result.getResponse(), buffer); + ContentResponse response = new HttpContentResponse(result.getResponse(), getContent()); if (result.isFailed()) { Throwable failure = result.getFailure(); @@ -136,7 +124,7 @@ public class AuthenticationProtocolHandler implements ProtocolHandler } authnResult.apply(request); - request.send(new Adapter() + request.send(new Response.Listener.Empty() { @Override public void onSuccess(Response response) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index 797a380841a..b4cd156c8bc 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -250,6 +250,11 @@ public class HttpConnection extends AbstractConnection implements Connection } } + public void abort(HttpResponse response) + { + receiver.fail(new HttpResponseException("Response aborted", response)); + } + @Override public void close() { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index 26024e83c96..4e6f669bc10 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -42,7 +42,7 @@ public class HttpExchange this.connection = connection; this.request = request; this.listener = listener; - this.response = new HttpResponse(listener); + this.response = new HttpResponse(this, listener); } public HttpConversation conversation() @@ -117,6 +117,12 @@ public class HttpExchange return false; } + public void abort() + { + LOG.debug("Aborting {}", response); + connection.abort(response); + } + @Override public String toString() { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java index deacc2037ae..74cb37084e8 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java @@ -31,15 +31,13 @@ import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.util.BlockingResponseListener; import org.eclipse.jetty.client.util.PathContentProvider; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.util.Fields; -import org.eclipse.jetty.util.FutureCallback; public class HttpRequest implements Request { @@ -59,6 +57,7 @@ public class HttpRequest implements Request private Listener listener; private ContentProvider content; private boolean followRedirects = true; + private volatile boolean aborted; public HttpRequest(HttpClient client, URI uri) { @@ -296,22 +295,9 @@ public class HttpRequest implements Request @Override public Future send() { - final FutureCallback callback = new FutureCallback<>(); - BufferingResponseListener listener = new BufferingResponseListener() - { - @Override - public void onComplete(Result result) - { - super.onComplete(result); - HttpContentResponse contentResponse = new HttpContentResponse(result.getResponse(), content()); - if (!result.isFailed()) - callback.completed(contentResponse); - else - callback.failed(contentResponse, result.getFailure()); - } - }; + BlockingResponseListener listener = new BlockingResponseListener(); send(listener); - return callback; + return listener; } @Override @@ -320,6 +306,18 @@ public class HttpRequest implements Request client.send(this, listener); } + @Override + public void abort() + { + aborted = true; + } + + @Override + public boolean aborted() + { + return aborted; + } + @Override public String toString() { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequestException.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequestException.java new file mode 100644 index 00000000000..0541eee14ca --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequestException.java @@ -0,0 +1,37 @@ +// +// ======================================================================== +// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import org.eclipse.jetty.client.api.Request; + +public class HttpRequestException extends Throwable +{ + private final Request request; + + public HttpRequestException(String message, Request request) + { + super(message); + this.request = request; + } + + public Request getRequest() + { + return request; + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpResponse.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpResponse.java index 5f9c2468574..42be83cc9c3 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpResponse.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpResponse.java @@ -25,13 +25,15 @@ import org.eclipse.jetty.http.HttpVersion; public class HttpResponse implements Response { private final HttpFields headers = new HttpFields(); + private final HttpExchange exchange; private final Listener listener; private HttpVersion version; private int status; private String reason; - public HttpResponse(Response.Listener listener) + public HttpResponse(HttpExchange exchange, Listener listener) { + this.exchange = exchange; this.listener = listener; } @@ -84,7 +86,7 @@ public class HttpResponse implements Response @Override public void abort() { -// request.abort(); + exchange.abort(); } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 11f0b4518dd..e75854fc1bf 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -60,12 +60,20 @@ public class HttpSender public void send(HttpExchange exchange) { - LOG.debug("Sending {}", exchange.request()); - requestNotifier.notifyBegin(exchange.request()); - ContentProvider content = exchange.request().content(); - this.contentLength = content == null ? -1 : content.length(); - this.contentChunks = content == null ? Collections.emptyIterator() : content.iterator(); - send(); + Request request = exchange.request(); + if (request.aborted()) + { + fail(new HttpRequestException("Request aborted", request)); + } + else + { + LOG.debug("Sending {}", request); + requestNotifier.notifyBegin(request); + ContentProvider content = request.content(); + this.contentLength = content == null ? -1 : content.length(); + this.contentChunks = content == null ? Collections.emptyIterator() : content.iterator(); + send(); + } } private void send() @@ -122,43 +130,45 @@ public class HttpSender } case FLUSH: { - StatefulExecutorCallback callback = new StatefulExecutorCallback(client.getExecutor()) + if (request.aborted()) { - @Override - protected void pendingCompleted() + fail(new HttpRequestException("Request aborted", request)); + } + else + { + StatefulExecutorCallback callback = new StatefulExecutorCallback(client.getExecutor()) + { + @Override + protected void pendingCompleted() + { + if (!committed) + committed(request); + send(); + } + + @Override + protected void failed(Throwable x) + { + fail(x); + } + }; + if (header == null) + header = BufferUtil.EMPTY_BUFFER; + if (chunk == null) + chunk = BufferUtil.EMPTY_BUFFER; + endPoint.write(null, callback, header, chunk, content); + if (callback.pending()) + return; + + if (callback.completed()) { if (!committed) committed(request); - send(); + + releaseBuffers(); + content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER; + lastContent = !contentChunks.hasNext(); } - - @Override - protected void failed(Throwable x) - { - fail(x); - } - }; - if (header == null) - header = BufferUtil.EMPTY_BUFFER; - if (chunk == null) - chunk = BufferUtil.EMPTY_BUFFER; - LOG.debug("Writing {} {} {}", header, chunk, content); - endPoint.write(null, callback, header, chunk, content); - if (callback.pending()) - { - LOG.debug("Write incomplete {} {} {}", header, chunk, content); - return; - } - - if (callback.completed()) - { - LOG.debug("Write complete {} {} {}", header, chunk, content); - if (!committed) - committed(request); - - releaseBuffers(); - content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER; - lastContent = !contentChunks.hasNext(); } break; } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java index 37275590567..1d745b2f2f2 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java @@ -24,7 +24,7 @@ import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpMethod; -public class RedirectProtocolHandler extends Response.Listener.Adapter implements ProtocolHandler +public class RedirectProtocolHandler extends Response.Listener.Empty implements ProtocolHandler { private static final String ATTRIBUTE = RedirectProtocolHandler.class.getName() + ".redirect"; @@ -128,7 +128,7 @@ public class RedirectProtocolHandler extends Response.Listener.Adapter implement // Copy content redirect.content(request.content()); - redirect.send(new Adapter()); + redirect.send(new Response.Listener.Empty()); } else { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java index f8e7bf3533f..b1b45bfbf43 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java @@ -230,6 +230,18 @@ public interface Request */ void send(Response.Listener listener); + /** + * Attempts to abort the send of this request. + * + * @see #aborted() + */ + void abort(); + + /** + * @return whether {@link #abort()} was called + */ + boolean aborted(); + /** * Listener for request events */ diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Response.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Response.java index f4caa38f08f..824bc334a01 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Response.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Response.java @@ -51,7 +51,7 @@ public interface Response public void onComplete(Result result); - public static class Adapter implements Listener + public static class Empty implements Listener { @Override public void onBegin(Response response) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BlockingResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BlockingResponseListener.java new file mode 100644 index 00000000000..39d619782af --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BlockingResponseListener.java @@ -0,0 +1,117 @@ +// +// ======================================================================== +// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client.util; + +import java.nio.ByteBuffer; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.eclipse.jetty.client.HttpContentResponse; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; + +public class BlockingResponseListener extends BufferingResponseListener implements Future +{ + private final CountDownLatch latch = new CountDownLatch(1); + private ContentResponse response; + private Throwable failure; + private volatile boolean cancelled; + + @Override + public void onBegin(Response response) + { + super.onBegin(response); + if (cancelled) + response.abort(); + } + + @Override + public void onHeaders(Response response) + { + super.onHeaders(response); + if (cancelled) + response.abort(); + } + + @Override + public void onContent(Response response, ByteBuffer content) + { + super.onContent(response, content); + if (cancelled) + response.abort(); + } + + @Override + public void onComplete(Result result) + { + super.onComplete(result); + response = new HttpContentResponse(result.getResponse(), getContent()); + failure = result.getFailure(); + latch.countDown(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + cancelled = true; + return latch.getCount() == 0; + } + + @Override + public boolean isCancelled() + { + return cancelled; + } + + @Override + public boolean isDone() + { + return latch.getCount() == 0 || isCancelled(); + } + + @Override + public ContentResponse get() throws InterruptedException, ExecutionException + { + latch.await(); + return result(); + } + + @Override + public ContentResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + boolean expired = !latch.await(timeout, unit); + if (expired) + throw new TimeoutException(); + return result(); + } + + private ContentResponse result() throws ExecutionException + { + if (isCancelled()) + throw (CancellationException)new CancellationException().initCause(failure); + if (failure != null) + throw new ExecutionException(failure); + return response; + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java index 0db2571ff23..04e426a2cb1 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java @@ -18,38 +18,32 @@ package org.eclipse.jetty.client.util; +import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.nio.charset.UnsupportedCharsetException; import org.eclipse.jetty.client.api.Response; -public class BufferingResponseListener extends Response.Listener.Adapter +public class BufferingResponseListener extends Response.Listener.Empty { - private final CountDownLatch latch = new CountDownLatch(1); - private final int maxCapacity; - private Response response; - private Throwable failure; - private byte[] buffer = new byte[0]; + private final int maxLength; + private volatile byte[] buffer = new byte[0]; public BufferingResponseListener() { - this(16 * 1024 * 1024); + this(2 * 1024 * 1024); } - public BufferingResponseListener(int maxCapacity) + public BufferingResponseListener(int maxLength) { - this.maxCapacity = maxCapacity; + this.maxLength = maxLength; } @Override public void onContent(Response response, ByteBuffer content) { long newLength = buffer.length + content.remaining(); - if (newLength > maxCapacity) + if (newLength > maxLength) throw new IllegalStateException("Buffering capacity exceeded"); byte[] newBuffer = new byte[(int)newLength]; @@ -58,38 +52,20 @@ public class BufferingResponseListener extends Response.Listener.Adapter buffer = newBuffer; } - @Override - public void onSuccess(Response response) - { - this.response = response; - latch.countDown(); - } - - @Override - public void onFailure(Response response, Throwable failure) - { - this.response = response; - this.failure = failure; - latch.countDown(); - } - - public Response await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException - { - boolean expired = !latch.await(timeout, unit); - if (failure != null) - throw new ExecutionException(failure); - if (expired) - throw new TimeoutException(); - return response; - } - - public byte[] content() + public byte[] getContent() { return buffer; } - public String contentAsString(String encoding) + public String getContent(String encoding) { - return new String(content(), Charset.forName(encoding)); + try + { + return new String(getContent(), encoding); + } + catch (UnsupportedEncodingException x) + { + throw new UnsupportedCharsetException(encoding); + } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/StreamingResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/StreamingResponseListener.java index 1337c1d0e48..e0402b01284 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/StreamingResponseListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/StreamingResponseListener.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.client.api.Response; -public class StreamingResponseListener extends Response.Listener.Adapter +public class StreamingResponseListener extends Response.Listener.Empty { public Response get(long timeout, TimeUnit seconds) { diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java index fb5cfd8514f..7a06dca80c5 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java @@ -113,7 +113,7 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest // Request without Authentication causes a 401 Request request = client.newRequest("localhost", connector.getLocalPort()).path("/test"); - ContentResponse response = request.send().get(555, TimeUnit.SECONDS); + ContentResponse response = request.send().get(5, TimeUnit.SECONDS); Assert.assertNotNull(response); Assert.assertEquals(401, response.status()); Assert.assertEquals(1, requests.get()); @@ -133,7 +133,7 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest client.getRequestListeners().add(requestListener); // Request with authentication causes a 401 (no previous successful authentication) + 200 - response = request.send().get(555, TimeUnit.SECONDS); + response = request.send().get(5, TimeUnit.SECONDS); Assert.assertNotNull(response); Assert.assertEquals(200, response.status()); Assert.assertEquals(2, requests.get()); @@ -153,7 +153,7 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest // Further requests do not trigger 401 because there is a previous successful authentication // Remove existing header to be sure it's added by the implementation request.header(HttpHeader.AUTHORIZATION.asString(), null); - response = request.send().get(555, TimeUnit.SECONDS); + response = request.send().get(5, TimeUnit.SECONDS); Assert.assertNotNull(response); Assert.assertEquals(200, response.status()); Assert.assertEquals(1, requests.get()); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index ac58761f4fb..fdab9f17dbb 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -234,7 +234,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest } } }) - .send(new Response.Listener.Adapter() + .send(new Response.Listener.Empty() { @Override public void onSuccess(Response response) @@ -253,7 +253,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest latch.countDown(); } }) - .send(new Response.Listener.Adapter() + .send(new Response.Listener.Empty() { @Override public void onSuccess(Response response) @@ -299,7 +299,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest latch.countDown(); } }) - .send(new Response.Listener.Adapter() + .send(new Response.Listener.Empty() { @Override public void onFailure(Response response, Throwable failure) @@ -309,7 +309,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest }); client.newRequest("http://localhost:" + connector.getLocalPort()) - .send(new Response.Listener.Adapter() + .send(new Response.Listener.Empty() { @Override public void onSuccess(Response response) @@ -354,7 +354,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest latch.countDown(); } }) - .send(new Response.Listener.Adapter() + .send(new Response.Listener.Empty() { @Override public void onSuccess(Response response) @@ -414,7 +414,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest return Arrays.asList(ByteBuffer.allocate(chunkSize), null).iterator(); } }) - .send(new Response.Listener.Adapter() + .send(new Response.Listener.Empty() { @Override public void onComplete(Result result) @@ -444,7 +444,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest destination.getActiveConnections().peek().close(); } }) - .send(new Response.Listener.Adapter() + .send(new Response.Listener.Empty() { @Override public void onComplete(Result result) diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java index 32a74d9cf17..5746d4a6d16 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java @@ -66,7 +66,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest successLatch.countDown(); } }) - .send(new Response.Listener.Adapter() + .send(new Response.Listener.Empty() { @Override public void onHeaders(Response response) @@ -129,7 +129,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest { failureLatch.countDown(); } - }).send(new Response.Listener.Adapter() + }).send(new Response.Listener.Empty() { @Override public void onComplete(Result result) @@ -181,7 +181,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest successLatch.countDown(); } }) - .send(new Response.Listener.Adapter() + .send(new Response.Listener.Empty() { @Override public void onSuccess(Response response) @@ -234,7 +234,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest failureLatch.countDown(); } }) - .send(new Response.Listener.Adapter() + .send(new Response.Listener.Empty() { @Override public void onComplete(Result result) @@ -276,7 +276,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest final CountDownLatch latch = new CountDownLatch(1); client.newRequest(host, port) - .send(new Response.Listener.Adapter() + .send(new Response.Listener.Empty() { @Override public void onComplete(Result result) @@ -321,7 +321,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest final CountDownLatch latch = new CountDownLatch(1); client.newRequest(host, port) .content(new ByteBufferContentProvider(ByteBuffer.allocate(16 * 1024 * 1024))) - .send(new Response.Listener.Adapter() + .send(new Response.Listener.Empty() { @Override public void onComplete(Result result) diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java index 07f300de06d..d8a8ef4b817 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java @@ -26,7 +26,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.util.BlockingResponseListener; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpVersion; @@ -66,6 +66,7 @@ public class HttpReceiverTest HttpExchange exchange = new HttpExchange(conversation, connection, null, listener); conversation.exchanges().offer(exchange); connection.setExchange(exchange); + exchange.requestComplete(true); return exchange; } @@ -78,7 +79,7 @@ public class HttpReceiverTest "\r\n"); final AtomicReference responseRef = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - HttpExchange exchange = newExchange(new Response.Listener.Adapter() + HttpExchange exchange = newExchange(new Response.Listener.Empty() { @Override public void onSuccess(Response response) @@ -110,11 +111,11 @@ public class HttpReceiverTest "Content-length: " + content.length() + "\r\n" + "\r\n" + content); - BufferingResponseListener listener = new BufferingResponseListener(); + BlockingResponseListener listener = new BlockingResponseListener(); HttpExchange exchange = newExchange(listener); exchange.receive(); - Response response = listener.await(5, TimeUnit.SECONDS); + Response response = listener.get(5, TimeUnit.SECONDS); Assert.assertNotNull(response); Assert.assertEquals(200, response.status()); Assert.assertEquals("OK", response.reason()); @@ -123,7 +124,7 @@ public class HttpReceiverTest Assert.assertNotNull(headers); Assert.assertEquals(1, headers.size()); Assert.assertEquals(String.valueOf(content.length()), headers.get(HttpHeader.CONTENT_LENGTH)); - String received = listener.contentAsString("UTF-8"); + String received = listener.getContent("UTF-8"); Assert.assertEquals(content, received); } @@ -137,7 +138,7 @@ public class HttpReceiverTest "Content-length: " + (content1.length() + content2.length()) + "\r\n" + "\r\n" + content1); - BufferingResponseListener listener = new BufferingResponseListener(); + BlockingResponseListener listener = new BlockingResponseListener(); HttpExchange exchange = newExchange(listener); exchange.receive(); endPoint.setInputEOF(); @@ -145,7 +146,7 @@ public class HttpReceiverTest try { - listener.await(5, TimeUnit.SECONDS); + listener.get(5, TimeUnit.SECONDS); Assert.fail(); } catch (ExecutionException e) @@ -161,7 +162,7 @@ public class HttpReceiverTest "HTTP/1.1 200 OK\r\n" + "Content-length: 1\r\n" + "\r\n"); - BufferingResponseListener listener = new BufferingResponseListener(); + BlockingResponseListener listener = new BlockingResponseListener(); HttpExchange exchange = newExchange(listener); exchange.receive(); // Simulate an idle timeout @@ -169,7 +170,7 @@ public class HttpReceiverTest try { - listener.await(5, TimeUnit.SECONDS); + listener.get(5, TimeUnit.SECONDS); Assert.fail(); } catch (ExecutionException e) @@ -185,13 +186,13 @@ public class HttpReceiverTest "HTTP/1.1 200 OK\r\n" + "Content-length: A\r\n" + "\r\n"); - BufferingResponseListener listener = new BufferingResponseListener(); + BlockingResponseListener listener = new BlockingResponseListener(); HttpExchange exchange = newExchange(listener); exchange.receive(); try { - listener.await(5, TimeUnit.SECONDS); + listener.get(5, TimeUnit.SECONDS); Assert.fail(); } catch (ExecutionException e) diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java new file mode 100644 index 00000000000..a6c15f85246 --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java @@ -0,0 +1,188 @@ +// +// ======================================================================== +// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.util.ByteBufferContentProvider; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.IO; +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class HttpRequestAbortTest extends AbstractHttpClientServerTest +{ + @Test + public void testAbortOnQueued() throws Exception + { + start(new EmptyServerHandler()); + + final AtomicBoolean begin = new AtomicBoolean(); + try + { + client.newRequest("localhost", connector.getLocalPort()) + .listener(new Request.Listener.Empty() + { + @Override + public void onQueued(Request request) + { + request.abort(); + } + + @Override + public void onBegin(Request request) + { + begin.set(true); + } + }) + .send().get(5, TimeUnit.SECONDS); + fail(); + } + catch (ExecutionException x) + { + HttpRequestException xx = (HttpRequestException)x.getCause(); + Request request = xx.getRequest(); + Assert.assertNotNull(request); + Assert.assertFalse(begin.get()); + } + } + + @Test + public void testAbortOnBegin() throws Exception + { + start(new EmptyServerHandler()); + + final AtomicBoolean headers = new AtomicBoolean(); + try + { + client.newRequest("localhost", connector.getLocalPort()) + .listener(new Request.Listener.Empty() + { + @Override + public void onBegin(Request request) + { + request.abort(); + } + + @Override + public void onHeaders(Request request) + { + headers.set(true); + } + }) + .send().get(5, TimeUnit.SECONDS); + fail(); + } + catch (ExecutionException x) + { + HttpRequestException xx = (HttpRequestException)x.getCause(); + Request request = xx.getRequest(); + Assert.assertNotNull(request); + Assert.assertFalse(headers.get()); + } + } + + @Test + public void testAbortOnHeaders() throws Exception + { + start(new EmptyServerHandler()); + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .listener(new Request.Listener.Empty() + { + @Override + public void onHeaders(Request request) + { + // Too late to abort + request.abort(); + } + }) + .send().get(5, TimeUnit.SECONDS); + assertEquals(200, response.status()); + } + + @Test + public void testAbortOnHeadersWithContent() throws Exception + { + final AtomicReference failure = new AtomicReference<>(); + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + baseRequest.setHandled(true); + IO.copy(request.getInputStream(), response.getOutputStream()); + } + catch (IOException x) + { + failure.set(x); + throw x; + } + } + }); + + // Test can behave in 2 ways: + // A) if the request is failed before the request arrived, then we get an ExecutionException + // B) if the request is failed after the request arrived, then we get a 500 + try + { + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .listener(new Request.Listener.Empty() + { + @Override + public void onHeaders(Request request) + { + request.abort(); + } + }) + .content(new ByteBufferContentProvider(ByteBuffer.wrap(new byte[]{0}), ByteBuffer.wrap(new byte[]{1})) + { + @Override + public long length() + { + return -1; + } + }) + .send().get(5, TimeUnit.SECONDS); + Assert.assertNotNull(failure.get()); + assertEquals(500, response.status()); + } + catch (ExecutionException x) + { + HttpRequestException xx = (HttpRequestException)x.getCause(); + Request request = xx.getRequest(); + Assert.assertNotNull(request); + } + } +} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java new file mode 100644 index 00000000000..30a5dbb5cca --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java @@ -0,0 +1,169 @@ +// +// ======================================================================== +// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.junit.Assert; +import org.junit.Test; + +public class HttpResponseAbortTest extends AbstractHttpClientServerTest +{ + @Test + public void testAbortOnBegin() throws Exception + { + start(new EmptyServerHandler()); + + final CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .send(new Response.Listener.Empty() + { + @Override + public void onBegin(Response response) + { + response.abort(); + } + + @Override + public void onComplete(Result result) + { + Assert.assertTrue(result.isFailed()); + latch.countDown(); + } + }); + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testAbortOnHeaders() throws Exception + { + start(new EmptyServerHandler()); + + final CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .send(new Response.Listener.Empty() + { + @Override + public void onHeaders(Response response) + { + response.abort(); + } + + @Override + public void onComplete(Result result) + { + Assert.assertTrue(result.isFailed()); + latch.countDown(); + } + }); + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testAbortOnContent() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + baseRequest.setHandled(true); + OutputStream output = response.getOutputStream(); + output.write(1); + output.flush(); + output.write(2); + output.flush(); + } + catch (IOException ignored) + { + // The client may have already closed, and we'll get an exception here, but it's expected + } + } + }); + + final CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .send(new Response.Listener.Empty() + { + @Override + public void onContent(Response response, ByteBuffer content) + { + response.abort(); + } + + @Override + public void onComplete(Result result) + { + Assert.assertTrue(result.isFailed()); + latch.countDown(); + } + }); + Assert.assertTrue(latch.await(555, TimeUnit.SECONDS)); + } + + @Test(expected = CancellationException.class) + public void testCancelFuture() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference> ref = new AtomicReference<>(); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + latch.await(5, TimeUnit.SECONDS); + baseRequest.setHandled(true); + ref.get().cancel(true); + OutputStream output = response.getOutputStream(); + output.write(new byte[]{0, 1, 2, 3, 4, 5, 6, 7}); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + }); + + Future future = client.newRequest("localhost", connector.getLocalPort()).send(); + ref.set(future); + latch.countDown(); + + future.get(5, TimeUnit.SECONDS); + } +} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java index 8f50c91fa9a..9a5c0ec891c 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java @@ -129,7 +129,7 @@ public class HttpSenderTest failureLatch.countDown(); } }); - connection.send(request, new Response.Listener.Adapter() + connection.send(request, new Response.Listener.Empty() { @Override public void onComplete(Result result) @@ -158,7 +158,7 @@ public class HttpSenderTest failureLatch.countDown(); } }); - connection.send(request, new Response.Listener.Adapter() + connection.send(request, new Response.Listener.Empty() { @Override public void onComplete(Result result) diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/api/Usage.java b/jetty-client/src/test/java/org/eclipse/jetty/client/api/Usage.java index 7a83c11eaca..55dbec3e87c 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/api/Usage.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/api/Usage.java @@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.util.BlockingResponseListener; import org.eclipse.jetty.client.util.PathContentProvider; import org.eclipse.jetty.client.util.StreamingResponseListener; import org.eclipse.jetty.http.HttpCookie; @@ -82,7 +82,7 @@ public class Usage HttpClient client = new HttpClient(); final AtomicReference responseRef = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - client.newRequest("localhost", 8080).send(new Response.Listener.Adapter() + client.newRequest("localhost", 8080).send(new Response.Listener.Empty() { @Override public void onSuccess(Response response) @@ -119,9 +119,9 @@ public class Usage try (Connection connection = client.getDestination("http", "localhost", 8080).newConnection().get(5, TimeUnit.SECONDS)) { Request request = client.newRequest("localhost", 8080); - BufferingResponseListener listener = new BufferingResponseListener(); + BlockingResponseListener listener = new BlockingResponseListener(); connection.send(request, listener); - Response response = listener.await(5, TimeUnit.SECONDS); + Response response = listener.get(5, TimeUnit.SECONDS); Assert.assertNotNull(response); Assert.assertEquals(200, response.status()); } diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java index 44c25974365..cc1d00be436 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java @@ -1111,6 +1111,11 @@ public class HttpParser } break; } + case CLOSED: + { + BufferUtil.clear(buffer); + return false; + } } } @@ -1159,7 +1164,7 @@ public class HttpParser case CLOSED: break; - + default: setState(State.END); if (!_headResponse)