diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index cac0e124cbd..c9bc7b44129 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -134,6 +134,16 @@ public abstract class AbstractConnection implements Connection getEndPoint().fillInterested(_readCallback); } + public void tryFillInterested() + { + tryFillInterested(_readCallback); + } + + public void tryFillInterested(Callback callback) + { + getEndPoint().tryFillInterested(callback); + } + public boolean isFillInterested() { return getEndPoint().isFillInterested(); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 324ecfa8d55..210867bed84 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -347,12 +347,19 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint } @Override - public void fillInterested(Callback callback) throws IllegalStateException + public void fillInterested(Callback callback) { notIdle(); _fillInterest.register(callback); } + @Override + public boolean tryFillInterested(Callback callback) + { + notIdle(); + return _fillInterest.tryRegister(callback); + } + @Override public boolean isFillInterested() { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index 68f795c9f21..73139c73ce0 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -205,6 +205,14 @@ public interface EndPoint extends Closeable */ void fillInterested(Callback callback) throws ReadPendingException; + /** + *

Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.

+ * + * @param callback the callback to call when an error occurs or we are readable. + * @return true if set + */ + boolean tryFillInterested(Callback callback); + /** * @return whether {@link #fillInterested(Callback)} has been called, but {@link #fill(ByteBuffer)} has not yet * been called diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java index e5f72d06075..127a98ef717 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java @@ -55,24 +55,37 @@ public abstract class FillInterest */ public void register(Callback callback) throws ReadPendingException { - if (callback == null) - throw new IllegalArgumentException(); - - if (_interested.compareAndSet(null, callback)) - { - if (LOG.isDebugEnabled()) - { - LOG.debug("{} register {}",this,callback); - _lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName()); - } - } - else + if (!tryRegister(callback)) { LOG.warn("Read pending for {} prevented {}", _interested, callback); if (LOG.isDebugEnabled()) LOG.warn("callback set at ",_lastSet); throw new ReadPendingException(); + } + } + + /** + * Call to register interest in a callback when a read is possible. + * The callback will be called either immediately if {@link #needsFillInterest()} + * returns true or eventually once {@link #fillable()} is called. + * + * @param callback the callback to register + * @return true if the register succeeded + */ + public boolean tryRegister(Callback callback) + { + if (callback == null) + throw new IllegalArgumentException(); + + if (!_interested.compareAndSet(null, callback)) + return false; + + if (LOG.isDebugEnabled()) + { + LOG.debug("{} register {}",this,callback); + _lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName()); } + try { if (LOG.isDebugEnabled()) @@ -83,6 +96,8 @@ public abstract class FillInterest { onFail(e); } + + return true; } /** diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 8eb83faa785..397932e8457 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -261,14 +261,17 @@ public class SslConnection extends AbstractConnection _decryptedEndPoint.getFillInterest().fillable(); // If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read + boolean runComplete = false; synchronized(_decryptedEndPoint) { if (_decryptedEndPoint._flushRequiresFillToProgress) { _decryptedEndPoint._flushRequiresFillToProgress = false; - _runCompleteWrite.run(); + runComplete = true; } } + if (runComplete) + _runCompleteWrite.run(); if (LOG.isDebugEnabled()) LOG.debug("onFillable exit {}", _decryptedEndPoint); @@ -455,6 +458,8 @@ public class SslConnection extends AbstractConnection // OR if we are handshaking we need to read some encrypted data OR // if neither then we should just try the flush again. boolean try_again = false; + boolean write = false; + boolean need_fill_interest = false; synchronized (DecryptedEndPoint.this) { if (LOG.isDebugEnabled()) @@ -464,15 +469,14 @@ public class SslConnection extends AbstractConnection { // write it _cannotAcceptMoreAppDataToFlush = true; - getEndPoint().write(_writeCallback, _encryptedOutput); + write = true; } // If we are handshaking and need to read, else if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) { // check if we are actually read blocked in order to write - _flushRequiresFillToProgress = true; - - ensureFillInterested(); + _flushRequiresFillToProgress = true; + need_fill_interest = !SslConnection.this.isFillInterested(); } else { @@ -485,8 +489,11 @@ public class SslConnection extends AbstractConnection } } - - if (try_again) + if (write) + getEndPoint().write(_writeCallback, _encryptedOutput); + else if (need_fill_interest) + ensureFillInterested(); + else if (try_again) { // If the output is closed, if (isOutputShutdown()) @@ -502,6 +509,7 @@ public class SslConnection extends AbstractConnection getExecutor().execute(_runCompleteWrite); } } + } @Override @@ -511,11 +519,12 @@ public class SslConnection extends AbstractConnection // method on the DecryptedEndPoint, so we have to work out if there is // decrypted data to be filled or what callbacks to setup to be told when there // might be more encrypted data available to attempt another call to fill - + boolean fillable; + boolean write = false; synchronized (DecryptedEndPoint.this) { // Do we already have some app data, then app can fill now so return true - boolean fillable = (BufferUtil.hasContent(_decryptedInput)) + fillable = (BufferUtil.hasContent(_decryptedInput)) // or if we have encryptedInput and have not underflowed yet, the it is worth trying a fill || BufferUtil.hasContent(_encryptedInput) && !_underFlown; @@ -534,7 +543,7 @@ public class SslConnection extends AbstractConnection { // write it _cannotAcceptMoreAppDataToFlush = true; - getEndPoint().write(_writeCallback, _encryptedOutput); + write = true; } else { @@ -545,12 +554,13 @@ public class SslConnection extends AbstractConnection } } } - - if (fillable) - getExecutor().execute(_runFillable); - else - ensureFillInterested(); } + if (write) + getEndPoint().write(_writeCallback, _encryptedOutput); + else if (fillable) + getExecutor().execute(_runFillable); + else + ensureFillInterested(); } @Override @@ -957,7 +967,7 @@ public class SslConnection extends AbstractConnection _flushRequiresFillToProgress = true; fill(__FLUSH_CALLED_FILL); // Check if after the fill() we need to wrap again - if (handshakeStatus == HandshakeStatus.NEED_WRAP) + if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) continue; } return allConsumed && BufferUtil.isEmpty(_encryptedOutput); @@ -1002,27 +1012,37 @@ public class SslConnection extends AbstractConnection { try { - synchronized (this) + boolean flush = false; + boolean close = false; + synchronized (_decryptedEndPoint) { boolean ishut = isInputShutdown(); boolean oshut = isOutputShutdown(); if (LOG.isDebugEnabled()) LOG.debug("{} shutdownOutput: oshut={}, ishut={}", SslConnection.this, oshut, ishut); - - if (!oshut) + + if (oshut) + return; + + if (!_closedOutbound) { - if (!_closedOutbound) - { - _closedOutbound=true; // Only attempt this once - _sslEngine.closeOutbound(); - flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message. - } - if (ishut) - getEndPoint().close(); - else - ensureFillInterested(); + _closedOutbound=true; // Only attempt this once + _sslEngine.closeOutbound(); + flush = true; } + + // TODO review close logic here + if (ishut) + close = true; + } + + if (flush) + flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message. + if (close) + getEndPoint().close(); + else + ensureFillInterested(); } catch (Throwable x) { @@ -1033,12 +1053,9 @@ public class SslConnection extends AbstractConnection private void ensureFillInterested() { - if (!SslConnection.this.isFillInterested()) - { - if (LOG.isDebugEnabled()) - LOG.debug("fillInterested SSL NB {}",SslConnection.this); - SslConnection.this.getEndPoint().fillInterested(_sslReadCallback); - } + if (LOG.isDebugEnabled()) + LOG.debug("fillInterested SSL NB {}",SslConnection.this); + SslConnection.this.tryFillInterested(_sslReadCallback); } @Override @@ -1110,4 +1127,5 @@ public class SslConnection extends AbstractConnection return super.toString()+"->"+getEndPoint().toString(); } } + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java index cdff2583333..84f16f88475 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java @@ -662,6 +662,11 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory _endp.fillInterested(callback); } + public boolean tryFillInterested(Callback callback) + { + return _endp.tryFillInterested(callback); + } + @Override public boolean isFillInterested() { diff --git a/jetty-websocket/javax-websocket-server-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/server/PathParamServerEndpointConfig.java b/jetty-websocket/javax-websocket-server-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/server/PathParamServerEndpointConfig.java index 274a86a1c33..7f624fd5454 100644 --- a/jetty-websocket/javax-websocket-server-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/server/PathParamServerEndpointConfig.java +++ b/jetty-websocket/javax-websocket-server-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/server/PathParamServerEndpointConfig.java @@ -28,7 +28,7 @@ import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec; import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope; /** - * Wrapper for a {@link ServerEndpointConfig} where there PathParm information from the incoming request. + * Wrapper for a {@link ServerEndpointConfig} where there is PathParam information from the incoming request. */ public class PathParamServerEndpointConfig extends BasicServerEndpointConfig implements ServerEndpointConfig {