From ccc7a71c74827ecf95a1ec76e9c326dbb34c764f Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 10 May 2013 09:02:52 +1000 Subject: [PATCH] 398467 Non Blocking IO Working towards 3.1 by exposing more of the underlying async IO operations. --- .../org/eclipse/jetty/server/HttpChannel.java | 35 ++- .../eclipse/jetty/server/HttpConnection.java | 286 ++++++++++-------- .../org/eclipse/jetty/server/Response.java | 2 +- .../jetty/server/HttpConnectionTest.java | 10 +- 4 files changed, 184 insertions(+), 149 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 278601da3c6..d1e5b501670 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -198,7 +198,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable throw new IOException("Committed before 100 Continues"); // TODO: break this dependency with HttpGenerator - boolean committed = commitResponse(HttpGenerator.CONTINUE_100_INFO, null, false); + boolean committed = sendResponse(HttpGenerator.CONTINUE_100_INFO, null, false); if (!committed) throw new IOException("Concurrent commit while trying to send 100-Continue"); } @@ -356,7 +356,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable { HttpFields fields = new HttpFields(); ResponseInfo info = new ResponseInfo(_request.getHttpVersion(), fields, 0, HttpStatus.INTERNAL_SERVER_ERROR_500, null, _request.isHead()); - boolean committed = commitResponse(info, null, true); + boolean committed = sendResponse(info, null, true); if (!committed) LOG.warn("Could not send response error 500: "+x); } @@ -584,7 +584,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable { if (_state.handling()==Next.CONTINUE) { - commitResponse(new ResponseInfo(HttpVersion.HTTP_1_1,new HttpFields(),0,status,reason,false),null,true); + sendResponse(new ResponseInfo(HttpVersion.HTTP_1_1,new HttpFields(),0,status,reason,false),null,true); } } catch (IOException e) @@ -598,11 +598,15 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable } } - protected boolean commitResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException + protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException { - boolean committed = _committed.compareAndSet(false, true); - if (committed) + boolean committing = _committed.compareAndSet(false, true); + if (committing) { + // We need an info to commit + if (info==null) + info = _response.newResponseInfo(); + try { // Try to commit with the passed info @@ -637,7 +641,12 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable _response.getHttpOutput().closed(); } } - return committed; + else if (info==null) + { + // This is a normal write + _transport.send(null, content, complete); + } + return committing; } protected boolean isCommitted() @@ -655,17 +664,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable */ protected void write(ByteBuffer content, boolean complete) throws IOException { - if (isCommitted()) - { - _transport.send(null, content, complete); - } - else - { - ResponseInfo info = _response.newResponseInfo(); - boolean committed = commitResponse(info, content, complete); - if (!committed) - throw new IOException("Concurrent commit"); - } + sendResponse(null, content, complete); } protected void execute(Runnable task) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 7029fd40b41..ab5708ddc82 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -51,7 +51,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http { public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE"; private static final boolean REQUEST_BUFFER_DIRECT=false; - private static final boolean HEADER_BUFFER_DIRECT=true; + private static final boolean HEADER_BUFFER_DIRECT=false; private static final boolean CHUNK_BUFFER_DIRECT=false; private static final Logger LOG = Log.getLogger(HttpConnection.class); private static final ThreadLocal __currentConnection = new ThreadLocal<>(); @@ -307,149 +307,46 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http @Override public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException - { - // If we are still expecting a 100 continues - if (_channel.isExpecting100Continue()) - // then we can't be persistent - _generator.setPersistent(false); - - - ByteBuffer header = null; - ByteBuffer chunk = null; - out: while (true) - { - HttpGenerator.Result result = _generator.generateResponse(info, header, chunk, content, lastContent); - if (LOG.isDebugEnabled()) - LOG.debug("{} generate: {} ({},{},{})@{}", - this, - result, - BufferUtil.toSummaryString(header), - BufferUtil.toSummaryString(content), - lastContent, - _generator.getState()); - - switch (result) - { - case NEED_HEADER: - { - if (lastContent && content!=null && BufferUtil.space(content)>_config.getResponseHeaderSize() && content.hasArray() ) - { - // use spare space in content buffer for header buffer - int p=content.position(); - int l=content.limit(); - content.position(l); - content.limit(l+_config.getResponseHeaderSize()); - header=content.slice(); - header.limit(0); - content.position(p); - content.limit(l); - } - else - header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT); - continue; - } - case NEED_CHUNK: - { - chunk = _chunk; - if (chunk==null) - chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT); - continue; - } - case FLUSH: - { - // Don't write the chunk or the content if this is a HEAD response - if (_channel.getRequest().isHead()) - { - BufferUtil.clear(chunk); - BufferUtil.clear(content); - } - - // If we have a header - if (BufferUtil.hasContent(header)) - { - // we know there will not be a chunk, so write either header+content or just the header - if (BufferUtil.hasContent(content)) - blockingWrite(header, content); - else - blockingWrite(header); - - } - else if (BufferUtil.hasContent(chunk)) - { - if (BufferUtil.hasContent(content)) - blockingWrite(chunk,content); - else - blockingWrite(chunk); - } - else if (BufferUtil.hasContent(content)) - { - blockingWrite(content); - } - continue; - } - case SHUTDOWN_OUT: - { - getEndPoint().shutdownOutput(); - continue; - } - case DONE: - { - if (header!=null) - { - // don't release header in spare content buffer - if (!lastContent || content==null || !content.hasArray() || !header.hasArray() || content.array()!=header.array()) - _bufferPool.release(header); - } - if (chunk!=null) - _bufferPool.release(chunk); - break out; - } - case CONTINUE: - { - break; - } - default: - { - throw new IllegalStateException("generateResponse="+result); - } - } - } - } - - @Override - public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) { try { - send(info,content,lastContent); - callback.succeeded(); - } - catch (IOException e) - { - callback.failed(e); - } - } + // If we are still expecting a 100 continues + if (info !=null && _channel.isExpecting100Continue()) + // then we can't be persistent + _generator.setPersistent(false); + + Sender sender = new Sender(content,lastContent,_writeBlocker); + sender.process(info); - private void blockingWrite(ByteBuffer... bytes) throws IOException - { - try - { - getEndPoint().write(_writeBlocker, bytes); _writeBlocker.block(); } catch (InterruptedException x) { + x.printStackTrace(); throw (IOException)new InterruptedIOException().initCause(x); } catch (TimeoutException e) { + e.printStackTrace(); throw new IOException(e); } catch (ClosedChannelException e) { + e.printStackTrace(); throw new EofException(e); } } + + @Override + public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) + { + // If we are still expecting a 100 continues + if (info !=null && _channel.isExpecting100Continue()) + // then we can't be persistent + _generator.setPersistent(false); + + new Sender(content,lastContent,callback).process(info); + } @Override public void completed() @@ -692,5 +589,144 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } + private class Sender implements Callback + { + final ByteBuffer _content; + final boolean _lastContent; + final Callback _callback; + + Sender(ByteBuffer content, boolean last, Callback callback) + { + _callback=callback; + _content=content; + _lastContent=last; + } + + public void process(ResponseInfo info) + { + try + { + ByteBuffer header = null; + ByteBuffer chunk = null; + while (true) + { + HttpGenerator.Result result = _generator.generateResponse(info, header, chunk, _content, _lastContent); + if (LOG.isDebugEnabled()) + LOG.debug("{} generate: {} ({},{},{})@{}", + this, + result, + BufferUtil.toSummaryString(header), + BufferUtil.toSummaryString(_content), + _lastContent, + _generator.getState()); + + switch (result) + { + case NEED_HEADER: + { + if (_lastContent && _content!=null && BufferUtil.space(_content)>_config.getResponseHeaderSize() && _content.hasArray() ) + { + // use spare space in content buffer for header buffer + int p=_content.position(); + int l=_content.limit(); + _content.position(l); + _content.limit(l+_config.getResponseHeaderSize()); + header=_content.slice(); + header.limit(0); + _content.position(p); + _content.limit(l); + } + else + header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT); + continue; + } + case NEED_CHUNK: + { + chunk = _chunk; + if (chunk==null) + chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT); + continue; + } + case FLUSH: + { + // Don't write the chunk or the content if this is a HEAD response + if (_channel.getRequest().isHead()) + { + BufferUtil.clear(chunk); + BufferUtil.clear(_content); + } + + // If we have a header + if (BufferUtil.hasContent(header)) + { + // we know there will not be a chunk, so write either header+content or just the header + if (BufferUtil.hasContent(_content)) + getEndPoint().write(this, header, _content); + else + getEndPoint().write(this, header); + } + else if (BufferUtil.hasContent(chunk)) + { + if (BufferUtil.hasContent(_content)) + getEndPoint().write(this, chunk, _content); + else + getEndPoint().write(this, chunk); + } + else if (BufferUtil.hasContent(_content)) + { + getEndPoint().write(this, _content); + } + else + continue; + return; + } + case SHUTDOWN_OUT: + { + getEndPoint().shutdownOutput(); + continue; + } + case DONE: + { + if (header!=null) + { + // don't release header in spare content buffer + if (!_lastContent || _content==null || !_content.hasArray() || !header.hasArray() || _content.array()!=header.array()) + _bufferPool.release(header); + } + if (chunk!=null) + _bufferPool.release(chunk); + _callback.succeeded(); + return; + } + case CONTINUE: + { + break; + } + default: + { + throw new IllegalStateException("generateResponse="+result); + } + } + } + } + catch(Exception e) + { + _callback.failed(e); + } + } + + @Override + public void succeeded() + { + process(null); + } + + @Override + public void failed(Throwable x) + { + _callback.failed(x); + } + + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java index d18366acec1..0ddd3270e22 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java @@ -425,7 +425,7 @@ public class Response implements HttpServletResponse { if (_channel.isExpecting102Processing() && !isCommitted()) { - _channel.commitResponse(HttpGenerator.PROGRESS_102_INFO, null, true); + _channel.sendResponse(HttpGenerator.PROGRESS_102_INFO, null, true); } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpConnectionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpConnectionTest.java index d0c8d3f2b3b..5869d352ebd 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpConnectionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpConnectionTest.java @@ -158,15 +158,15 @@ public class HttpConnectionTest @Test public void testHead() throws Exception { - String responseHEAD=connector.getResponses("HEAD /R1 HTTP/1.1\015\012"+ - "Host: localhost\015\012"+ - "Connection: close\015\012"+ - "\015\012"); - String responsePOST=connector.getResponses("POST /R1 HTTP/1.1\015\012"+ "Host: localhost\015\012"+ "Connection: close\015\012"+ "\015\012"); + + String responseHEAD=connector.getResponses("HEAD /R1 HTTP/1.1\015\012"+ + "Host: localhost\015\012"+ + "Connection: close\015\012"+ + "\015\012"); assertThat(responsePOST,startsWith(responseHEAD.substring(0,responseHEAD.length()-2))); assertThat(responsePOST.length(),greaterThan(responseHEAD.length()));