431103 - Complete listener not called if request times out before processing exchange.

Fixed by forcing the abort of the exchange in [Pooling|Multiplex]HttpDestination.
This commit is contained in:
Simone Bordet 2014-03-25 18:18:14 +01:00
parent 5c188e02b7
commit cda4af3ec9
6 changed files with 32 additions and 11 deletions

View File

@ -105,7 +105,6 @@ public class HttpClient extends ContainerLifeCycle
private static final Logger LOG = Log.getLogger(HttpClient.class);
private final ConcurrentMap<Origin, HttpDestination> destinations = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, HttpConversation> conversations = new ConcurrentHashMap<>();
private final List<ProtocolHandler> handlers = new ArrayList<>();
private final List<Request.Listener> requestListeners = new ArrayList<>();
private final AuthenticationStore authenticationStore = new HttpAuthenticationStore();
@ -237,7 +236,6 @@ public class HttpClient extends ContainerLifeCycle
destination.close();
destinations.clear();
conversations.clear();
requestListeners.clear();
authenticationStore.clearAuthentications();
authenticationStore.clearAuthenticationResults();

View File

@ -43,7 +43,6 @@ public class HttpExchange
private final HttpResponse response;
private volatile Throwable requestFailure;
private volatile Throwable responseFailure;
public HttpExchange(HttpDestination destination, HttpRequest request, List<Response.ResponseListener> listeners)
{
@ -205,6 +204,7 @@ public class HttpExchange
{
if (update(0b0101, cause) == 0b0101)
{
LOG.debug("Failing {}: {}", this, cause);
destination.getRequestNotifier().notifyFailure(request, cause);
List<Response.ResponseListener> listeners = getConversation().getResponseListeners();
ResponseNotifier responseNotifier = destination.getResponseNotifier();

View File

@ -102,9 +102,11 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
Throwable cause = request.getAbortCause();
if (cause != null)
{
// If we have a non-null abort cause, it means that someone
// else has already aborted and notified, nothing do to here.
LOG.debug("Aborted before processing {}: {}", exchange, cause);
// It may happen that the request is aborted before the exchange
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
}
else
{

View File

@ -96,9 +96,6 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
LOG.debug("Processing exchange {} on connection {}", exchange, connection);
if (exchange == null)
{
// TODO: review this part... may not be 100% correct
// TODO: e.g. is client is not running, there should be no need to close the connection
if (!connectionPool.release(connection))
connection.close();
@ -114,9 +111,11 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
Throwable cause = request.getAbortCause();
if (cause != null)
{
// If we have a non-null abort cause, it means that someone
// else has already aborted and notified, nothing do to here.
LOG.debug("Aborted before processing {}: {}", exchange, cause);
// It may happen that the request is aborted before the exchange
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
}
else
{

View File

@ -58,13 +58,14 @@ public class TimeoutCompleteListener implements Response.CompleteListener, Runna
Scheduler.Task task = scheduler.schedule(this, timeout, TimeUnit.MILLISECONDS);
if (this.task.getAndSet(task) != null)
throw new IllegalStateException();
LOG.debug("Scheduled timeout task {} in {} ms", task, timeout);
LOG.debug("Scheduled timeout task {} in {} ms for {}", task, timeout, request);
return true;
}
@Override
public void run()
{
LOG.debug("Executing timeout task {} for {}", task, request);
request.abort(new TimeoutException("Total timeout elapsed"));
}
}

View File

@ -364,6 +364,27 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
Assert.assertNotNull(request.getAbortCause());
}
@Test
public void testVeryShortTimeout() throws Exception
{
start(new EmptyServerHandler());
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(1, TimeUnit.MILLISECONDS) // Very short timeout
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException
{
try (Socket socket = new Socket())