Improved HttpSender.sendHeaders() and sendContent() methods, separating HttpContent and the callbacks.

This commit is contained in:
Simone Bordet 2013-07-16 11:55:43 +02:00
parent 52b805697a
commit b82444e3d2
6 changed files with 246 additions and 156 deletions

View File

@ -130,7 +130,7 @@ public class HttpClient extends ContainerLifeCycle
private volatile long idleTimeout;
private volatile boolean tcpNoDelay = true;
private volatile boolean dispatchIO = true;
private volatile boolean strictEventOrdering = true;
private volatile boolean strictEventOrdering = false;
private volatile ProxyConfiguration proxyConfig;
private volatile HttpField encodingField;

View File

@ -23,60 +23,106 @@ import java.util.Collections;
import java.util.Iterator;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.client.util.DeferredContentProvider;
public abstract class HttpContent implements Callback
/**
* {@link HttpContent} is a stateful, linear representation of the request content provided
* by a {@link ContentProvider} that can be traversed one-way to obtain content buffers to
* send to a HTTP server.
* <p />
* {@link HttpContent} offers the notion of a one-way cursor to traverse the content.
* The cursor starts in a virtual "before" position and can be advanced using {@link #advance()}
* until it reaches a virtual "after" position where the content is fully consumed.
* <pre>
* +---+ +---+ +---+ +---+ +---+
* | | | | | | | | | |
* +---+ +---+ +---+ +---+ +---+
* ^ ^ ^ ^
* | | --> advance() | |
* | | last |
* | | |
* before | after
* |
* current
* </pre>
* At each valid (non-before and non-after) cursor position, {@link HttpContent} provides the following state:
* <ul>
* <li>the buffer containing the content to send, via {@link #getByteBuffer()}</li>
* <li>a copy of the content buffer that can be used for notifications, via {@link #getContent()}</li>
* <li>whether the buffer to write is the last one, via {@link #isLast()}</li>
* </ul>
* {@link HttpContent} may not have content, if the related {@link ContentProvider} is {@code null}, and this
* is reflected by {@link #hasContent()}.
* <p />
* {@link HttpContent} may have {@link DeferredContentProvider deferred content}, in which case {@link #advance()}
* moves the cursor to a position that provides {@code null} {@link #getByteBuffer() buffer} and
* {@link #getContent() content}. When the deferred content is available, a further call to {@link #advance()}
* will move the cursor to a position that provides non {@code null} buffer and content.
*/
public class HttpContent
{
private static final ByteBuffer AFTER = ByteBuffer.allocate(0);
private final ContentProvider provider;
private final Iterator<ByteBuffer> iterator;
private ByteBuffer buffer;
private volatile ByteBuffer content;
public HttpContent(ContentProvider provider)
{
this(provider, provider == null ? Collections.<ByteBuffer>emptyIterator() : provider.iterator());
}
public HttpContent(HttpContent that)
{
this(that.provider, that.iterator);
this.buffer = that.buffer;
this.content = that.content;
}
private HttpContent(ContentProvider provider, Iterator<ByteBuffer> iterator)
{
this.provider = provider;
this.iterator = iterator;
this.iterator = provider == null ? Collections.<ByteBuffer>emptyIterator() : provider.iterator();
}
/**
* @return whether there is any content at all
*/
public boolean hasContent()
{
return provider != null;
}
/**
* @return whether the cursor points to the last content
*/
public boolean isLast()
{
return !iterator.hasNext();
}
/**
* @return the {@link ByteBuffer} containing the content at the cursor's position
*/
public ByteBuffer getByteBuffer()
{
return buffer;
}
/**
* @return a {@link ByteBuffer#slice()} of {@link #getByteBuffer()} at the cursor's position
*/
public ByteBuffer getContent()
{
return content;
}
/**
* Advances the cursor to the next block of content.
* <p />
* The next block of content may be valid (which yields a non-null buffer
* returned by {@link #getByteBuffer()}), but may also be deferred
* (which yields a null buffer returned by {@link #getByteBuffer()}).
* <p />
* If the block of content pointed by the new cursor position is valid, this method returns true.
*
* @return true if there is content at the new cursor's position, false otherwise.
*/
public boolean advance()
{
if (isLast())
{
if (content != null)
content = buffer = BufferUtil.EMPTY_BUFFER;
if (content != AFTER)
content = buffer = AFTER;
return false;
}
else
@ -86,4 +132,12 @@ public abstract class HttpContent implements Callback
return buffer != null;
}
}
/**
* @return whether the cursor has been advanced past the {@link #isLast() last} position.
*/
public boolean isConsumed()
{
return content == AFTER;
}
}

View File

@ -323,6 +323,7 @@ public abstract class HttpReceiver
protected void dispose()
{
responseState.set(ResponseState.FAILURE);
decoder = null;
}

View File

@ -29,25 +29,45 @@ import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* {@link HttpSender} abstracts the algorithm to send HTTP requests, so that subclasses only implement
* the transport-specific code to send requests over the wire.
* <p />
* {@link HttpSender} governs two state machines.
* <p />
* The request state machine is updated by {@link HttpSender} as the various steps of sending a request
* are executed, see {@link RequestState}.
* At any point in time, a user thread may abort the request, which may move the request state machine
* to {@link RequestState#FAILURE}. The request state machine guarantees that the request steps are
* executed only if the request has not been failed already.
* <p />
* The sender state machine is updated by {@link HttpSender} from three sources: deferred content notifications
* (via {@link #onContent()}), 100-continue notifications (via {@link #proceed(HttpExchange, boolean)})
* and normal request send (via {@link #sendContent(HttpExchange, HttpContent, Callback)}).
* This state machine must guarantee that the request sending is never executed concurrently: only one of
* those sources may trigger the call to {@link #sendContent(HttpExchange, HttpContent, Callback)}.
*/
public abstract class HttpSender implements AsyncContentProvider.Listener
{
protected static final Logger LOG = Log.getLogger(new Object(){}.getClass().getEnclosingClass());
private final AtomicReference<RequestState> requestState = new AtomicReference<>(RequestState.QUEUED);
private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
private final Callback commitCallback = new CommitCallback();
private final Callback contentCallback = new ContentCallback();
private final Callback lastCallback = new LastContentCallback();
private final HttpChannel channel;
private volatile HttpContent content;
public HttpSender(HttpChannel channel)
protected HttpSender(HttpChannel channel)
{
this.channel = channel;
}
public HttpChannel getHttpChannel()
protected HttpChannel getHttpChannel()
{
return channel;
}
@ -74,8 +94,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (updateSenderState(current, SenderState.SENDING))
{
LOG.debug("Deferred content available, idle -> sending");
HttpContent content = this.content;
content.advance();
sendContent(exchange, new ContentCallback(content));
sendContent(exchange, content, contentCallback);
return;
}
break;
@ -133,7 +154,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
throw new IllegalStateException();
ContentProvider contentProvider = request.getContent();
HttpContent content = this.content = new CommitCallback(contentProvider);
HttpContent content = this.content = new HttpContent(contentProvider);
// Setting the listener may trigger calls to onContent() by other
// threads so we must set it only after the sender state has been updated
@ -143,7 +164,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (!beginToHeaders(request))
return;
sendHeaders(exchange, content);
sendHeaders(exchange, content, commitCallback);
}
}
@ -311,15 +332,15 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return true;
}
protected abstract void sendHeaders(HttpExchange exchange, HttpContent content);
protected abstract void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback);
protected abstract void sendContent(HttpExchange exchange, HttpContent content);
protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
protected void reset()
{
content = null;
requestState.set(RequestState.QUEUED);
senderState.set(SenderState.IDLE);
content = null;
}
protected RequestState dispose()
@ -355,16 +376,17 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
case WAITING:
{
// We received the 100 Continue, send the content if any
// We received the 100 Continue, send the content if any.
// First update the sender state to be sure to be the one
// to call sendContent() since we race with onContent().
if (!updateSenderState(current, SenderState.SENDING))
break;
HttpContent content = this.content;
if (content.advance())
{
// There is content to send
LOG.debug("Proceed while waiting");
sendContent(exchange, new ContentCallback(content));
sendContent(exchange, content, contentCallback);
}
else
{
@ -413,7 +435,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return updated;
}
protected boolean updateSenderState(SenderState from, SenderState to)
private boolean updateSenderState(SenderState from, SenderState to)
{
boolean updated = senderState.compareAndSet(from, to);
if (!updated)
@ -446,23 +468,66 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
}
/**
* The request states {@link HttpSender} goes through when sending a request.
*/
protected enum RequestState
{
QUEUED, BEGIN, HEADERS, COMMIT, CONTENT, FAILURE
/**
* The request is queued, the initial state
*/
QUEUED,
/**
* The request has been dequeued
*/
BEGIN,
/**
* The request headers (and possibly some content) is about to be sent
*/
HEADERS,
/**
* The request headers (and possibly some content) have been sent
*/
COMMIT,
/**
* The request content is being sent
*/
CONTENT,
/**
* The request failed
*/
FAILURE
}
protected enum SenderState
/**
* The sender states {@link HttpSender} goes through when sending a request.
*/
private enum SenderState
{
IDLE, SENDING, EXPECTING, WAITING, SCHEDULED
/**
* {@link HttpSender} is not sending the request
*/
IDLE,
/**
* {@link HttpSender} is sending the request
*/
SENDING,
/**
* {@link HttpSender} is sending the headers but will wait for 100-Continue before sending the content
*/
EXPECTING,
/**
* {@link HttpSender} is waiting for 100-Continue
*/
WAITING,
/**
* {@link HttpSender} is currently sending the request, and deferred content is available to be sent
*/
SCHEDULED
}
private class CommitCallback extends HttpContent
private class CommitCallback implements Callback
{
private CommitCallback(ContentProvider contentProvider)
{
super(contentProvider);
}
@Override
public void succeeded()
{
@ -470,6 +535,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
process();
}
// Catch-all for runtime exceptions
catch (Exception x)
{
anyToFailure(x);
@ -486,7 +552,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (!headersToCommit(request))
return;
if (!hasContent())
HttpContent content = HttpSender.this.content;
if (!content.hasContent())
{
// No content to send, we are done.
someToSuccess(exchange);
@ -494,10 +562,10 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
else
{
// Was any content sent while committing ?
ByteBuffer content = getContent();
if (content != null)
ByteBuffer contentBuffer = content.getContent();
if (contentBuffer != null)
{
if (!someToContent(request, content))
if (!someToContent(request, contentBuffer))
return;
}
@ -509,15 +577,15 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
case SENDING:
{
// We have content to send ?
if (advance())
if (content.advance())
{
sendContent(exchange, new ContentCallback(this));
sendContent(exchange, content, contentCallback);
}
else
{
if (isLast())
if (content.isLast())
{
sendContent(exchange, new LastContentCallback(this));
sendContent(exchange, content, lastCallback);
}
else
{
@ -559,19 +627,77 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
}
private class ContentCallback extends HttpContent
private class ContentCallback extends IteratingCallback
{
private final IteratingCallback delegate = new Delegate(this);
public ContentCallback(HttpContent content)
@Override
protected boolean process() throws Exception
{
super(content);
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
Request request = exchange.getRequest();
HttpContent content = HttpSender.this.content;
ByteBuffer contentBuffer = content.getContent();
if (contentBuffer != null)
{
if (!someToContent(request, contentBuffer))
return false;
}
if (content.advance())
{
// There is more content to send
sendContent(exchange, content, this);
}
else
{
if (content.isLast())
{
sendContent(exchange, content, lastCallback);
}
else
{
while (true)
{
SenderState current = senderState.get();
switch (current)
{
case SENDING:
{
if (updateSenderState(current, SenderState.IDLE))
{
LOG.debug("Waiting for deferred content for {}", request);
return false;
}
break;
}
case SCHEDULED:
{
if (updateSenderState(current, SenderState.SENDING))
{
LOG.debug("Deferred content available for {}", request);
// TODO: this case is not covered by tests
sendContent(exchange, content, this);
return false;
}
break;
}
default:
{
throw new IllegalStateException();
}
}
}
}
}
return false;
}
@Override
public void succeeded()
protected void completed()
{
delegate.succeeded();
}
@Override
@ -579,88 +705,10 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
anyToFailure(failure);
}
private class Delegate extends IteratingNestedCallback
{
private Delegate(Callback callback)
{
super(callback);
}
@Override
protected boolean process() throws Exception
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
Request request = exchange.getRequest();
ByteBuffer contentBuffer = getContent();
if (contentBuffer != null)
{
if (!someToContent(request, contentBuffer))
return false;
}
if (advance())
{
// There is more content to send
sendContent(exchange, ContentCallback.this);
}
else
{
if (isLast())
{
sendContent(exchange, new LastContentCallback(ContentCallback.this));
}
else
{
while (true)
{
SenderState current = senderState.get();
switch (current)
{
case SENDING:
{
if (updateSenderState(current, SenderState.IDLE))
{
LOG.debug("Waiting for deferred content for {}", request);
return false;
}
break;
}
case SCHEDULED:
{
if (updateSenderState(current, SenderState.SENDING))
{
LOG.debug("Deferred content available for {}", request);
// TODO: this case is not covered by tests
sendContent(exchange, ContentCallback.this);
return false;
}
break;
}
default:
{
throw new IllegalStateException();
}
}
}
}
}
return false;
}
}
}
private class LastContentCallback extends HttpContent
private class LastContentCallback implements Callback
{
private LastContentCallback(HttpContent content)
{
super(content);
}
@Override
public void succeeded()
{

View File

@ -47,7 +47,7 @@ public class HttpSenderOverHTTP extends HttpSender
}
@Override
protected void sendHeaders(HttpExchange exchange, HttpContent content)
protected void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback)
{
Request request = exchange.getRequest();
ContentProvider requestContent = request.getContent();
@ -104,7 +104,7 @@ public class HttpSenderOverHTTP extends HttpSender
if (hasContent)
toWrite[toWrite.length - 1] = contentBuffer;
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
endPoint.write(new ByteBufferRecyclerCallback(content, bufferPool, toRecycle), toWrite);
endPoint.write(new ByteBufferRecyclerCallback(callback, bufferPool, toRecycle), toWrite);
return;
}
default:
@ -117,12 +117,12 @@ public class HttpSenderOverHTTP extends HttpSender
catch (Exception x)
{
LOG.debug(x);
content.failed(x);
callback.failed(x);
}
}
@Override
protected void sendContent(HttpExchange exchange, HttpContent content)
protected void sendContent(HttpExchange exchange, HttpContent content, Callback callback)
{
try
{
@ -145,9 +145,9 @@ public class HttpSenderOverHTTP extends HttpSender
{
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
if (chunk != null)
endPoint.write(new ByteBufferRecyclerCallback(content, bufferPool, chunk), chunk, contentBuffer);
endPoint.write(new ByteBufferRecyclerCallback(callback, bufferPool, chunk), chunk, contentBuffer);
else
endPoint.write(content, contentBuffer);
endPoint.write(callback, contentBuffer);
return;
}
case SHUTDOWN_OUT:
@ -162,7 +162,7 @@ public class HttpSenderOverHTTP extends HttpSender
case DONE:
{
assert generator.isEnd();
content.succeeded();
callback.succeeded();
return;
}
default:
@ -175,7 +175,7 @@ public class HttpSenderOverHTTP extends HttpSender
catch (Exception x)
{
LOG.debug(x);
content.failed(x);
callback.failed(x);
}
}

View File

@ -20,8 +20,6 @@ package org.eclipse.jetty.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
@ -468,18 +466,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
{
baseRequest.setHandled(true);
// Send 100-Continue and echo the content
// IO.copy(request.getInputStream(), response.getOutputStream());
// TODO: DEBUG CODE, REMOVE WHEN FIXED
InputStream input = request.getInputStream();
OutputStream output = response.getOutputStream();
while (true)
{
int value = input.read();
Log.getLogger(getClass()).info("copy {}", value);
if (value < 0)
break;
output.write(value);
}
IO.copy(request.getInputStream(), response.getOutputStream());
}
});