439895 - No event callback should be invoked after the "failure" callback.
Fixed HttpSender and HttpReceiver to use a non-blocking collaborative mechanism to notify callbacks. Only the "failed" callback can run concurrently with other callbacks. No other callback can run after the "complete" callback: a failure concurrent with another callback will notify the "failed" callback, finish the running callback and only then invoke the "complete" callback.
This commit is contained in:
parent
c72649e150
commit
816b85ea4d
|
@ -49,9 +49,8 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
* is available</li>
|
||||
* <li>{@link #responseHeader(HttpExchange, HttpField)}, when a HTTP field is available</li>
|
||||
* <li>{@link #responseHeaders(HttpExchange)}, when all HTTP headers are available</li>
|
||||
* <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available; this is the only
|
||||
* method that may be invoked multiple times with different buffers containing different content</li>
|
||||
* <li>{@link #responseSuccess(HttpExchange)}, when the response is complete</li>
|
||||
* <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available</li>
|
||||
* <li>{@link #responseSuccess(HttpExchange)}, when the response is successful</li>
|
||||
* </ol>
|
||||
* At any time, subclasses may invoke {@link #responseFailure(Throwable)} to indicate that the response has failed
|
||||
* (for example, because of I/O exceptions).
|
||||
|
@ -69,7 +68,8 @@ public abstract class HttpReceiver
|
|||
|
||||
private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
|
||||
private final HttpChannel channel;
|
||||
private volatile ContentDecoder decoder;
|
||||
private ContentDecoder decoder;
|
||||
private Throwable failure;
|
||||
|
||||
protected HttpReceiver(HttpChannel channel)
|
||||
{
|
||||
|
@ -104,7 +104,7 @@ public abstract class HttpReceiver
|
|||
*/
|
||||
protected boolean responseBegin(HttpExchange exchange)
|
||||
{
|
||||
if (!updateResponseState(ResponseState.IDLE, ResponseState.BEGIN))
|
||||
if (!updateResponseState(ResponseState.IDLE, ResponseState.TRANSIENT))
|
||||
return false;
|
||||
|
||||
HttpConversation conversation = exchange.getConversation();
|
||||
|
@ -127,6 +127,9 @@ public abstract class HttpReceiver
|
|||
ResponseNotifier notifier = destination.getResponseNotifier();
|
||||
notifier.notifyBegin(conversation.getResponseListeners(), response);
|
||||
|
||||
if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
|
||||
terminateResponse(exchange, failure);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -152,7 +155,7 @@ public abstract class HttpReceiver
|
|||
case BEGIN:
|
||||
case HEADER:
|
||||
{
|
||||
if (updateResponseState(current, ResponseState.HEADER))
|
||||
if (updateResponseState(current, ResponseState.TRANSIENT))
|
||||
break out;
|
||||
break;
|
||||
}
|
||||
|
@ -188,6 +191,9 @@ public abstract class HttpReceiver
|
|||
}
|
||||
}
|
||||
|
||||
if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
|
||||
terminateResponse(exchange, failure);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -228,7 +234,7 @@ public abstract class HttpReceiver
|
|||
case BEGIN:
|
||||
case HEADER:
|
||||
{
|
||||
if (updateResponseState(current, ResponseState.HEADERS))
|
||||
if (updateResponseState(current, ResponseState.TRANSIENT))
|
||||
break out;
|
||||
break;
|
||||
}
|
||||
|
@ -261,6 +267,9 @@ public abstract class HttpReceiver
|
|||
}
|
||||
}
|
||||
|
||||
if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
|
||||
terminateResponse(exchange, failure);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -283,7 +292,7 @@ public abstract class HttpReceiver
|
|||
case HEADERS:
|
||||
case CONTENT:
|
||||
{
|
||||
if (updateResponseState(current, ResponseState.CONTENT))
|
||||
if (updateResponseState(current, ResponseState.TRANSIENT))
|
||||
break out;
|
||||
break;
|
||||
}
|
||||
|
@ -312,6 +321,9 @@ public abstract class HttpReceiver
|
|||
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
|
||||
notifier.notifyContent(exchange.getConversation().getResponseListeners(), response, buffer, callback);
|
||||
|
||||
if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
|
||||
terminateResponse(exchange, failure);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -332,16 +344,17 @@ public abstract class HttpReceiver
|
|||
if (!completed)
|
||||
return false;
|
||||
|
||||
// Reset to be ready for another response
|
||||
responseState.set(ResponseState.IDLE);
|
||||
|
||||
// Reset to be ready for another response.
|
||||
reset();
|
||||
|
||||
// Mark atomically the response as terminated and succeeded,
|
||||
// with respect to concurrency between request and response.
|
||||
// If there is a non-null result, then both sender and
|
||||
// receiver are reset and ready to be reused, and the
|
||||
// connection closed/pooled (depending on the transport).
|
||||
Result result = exchange.terminateResponse(null);
|
||||
|
||||
// It is important to notify *after* we reset and terminate
|
||||
// because the notification may trigger another request/response.
|
||||
HttpResponse response = exchange.getResponse();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response success {}", response);
|
||||
|
@ -349,17 +362,7 @@ public abstract class HttpReceiver
|
|||
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
|
||||
notifier.notifySuccess(listeners, response);
|
||||
|
||||
if (result != null)
|
||||
{
|
||||
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
|
||||
if (!ordered)
|
||||
channel.exchangeTerminated(result);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Request/Response succeeded {}", response);
|
||||
notifier.notifyComplete(listeners, result);
|
||||
if (ordered)
|
||||
channel.exchangeTerminated(result);
|
||||
}
|
||||
terminateResponse(exchange, result);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -388,7 +391,20 @@ public abstract class HttpReceiver
|
|||
if (!completed)
|
||||
return false;
|
||||
|
||||
// Dispose to avoid further responses
|
||||
this.failure = failure;
|
||||
|
||||
// Update the state to avoid more response processing.
|
||||
boolean fail;
|
||||
while (true)
|
||||
{
|
||||
ResponseState current = responseState.get();
|
||||
if (updateResponseState(current, ResponseState.FAILURE))
|
||||
{
|
||||
fail = current != ResponseState.TRANSIENT;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
dispose();
|
||||
|
||||
// Mark atomically the response as terminated and failed,
|
||||
|
@ -402,19 +418,45 @@ public abstract class HttpReceiver
|
|||
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
|
||||
notifier.notifyFailure(listeners, response, failure);
|
||||
|
||||
if (fail)
|
||||
{
|
||||
terminateResponse(exchange, result);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void terminateResponse(HttpExchange exchange, Throwable failure)
|
||||
{
|
||||
Result result = exchange.terminateResponse(failure);
|
||||
terminateResponse(exchange, result);
|
||||
}
|
||||
|
||||
private void terminateResponse(HttpExchange exchange, Result result)
|
||||
{
|
||||
HttpResponse response = exchange.getResponse();
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response complete {}", response);
|
||||
|
||||
if (result != null)
|
||||
{
|
||||
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
|
||||
if (!ordered)
|
||||
channel.exchangeTerminated(result);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Request/Response failed {}", response);
|
||||
LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", response);
|
||||
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
|
||||
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
|
||||
notifier.notifyComplete(listeners, result);
|
||||
if (ordered)
|
||||
channel.exchangeTerminated(result);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -427,7 +469,6 @@ public abstract class HttpReceiver
|
|||
protected void reset()
|
||||
{
|
||||
decoder = null;
|
||||
responseState.set(ResponseState.IDLE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -440,7 +481,6 @@ public abstract class HttpReceiver
|
|||
protected void dispose()
|
||||
{
|
||||
decoder = null;
|
||||
responseState.set(ResponseState.FAILURE);
|
||||
}
|
||||
|
||||
public boolean abort(Throwable cause)
|
||||
|
@ -464,6 +504,10 @@ public abstract class HttpReceiver
|
|||
*/
|
||||
private enum ResponseState
|
||||
{
|
||||
/**
|
||||
* One of the response*() methods is being executed.
|
||||
*/
|
||||
TRANSIENT,
|
||||
/**
|
||||
* The response is not yet received, the initial state
|
||||
*/
|
||||
|
|
|
@ -65,7 +65,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
private final IteratingCallback contentCallback = new ContentCallback();
|
||||
private final Callback lastCallback = new LastContentCallback();
|
||||
private final HttpChannel channel;
|
||||
private volatile HttpContent content;
|
||||
private HttpContent content;
|
||||
private Throwable failure;
|
||||
|
||||
protected HttpSender(HttpChannel channel)
|
||||
{
|
||||
|
@ -197,34 +198,40 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
|
||||
protected boolean queuedToBegin(Request request)
|
||||
{
|
||||
if (!updateRequestState(RequestState.QUEUED, RequestState.BEGIN))
|
||||
if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT))
|
||||
return false;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Request begin {}", request);
|
||||
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
|
||||
notifier.notifyBegin(request);
|
||||
if (!updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
|
||||
terminateRequest(getHttpExchange(), failure, false);
|
||||
return true;
|
||||
}
|
||||
|
||||
protected boolean beginToHeaders(Request request)
|
||||
{
|
||||
if (!updateRequestState(RequestState.BEGIN, RequestState.HEADERS))
|
||||
if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
|
||||
return false;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Request headers {}{}{}", request, System.getProperty("line.separator"), request.getHeaders().toString().trim());
|
||||
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
|
||||
notifier.notifyHeaders(request);
|
||||
if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
|
||||
terminateRequest(getHttpExchange(), failure, false);
|
||||
return true;
|
||||
}
|
||||
|
||||
protected boolean headersToCommit(Request request)
|
||||
{
|
||||
if (!updateRequestState(RequestState.HEADERS, RequestState.COMMIT))
|
||||
if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT))
|
||||
return false;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Request committed {}", request);
|
||||
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
|
||||
notifier.notifyCommit(request);
|
||||
if (!updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
|
||||
terminateRequest(getHttpExchange(), failure, true);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -236,21 +243,19 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
case COMMIT:
|
||||
case CONTENT:
|
||||
{
|
||||
if (!updateRequestState(current, RequestState.CONTENT))
|
||||
if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT))
|
||||
return false;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Request content {}{}{}", request, System.getProperty("line.separator"), BufferUtil.toDetailString(content));
|
||||
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
|
||||
notifier.notifyContent(request, content);
|
||||
if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT))
|
||||
terminateRequest(getHttpExchange(), failure, true);
|
||||
return true;
|
||||
}
|
||||
case FAILURE:
|
||||
{
|
||||
return false;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException(current.toString());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -269,43 +274,28 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
if (!completed)
|
||||
return false;
|
||||
|
||||
// Reset to be ready for another request
|
||||
requestState.set(RequestState.QUEUED);
|
||||
|
||||
// Reset to be ready for another request.
|
||||
reset();
|
||||
|
||||
// Mark atomically the request as terminated and succeeded,
|
||||
// with respect to concurrency between request and response.
|
||||
Result result = exchange.terminateRequest(null);
|
||||
|
||||
// It is important to notify completion *after* we reset because
|
||||
// the notification may trigger another request/response
|
||||
Request request = exchange.getRequest();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Request success {}", request);
|
||||
HttpDestination destination = getHttpChannel().getHttpDestination();
|
||||
destination.getRequestNotifier().notifySuccess(exchange.getRequest());
|
||||
|
||||
if (result != null)
|
||||
{
|
||||
boolean ordered = destination.getHttpClient().isStrictEventOrdering();
|
||||
if (!ordered)
|
||||
channel.exchangeTerminated(result);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Request/Response succeded {}", request);
|
||||
HttpConversation conversation = exchange.getConversation();
|
||||
destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
|
||||
if (ordered)
|
||||
channel.exchangeTerminated(result);
|
||||
}
|
||||
terminateRequest(exchange, null, true, result);
|
||||
|
||||
return true;
|
||||
}
|
||||
case FAILURE:
|
||||
{
|
||||
return false;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException(current.toString());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -322,8 +312,22 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
if (!completed)
|
||||
return false;
|
||||
|
||||
// Dispose to avoid further requests
|
||||
RequestState requestState = dispose();
|
||||
this.failure = failure;
|
||||
|
||||
// Update the state to avoid more request processing.
|
||||
RequestState current;
|
||||
boolean fail;
|
||||
while (true)
|
||||
{
|
||||
current = requestState.get();
|
||||
if (updateRequestState(current, RequestState.FAILURE))
|
||||
{
|
||||
fail = current != RequestState.TRANSIENT && current != RequestState.TRANSIENT_CONTENT;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
dispose();
|
||||
|
||||
// Mark atomically the request as terminated and failed,
|
||||
// with respect to concurrency between request and response.
|
||||
|
@ -335,8 +339,36 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
HttpDestination destination = getHttpChannel().getHttpDestination();
|
||||
destination.getRequestNotifier().notifyFailure(request, failure);
|
||||
|
||||
boolean notCommitted = isBeforeCommit(requestState);
|
||||
if (result == null && notCommitted && request.getAbortCause() == null)
|
||||
if (fail)
|
||||
{
|
||||
terminateRequest(exchange, failure, !isBeforeCommit(current), result);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed)
|
||||
{
|
||||
if (exchange != null)
|
||||
{
|
||||
Result result = exchange.terminateRequest(failure);
|
||||
terminateRequest(exchange, failure, committed, result);
|
||||
}
|
||||
}
|
||||
|
||||
private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed, Result result)
|
||||
{
|
||||
Request request = exchange.getRequest();
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Terminating request {}", request);
|
||||
|
||||
if (failure != null && !committed && result == null && request.getAbortCause() == null)
|
||||
{
|
||||
// Complete the response from here
|
||||
if (exchange.responseComplete())
|
||||
|
@ -349,18 +381,17 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
|
||||
if (result != null)
|
||||
{
|
||||
HttpDestination destination = getHttpChannel().getHttpDestination();
|
||||
boolean ordered = destination.getHttpClient().isStrictEventOrdering();
|
||||
if (!ordered)
|
||||
channel.exchangeTerminated(result);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Request/Response failed {}", request);
|
||||
LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", request);
|
||||
HttpConversation conversation = exchange.getConversation();
|
||||
destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
|
||||
if (ordered)
|
||||
channel.exchangeTerminated(result);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -398,23 +429,14 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
{
|
||||
content.close();
|
||||
content = null;
|
||||
requestState.set(RequestState.QUEUED);
|
||||
senderState.set(SenderState.IDLE);
|
||||
}
|
||||
|
||||
protected RequestState dispose()
|
||||
protected void dispose()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
RequestState current = requestState.get();
|
||||
if (updateRequestState(current, RequestState.FAILURE))
|
||||
{
|
||||
HttpContent content = this.content;
|
||||
if (content != null)
|
||||
content.close();
|
||||
return current;
|
||||
}
|
||||
}
|
||||
HttpContent content = this.content;
|
||||
if (content != null)
|
||||
content.close();
|
||||
}
|
||||
|
||||
public void proceed(HttpExchange exchange, Throwable failure)
|
||||
|
@ -485,7 +507,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
return abortable && anyToFailure(failure);
|
||||
}
|
||||
|
||||
protected boolean updateRequestState(RequestState from, RequestState to)
|
||||
private boolean updateRequestState(RequestState from, RequestState to)
|
||||
{
|
||||
boolean updated = requestState.compareAndSet(from, to);
|
||||
if (!updated)
|
||||
|
@ -505,6 +527,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
{
|
||||
switch (requestState)
|
||||
{
|
||||
case TRANSIENT:
|
||||
case QUEUED:
|
||||
case BEGIN:
|
||||
case HEADERS:
|
||||
|
@ -518,6 +541,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
{
|
||||
switch (requestState)
|
||||
{
|
||||
case TRANSIENT_CONTENT:
|
||||
case COMMIT:
|
||||
case CONTENT:
|
||||
return true;
|
||||
|
@ -534,8 +558,16 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
/**
|
||||
* The request states {@link HttpSender} goes through when sending a request.
|
||||
*/
|
||||
protected enum RequestState
|
||||
private enum RequestState
|
||||
{
|
||||
/**
|
||||
* One of the state transition methods is being executed.
|
||||
*/
|
||||
TRANSIENT,
|
||||
/**
|
||||
* The content transition method is being executed.
|
||||
*/
|
||||
TRANSIENT_CONTENT,
|
||||
/**
|
||||
* The request is queued, the initial state
|
||||
*/
|
||||
|
|
|
@ -196,12 +196,11 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
}
|
||||
|
||||
@Override
|
||||
protected RequestState dispose()
|
||||
protected void dispose()
|
||||
{
|
||||
generator.abort();
|
||||
RequestState result = super.dispose();
|
||||
super.dispose();
|
||||
shutdownOutput();
|
||||
return result;
|
||||
}
|
||||
|
||||
private void shutdownOutput()
|
||||
|
|
|
@ -144,28 +144,41 @@ public class InputStreamResponseListener extends Listener.Adapter
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(Response response)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Queuing end of content {}{}", EOF, "");
|
||||
queue.offer(EOF);
|
||||
signal();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Response response, Throwable failure)
|
||||
{
|
||||
fail(failure);
|
||||
signal();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
if (result.isFailed() && failure == null)
|
||||
fail(result.getFailure());
|
||||
this.result = result;
|
||||
if (result.isSucceeded())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Queuing end of content {}{}", EOF, "");
|
||||
queue.offer(EOF);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Queuing failure {} {}", FAILURE, failure);
|
||||
queue.offer(FAILURE);
|
||||
this.failure = result.getFailure();
|
||||
responseLatch.countDown();
|
||||
}
|
||||
resultLatch.countDown();
|
||||
signal();
|
||||
}
|
||||
|
||||
private void fail(Throwable failure)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Queuing failure {} {}", FAILURE, failure);
|
||||
queue.offer(FAILURE);
|
||||
this.failure = failure;
|
||||
responseLatch.countDown();
|
||||
}
|
||||
|
||||
protected boolean await()
|
||||
{
|
||||
try
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class HttpResponseConcurrentAbortTest extends AbstractHttpClientServerTest
|
||||
{
|
||||
private final CountDownLatch callbackLatch = new CountDownLatch(1);
|
||||
private final CountDownLatch failureLatch = new CountDownLatch(1);
|
||||
private final CountDownLatch completeLatch = new CountDownLatch(1);
|
||||
private final AtomicBoolean success = new AtomicBoolean();
|
||||
|
||||
public HttpResponseConcurrentAbortTest(SslContextFactory sslContextFactory)
|
||||
{
|
||||
super(sslContextFactory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortOnBegin() throws Exception
|
||||
{
|
||||
start(new EmptyServerHandler());
|
||||
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.onResponseBegin(new Response.BeginListener()
|
||||
{
|
||||
@Override
|
||||
public void onBegin(Response response)
|
||||
{
|
||||
abort(response);
|
||||
}
|
||||
})
|
||||
.send(new TestResponseListener());
|
||||
Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(completeLatch.await(6, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(success.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortOnHeader() throws Exception
|
||||
{
|
||||
start(new EmptyServerHandler());
|
||||
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.onResponseHeader(new Response.HeaderListener()
|
||||
{
|
||||
@Override
|
||||
public boolean onHeader(Response response, HttpField field)
|
||||
{
|
||||
abort(response);
|
||||
return true;
|
||||
}
|
||||
})
|
||||
.send(new TestResponseListener());
|
||||
Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(success.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortOnHeaders() throws Exception
|
||||
{
|
||||
start(new EmptyServerHandler());
|
||||
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.onResponseHeaders(new Response.HeadersListener()
|
||||
{
|
||||
@Override
|
||||
public void onHeaders(Response response)
|
||||
{
|
||||
abort(response);
|
||||
}
|
||||
})
|
||||
.send(new TestResponseListener());
|
||||
Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(success.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortOnContent() throws Exception
|
||||
{
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
OutputStream output = response.getOutputStream();
|
||||
output.write(1);
|
||||
output.flush();
|
||||
}
|
||||
});
|
||||
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.onResponseContent(new Response.ContentListener()
|
||||
{
|
||||
@Override
|
||||
public void onContent(Response response, ByteBuffer content)
|
||||
{
|
||||
abort(response);
|
||||
}
|
||||
})
|
||||
.send(new TestResponseListener());
|
||||
Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(success.get());
|
||||
}
|
||||
|
||||
private void abort(final Response response)
|
||||
{
|
||||
Logger logger = Log.getLogger(getClass());
|
||||
|
||||
new Thread("abort")
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
response.abort(new Exception());
|
||||
}
|
||||
}.start();
|
||||
|
||||
try
|
||||
{
|
||||
// The failure callback must be executed asynchronously.
|
||||
boolean latched = failureLatch.await(4, TimeUnit.SECONDS);
|
||||
success.set(latched);
|
||||
logger.info("SIMON - STEP 1");
|
||||
|
||||
// The complete callback must not be executed
|
||||
// until we return from this callback.
|
||||
latched = completeLatch.await(1, TimeUnit.SECONDS);
|
||||
success.set(!latched);
|
||||
logger.info("SIMON - STEP 2");
|
||||
|
||||
callbackLatch.countDown();
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new RuntimeException(x);
|
||||
}
|
||||
}
|
||||
|
||||
private class TestResponseListener extends Response.Listener.Adapter
|
||||
{
|
||||
@Override
|
||||
public void onFailure(Response response, Throwable failure)
|
||||
{
|
||||
failureLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
Assert.assertTrue(result.isFailed());
|
||||
completeLatch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue