From 816b85ea4dbc8513c71d7862161aeda5a2b9c02c Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 18 Jul 2014 15:57:23 +0200 Subject: [PATCH] 439895 - No event callback should be invoked after the "failure" callback. Fixed HttpSender and HttpReceiver to use a non-blocking collaborative mechanism to notify callbacks. Only the "failed" callback can run concurrently with other callbacks. No other callback can run after the "complete" callback: a failure concurrent with another callback will notify the "failed" callback, finish the running callback and only then invoke the "complete" callback. --- .../eclipse/jetty/client/HttpReceiver.java | 102 ++++++--- .../org/eclipse/jetty/client/HttpSender.java | 136 +++++++----- .../jetty/client/http/HttpSenderOverHTTP.java | 5 +- .../util/InputStreamResponseListener.java | 41 ++-- .../HttpResponseConcurrentAbortTest.java | 198 ++++++++++++++++++ 5 files changed, 384 insertions(+), 98 deletions(-) create mode 100644 jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseConcurrentAbortTest.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index ec902529240..356413a04ce 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -49,9 +49,8 @@ import org.eclipse.jetty.util.log.Logger; * is available *
  • {@link #responseHeader(HttpExchange, HttpField)}, when a HTTP field is available
  • *
  • {@link #responseHeaders(HttpExchange)}, when all HTTP headers are available
  • - *
  • {@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available; this is the only - * method that may be invoked multiple times with different buffers containing different content
  • - *
  • {@link #responseSuccess(HttpExchange)}, when the response is complete
  • + *
  • {@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available
  • + *
  • {@link #responseSuccess(HttpExchange)}, when the response is successful
  • * * At any time, subclasses may invoke {@link #responseFailure(Throwable)} to indicate that the response has failed * (for example, because of I/O exceptions). @@ -69,7 +68,8 @@ public abstract class HttpReceiver private final AtomicReference responseState = new AtomicReference<>(ResponseState.IDLE); private final HttpChannel channel; - private volatile ContentDecoder decoder; + private ContentDecoder decoder; + private Throwable failure; protected HttpReceiver(HttpChannel channel) { @@ -104,7 +104,7 @@ public abstract class HttpReceiver */ protected boolean responseBegin(HttpExchange exchange) { - if (!updateResponseState(ResponseState.IDLE, ResponseState.BEGIN)) + if (!updateResponseState(ResponseState.IDLE, ResponseState.TRANSIENT)) return false; HttpConversation conversation = exchange.getConversation(); @@ -127,6 +127,9 @@ public abstract class HttpReceiver ResponseNotifier notifier = destination.getResponseNotifier(); notifier.notifyBegin(conversation.getResponseListeners(), response); + if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN)) + terminateResponse(exchange, failure); + return true; } @@ -152,7 +155,7 @@ public abstract class HttpReceiver case BEGIN: case HEADER: { - if (updateResponseState(current, ResponseState.HEADER)) + if (updateResponseState(current, ResponseState.TRANSIENT)) break out; break; } @@ -188,6 +191,9 @@ public abstract class HttpReceiver } } + if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER)) + terminateResponse(exchange, failure); + return true; } @@ -228,7 +234,7 @@ public abstract class HttpReceiver case BEGIN: case HEADER: { - if (updateResponseState(current, ResponseState.HEADERS)) + if (updateResponseState(current, ResponseState.TRANSIENT)) break out; break; } @@ -261,6 +267,9 @@ public abstract class HttpReceiver } } + if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS)) + terminateResponse(exchange, failure); + return true; } @@ -283,7 +292,7 @@ public abstract class HttpReceiver case HEADERS: case CONTENT: { - if (updateResponseState(current, ResponseState.CONTENT)) + if (updateResponseState(current, ResponseState.TRANSIENT)) break out; break; } @@ -312,6 +321,9 @@ public abstract class HttpReceiver ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); notifier.notifyContent(exchange.getConversation().getResponseListeners(), response, buffer, callback); + if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) + terminateResponse(exchange, failure); + return true; } @@ -332,16 +344,17 @@ public abstract class HttpReceiver if (!completed) return false; - // Reset to be ready for another response + responseState.set(ResponseState.IDLE); + + // Reset to be ready for another response. reset(); // Mark atomically the response as terminated and succeeded, // with respect to concurrency between request and response. - // If there is a non-null result, then both sender and - // receiver are reset and ready to be reused, and the - // connection closed/pooled (depending on the transport). Result result = exchange.terminateResponse(null); + // It is important to notify *after* we reset and terminate + // because the notification may trigger another request/response. HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) LOG.debug("Response success {}", response); @@ -349,17 +362,7 @@ public abstract class HttpReceiver ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); notifier.notifySuccess(listeners, response); - if (result != null) - { - boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering(); - if (!ordered) - channel.exchangeTerminated(result); - if (LOG.isDebugEnabled()) - LOG.debug("Request/Response succeeded {}", response); - notifier.notifyComplete(listeners, result); - if (ordered) - channel.exchangeTerminated(result); - } + terminateResponse(exchange, result); return true; } @@ -388,7 +391,20 @@ public abstract class HttpReceiver if (!completed) return false; - // Dispose to avoid further responses + this.failure = failure; + + // Update the state to avoid more response processing. + boolean fail; + while (true) + { + ResponseState current = responseState.get(); + if (updateResponseState(current, ResponseState.FAILURE)) + { + fail = current != ResponseState.TRANSIENT; + break; + } + } + dispose(); // Mark atomically the response as terminated and failed, @@ -402,19 +418,45 @@ public abstract class HttpReceiver ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); notifier.notifyFailure(listeners, response, failure); + if (fail) + { + terminateResponse(exchange, result); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Concurrent failure: response termination skipped, performed by helpers"); + } + + return true; + } + + private void terminateResponse(HttpExchange exchange, Throwable failure) + { + Result result = exchange.terminateResponse(failure); + terminateResponse(exchange, result); + } + + private void terminateResponse(HttpExchange exchange, Result result) + { + HttpResponse response = exchange.getResponse(); + + if (LOG.isDebugEnabled()) + LOG.debug("Response complete {}", response); + if (result != null) { boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering(); if (!ordered) channel.exchangeTerminated(result); if (LOG.isDebugEnabled()) - LOG.debug("Request/Response failed {}", response); + LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", response); + List listeners = exchange.getConversation().getResponseListeners(); + ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); notifier.notifyComplete(listeners, result); if (ordered) channel.exchangeTerminated(result); } - - return true; } /** @@ -427,7 +469,6 @@ public abstract class HttpReceiver protected void reset() { decoder = null; - responseState.set(ResponseState.IDLE); } /** @@ -440,7 +481,6 @@ public abstract class HttpReceiver protected void dispose() { decoder = null; - responseState.set(ResponseState.FAILURE); } public boolean abort(Throwable cause) @@ -464,6 +504,10 @@ public abstract class HttpReceiver */ private enum ResponseState { + /** + * One of the response*() methods is being executed. + */ + TRANSIENT, /** * The response is not yet received, the initial state */ diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 26302064d62..85d4d0b55ad 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -65,7 +65,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener private final IteratingCallback contentCallback = new ContentCallback(); private final Callback lastCallback = new LastContentCallback(); private final HttpChannel channel; - private volatile HttpContent content; + private HttpContent content; + private Throwable failure; protected HttpSender(HttpChannel channel) { @@ -197,34 +198,40 @@ public abstract class HttpSender implements AsyncContentProvider.Listener protected boolean queuedToBegin(Request request) { - if (!updateRequestState(RequestState.QUEUED, RequestState.BEGIN)) + if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT)) return false; if (LOG.isDebugEnabled()) LOG.debug("Request begin {}", request); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); notifier.notifyBegin(request); + if (!updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN)) + terminateRequest(getHttpExchange(), failure, false); return true; } protected boolean beginToHeaders(Request request) { - if (!updateRequestState(RequestState.BEGIN, RequestState.HEADERS)) + if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT)) return false; if (LOG.isDebugEnabled()) LOG.debug("Request headers {}{}{}", request, System.getProperty("line.separator"), request.getHeaders().toString().trim()); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); notifier.notifyHeaders(request); + if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS)) + terminateRequest(getHttpExchange(), failure, false); return true; } protected boolean headersToCommit(Request request) { - if (!updateRequestState(RequestState.HEADERS, RequestState.COMMIT)) + if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT)) return false; if (LOG.isDebugEnabled()) LOG.debug("Request committed {}", request); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); notifier.notifyCommit(request); + if (!updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT)) + terminateRequest(getHttpExchange(), failure, true); return true; } @@ -236,21 +243,19 @@ public abstract class HttpSender implements AsyncContentProvider.Listener case COMMIT: case CONTENT: { - if (!updateRequestState(current, RequestState.CONTENT)) + if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT)) return false; if (LOG.isDebugEnabled()) LOG.debug("Request content {}{}{}", request, System.getProperty("line.separator"), BufferUtil.toDetailString(content)); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); notifier.notifyContent(request, content); + if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT)) + terminateRequest(getHttpExchange(), failure, true); return true; } - case FAILURE: - { - return false; - } default: { - throw new IllegalStateException(current.toString()); + return false; } } } @@ -269,43 +274,28 @@ public abstract class HttpSender implements AsyncContentProvider.Listener if (!completed) return false; - // Reset to be ready for another request + requestState.set(RequestState.QUEUED); + + // Reset to be ready for another request. reset(); // Mark atomically the request as terminated and succeeded, // with respect to concurrency between request and response. Result result = exchange.terminateRequest(null); - // It is important to notify completion *after* we reset because - // the notification may trigger another request/response Request request = exchange.getRequest(); if (LOG.isDebugEnabled()) LOG.debug("Request success {}", request); HttpDestination destination = getHttpChannel().getHttpDestination(); destination.getRequestNotifier().notifySuccess(exchange.getRequest()); - if (result != null) - { - boolean ordered = destination.getHttpClient().isStrictEventOrdering(); - if (!ordered) - channel.exchangeTerminated(result); - if (LOG.isDebugEnabled()) - LOG.debug("Request/Response succeded {}", request); - HttpConversation conversation = exchange.getConversation(); - destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result); - if (ordered) - channel.exchangeTerminated(result); - } + terminateRequest(exchange, null, true, result); return true; } - case FAILURE: - { - return false; - } default: { - throw new IllegalStateException(current.toString()); + return false; } } } @@ -322,8 +312,22 @@ public abstract class HttpSender implements AsyncContentProvider.Listener if (!completed) return false; - // Dispose to avoid further requests - RequestState requestState = dispose(); + this.failure = failure; + + // Update the state to avoid more request processing. + RequestState current; + boolean fail; + while (true) + { + current = requestState.get(); + if (updateRequestState(current, RequestState.FAILURE)) + { + fail = current != RequestState.TRANSIENT && current != RequestState.TRANSIENT_CONTENT; + break; + } + } + + dispose(); // Mark atomically the request as terminated and failed, // with respect to concurrency between request and response. @@ -335,8 +339,36 @@ public abstract class HttpSender implements AsyncContentProvider.Listener HttpDestination destination = getHttpChannel().getHttpDestination(); destination.getRequestNotifier().notifyFailure(request, failure); - boolean notCommitted = isBeforeCommit(requestState); - if (result == null && notCommitted && request.getAbortCause() == null) + if (fail) + { + terminateRequest(exchange, failure, !isBeforeCommit(current), result); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Concurrent failure: request termination skipped, performed by helpers"); + } + + return true; + } + + private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed) + { + if (exchange != null) + { + Result result = exchange.terminateRequest(failure); + terminateRequest(exchange, failure, committed, result); + } + } + + private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed, Result result) + { + Request request = exchange.getRequest(); + + if (LOG.isDebugEnabled()) + LOG.debug("Terminating request {}", request); + + if (failure != null && !committed && result == null && request.getAbortCause() == null) { // Complete the response from here if (exchange.responseComplete()) @@ -349,18 +381,17 @@ public abstract class HttpSender implements AsyncContentProvider.Listener if (result != null) { + HttpDestination destination = getHttpChannel().getHttpDestination(); boolean ordered = destination.getHttpClient().isStrictEventOrdering(); if (!ordered) channel.exchangeTerminated(result); if (LOG.isDebugEnabled()) - LOG.debug("Request/Response failed {}", request); + LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", request); HttpConversation conversation = exchange.getConversation(); destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result); if (ordered) channel.exchangeTerminated(result); } - - return true; } /** @@ -398,23 +429,14 @@ public abstract class HttpSender implements AsyncContentProvider.Listener { content.close(); content = null; - requestState.set(RequestState.QUEUED); senderState.set(SenderState.IDLE); } - protected RequestState dispose() + protected void dispose() { - while (true) - { - RequestState current = requestState.get(); - if (updateRequestState(current, RequestState.FAILURE)) - { - HttpContent content = this.content; - if (content != null) - content.close(); - return current; - } - } + HttpContent content = this.content; + if (content != null) + content.close(); } public void proceed(HttpExchange exchange, Throwable failure) @@ -485,7 +507,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener return abortable && anyToFailure(failure); } - protected boolean updateRequestState(RequestState from, RequestState to) + private boolean updateRequestState(RequestState from, RequestState to) { boolean updated = requestState.compareAndSet(from, to); if (!updated) @@ -505,6 +527,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener { switch (requestState) { + case TRANSIENT: case QUEUED: case BEGIN: case HEADERS: @@ -518,6 +541,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener { switch (requestState) { + case TRANSIENT_CONTENT: case COMMIT: case CONTENT: return true; @@ -534,8 +558,16 @@ public abstract class HttpSender implements AsyncContentProvider.Listener /** * The request states {@link HttpSender} goes through when sending a request. */ - protected enum RequestState + private enum RequestState { + /** + * One of the state transition methods is being executed. + */ + TRANSIENT, + /** + * The content transition method is being executed. + */ + TRANSIENT_CONTENT, /** * The request is queued, the initial state */ diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java index 6b0364d1803..c8ce04168ab 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java @@ -196,12 +196,11 @@ public class HttpSenderOverHTTP extends HttpSender } @Override - protected RequestState dispose() + protected void dispose() { generator.abort(); - RequestState result = super.dispose(); + super.dispose(); shutdownOutput(); - return result; } private void shutdownOutput() diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java index 5a68967df37..789b525365d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java @@ -144,28 +144,41 @@ public class InputStreamResponseListener extends Listener.Adapter } } + @Override + public void onSuccess(Response response) + { + if (LOG.isDebugEnabled()) + LOG.debug("Queuing end of content {}{}", EOF, ""); + queue.offer(EOF); + signal(); + } + + @Override + public void onFailure(Response response, Throwable failure) + { + fail(failure); + signal(); + } + @Override public void onComplete(Result result) { + if (result.isFailed() && failure == null) + fail(result.getFailure()); this.result = result; - if (result.isSucceeded()) - { - if (LOG.isDebugEnabled()) - LOG.debug("Queuing end of content {}{}", EOF, ""); - queue.offer(EOF); - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Queuing failure {} {}", FAILURE, failure); - queue.offer(FAILURE); - this.failure = result.getFailure(); - responseLatch.countDown(); - } resultLatch.countDown(); signal(); } + private void fail(Throwable failure) + { + if (LOG.isDebugEnabled()) + LOG.debug("Queuing failure {} {}", FAILURE, failure); + queue.offer(FAILURE); + this.failure = failure; + responseLatch.countDown(); + } + protected boolean await() { try diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseConcurrentAbortTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseConcurrentAbortTest.java new file mode 100644 index 00000000000..a6089421150 --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseConcurrentAbortTest.java @@ -0,0 +1,198 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.junit.Assert; +import org.junit.Test; + +public class HttpResponseConcurrentAbortTest extends AbstractHttpClientServerTest +{ + private final CountDownLatch callbackLatch = new CountDownLatch(1); + private final CountDownLatch failureLatch = new CountDownLatch(1); + private final CountDownLatch completeLatch = new CountDownLatch(1); + private final AtomicBoolean success = new AtomicBoolean(); + + public HttpResponseConcurrentAbortTest(SslContextFactory sslContextFactory) + { + super(sslContextFactory); + } + + @Test + public void testAbortOnBegin() throws Exception + { + start(new EmptyServerHandler()); + + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .onResponseBegin(new Response.BeginListener() + { + @Override + public void onBegin(Response response) + { + abort(response); + } + }) + .send(new TestResponseListener()); + Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(completeLatch.await(6, TimeUnit.SECONDS)); + Assert.assertTrue(success.get()); + } + + @Test + public void testAbortOnHeader() throws Exception + { + start(new EmptyServerHandler()); + + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .onResponseHeader(new Response.HeaderListener() + { + @Override + public boolean onHeader(Response response, HttpField field) + { + abort(response); + return true; + } + }) + .send(new TestResponseListener()); + Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(success.get()); + } + + @Test + public void testAbortOnHeaders() throws Exception + { + start(new EmptyServerHandler()); + + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .onResponseHeaders(new Response.HeadersListener() + { + @Override + public void onHeaders(Response response) + { + abort(response); + } + }) + .send(new TestResponseListener()); + Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(success.get()); + } + + @Test + public void testAbortOnContent() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + OutputStream output = response.getOutputStream(); + output.write(1); + output.flush(); + } + }); + + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .onResponseContent(new Response.ContentListener() + { + @Override + public void onContent(Response response, ByteBuffer content) + { + abort(response); + } + }) + .send(new TestResponseListener()); + Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(success.get()); + } + + private void abort(final Response response) + { + Logger logger = Log.getLogger(getClass()); + + new Thread("abort") + { + @Override + public void run() + { + response.abort(new Exception()); + } + }.start(); + + try + { + // The failure callback must be executed asynchronously. + boolean latched = failureLatch.await(4, TimeUnit.SECONDS); + success.set(latched); + logger.info("SIMON - STEP 1"); + + // The complete callback must not be executed + // until we return from this callback. + latched = completeLatch.await(1, TimeUnit.SECONDS); + success.set(!latched); + logger.info("SIMON - STEP 2"); + + callbackLatch.countDown(); + } + catch (InterruptedException x) + { + throw new RuntimeException(x); + } + } + + private class TestResponseListener extends Response.Listener.Adapter + { + @Override + public void onFailure(Response response, Throwable failure) + { + failureLatch.countDown(); + } + + @Override + public void onComplete(Result result) + { + Assert.assertTrue(result.isFailed()); + completeLatch.countDown(); + } + } +}