Javadocs.

This commit is contained in:
Simone Bordet 2013-07-16 16:24:10 +02:00
parent b82444e3d2
commit af06b25538
3 changed files with 192 additions and 31 deletions

View File

@ -37,6 +37,32 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* {@link HttpReceiver} provides the abstract code to implement the various steps of the receive of HTTP responses.
* <p />
* {@link HttpReceiver} maintains a state machine that is updated when the steps of receiving a response are executed.
* <p />
* Subclasses must handle the transport-specific details, for example how to read from the raw socket and how to parse
* the bytes read from the socket. Then they have to call the methods defined in this class in the following order:
* <ol>
* <li>{@link #responseBegin(HttpExchange)}, when the HTTP response data containing the HTTP status code
* is available</li>
* <li>{@link #responseHeader(HttpExchange, HttpField)}, when a HTTP field is available</li>
* <li>{@link #responseHeaders(HttpExchange)}, when all HTTP headers are available</li>
* <li>{@link #responseContent(HttpExchange, ByteBuffer)}, when HTTP content is available; this is the only method
* that may be invoked multiple times with different buffers containing different content</li>
* <li>{@link #responseSuccess(HttpExchange)}, when the response is complete</li>
* </ol>
* At any time, subclasses may invoke {@link #responseFailure(Throwable)} to indicate that the response has failed
* (for example, because of I/O exceptions).
* At any time, user threads may abort the response which will cause {@link #responseFailure(Throwable)} to be
* invoked.
* <p />
* The state machine maintained by this class ensures that the response steps are not executed by an I/O thread
* if the response has already been failed.
*
* @see HttpSender
*/
public abstract class HttpReceiver
{
protected static final Logger LOG = Log.getLogger(new Object(){}.getClass().getEnclosingClass());
@ -45,12 +71,12 @@ public abstract class HttpReceiver
private final HttpChannel channel;
private volatile ContentDecoder decoder;
public HttpReceiver(HttpChannel channel)
protected HttpReceiver(HttpChannel channel)
{
this.channel = channel;
}
public HttpChannel getHttpChannel()
protected HttpChannel getHttpChannel()
{
return channel;
}
@ -65,10 +91,21 @@ public abstract class HttpReceiver
return channel.getHttpDestination();
}
protected void onResponseBegin(HttpExchange exchange)
/**
* Method to be invoked when the response status code is available.
* <p />
* Subclasses must have set the response status code on the {@link Response} object of the {@link HttpExchange}
* prior invoking this method.
* <p />
* This method takes case of notifying {@link Response.BeginListener}s.
*
* @param exchange the HTTP exchange
* @return whether the processing should continue
*/
protected boolean responseBegin(HttpExchange exchange)
{
if (!updateResponseState(ResponseState.IDLE, ResponseState.BEGIN))
return;
return false;
HttpConversation conversation = exchange.getConversation();
HttpResponse response = exchange.getResponse();
@ -87,9 +124,23 @@ public abstract class HttpReceiver
LOG.debug("Response begin {}", response);
ResponseNotifier notifier = destination.getResponseNotifier();
notifier.notifyBegin(conversation.getResponseListeners(), response);
return true;
}
protected void onResponseHeader(HttpExchange exchange, HttpField field)
/**
* Method to be invoked when a response HTTP header is available.
* <p />
* Subclasses must not have added the header to the {@link Response} object of the {@link HttpExchange}
* prior invoking this method.
* <p />
* This method takes case of notifying {@link Response.HeaderListener}s and storing cookies.
*
* @param exchange the HTTP exchange
* @param field the response HTTP field
* @return whether the processing should continue
*/
protected boolean responseHeader(HttpExchange exchange, HttpField field)
{
out: while (true)
{
@ -105,7 +156,7 @@ public abstract class HttpReceiver
}
default:
{
return;
return false;
}
}
}
@ -134,9 +185,11 @@ public abstract class HttpReceiver
}
}
}
return true;
}
private void storeCookie(URI uri, HttpField field)
protected void storeCookie(URI uri, HttpField field)
{
try
{
@ -154,7 +207,15 @@ public abstract class HttpReceiver
}
}
protected void onResponseHeaders(HttpExchange exchange)
/**
* Method to be invoked after all response HTTP headers are available.
* <p />
* This method takes case of notifying {@link Response.HeadersListener}s.
*
* @param exchange the HTTP exchange
* @return whether the processing should continue
*/
protected boolean responseHeaders(HttpExchange exchange)
{
out: while (true)
{
@ -170,7 +231,7 @@ public abstract class HttpReceiver
}
default:
{
return;
return false;
}
}
}
@ -196,9 +257,20 @@ public abstract class HttpReceiver
}
}
}
return true;
}
protected void onResponseContent(HttpExchange exchange, ByteBuffer buffer)
/**
* Method to be invoked when response HTTP content is available.
* <p />
* This method takes case of decoding the content, if necessary, and notifying {@link Response.ContentListener}s.
*
* @param exchange the HTTP exchange
* @param buffer the response HTTP content buffer
* @return whether the processing should continue
*/
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer)
{
out: while (true)
{
@ -214,7 +286,7 @@ public abstract class HttpReceiver
}
default:
{
return;
return false;
}
}
}
@ -233,9 +305,20 @@ public abstract class HttpReceiver
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyContent(exchange.getConversation().getResponseListeners(), response, buffer);
return true;
}
protected boolean onResponseSuccess(HttpExchange exchange)
/**
* Method to be invoked when the response is successful.
* <p />
* This method takes case of notifying {@link Response.SuccessListener}s and possibly
* {@link Response.CompleteListener}s (if the exchange is completed).
*
* @param exchange the HTTP exchange
* @return whether the response was processed as successful
*/
protected boolean responseSuccess(HttpExchange exchange)
{
// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
@ -273,7 +356,15 @@ public abstract class HttpReceiver
return true;
}
protected boolean onResponseFailure(Throwable failure)
/**
* Method to be invoked when the response is failed.
* <p />
* This method takes case of notifying {@link Response.FailureListener}s.
*
* @param failure the response failure
* @return whether the response was processed as failed
*/
protected boolean responseFailure(Throwable failure)
{
HttpExchange exchange = getHttpExchange();
// In case of a response error, the failure has already been notified
@ -315,28 +406,42 @@ public abstract class HttpReceiver
return true;
}
/**
* Resets this {@link HttpReceiver} state.
* <p />
* Subclasses should override (but remember to call {@code super}) to reset their own state.
* <p />
* Either this method or {@link #dispose()} is called.
*/
protected void reset()
{
responseState.set(ResponseState.IDLE);
decoder = null;
responseState.set(ResponseState.IDLE);
}
/**
* Disposes this {@link HttpReceiver} state.
* <p />
* Subclasses should override (but remember to call {@code super}) to dispose their own state.
* <p />
* Either this method or {@link #reset()} is called.
*/
protected void dispose()
{
responseState.set(ResponseState.FAILURE);
decoder = null;
responseState.set(ResponseState.FAILURE);
}
public void idleTimeout()
{
// If we cannot fail, it means a response arrived
// just when we were timeout idling, so we don't close
onResponseFailure(new TimeoutException());
responseFailure(new TimeoutException());
}
public boolean abort(Throwable cause)
{
return onResponseFailure(cause);
return responseFailure(cause);
}
private boolean updateResponseState(ResponseState from, ResponseState to)
@ -347,8 +452,34 @@ public abstract class HttpReceiver
return updated;
}
/**
* The request states {@link HttpReceiver} goes through when receiving a response.
*/
private enum ResponseState
{
IDLE, BEGIN, HEADER, HEADERS, CONTENT, FAILURE
/**
* The response is not yet received, the initial state
*/
IDLE,
/**
* The response status code has been received
*/
BEGIN,
/**
* The response headers are being received
*/
HEADER,
/**
* All the response headers have been received
*/
HEADERS,
/**
* The response content is being received
*/
CONTENT,
/**
* The response is failed
*/
FAILURE
}
}

View File

@ -34,21 +34,26 @@ import org.eclipse.jetty.util.log.Logger;
/**
* {@link HttpSender} abstracts the algorithm to send HTTP requests, so that subclasses only implement
* the transport-specific code to send requests over the wire.
* the transport-specific code to send requests over the wire, implementing
* {@link #sendHeaders(HttpExchange, HttpContent, Callback)} and
* {@link #sendContent(HttpExchange, HttpContent, Callback)}.
* <p />
* {@link HttpSender} governs two state machines.
* <p />
* The request state machine is updated by {@link HttpSender} as the various steps of sending a request
* are executed, see {@link RequestState}.
* At any point in time, a user thread may abort the request, which may move the request state machine
* to {@link RequestState#FAILURE}. The request state machine guarantees that the request steps are
* executed only if the request has not been failed already.
* At any point in time, a user thread may abort the request, which may (if the request has not been
* completely sent yet) move the request state machine to {@link RequestState#FAILURE}.
* The request state machine guarantees that the request steps are executed (by I/O threads) only if
* the request has not been failed already.
* <p />
* The sender state machine is updated by {@link HttpSender} from three sources: deferred content notifications
* (via {@link #onContent()}), 100-continue notifications (via {@link #proceed(HttpExchange, boolean)})
* and normal request send (via {@link #sendContent(HttpExchange, HttpContent, Callback)}).
* This state machine must guarantee that the request sending is never executed concurrently: only one of
* those sources may trigger the call to {@link #sendContent(HttpExchange, HttpContent, Callback)}.
*
* @see HttpReceiver
*/
public abstract class HttpSender implements AsyncContentProvider.Listener
{
@ -332,8 +337,33 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return true;
}
/**
* Implementations should send the HTTP headers over the wire, possibly with some content,
* in a single write, and notify the given {@code callback} of the result of this operation.
* <p />
* If there is more content to send, then {@link #sendContent(HttpExchange, HttpContent, Callback)}
* will be invoked.
*
* @param exchange the exchange to send
* @param content the content to send
* @param callback the callback to notify
*/
protected abstract void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback);
/**
* Implementations should send the content at the {@link HttpContent} cursor position over the wire.
* <p />
* The {@link HttpContent} cursor is advanced by {@link HttpSender} at the right time, and if more
* content needs to be sent, this method is invoked again; subclasses need only to send the content
* at the {@link HttpContent} cursor position.
* <p />
* This method is invoked one last time when {@link HttpContent#isConsumed()} is true; subclasses
* needs to skip sending content in this case, and just complete their content generation.
*
* @param exchange the exchange to send
* @param content the content to send
* @param callback the callback to notify
*/
protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
protected void reset()
@ -494,7 +524,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
*/
CONTENT,
/**
* The request failed
* The request is failed
*/
FAILURE
}

View File

@ -111,7 +111,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
// Shutting down the parser may invoke messageComplete() or earlyEOF()
parser.atEOF();
parser.parseNext(BufferUtil.EMPTY_BUFFER);
if (!onResponseFailure(new EOFException()))
if (!responseFailure(new EOFException()))
{
// TODO: just shutdown here, or full close ?
getHttpChannel().getHttpConnection().close();
@ -135,7 +135,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
parser.setHeadResponse(exchange.getRequest().getMethod() == HttpMethod.HEAD);
exchange.getResponse().version(version).status(status).reason(reason);
onResponseBegin(exchange);
responseBegin(exchange);
return false;
}
@ -146,7 +146,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (exchange == null)
return false;
onResponseHeader(exchange, field);
responseHeader(exchange, field);
return false;
}
@ -157,7 +157,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (exchange == null)
return false;
onResponseHeaders(exchange);
responseHeaders(exchange);
return false;
}
@ -168,7 +168,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (exchange == null)
return false;
onResponseContent(exchange, buffer);
responseContent(exchange, buffer);
return false;
}
@ -179,7 +179,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (exchange == null)
return false; // TODO: is it correct to return false here ?
onResponseSuccess(exchange);
responseSuccess(exchange);
return true;
}
@ -217,7 +217,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
private void failAndClose(Throwable failure)
{
onResponseFailure(failure);
if (responseFailure(failure))
getHttpChannel().getHttpConnection().close();
}
}