Merged branch 'jetty-9.2.x' into 'master'.
This commit is contained in:
commit
27beeff164
|
@ -86,6 +86,6 @@ public abstract class HttpChannel
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return String.format("%s@%h", getClass().getSimpleName(), this);
|
return String.format("%s@%x(exchange=%s)", getClass().getSimpleName(), hashCode(), exchange);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -435,7 +435,7 @@ public abstract class HttpReceiver
|
||||||
|
|
||||||
HttpResponse response = exchange.getResponse();
|
HttpResponse response = exchange.getResponse();
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Response failure {} {}", response, failure);
|
LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
|
||||||
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
|
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
|
||||||
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
|
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
|
||||||
notifier.notifyFailure(listeners, response, failure);
|
notifier.notifyFailure(listeners, response, failure);
|
||||||
|
@ -524,10 +524,11 @@ public abstract class HttpReceiver
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return String.format("%s@%x(rcv=%s)",
|
return String.format("%s@%x(rsp=%s,failure=%s)",
|
||||||
getClass().getSimpleName(),
|
getClass().getSimpleName(),
|
||||||
hashCode(),
|
hashCode(),
|
||||||
responseState);
|
responseState,
|
||||||
|
failure);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -666,13 +666,25 @@ public class HttpRequest implements Request
|
||||||
@Override
|
@Override
|
||||||
public void send(Response.CompleteListener listener)
|
public void send(Response.CompleteListener listener)
|
||||||
{
|
{
|
||||||
if (getTimeout() > 0)
|
TimeoutCompleteListener timeoutListener = null;
|
||||||
|
try
|
||||||
{
|
{
|
||||||
TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(this);
|
if (getTimeout() > 0)
|
||||||
timeoutListener.schedule(client.getScheduler());
|
{
|
||||||
responseListeners.add(timeoutListener);
|
timeoutListener = new TimeoutCompleteListener(this);
|
||||||
|
timeoutListener.schedule(client.getScheduler());
|
||||||
|
responseListeners.add(timeoutListener);
|
||||||
|
}
|
||||||
|
send(this, listener);
|
||||||
|
}
|
||||||
|
catch (Throwable x)
|
||||||
|
{
|
||||||
|
// Do not leak the scheduler task if we
|
||||||
|
// can't even start sending the request.
|
||||||
|
if (timeoutListener != null)
|
||||||
|
timeoutListener.cancel();
|
||||||
|
throw x;
|
||||||
}
|
}
|
||||||
send(this, listener);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void send(HttpRequest request, Response.CompleteListener listener)
|
private void send(HttpRequest request, Response.CompleteListener listener)
|
||||||
|
|
|
@ -206,7 +206,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
|
if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
|
||||||
return false;
|
return false;
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Request headers {}{}{}", request, System.getProperty("line.separator"), request.getHeaders().toString().trim());
|
LOG.debug("Request headers {}{}{}", request, System.lineSeparator(), request.getHeaders().toString().trim());
|
||||||
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
|
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
|
||||||
notifier.notifyHeaders(request);
|
notifier.notifyHeaders(request);
|
||||||
if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
|
if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
|
||||||
|
@ -238,7 +238,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT))
|
if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT))
|
||||||
return false;
|
return false;
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Request content {}{}{}", request, System.getProperty("line.separator"), BufferUtil.toDetailString(content));
|
LOG.debug("Request content {}{}{}", request, System.lineSeparator(), BufferUtil.toDetailString(content));
|
||||||
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
|
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
|
||||||
notifier.notifyContent(request, content);
|
notifier.notifyContent(request, content);
|
||||||
if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT))
|
if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT))
|
||||||
|
@ -327,7 +327,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
|
|
||||||
Request request = exchange.getRequest();
|
Request request = exchange.getRequest();
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Request failure {} {}", exchange, failure);
|
LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
|
||||||
HttpDestination destination = getHttpChannel().getHttpDestination();
|
HttpDestination destination = getHttpChannel().getHttpDestination();
|
||||||
destination.getRequestNotifier().notifyFailure(request, failure);
|
destination.getRequestNotifier().notifyFailure(request, failure);
|
||||||
|
|
||||||
|
@ -365,7 +365,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
if (failure != null)
|
if (failure != null)
|
||||||
{
|
{
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Response failure from request {}", exchange);
|
LOG.debug("Response failure from request {} {}", request, exchange);
|
||||||
getHttpChannel().abortResponse(failure);
|
getHttpChannel().abortResponse(failure);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -492,15 +492,13 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
|
|
||||||
public boolean abort(Throwable failure)
|
public boolean abort(Throwable failure)
|
||||||
{
|
{
|
||||||
RequestState current = requestState.get();
|
return anyToFailure(failure);
|
||||||
boolean abortable = isBeforeCommit(current) || isSending(current);
|
|
||||||
return abortable && anyToFailure(failure);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean updateRequestState(RequestState from, RequestState to)
|
private boolean updateRequestState(RequestState from, RequestState to)
|
||||||
{
|
{
|
||||||
boolean updated = requestState.compareAndSet(from, to);
|
boolean updated = requestState.compareAndSet(from, to);
|
||||||
if (!updated)
|
if (!updated && LOG.isDebugEnabled())
|
||||||
LOG.debug("RequestState update failed: {} -> {}: {}", from, to, requestState.get());
|
LOG.debug("RequestState update failed: {} -> {}: {}", from, to, requestState.get());
|
||||||
return updated;
|
return updated;
|
||||||
}
|
}
|
||||||
|
@ -508,38 +506,11 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
private boolean updateSenderState(SenderState from, SenderState to)
|
private boolean updateSenderState(SenderState from, SenderState to)
|
||||||
{
|
{
|
||||||
boolean updated = senderState.compareAndSet(from, to);
|
boolean updated = senderState.compareAndSet(from, to);
|
||||||
if (!updated)
|
if (!updated && LOG.isDebugEnabled())
|
||||||
LOG.debug("SenderState update failed: {} -> {}: {}", from, to, senderState.get());
|
LOG.debug("SenderState update failed: {} -> {}: {}", from, to, senderState.get());
|
||||||
return updated;
|
return updated;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isBeforeCommit(RequestState requestState)
|
|
||||||
{
|
|
||||||
switch (requestState)
|
|
||||||
{
|
|
||||||
case TRANSIENT:
|
|
||||||
case QUEUED:
|
|
||||||
case BEGIN:
|
|
||||||
case HEADERS:
|
|
||||||
return true;
|
|
||||||
default:
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isSending(RequestState requestState)
|
|
||||||
{
|
|
||||||
switch (requestState)
|
|
||||||
{
|
|
||||||
case TRANSIENT_CONTENT:
|
|
||||||
case COMMIT:
|
|
||||||
case CONTENT:
|
|
||||||
return true;
|
|
||||||
default:
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private RuntimeException illegalSenderState(SenderState current)
|
private RuntimeException illegalSenderState(SenderState current)
|
||||||
{
|
{
|
||||||
return new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead");
|
return new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead");
|
||||||
|
@ -548,11 +519,12 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return String.format("%s@%x(req=%s,snd=%s)",
|
return String.format("%s@%x(req=%s,snd=%s,failure=%s)",
|
||||||
getClass().getSimpleName(),
|
getClass().getSimpleName(),
|
||||||
hashCode(),
|
hashCode(),
|
||||||
requestState,
|
requestState,
|
||||||
senderState);
|
senderState,
|
||||||
|
failure);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -44,21 +44,20 @@ public class TimeoutCompleteListener implements Response.CompleteListener, Runna
|
||||||
@Override
|
@Override
|
||||||
public void onComplete(Result result)
|
public void onComplete(Result result)
|
||||||
{
|
{
|
||||||
Scheduler.Task task = this.task.getAndSet(null);
|
cancel();
|
||||||
if (task != null)
|
|
||||||
{
|
|
||||||
boolean cancelled = task.cancel();
|
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug("Cancelled (successfully: {}) timeout task {}", cancelled, task);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean schedule(Scheduler scheduler)
|
public boolean schedule(Scheduler scheduler)
|
||||||
{
|
{
|
||||||
long timeout = request.getTimeout();
|
long timeout = request.getTimeout();
|
||||||
Scheduler.Task task = scheduler.schedule(this, timeout, TimeUnit.MILLISECONDS);
|
Scheduler.Task task = scheduler.schedule(this, timeout, TimeUnit.MILLISECONDS);
|
||||||
if (this.task.getAndSet(task) != null)
|
Scheduler.Task existing = this.task.getAndSet(task);
|
||||||
|
if (existing != null)
|
||||||
|
{
|
||||||
|
existing.cancel();
|
||||||
|
cancel();
|
||||||
throw new IllegalStateException();
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Scheduled timeout task {} in {} ms for {}", task, timeout, request);
|
LOG.debug("Scheduled timeout task {} in {} ms for {}", task, timeout, request);
|
||||||
return true;
|
return true;
|
||||||
|
@ -71,4 +70,15 @@ public class TimeoutCompleteListener implements Response.CompleteListener, Runna
|
||||||
LOG.debug("Executing timeout task {} for {}", task, request);
|
LOG.debug("Executing timeout task {} for {}", task, request);
|
||||||
request.abort(new TimeoutException("Total timeout elapsed"));
|
request.abort(new TimeoutException("Total timeout elapsed"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void cancel()
|
||||||
|
{
|
||||||
|
Scheduler.Task task = this.task.getAndSet(null);
|
||||||
|
if (task != null)
|
||||||
|
{
|
||||||
|
boolean cancelled = task.cancel();
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Cancelled (successfully: {}) timeout task {}", cancelled, task);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,9 +119,8 @@ public class HttpChannelOverHTTP extends HttpChannel
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return String.format("%s@%x(send=%s,recv=%s)",
|
return String.format("%s[send=%s,recv=%s]",
|
||||||
getClass().getSimpleName(),
|
super.toString(),
|
||||||
hashCode(),
|
|
||||||
sender,
|
sender,
|
||||||
receiver);
|
receiver);
|
||||||
}
|
}
|
||||||
|
|
|
@ -319,6 +319,36 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
|
||||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeoutCancelledWhenSendingThrowsException() throws Exception
|
||||||
|
{
|
||||||
|
start(new EmptyServerHandler());
|
||||||
|
|
||||||
|
long timeout = 1000;
|
||||||
|
Request request = client.newRequest("badscheme://localhost:" + connector.getLocalPort());
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
request.timeout(timeout, TimeUnit.MILLISECONDS)
|
||||||
|
.send(new Response.CompleteListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onComplete(Result result)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
catch (Exception expected)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(2 * timeout);
|
||||||
|
|
||||||
|
// If the task was not cancelled, it aborted the request.
|
||||||
|
Assert.assertNull(request.getAbortCause());
|
||||||
|
}
|
||||||
|
|
||||||
private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException
|
private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException
|
||||||
{
|
{
|
||||||
try (Socket socket = new Socket())
|
try (Socket socket = new Socket())
|
||||||
|
|
Loading…
Reference in New Issue