From 6bfc19be1babd9e62338ab538dac0ff9684f51e0 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 14 Dec 2012 09:50:22 +1100 Subject: [PATCH] jetty-9 optimisation to dispatch before parsing so that handling is done in same thread --- .../eclipse/jetty/io/AbstractConnection.java | 146 +++++++++--------- .../eclipse/jetty/io/ssl/SslConnection.java | 4 +- .../eclipse/jetty/io/SslConnectionTest.java | 2 +- .../eclipse/jetty/server/HttpConnection.java | 35 ++--- .../jetty/spdy/client/SPDYConnection.java | 2 +- .../io/AbstractWebSocketConnection.java | 2 +- 6 files changed, 89 insertions(+), 102 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index fac16b45121..33a8a6ad349 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.ExecutorCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -39,6 +38,8 @@ import org.eclipse.jetty.util.log.Logger; public abstract class AbstractConnection implements Connection { private static final Logger LOG = Log.getLogger(AbstractConnection.class); + + public static final boolean EXECUTE_ONFILLABLE=true; private final List listeners = new CopyOnWriteArrayList<>(); private final AtomicReference _state = new AtomicReference<>(State.IDLE); @@ -46,82 +47,22 @@ public abstract class AbstractConnection implements Connection private final EndPoint _endPoint; private final Executor _executor; private final Callback _readCallback; + private final boolean _executeOnfillable; private int _inputBufferSize=2048; - public AbstractConnection(EndPoint endp, Executor executor) + protected AbstractConnection(EndPoint endp, Executor executor) { - this(endp,executor,true); + this(endp,executor,EXECUTE_ONFILLABLE); } - public AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable) + protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable) { if (executor == null) throw new IllegalArgumentException("Executor must not be null!"); _endPoint = endp; _executor = executor; - _readCallback = new ExecutorCallback(executor,0) - { - @Override - public void succeeded() - { - if (executeOnfillable) - super.succeeded(); - else - onCompleted(); - } - - @Override - protected void onCompleted() - { - if (_state.compareAndSet(State.INTERESTED,State.FILLING)) - { - try - { - onFillable(); - } - finally - { - loop:while(true) - { - switch(_state.get()) - { - case IDLE: - case INTERESTED: - throw new IllegalStateException(); - - case FILLING: - if (_state.compareAndSet(State.FILLING,State.IDLE)) - break loop; - break; - - case FILLING_INTERESTED: - if (_state.compareAndSet(State.FILLING_INTERESTED,State.INTERESTED)) - { - getEndPoint().fillInterested(_readCallback); - break loop; - } - break; - } - } - - } - } - else - LOG.warn(new Throwable()); - } - - @Override - protected void onFailed(Throwable x) - { - onFillInterestedFailed(x); - } - - @Override - public String toString() - { - return String.format("AC.ExReadCB@%x", AbstractConnection.this.hashCode()); - } - }; + _readCallback = new ReadCallback(); + _executeOnfillable=executeOnfillable; } @Override @@ -216,8 +157,6 @@ public abstract class AbstractConnection implements Connection return true; } - // TODO remove this when open/close refactored - final AtomicReference _opened = new AtomicReference<>(null); @Override public void onOpen() { @@ -225,12 +164,6 @@ public abstract class AbstractConnection implements Connection for (Listener listener : listeners) listener.onOpened(this); - - if (!_opened.compareAndSet(null,new Throwable())) - { - LOG.warn("ALREADY OPENED ", _opened.get()); - LOG.warn("EXTRA OPEN AT ",new Throwable()); - } } @Override @@ -294,4 +227,67 @@ public abstract class AbstractConnection implements Connection { IDLE, INTERESTED, FILLING, FILLING_INTERESTED } + + private class ReadCallback implements Callback, Runnable + { + @Override + public void run() + { + if (_state.compareAndSet(State.INTERESTED,State.FILLING)) + { + try + { + onFillable(); + } + finally + { + loop:while(true) + { + switch(_state.get()) + { + case IDLE: + case INTERESTED: + throw new IllegalStateException(); + + case FILLING: + if (_state.compareAndSet(State.FILLING,State.IDLE)) + break loop; + break; + + case FILLING_INTERESTED: + if (_state.compareAndSet(State.FILLING_INTERESTED,State.INTERESTED)) + { + getEndPoint().fillInterested(_readCallback); + break loop; + } + break; + } + } + } + } + else + LOG.warn(new Throwable()); + } + + @Override + public void succeeded() + { + if (_executeOnfillable) + _executor.execute(this); + else + run(); + } + + @Override + public void failed(Throwable x) + { + onFillInterestedFailed(x); + } + + @Override + public String toString() + { + return String.format("AC.ExReadCB@%x", AbstractConnection.this.hashCode()); + } + }; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 6bf63c6fe8a..53f1bcee9bd 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -103,7 +103,9 @@ public class SslConnection extends AbstractConnection public SslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine) { - super(endPoint, executor); + // This connection does not execute calls to onfillable, so they will be called by the selector thread. + // onfillable does not block and will only wakeup another thread to do the actual reading and handling. + super(endPoint, executor, !EXECUTE_ONFILLABLE); this._bufferPool = byteBufferPool; this._sslEngine = sslEngine; this._decryptedEndPoint = newDecryptedEndPoint(); diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java index e1a176f1e36..b0f82c4757f 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java @@ -246,7 +246,7 @@ public class SslConnectionTest len=5; while(len>0) len-=client.getInputStream().read(buffer); - Assert.assertEquals(1, _dispatches.get()); + Assert.assertEquals(0, _dispatches.get()); client.close(); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index a30e4b95d1c..28c5788749c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -66,24 +66,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http private BlockingCallback _readBlocker = new BlockingCallback(); private BlockingCallback _writeBlocker = new BlockingCallback(); - // TODO get rid of this - private final Runnable _channelRunner = new Runnable() - { - @Override - public void run() - { - try - { - setCurrentConnection(HttpConnection.this); - _channel.run(); - } - finally - { - setCurrentConnection(null); - } - - } - }; public static HttpConnection getCurrentConnection() { @@ -102,10 +84,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint) { - // Tell AbstractConnector executeOnFillable==false because we are guaranteeing that onfillable - // will never block nor take an excessive amount of CPU. ie it is OK for the selector thread to - // be used. In this case the thread that calls onfillable will be asked to do some IO and parsing. - super(endPoint, connector.getExecutor(),false); + // Tell AbstractConnector executeOnFillable==true because we want the same thread that + // does the HTTP parsing to handle the request so its cache is hot + super(endPoint, connector.getExecutor(),true); _config = config; _connector = connector; @@ -280,7 +261,15 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http // The parser returned true, which indicates the channel is ready to handle a request. // Call the channel and this will either handle the request/response to completion OR, // if the request suspends, the request/response will be incomplete so the outer loop will exit. - getExecutor().execute(_channelRunner); + try + { + setCurrentConnection(HttpConnection.this); + _channel.run(); + } + finally + { + setCurrentConnection(null); + } return; } } diff --git a/jetty-spdy/spdy-client/src/main/java/org/eclipse/jetty/spdy/client/SPDYConnection.java b/jetty-spdy/spdy-client/src/main/java/org/eclipse/jetty/spdy/client/SPDYConnection.java index aecdd74b283..3bf3ac2d01e 100644 --- a/jetty-spdy/spdy-client/src/main/java/org/eclipse/jetty/spdy/client/SPDYConnection.java +++ b/jetty-spdy/spdy-client/src/main/java/org/eclipse/jetty/spdy/client/SPDYConnection.java @@ -56,7 +56,7 @@ public class SPDYConnection extends AbstractConnection implements Controller, Id // always dispatches to a new thread when calling application // code, so here we can safely pass false as last parameter, // and avoid to dispatch to onFillable(). - super(endPoint, executor, false); + super(endPoint, executor, !EXECUTE_ONFILLABLE); this.bufferPool = bufferPool; this.parser = parser; onIdle(true); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index 8973e532438..712a7f41fd1 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -318,7 +318,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool) { - super(endp,executor); + super(endp,executor,EXECUTE_ONFILLABLE); // TODO review if this is best. Specially with MUX this.policy = policy; this.bufferPool = bufferPool; this.generator = new Generator(policy,bufferPool);