From 4dd80e91284123b31db835dcc4c86bb12423ed8c Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 14 May 2013 17:52:46 +1000 Subject: [PATCH] 398467 Servlet 3.1 Non Blocking IO More refinements to avoid duplicate code and optimise common paths --- .../org/eclipse/jetty/server/HttpChannel.java | 112 +++++++----- .../eclipse/jetty/server/HttpConnection.java | 164 ++++++++++++++---- .../eclipse/jetty/server/HttpTransport.java | 3 + .../eclipse/jetty/server/ResponseTest.java | 19 ++ .../server/http/HttpTransportOverSPDY.java | 8 + .../server/proxy/ProxyHTTPSPDYConnection.java | 3 + .../server/mux/HttpTransportOverMux.java | 6 + 7 files changed, 235 insertions(+), 80 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 736ac14c58c..486ec256473 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -616,57 +616,15 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable // wrap callback to process 100 or 500 responses final int status=info.getStatus(); - final Callback committed = new Callback() - { - @Override - public void succeeded() - { - // If we are committing a 1xx response, we need to reset the commit - // status so that the "real" response can be committed again. - if (status<200 && status>=100) - _committed.set(false); - callback.succeeded(); - } - - @Override - public void failed(final Throwable x) - { - if (x instanceof EofException) - { - LOG.debug(x); - _response.getHttpOutput().closed(); - callback.failed(x); - } - else - { - LOG.warn(x); - _transport.send(HttpGenerator.RESPONSE_500_INFO,null,true,new Callback() - { - @Override - public void succeeded() - { - _response.getHttpOutput().closed(); - callback.failed(x); - } - - @Override - public void failed(Throwable th) - { - LOG.ignore(th); - _response.getHttpOutput().closed(); - callback.failed(x); - } - }); - } - } - }; + final Callback committed = (status<200&&status>=100)?new Commit100Callback(callback):new CommitCallback(callback); + // committing write _transport.send(info, content, complete, committed); } else if (info==null) { // This is a normal write - _transport.send(null, content, complete, callback); + _transport.send(content, complete, callback); } else { @@ -747,4 +705,68 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable { return getEndPoint() instanceof ChannelEndPoint; } + + private class CommitCallback implements Callback + { + private final Callback _callback; + + private CommitCallback(Callback callback) + { + _callback = callback; + } + + @Override + public void succeeded() + { + _callback.succeeded(); + } + + @Override + public void failed(final Throwable x) + { + if (x instanceof EofException) + { + LOG.debug(x); + _response.getHttpOutput().closed(); + _callback.failed(x); + } + else + { + LOG.warn(x); + _transport.send(HttpGenerator.RESPONSE_500_INFO,null,true,new Callback() + { + @Override + public void succeeded() + { + _response.getHttpOutput().closed(); + _callback.failed(x); + } + + @Override + public void failed(Throwable th) + { + LOG.ignore(th); + _response.getHttpOutput().closed(); + _callback.failed(x); + } + }); + } + } + } + + private class Commit100Callback extends CommitCallback + { + private Commit100Callback(Callback callback) + { + super(callback); + } + + @Override + public void succeeded() + { + _committed.set(false); + super.succeeded(); + } + + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 098837f6314..04085f462e1 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -305,19 +305,21 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http onFillable(); } - @Override public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException { try { - // If we are still expecting a 100 continues - if (info !=null && _channel.isExpecting100Continue()) - // then we can't be persistent - _generator.setPersistent(false); - - new Sender(info,content,lastContent,_writeBlocker).iterate(); - + if (info==null) + new ContentCallback(content,lastContent,_writeBlocker).iterate(); + else + { + // If we are still expecting a 100 continues + if (_channel.isExpecting100Continue()) + // then we can't be persistent + _generator.setPersistent(false); + new CommitCallback(info,content,lastContent,_writeBlocker).iterate(); + } _writeBlocker.block(); } catch (InterruptedException x) @@ -340,14 +342,24 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http @Override public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) { - // If we are still expecting a 100 continues - if (info !=null && _channel.isExpecting100Continue()) - // then we can't be persistent - _generator.setPersistent(false); - - new Sender(info,content,lastContent,callback).iterate(); + if (info==null) + new ContentCallback(content,lastContent,callback).iterate(); + else + { + // If we are still expecting a 100 continues + if (_channel.isExpecting100Continue()) + // then we can't be persistent + _generator.setPersistent(false); + new CommitCallback(info,content,lastContent,callback).iterate(); + } } + @Override + public void send(ByteBuffer content, boolean lastContent, Callback callback) + { + new ContentCallback(content,lastContent,callback).iterate(); + } + @Override public void completed() { @@ -588,13 +600,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } } - private class Sender extends IteratingCallback + private class CommitCallback extends IteratingCallback { final ByteBuffer _content; final boolean _lastContent; final ResponseInfo _info; + ByteBuffer _header; - Sender(ResponseInfo info, ByteBuffer content, boolean last, Callback callback) + CommitCallback(ResponseInfo info, ByteBuffer content, boolean last, Callback callback) { super(callback); _info=info; @@ -605,16 +618,15 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http @Override public boolean process() throws Exception { - ByteBuffer header = null; - ByteBuffer chunk = null; + ByteBuffer chunk = _chunk; while (true) { - HttpGenerator.Result result = _generator.generateResponse(_info, header, chunk, _content, _lastContent); + HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent); if (LOG.isDebugEnabled()) LOG.debug("{} generate: {} ({},{},{})@{}", this, result, - BufferUtil.toSummaryString(header), + BufferUtil.toSummaryString(_header), BufferUtil.toSummaryString(_content), _lastContent, _generator.getState()); @@ -630,20 +642,18 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http int l=_content.limit(); _content.position(l); _content.limit(l+_config.getResponseHeaderSize()); - header=_content.slice(); - header.limit(0); + _header=_content.slice(); + _header.limit(0); _content.position(p); _content.limit(l); } else - header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT); + _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT); continue; } case NEED_CHUNK: { - chunk = _chunk; - if (chunk==null) - chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT); + chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT); continue; } case FLUSH: @@ -656,13 +666,17 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } // If we have a header - if (BufferUtil.hasContent(header)) + if (BufferUtil.hasContent(_header)) { - // we know there will not be a chunk, so write either header+content or just the header if (BufferUtil.hasContent(_content)) - getEndPoint().write(this, header, _content); + { + if (BufferUtil.hasContent(chunk)) + getEndPoint().write(this, _header, chunk, _content); + else + getEndPoint().write(this, _header, _content); + } else - getEndPoint().write(this, header); + getEndPoint().write(this, _header); } else if (BufferUtil.hasContent(chunk)) { @@ -686,14 +700,94 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } case DONE: { - if (header!=null) + if (_header!=null) { // don't release header in spare content buffer - if (!_lastContent || _content==null || !_content.hasArray() || !header.hasArray() || _content.array()!=header.array()) - _bufferPool.release(header); + if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array()) + _bufferPool.release(_header); } - if (chunk!=null) - _bufferPool.release(chunk); + return true; + } + case CONTINUE: + { + break; + } + default: + { + throw new IllegalStateException("generateResponse="+result); + } + } + } + } + } + + private class ContentCallback extends IteratingCallback + { + final ByteBuffer _content; + final boolean _lastContent; + + ContentCallback(ByteBuffer content, boolean last, Callback callback) + { + super(callback); + _content=content; + _lastContent=last; + } + + @Override + public boolean process() throws Exception + { + ByteBuffer chunk = _chunk; + while (true) + { + HttpGenerator.Result result = _generator.generateResponse(null, null, chunk, _content, _lastContent); + if (LOG.isDebugEnabled()) + LOG.debug("{} generate: {} ({},{})@{}", + this, + result, + BufferUtil.toSummaryString(_content), + _lastContent, + _generator.getState()); + + switch (result) + { + case NEED_HEADER: + throw new IllegalStateException(); + case NEED_CHUNK: + { + chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT); + continue; + } + case FLUSH: + { + // Don't write the chunk or the content if this is a HEAD response + if (_channel.getRequest().isHead()) + { + BufferUtil.clear(chunk); + BufferUtil.clear(_content); + continue; + } + else if (BufferUtil.hasContent(chunk)) + { + if (BufferUtil.hasContent(_content)) + getEndPoint().write(this, chunk, _content); + else + getEndPoint().write(this, chunk); + } + else if (BufferUtil.hasContent(_content)) + { + getEndPoint().write(this, _content); + } + else + continue; + return false; + } + case SHUTDOWN_OUT: + { + getEndPoint().shutdownOutput(); + continue; + } + case DONE: + { return true; } case CONTINUE: diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java index 0ebcfccd697..ef62bdc86f2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java @@ -26,9 +26,12 @@ import org.eclipse.jetty.util.Callback; public interface HttpTransport { + @Deprecated void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException; void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback); + + void send(ByteBuffer content, boolean lastContent, Callback callback); void completed(); } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java index 21f7427590c..7cd31b8cf5e 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java @@ -29,6 +29,8 @@ import java.util.Collections; import java.util.Enumeration; import java.util.Iterator; import java.util.Locale; +import java.util.concurrent.TimeoutException; + import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.http.Cookie; @@ -46,6 +48,7 @@ import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.session.HashSessionIdManager; import org.eclipse.jetty.server.session.HashSessionManager; import org.eclipse.jetty.server.session.HashedSession; +import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.TimerScheduler; @@ -84,6 +87,16 @@ public class ResponseTest @Override public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException { + BlockingCallback cb = new BlockingCallback(); + send(info,content,lastContent,cb); + try + { + cb.block(); + } + catch (InterruptedException | TimeoutException e) + { + throw new IOException(e); + } } @Override @@ -91,6 +104,12 @@ public class ResponseTest { callback.succeeded(); } + + @Override + public void send(ByteBuffer responseBodyContent, boolean lastContent, Callback callback) + { + send(null,responseBodyContent, lastContent, callback); + } @Override public void completed() diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java index 30d443fcf2a..ac9e3892482 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java @@ -86,6 +86,14 @@ public class HttpTransportOverSPDY implements HttpTransport return requestHeaders; } + + @Override + public void send(ByteBuffer responseBodyContent, boolean lastContent, Callback callback) + { + // TODO can this be more efficient? + send(null,responseBodyContent, lastContent, callback); + } + @Override public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) { diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java index 2c05ec77899..602255fa070 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java @@ -264,6 +264,8 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse long contentLength = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString()); HttpGenerator.ResponseInfo info = new HttpGenerator.ResponseInfo(httpVersion, fields, contentLength, code, reason, false); + + // TODO use the async send send(info, null, replyInfo.isClose()); if (replyInfo.isClose()) @@ -300,6 +302,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse // Data buffer must be copied, as the ByteBuffer is pooled ByteBuffer byteBuffer = dataInfo.asByteBuffer(false); + // TODO use the async send with callback! send(null, byteBuffer, dataInfo.isClose()); if (dataInfo.isClose()) diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/mux/HttpTransportOverMux.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/mux/HttpTransportOverMux.java index 0d2fef5d0b4..7ac7ba3c3fe 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/mux/HttpTransportOverMux.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/mux/HttpTransportOverMux.java @@ -87,4 +87,10 @@ public class HttpTransportOverMux implements HttpTransport // prepare the AddChannelResponse // TODO: look at HttpSender in jetty-client for generator loop logic } + + @Override + public void send(ByteBuffer responseBodyContent, boolean lastContent, Callback callback) + { + send(null,responseBodyContent, lastContent, callback); + } }