Notifying the "failure" event for request and response when the failure

is detected during the processing of another event.
This commit is contained in:
Simone Bordet 2015-03-04 01:01:09 +01:00
parent da80498c56
commit 40ad8dc608
4 changed files with 40 additions and 28 deletions

View File

@ -159,10 +159,9 @@ public class HttpExchange
private int update(int code, Throwable failure)
{
int current;
while (true)
{
current = complete.get();
int current = complete.get();
boolean updateable = (current & code) == 0;
if (updateable)
{
@ -177,9 +176,8 @@ public class HttpExchange
if (LOG.isDebugEnabled())
LOG.debug("{} updated", this);
}
break;
return current;
}
return current;
}
public boolean abort(Throwable cause)

View File

@ -429,16 +429,7 @@ public abstract class HttpReceiver
dispose();
// Mark atomically the response as terminated and failed,
// with respect to concurrency between request and response.
Result result = exchange.terminateResponse(failure);
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyFailure(listeners, response, failure);
Result result = failResponse(exchange, failure);
if (fail)
{
@ -453,9 +444,25 @@ public abstract class HttpReceiver
return true;
}
private Result failResponse(HttpExchange exchange, Throwable failure)
{
// Mark atomically the response as terminated and failed,
// with respect to concurrency between request and response.
Result result = exchange.terminateResponse(failure);
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyFailure(listeners, response, failure);
return result;
}
private void terminateResponse(HttpExchange exchange, Throwable failure)
{
Result result = exchange.terminateResponse(failure);
Result result = failResponse(exchange, failure);
terminateResponse(exchange, result);
}
@ -472,7 +479,7 @@ public abstract class HttpReceiver
if (!ordered)
channel.exchangeTerminated(result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", response);
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyComplete(listeners, result);

View File

@ -321,15 +321,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
dispose();
// Mark atomically the request as terminated and failed,
// with respect to concurrency between request and response.
Result result = exchange.terminateRequest(failure);
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifyFailure(request, failure);
Result result = failRequest(exchange, failure);
if (fail)
{
@ -344,11 +336,26 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return true;
}
private Result failRequest(HttpExchange exchange, Throwable failure)
{
// Mark atomically the request as terminated and failed,
// with respect to concurrency between request and response.
Result result = exchange.terminateRequest(failure);
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifyFailure(request, failure);
return result;
}
private void terminateRequest(HttpExchange exchange, Throwable failure)
{
if (exchange != null)
{
Result result = exchange.terminateRequest(failure);
Result result = failRequest(exchange, failure);
terminateRequest(exchange, failure, result);
}
}
@ -376,7 +383,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (!ordered)
channel.exchangeTerminated(result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", request);
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
HttpConversation conversation = exchange.getConversation();
destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
if (ordered)

View File

@ -148,7 +148,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
return closed.compareAndSet(false, true);
}
private boolean abort(Throwable failure)
protected boolean abort(Throwable failure)
{
HttpExchange exchange = channel.getHttpExchange();
return exchange != null && exchange.getRequest().abort(failure);