diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index 8bd212c32df..215469e2969 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -142,6 +142,16 @@ public abstract class AbstractConnection implements Connection getEndPoint().fillInterested(_readCallback); } + public void tryFillInterested() + { + tryFillInterested(_readCallback); + } + + public void tryFillInterested(Callback callback) + { + getEndPoint().tryFillInterested(callback); + } + public boolean isFillInterested() { return getEndPoint().isFillInterested(); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 64d719f6852..a7e42e3c955 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -122,12 +122,19 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint } @Override - public void fillInterested(Callback callback) throws IllegalStateException + public void fillInterested(Callback callback) { notIdle(); _fillInterest.register(callback); } + @Override + public boolean tryFillInterested(Callback callback) + { + notIdle(); + return _fillInterest.tryRegister(callback); + } + @Override public boolean isFillInterested() { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index 8f285c3ed02..479660c0ddd 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -205,6 +205,14 @@ public interface EndPoint extends Closeable */ void fillInterested(Callback callback) throws ReadPendingException; + /** + *

Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.

+ * + * @param callback the callback to call when an error occurs or we are readable. + * @return true if set + */ + boolean tryFillInterested(Callback callback); + /** * @return whether {@link #fillInterested(Callback)} has been called, but {@link #fill(ByteBuffer)} has not yet * been called diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java index a00a8932278..a1b2f2e299f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java @@ -53,24 +53,37 @@ public abstract class FillInterest */ public void register(Callback callback) throws ReadPendingException { - if (callback == null) - throw new IllegalArgumentException(); - - if (_interested.compareAndSet(null, callback)) - { - if (LOG.isDebugEnabled()) - { - LOG.debug("{} register {}",this,callback); - _lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName()); - } - } - else + if (!tryRegister(callback)) { LOG.warn("Read pending for {} prevented {}", _interested, callback); if (LOG.isDebugEnabled()) LOG.warn("callback set at ",_lastSet); throw new ReadPendingException(); + } + } + + /** + * Call to register interest in a callback when a read is possible. + * The callback will be called either immediately if {@link #needsFillInterest()} + * returns true or eventually once {@link #fillable()} is called. + * + * @param callback the callback to register + * @return true if the register succeeded + */ + public boolean tryRegister(Callback callback) + { + if (callback == null) + throw new IllegalArgumentException(); + + if (!_interested.compareAndSet(null, callback)) + return false; + + if (LOG.isDebugEnabled()) + { + LOG.debug("{} register {}",this,callback); + _lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName()); } + try { if (LOG.isDebugEnabled()) @@ -81,6 +94,8 @@ public abstract class FillInterest { onFail(e); } + + return true; } /** diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 8bc06cad61b..5319ae5e96f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -223,14 +223,17 @@ public class SslConnection extends AbstractConnection _decryptedEndPoint.getFillInterest().fillable(); // If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read + boolean runComplete = false; synchronized(_decryptedEndPoint) { if (_decryptedEndPoint._flushRequiresFillToProgress) { _decryptedEndPoint._flushRequiresFillToProgress = false; - _runCompletWrite.run(); + runComplete = true; } } + if (runComplete) + _runCompletWrite.run(); if (LOG.isDebugEnabled()) LOG.debug("onFillable exit {}", _decryptedEndPoint); @@ -390,6 +393,8 @@ public class SslConnection extends AbstractConnection // OR if we are handshaking we need to read some encrypted data OR // if neither then we should just try the flush again. boolean try_again = false; + boolean write = false; + boolean need_fill_interest = false; synchronized (DecryptedEndPoint.this) { if (LOG.isDebugEnabled()) @@ -399,15 +404,14 @@ public class SslConnection extends AbstractConnection { // write it _cannotAcceptMoreAppDataToFlush = true; - getEndPoint().write(_writeCallback, _encryptedOutput); + write = true; } // If we are handshaking and need to read, else if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) { // check if we are actually read blocked in order to write - _flushRequiresFillToProgress = true; - - ensureFillInterested(); + _flushRequiresFillToProgress = true; + need_fill_interest = !SslConnection.this.isFillInterested(); } else { @@ -420,8 +424,11 @@ public class SslConnection extends AbstractConnection } } - - if (try_again) + if (write) + getEndPoint().write(_writeCallback, _encryptedOutput); + else if (need_fill_interest) + ensureFillInterested(); + else if (try_again) { // If the output is closed, if (isOutputShutdown()) @@ -437,6 +444,7 @@ public class SslConnection extends AbstractConnection getExecutor().execute(_runCompletWrite); } } + } @Override @@ -446,11 +454,12 @@ public class SslConnection extends AbstractConnection // method on the DecryptedEndPoint, so we have to work out if there is // decrypted data to be filled or what callbacks to setup to be told when there // might be more encrypted data available to attempt another call to fill - + boolean fillable; + boolean write = false; synchronized (DecryptedEndPoint.this) { // Do we already have some app data, then app can fill now so return true - boolean fillable = (BufferUtil.hasContent(_decryptedInput)) + fillable = (BufferUtil.hasContent(_decryptedInput)) // or if we have encryptedInput and have not underflowed yet, the it is worth trying a fill || BufferUtil.hasContent(_encryptedInput) && !_underFlown; @@ -469,7 +478,7 @@ public class SslConnection extends AbstractConnection { // write it _cannotAcceptMoreAppDataToFlush = true; - getEndPoint().write(_writeCallback, _encryptedOutput); + write = true; } else { @@ -480,12 +489,13 @@ public class SslConnection extends AbstractConnection } } } - - if (fillable) - getExecutor().execute(_runFillable); - else - ensureFillInterested(); } + if (write) + getEndPoint().write(_writeCallback, _encryptedOutput); + else if (fillable) + getExecutor().execute(_runFillable); + else + ensureFillInterested(); } @Override @@ -892,7 +902,7 @@ public class SslConnection extends AbstractConnection _flushRequiresFillToProgress = true; fill(__FLUSH_CALLED_FILL); // Check if after the fill() we need to wrap again - if (handshakeStatus == HandshakeStatus.NEED_WRAP) + if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) continue; } return allConsumed && BufferUtil.isEmpty(_encryptedOutput); @@ -937,27 +947,37 @@ public class SslConnection extends AbstractConnection { try { - synchronized (this) + boolean flush = false; + boolean close = false; + synchronized (_decryptedEndPoint) { boolean ishut = isInputShutdown(); boolean oshut = isOutputShutdown(); if (LOG.isDebugEnabled()) LOG.debug("{} shutdownOutput: oshut={}, ishut={}", SslConnection.this, oshut, ishut); - - if (!oshut) + + if (oshut) + return; + + if (!_closedOutbound) { - if (!_closedOutbound) - { - _closedOutbound=true; // Only attempt this once - _sslEngine.closeOutbound(); - flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message. - } - if (ishut) - getEndPoint().close(); - else - ensureFillInterested(); + _closedOutbound=true; // Only attempt this once + _sslEngine.closeOutbound(); + flush = true; } + + // TODO review close logic here + if (ishut) + close = true; + } + + if (flush) + flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message. + if (close) + getEndPoint().close(); + else + ensureFillInterested(); } catch (Throwable x) { @@ -968,16 +988,13 @@ public class SslConnection extends AbstractConnection private void ensureFillInterested() { - if (!SslConnection.this.isFillInterested()) + if (getFillInterest().isCallbackNonBlocking()) { - if (getFillInterest().isCallbackNonBlocking()) - { - SslConnection.this.getEndPoint().fillInterested(_nonBlockingReadCallback); - } - else - { - SslConnection.this.fillInterested(); - } + SslConnection.this.tryFillInterested(_nonBlockingReadCallback); + } + else + { + SslConnection.this.tryFillInterested(); } } @@ -1058,4 +1075,5 @@ public class SslConnection extends AbstractConnection return super.toString()+"->"+getEndPoint().toString(); } } + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java index 9752434140b..733670ce945 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java @@ -303,6 +303,11 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory _endp.fillInterested(callback); } + public boolean tryFillInterested(Callback callback) + { + return _endp.tryFillInterested(callback); + } + @Override public boolean isFillInterested() {