398467 Servlet 3.1 Non Blocking IO

More refinements to avoid duplicate code and optimise common paths
This commit is contained in:
Greg Wilkins 2013-05-14 17:52:46 +10:00
parent 19d9febfbc
commit 4dd80e9128
7 changed files with 235 additions and 80 deletions

View File

@ -616,57 +616,15 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
// wrap callback to process 100 or 500 responses
final int status=info.getStatus();
final Callback committed = new Callback()
{
@Override
public void succeeded()
{
// If we are committing a 1xx response, we need to reset the commit
// status so that the "real" response can be committed again.
if (status<200 && status>=100)
_committed.set(false);
callback.succeeded();
}
@Override
public void failed(final Throwable x)
{
if (x instanceof EofException)
{
LOG.debug(x);
_response.getHttpOutput().closed();
callback.failed(x);
}
else
{
LOG.warn(x);
_transport.send(HttpGenerator.RESPONSE_500_INFO,null,true,new Callback()
{
@Override
public void succeeded()
{
_response.getHttpOutput().closed();
callback.failed(x);
}
@Override
public void failed(Throwable th)
{
LOG.ignore(th);
_response.getHttpOutput().closed();
callback.failed(x);
}
});
}
}
};
final Callback committed = (status<200&&status>=100)?new Commit100Callback(callback):new CommitCallback(callback);
// committing write
_transport.send(info, content, complete, committed);
}
else if (info==null)
{
// This is a normal write
_transport.send(null, content, complete, callback);
_transport.send(content, complete, callback);
}
else
{
@ -747,4 +705,68 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
{
return getEndPoint() instanceof ChannelEndPoint;
}
private class CommitCallback implements Callback
{
private final Callback _callback;
private CommitCallback(Callback callback)
{
_callback = callback;
}
@Override
public void succeeded()
{
_callback.succeeded();
}
@Override
public void failed(final Throwable x)
{
if (x instanceof EofException)
{
LOG.debug(x);
_response.getHttpOutput().closed();
_callback.failed(x);
}
else
{
LOG.warn(x);
_transport.send(HttpGenerator.RESPONSE_500_INFO,null,true,new Callback()
{
@Override
public void succeeded()
{
_response.getHttpOutput().closed();
_callback.failed(x);
}
@Override
public void failed(Throwable th)
{
LOG.ignore(th);
_response.getHttpOutput().closed();
_callback.failed(x);
}
});
}
}
}
private class Commit100Callback extends CommitCallback
{
private Commit100Callback(Callback callback)
{
super(callback);
}
@Override
public void succeeded()
{
_committed.set(false);
super.succeeded();
}
}
}

View File

@ -305,19 +305,21 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
onFillable();
}
@Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
{
try
{
// If we are still expecting a 100 continues
if (info !=null && _channel.isExpecting100Continue())
// then we can't be persistent
_generator.setPersistent(false);
new Sender(info,content,lastContent,_writeBlocker).iterate();
if (info==null)
new ContentCallback(content,lastContent,_writeBlocker).iterate();
else
{
// If we are still expecting a 100 continues
if (_channel.isExpecting100Continue())
// then we can't be persistent
_generator.setPersistent(false);
new CommitCallback(info,content,lastContent,_writeBlocker).iterate();
}
_writeBlocker.block();
}
catch (InterruptedException x)
@ -340,14 +342,24 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override
public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
{
// If we are still expecting a 100 continues
if (info !=null && _channel.isExpecting100Continue())
// then we can't be persistent
_generator.setPersistent(false);
new Sender(info,content,lastContent,callback).iterate();
if (info==null)
new ContentCallback(content,lastContent,callback).iterate();
else
{
// If we are still expecting a 100 continues
if (_channel.isExpecting100Continue())
// then we can't be persistent
_generator.setPersistent(false);
new CommitCallback(info,content,lastContent,callback).iterate();
}
}
@Override
public void send(ByteBuffer content, boolean lastContent, Callback callback)
{
new ContentCallback(content,lastContent,callback).iterate();
}
@Override
public void completed()
{
@ -588,13 +600,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
private class Sender extends IteratingCallback
private class CommitCallback extends IteratingCallback
{
final ByteBuffer _content;
final boolean _lastContent;
final ResponseInfo _info;
ByteBuffer _header;
Sender(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
CommitCallback(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
{
super(callback);
_info=info;
@ -605,16 +618,15 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override
public boolean process() throws Exception
{
ByteBuffer header = null;
ByteBuffer chunk = null;
ByteBuffer chunk = _chunk;
while (true)
{
HttpGenerator.Result result = _generator.generateResponse(_info, header, chunk, _content, _lastContent);
HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent);
if (LOG.isDebugEnabled())
LOG.debug("{} generate: {} ({},{},{})@{}",
this,
result,
BufferUtil.toSummaryString(header),
BufferUtil.toSummaryString(_header),
BufferUtil.toSummaryString(_content),
_lastContent,
_generator.getState());
@ -630,20 +642,18 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
int l=_content.limit();
_content.position(l);
_content.limit(l+_config.getResponseHeaderSize());
header=_content.slice();
header.limit(0);
_header=_content.slice();
_header.limit(0);
_content.position(p);
_content.limit(l);
}
else
header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
_header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
continue;
}
case NEED_CHUNK:
{
chunk = _chunk;
if (chunk==null)
chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
continue;
}
case FLUSH:
@ -656,13 +666,17 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
// If we have a header
if (BufferUtil.hasContent(header))
if (BufferUtil.hasContent(_header))
{
// we know there will not be a chunk, so write either header+content or just the header
if (BufferUtil.hasContent(_content))
getEndPoint().write(this, header, _content);
{
if (BufferUtil.hasContent(chunk))
getEndPoint().write(this, _header, chunk, _content);
else
getEndPoint().write(this, _header, _content);
}
else
getEndPoint().write(this, header);
getEndPoint().write(this, _header);
}
else if (BufferUtil.hasContent(chunk))
{
@ -686,14 +700,94 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
case DONE:
{
if (header!=null)
if (_header!=null)
{
// don't release header in spare content buffer
if (!_lastContent || _content==null || !_content.hasArray() || !header.hasArray() || _content.array()!=header.array())
_bufferPool.release(header);
if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array())
_bufferPool.release(_header);
}
if (chunk!=null)
_bufferPool.release(chunk);
return true;
}
case CONTINUE:
{
break;
}
default:
{
throw new IllegalStateException("generateResponse="+result);
}
}
}
}
}
private class ContentCallback extends IteratingCallback
{
final ByteBuffer _content;
final boolean _lastContent;
ContentCallback(ByteBuffer content, boolean last, Callback callback)
{
super(callback);
_content=content;
_lastContent=last;
}
@Override
public boolean process() throws Exception
{
ByteBuffer chunk = _chunk;
while (true)
{
HttpGenerator.Result result = _generator.generateResponse(null, null, chunk, _content, _lastContent);
if (LOG.isDebugEnabled())
LOG.debug("{} generate: {} ({},{})@{}",
this,
result,
BufferUtil.toSummaryString(_content),
_lastContent,
_generator.getState());
switch (result)
{
case NEED_HEADER:
throw new IllegalStateException();
case NEED_CHUNK:
{
chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
continue;
}
case FLUSH:
{
// Don't write the chunk or the content if this is a HEAD response
if (_channel.getRequest().isHead())
{
BufferUtil.clear(chunk);
BufferUtil.clear(_content);
continue;
}
else if (BufferUtil.hasContent(chunk))
{
if (BufferUtil.hasContent(_content))
getEndPoint().write(this, chunk, _content);
else
getEndPoint().write(this, chunk);
}
else if (BufferUtil.hasContent(_content))
{
getEndPoint().write(this, _content);
}
else
continue;
return false;
}
case SHUTDOWN_OUT:
{
getEndPoint().shutdownOutput();
continue;
}
case DONE:
{
return true;
}
case CONTINUE:

View File

@ -26,9 +26,12 @@ import org.eclipse.jetty.util.Callback;
public interface HttpTransport
{
@Deprecated
void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException;
void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback);
void send(ByteBuffer content, boolean lastContent, Callback callback);
void completed();
}

View File

@ -29,6 +29,8 @@ import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Locale;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.Cookie;
@ -46,6 +48,7 @@ import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.session.HashSessionIdManager;
import org.eclipse.jetty.server.session.HashSessionManager;
import org.eclipse.jetty.server.session.HashedSession;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.TimerScheduler;
@ -84,6 +87,16 @@ public class ResponseTest
@Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
{
BlockingCallback cb = new BlockingCallback();
send(info,content,lastContent,cb);
try
{
cb.block();
}
catch (InterruptedException | TimeoutException e)
{
throw new IOException(e);
}
}
@Override
@ -91,6 +104,12 @@ public class ResponseTest
{
callback.succeeded();
}
@Override
public void send(ByteBuffer responseBodyContent, boolean lastContent, Callback callback)
{
send(null,responseBodyContent, lastContent, callback);
}
@Override
public void completed()

View File

@ -86,6 +86,14 @@ public class HttpTransportOverSPDY implements HttpTransport
return requestHeaders;
}
@Override
public void send(ByteBuffer responseBodyContent, boolean lastContent, Callback callback)
{
// TODO can this be more efficient?
send(null,responseBodyContent, lastContent, callback);
}
@Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
{

View File

@ -264,6 +264,8 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
long contentLength = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString());
HttpGenerator.ResponseInfo info = new HttpGenerator.ResponseInfo(httpVersion, fields, contentLength, code,
reason, false);
// TODO use the async send
send(info, null, replyInfo.isClose());
if (replyInfo.isClose())
@ -300,6 +302,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
// Data buffer must be copied, as the ByteBuffer is pooled
ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
// TODO use the async send with callback!
send(null, byteBuffer, dataInfo.isClose());
if (dataInfo.isClose())

View File

@ -87,4 +87,10 @@ public class HttpTransportOverMux implements HttpTransport
// prepare the AddChannelResponse
// TODO: look at HttpSender in jetty-client for generator loop logic
}
@Override
public void send(ByteBuffer responseBodyContent, boolean lastContent, Callback callback)
{
send(null,responseBodyContent, lastContent, callback);
}
}