398467 Non Blocking IO

Working towards 3.1 by exposing more of the underlying async IO operations.
This commit is contained in:
Greg Wilkins 2013-05-10 09:02:52 +10:00
parent bec2fc576d
commit ccc7a71c74
4 changed files with 184 additions and 149 deletions

View File

@ -198,7 +198,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
throw new IOException("Committed before 100 Continues"); throw new IOException("Committed before 100 Continues");
// TODO: break this dependency with HttpGenerator // TODO: break this dependency with HttpGenerator
boolean committed = commitResponse(HttpGenerator.CONTINUE_100_INFO, null, false); boolean committed = sendResponse(HttpGenerator.CONTINUE_100_INFO, null, false);
if (!committed) if (!committed)
throw new IOException("Concurrent commit while trying to send 100-Continue"); throw new IOException("Concurrent commit while trying to send 100-Continue");
} }
@ -356,7 +356,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
{ {
HttpFields fields = new HttpFields(); HttpFields fields = new HttpFields();
ResponseInfo info = new ResponseInfo(_request.getHttpVersion(), fields, 0, HttpStatus.INTERNAL_SERVER_ERROR_500, null, _request.isHead()); ResponseInfo info = new ResponseInfo(_request.getHttpVersion(), fields, 0, HttpStatus.INTERNAL_SERVER_ERROR_500, null, _request.isHead());
boolean committed = commitResponse(info, null, true); boolean committed = sendResponse(info, null, true);
if (!committed) if (!committed)
LOG.warn("Could not send response error 500: "+x); LOG.warn("Could not send response error 500: "+x);
} }
@ -584,7 +584,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
{ {
if (_state.handling()==Next.CONTINUE) if (_state.handling()==Next.CONTINUE)
{ {
commitResponse(new ResponseInfo(HttpVersion.HTTP_1_1,new HttpFields(),0,status,reason,false),null,true); sendResponse(new ResponseInfo(HttpVersion.HTTP_1_1,new HttpFields(),0,status,reason,false),null,true);
} }
} }
catch (IOException e) catch (IOException e)
@ -598,11 +598,15 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
} }
} }
protected boolean commitResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException
{ {
boolean committed = _committed.compareAndSet(false, true); boolean committing = _committed.compareAndSet(false, true);
if (committed) if (committing)
{ {
// We need an info to commit
if (info==null)
info = _response.newResponseInfo();
try try
{ {
// Try to commit with the passed info // Try to commit with the passed info
@ -637,7 +641,12 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
_response.getHttpOutput().closed(); _response.getHttpOutput().closed();
} }
} }
return committed; else if (info==null)
{
// This is a normal write
_transport.send(null, content, complete);
}
return committing;
} }
protected boolean isCommitted() protected boolean isCommitted()
@ -655,17 +664,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
*/ */
protected void write(ByteBuffer content, boolean complete) throws IOException protected void write(ByteBuffer content, boolean complete) throws IOException
{ {
if (isCommitted()) sendResponse(null, content, complete);
{
_transport.send(null, content, complete);
}
else
{
ResponseInfo info = _response.newResponseInfo();
boolean committed = commitResponse(info, content, complete);
if (!committed)
throw new IOException("Concurrent commit");
}
} }
protected void execute(Runnable task) protected void execute(Runnable task)

View File

@ -51,7 +51,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{ {
public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE"; public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
private static final boolean REQUEST_BUFFER_DIRECT=false; private static final boolean REQUEST_BUFFER_DIRECT=false;
private static final boolean HEADER_BUFFER_DIRECT=true; private static final boolean HEADER_BUFFER_DIRECT=false;
private static final boolean CHUNK_BUFFER_DIRECT=false; private static final boolean CHUNK_BUFFER_DIRECT=false;
private static final Logger LOG = Log.getLogger(HttpConnection.class); private static final Logger LOG = Log.getLogger(HttpConnection.class);
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>(); private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
@ -307,148 +307,45 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override @Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
{
try
{ {
// If we are still expecting a 100 continues // If we are still expecting a 100 continues
if (_channel.isExpecting100Continue()) if (info !=null && _channel.isExpecting100Continue())
// then we can't be persistent // then we can't be persistent
_generator.setPersistent(false); _generator.setPersistent(false);
Sender sender = new Sender(content,lastContent,_writeBlocker);
sender.process(info);
ByteBuffer header = null; _writeBlocker.block();
ByteBuffer chunk = null;
out: while (true)
{
HttpGenerator.Result result = _generator.generateResponse(info, header, chunk, content, lastContent);
if (LOG.isDebugEnabled())
LOG.debug("{} generate: {} ({},{},{})@{}",
this,
result,
BufferUtil.toSummaryString(header),
BufferUtil.toSummaryString(content),
lastContent,
_generator.getState());
switch (result)
{
case NEED_HEADER:
{
if (lastContent && content!=null && BufferUtil.space(content)>_config.getResponseHeaderSize() && content.hasArray() )
{
// use spare space in content buffer for header buffer
int p=content.position();
int l=content.limit();
content.position(l);
content.limit(l+_config.getResponseHeaderSize());
header=content.slice();
header.limit(0);
content.position(p);
content.limit(l);
} }
else catch (InterruptedException x)
header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
continue;
}
case NEED_CHUNK:
{ {
chunk = _chunk; x.printStackTrace();
if (chunk==null) throw (IOException)new InterruptedIOException().initCause(x);
chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
continue;
} }
case FLUSH: catch (TimeoutException e)
{ {
// Don't write the chunk or the content if this is a HEAD response e.printStackTrace();
if (_channel.getRequest().isHead()) throw new IOException(e);
}
catch (ClosedChannelException e)
{ {
BufferUtil.clear(chunk); e.printStackTrace();
BufferUtil.clear(content); throw new EofException(e);
}
// If we have a header
if (BufferUtil.hasContent(header))
{
// we know there will not be a chunk, so write either header+content or just the header
if (BufferUtil.hasContent(content))
blockingWrite(header, content);
else
blockingWrite(header);
}
else if (BufferUtil.hasContent(chunk))
{
if (BufferUtil.hasContent(content))
blockingWrite(chunk,content);
else
blockingWrite(chunk);
}
else if (BufferUtil.hasContent(content))
{
blockingWrite(content);
}
continue;
}
case SHUTDOWN_OUT:
{
getEndPoint().shutdownOutput();
continue;
}
case DONE:
{
if (header!=null)
{
// don't release header in spare content buffer
if (!lastContent || content==null || !content.hasArray() || !header.hasArray() || content.array()!=header.array())
_bufferPool.release(header);
}
if (chunk!=null)
_bufferPool.release(chunk);
break out;
}
case CONTINUE:
{
break;
}
default:
{
throw new IllegalStateException("generateResponse="+result);
}
}
} }
} }
@Override @Override
public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
{ {
try // If we are still expecting a 100 continues
{ if (info !=null && _channel.isExpecting100Continue())
send(info,content,lastContent); // then we can't be persistent
callback.succeeded(); _generator.setPersistent(false);
}
catch (IOException e)
{
callback.failed(e);
}
}
private void blockingWrite(ByteBuffer... bytes) throws IOException new Sender(content,lastContent,callback).process(info);
{
try
{
getEndPoint().write(_writeBlocker, bytes);
_writeBlocker.block();
}
catch (InterruptedException x)
{
throw (IOException)new InterruptedIOException().initCause(x);
}
catch (TimeoutException e)
{
throw new IOException(e);
}
catch (ClosedChannelException e)
{
throw new EofException(e);
}
} }
@Override @Override
@ -692,5 +589,144 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
private class Sender implements Callback
{
final ByteBuffer _content;
final boolean _lastContent;
final Callback _callback;
Sender(ByteBuffer content, boolean last, Callback callback)
{
_callback=callback;
_content=content;
_lastContent=last;
}
public void process(ResponseInfo info)
{
try
{
ByteBuffer header = null;
ByteBuffer chunk = null;
while (true)
{
HttpGenerator.Result result = _generator.generateResponse(info, header, chunk, _content, _lastContent);
if (LOG.isDebugEnabled())
LOG.debug("{} generate: {} ({},{},{})@{}",
this,
result,
BufferUtil.toSummaryString(header),
BufferUtil.toSummaryString(_content),
_lastContent,
_generator.getState());
switch (result)
{
case NEED_HEADER:
{
if (_lastContent && _content!=null && BufferUtil.space(_content)>_config.getResponseHeaderSize() && _content.hasArray() )
{
// use spare space in content buffer for header buffer
int p=_content.position();
int l=_content.limit();
_content.position(l);
_content.limit(l+_config.getResponseHeaderSize());
header=_content.slice();
header.limit(0);
_content.position(p);
_content.limit(l);
}
else
header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
continue;
}
case NEED_CHUNK:
{
chunk = _chunk;
if (chunk==null)
chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
continue;
}
case FLUSH:
{
// Don't write the chunk or the content if this is a HEAD response
if (_channel.getRequest().isHead())
{
BufferUtil.clear(chunk);
BufferUtil.clear(_content);
}
// If we have a header
if (BufferUtil.hasContent(header))
{
// we know there will not be a chunk, so write either header+content or just the header
if (BufferUtil.hasContent(_content))
getEndPoint().write(this, header, _content);
else
getEndPoint().write(this, header);
}
else if (BufferUtil.hasContent(chunk))
{
if (BufferUtil.hasContent(_content))
getEndPoint().write(this, chunk, _content);
else
getEndPoint().write(this, chunk);
}
else if (BufferUtil.hasContent(_content))
{
getEndPoint().write(this, _content);
}
else
continue;
return;
}
case SHUTDOWN_OUT:
{
getEndPoint().shutdownOutput();
continue;
}
case DONE:
{
if (header!=null)
{
// don't release header in spare content buffer
if (!_lastContent || _content==null || !_content.hasArray() || !header.hasArray() || _content.array()!=header.array())
_bufferPool.release(header);
}
if (chunk!=null)
_bufferPool.release(chunk);
_callback.succeeded();
return;
}
case CONTINUE:
{
break;
}
default:
{
throw new IllegalStateException("generateResponse="+result);
}
}
}
}
catch(Exception e)
{
_callback.failed(e);
}
}
@Override
public void succeeded()
{
process(null);
}
@Override
public void failed(Throwable x)
{
_callback.failed(x);
}
}
} }

View File

@ -425,7 +425,7 @@ public class Response implements HttpServletResponse
{ {
if (_channel.isExpecting102Processing() && !isCommitted()) if (_channel.isExpecting102Processing() && !isCommitted())
{ {
_channel.commitResponse(HttpGenerator.PROGRESS_102_INFO, null, true); _channel.sendResponse(HttpGenerator.PROGRESS_102_INFO, null, true);
} }
} }

View File

@ -158,12 +158,12 @@ public class HttpConnectionTest
@Test @Test
public void testHead() throws Exception public void testHead() throws Exception
{ {
String responseHEAD=connector.getResponses("HEAD /R1 HTTP/1.1\015\012"+ String responsePOST=connector.getResponses("POST /R1 HTTP/1.1\015\012"+
"Host: localhost\015\012"+ "Host: localhost\015\012"+
"Connection: close\015\012"+ "Connection: close\015\012"+
"\015\012"); "\015\012");
String responsePOST=connector.getResponses("POST /R1 HTTP/1.1\015\012"+ String responseHEAD=connector.getResponses("HEAD /R1 HTTP/1.1\015\012"+
"Host: localhost\015\012"+ "Host: localhost\015\012"+
"Connection: close\015\012"+ "Connection: close\015\012"+
"\015\012"); "\015\012");