diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java index 0656abbc639..2e0008aa6f3 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java @@ -279,6 +279,13 @@ public class HttpRequest implements Request return this; } + @Override + public Request onRequestCommit(CommitListener listener) + { + this.requestListeners.add(listener); + return this; + } + @Override public Request onRequestSuccess(SuccessListener listener) { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 220d96a0fa5..228fdc647e9 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -64,7 +64,7 @@ public class HttpSender public void send(HttpExchange exchange) { - if (!updateState(State.IDLE, State.SEND)) + if (!updateState(State.IDLE, State.BEGIN)) throw new IllegalStateException(); // Arrange the listeners, so that if there is a request failure the proper listeners are notified @@ -170,13 +170,35 @@ public class HttpSender } case FLUSH: { - switch (state.get()) + out: while (true) { - case SEND: - case COMMIT: - break; - default: - return; + State current = state.get(); + switch (current) + { + case BEGIN: + { + if (!updateState(current, State.SEND)) + continue; + requestNotifier.notifyHeaders(request); + break out; + } + case SEND: + case COMMIT: + { + // State update is performed after the write in commit() + break out; + } + case FAILURE: + { + // Failed concurrently, avoid the write since + // the connection is probably already closed + return; + } + default: + { + throw new IllegalStateException(); + } + } } StatefulExecutorCallback callback = new StatefulExecutorCallback(client.getExecutor()) @@ -327,12 +349,16 @@ public class HttpSender if (!updateState(current, State.COMMIT)) continue; LOG.debug("Committed {}", request); - requestNotifier.notifyHeaders(request); + requestNotifier.notifyCommit(request); return true; case COMMIT: - return updateState(current, State.COMMIT); - default: + if (!updateState(current, State.COMMIT)) + continue; + return true; + case FAILURE: return false; + default: + throw new IllegalStateException(); } } } @@ -400,7 +426,7 @@ public class HttpSender LOG.debug("Failed {} {}", request, failure); Result result = completion.getReference(); - boolean notCommitted = current == State.IDLE || current == State.SEND; + boolean notCommitted = current == State.IDLE || current == State.BEGIN || current == State.SEND; if (result == null && notCommitted && request.getAbortCause() == null) { result = exchange.responseComplete(failure).getReference(); @@ -422,7 +448,7 @@ public class HttpSender public boolean abort(HttpExchange exchange, Throwable cause) { State current = state.get(); - boolean abortable = current == State.IDLE || current == State.SEND || + boolean abortable = current == State.IDLE || current == State.BEGIN || current == State.SEND || current == State.COMMIT && contentIterator.hasNext(); return abortable && fail(cause); } @@ -445,7 +471,7 @@ public class HttpSender private enum State { - IDLE, SEND, COMMIT, FAILURE + IDLE, BEGIN, SEND, COMMIT, FAILURE } private static abstract class StatefulExecutorCallback implements Callback, Runnable diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RequestNotifier.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RequestNotifier.java index b04949b4852..db2d88668ee 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RequestNotifier.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RequestNotifier.java @@ -96,6 +96,27 @@ public class RequestNotifier } } + public void notifyCommit(Request request) + { + for (Request.CommitListener listener : request.getRequestListeners(Request.CommitListener.class)) + notifyCommit(listener, request); + for (Request.Listener listener : client.getRequestListeners()) + notifyCommit(listener, request); + } + + private void notifyCommit(Request.CommitListener listener, Request request) + { + try + { + if (listener != null) + listener.onCommit(request); + } + catch (Exception x) + { + LOG.info("Exception while notifying listener " + listener, x); + } + } + public void notifySuccess(Request request) { for (Request.SuccessListener listener : request.getRequestListeners(Request.SuccessListener.class)) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java index f52c00fdb5b..f6888486a89 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java @@ -243,6 +243,12 @@ public interface Request */ Request onRequestHeaders(HeadersListener listener); + /** + * @param listener a listener for request commit event + * @return this request object + */ + Request onRequestCommit(CommitListener listener); + /** * @param listener a listener for request success event * @return this request object @@ -367,9 +373,23 @@ public interface Request } /** - * Listener for the request committed event. + * Listener for the request headers event. */ public interface HeadersListener extends RequestListener + { + /** + * Callback method invoked when the request headers (and perhaps small content) are ready to be sent. + * The request has been converted into bytes, but not yet sent to the server, and further modifications + * to the request may have no effect. + * @param request the request that is about to be committed + */ + public void onHeaders(Request request); + } + + /** + * Listener for the request committed event. + */ + public interface CommitListener extends RequestListener { /** * Callback method invoked when the request headers (and perhaps small content) have been sent. @@ -377,7 +397,7 @@ public interface Request * request may have no effect. * @param request the request that has been committed */ - public void onHeaders(Request request); + public void onCommit(Request request); } /** @@ -409,7 +429,7 @@ public interface Request /** * Listener for all request events. */ - public interface Listener extends QueuedListener, BeginListener, HeadersListener, SuccessListener, FailureListener + public interface Listener extends QueuedListener, BeginListener, HeadersListener, CommitListener, SuccessListener, FailureListener { /** * An empty implementation of {@link Listener} @@ -431,6 +451,11 @@ public interface Request { } + @Override + public void onCommit(Request request) + { + } + @Override public void onSuccess(Request request) { diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java index 350cfd1466c..8853b881aa4 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java @@ -94,7 +94,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest final Throwable cause = new Exception(); final CountDownLatch aborted = new CountDownLatch(1); - final CountDownLatch headers = new CountDownLatch(1); + final CountDownLatch committed = new CountDownLatch(1); try { client.newRequest("localhost", connector.getLocalPort()) @@ -109,9 +109,9 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest } @Override - public void onHeaders(Request request) + public void onCommit(Request request) { - headers.countDown(); + committed.countDown(); } }) .send().get(5, TimeUnit.SECONDS); @@ -121,12 +121,51 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest { Assert.assertSame(cause, x.getCause()); Assert.assertTrue(aborted.await(5, TimeUnit.SECONDS)); - Assert.assertFalse(headers.await(1, TimeUnit.SECONDS)); + Assert.assertFalse(committed.await(1, TimeUnit.SECONDS)); + } + } + + @Slow + @Test + public void testAbortOnHeaders() throws Exception + { + start(new EmptyServerHandler()); + + final Throwable cause = new Exception(); + final CountDownLatch aborted = new CountDownLatch(1); + final CountDownLatch committed = new CountDownLatch(1); + try + { + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .listener(new Request.Listener.Empty() + { + @Override + public void onHeaders(Request request) + { + if (request.abort(cause)) + aborted.countDown(); + } + + @Override + public void onCommit(Request request) + { + committed.countDown(); + } + }) + .send().get(5, TimeUnit.SECONDS); + Assert.fail(); + } + catch (ExecutionException x) + { + Assert.assertSame(cause, x.getCause()); + Assert.assertTrue(aborted.await(5, TimeUnit.SECONDS)); + Assert.assertFalse(committed.await(1, TimeUnit.SECONDS)); } } @Test - public void testAbortOnHeaders() throws Exception + public void testAbortOnCommit() throws Exception { start(new EmptyServerHandler()); @@ -140,10 +179,10 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest { ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .onRequestHeaders(new Request.HeadersListener() + .onRequestCommit(new Request.CommitListener() { @Override - public void onHeaders(Request request) + public void onCommit(Request request) { if (request.abort(cause)) aborted.countDown(); @@ -161,7 +200,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest } @Test - public void testAbortOnHeadersWithContent() throws Exception + public void testAbortOnCommitWithContent() throws Exception { final AtomicReference failure = new AtomicReference<>(); start(new AbstractHandler() @@ -188,10 +227,10 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest { client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .onRequestHeaders(new Request.HeadersListener() + .onRequestCommit(new Request.CommitListener() { @Override - public void onHeaders(Request request) + public void onCommit(Request request) { request.abort(cause); }