460905 - Make sure TimeoutCompleteListener is cancelled if the request cannot be sent.
This commit is contained in:
parent
7b25674aca
commit
d5a6ad2345
|
@ -665,15 +665,26 @@ public class HttpRequest implements Request
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void send(Response.CompleteListener listener)
|
public void send(Response.CompleteListener listener)
|
||||||
|
{
|
||||||
|
TimeoutCompleteListener timeoutListener = null;
|
||||||
|
try
|
||||||
{
|
{
|
||||||
if (getTimeout() > 0)
|
if (getTimeout() > 0)
|
||||||
{
|
{
|
||||||
TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(this);
|
timeoutListener = new TimeoutCompleteListener(this);
|
||||||
timeoutListener.schedule(client.getScheduler());
|
timeoutListener.schedule(client.getScheduler());
|
||||||
responseListeners.add(timeoutListener);
|
responseListeners.add(timeoutListener);
|
||||||
}
|
}
|
||||||
send(this, listener);
|
send(this, listener);
|
||||||
}
|
}
|
||||||
|
catch (Throwable x)
|
||||||
|
{
|
||||||
|
// Do not leak the scheduler task if we
|
||||||
|
// can't even start sending the request.
|
||||||
|
if (timeoutListener != null)
|
||||||
|
timeoutListener.cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void send(HttpRequest request, Response.CompleteListener listener)
|
private void send(HttpRequest request, Response.CompleteListener listener)
|
||||||
{
|
{
|
||||||
|
|
|
@ -44,21 +44,20 @@ public class TimeoutCompleteListener implements Response.CompleteListener, Runna
|
||||||
@Override
|
@Override
|
||||||
public void onComplete(Result result)
|
public void onComplete(Result result)
|
||||||
{
|
{
|
||||||
Scheduler.Task task = this.task.getAndSet(null);
|
cancel();
|
||||||
if (task != null)
|
|
||||||
{
|
|
||||||
boolean cancelled = task.cancel();
|
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug("Cancelled (successfully: {}) timeout task {}", cancelled, task);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean schedule(Scheduler scheduler)
|
public boolean schedule(Scheduler scheduler)
|
||||||
{
|
{
|
||||||
long timeout = request.getTimeout();
|
long timeout = request.getTimeout();
|
||||||
Scheduler.Task task = scheduler.schedule(this, timeout, TimeUnit.MILLISECONDS);
|
Scheduler.Task task = scheduler.schedule(this, timeout, TimeUnit.MILLISECONDS);
|
||||||
if (this.task.getAndSet(task) != null)
|
Scheduler.Task existing = this.task.getAndSet(task);
|
||||||
|
if (existing != null)
|
||||||
|
{
|
||||||
|
existing.cancel();
|
||||||
|
cancel();
|
||||||
throw new IllegalStateException();
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Scheduled timeout task {} in {} ms for {}", task, timeout, request);
|
LOG.debug("Scheduled timeout task {} in {} ms for {}", task, timeout, request);
|
||||||
return true;
|
return true;
|
||||||
|
@ -71,4 +70,15 @@ public class TimeoutCompleteListener implements Response.CompleteListener, Runna
|
||||||
LOG.debug("Executing timeout task {} for {}", task, request);
|
LOG.debug("Executing timeout task {} for {}", task, request);
|
||||||
request.abort(new TimeoutException("Total timeout elapsed"));
|
request.abort(new TimeoutException("Total timeout elapsed"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void cancel()
|
||||||
|
{
|
||||||
|
Scheduler.Task task = this.task.getAndSet(null);
|
||||||
|
if (task != null)
|
||||||
|
{
|
||||||
|
boolean cancelled = task.cancel();
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Cancelled (successfully: {}) timeout task {}", cancelled, task);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -428,6 +428,28 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
|
||||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeoutCancelledWhenSendingThrowsException() throws Exception
|
||||||
|
{
|
||||||
|
start(new EmptyServerHandler());
|
||||||
|
|
||||||
|
long timeout = 1000;
|
||||||
|
Request request = client.newRequest("bad_scheme://localhost:" + connector.getLocalPort());
|
||||||
|
request.timeout(timeout, TimeUnit.MILLISECONDS)
|
||||||
|
.send(new Response.CompleteListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onComplete(Result result)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Thread.sleep(2 * timeout);
|
||||||
|
|
||||||
|
// If the task was not cancelled, it aborted the request.
|
||||||
|
Assert.assertNull(request.getAbortCause());
|
||||||
|
}
|
||||||
|
|
||||||
private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException
|
private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException
|
||||||
{
|
{
|
||||||
try (Socket socket = new Socket())
|
try (Socket socket = new Socket())
|
||||||
|
|
Loading…
Reference in New Issue