Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.
+ * + * @param callback the callback to call when an error occurs or we are readable. + * @return true if set + */ + boolean tryFillInterested(Callback callback); + /** * @return whether {@link #fillInterested(Callback)} has been called, but {@link #fill(ByteBuffer)} has not yet * been called diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java index e5f72d06075..127a98ef717 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java @@ -55,24 +55,37 @@ public abstract class FillInterest */ public void register(Callback callback) throws ReadPendingException { - if (callback == null) - throw new IllegalArgumentException(); - - if (_interested.compareAndSet(null, callback)) - { - if (LOG.isDebugEnabled()) - { - LOG.debug("{} register {}",this,callback); - _lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName()); - } - } - else + if (!tryRegister(callback)) { LOG.warn("Read pending for {} prevented {}", _interested, callback); if (LOG.isDebugEnabled()) LOG.warn("callback set at ",_lastSet); throw new ReadPendingException(); + } + } + + /** + * Call to register interest in a callback when a read is possible. + * The callback will be called either immediately if {@link #needsFillInterest()} + * returns true or eventually once {@link #fillable()} is called. + * + * @param callback the callback to register + * @return true if the register succeeded + */ + public boolean tryRegister(Callback callback) + { + if (callback == null) + throw new IllegalArgumentException(); + + if (!_interested.compareAndSet(null, callback)) + return false; + + if (LOG.isDebugEnabled()) + { + LOG.debug("{} register {}",this,callback); + _lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName()); } + try { if (LOG.isDebugEnabled()) @@ -83,6 +96,8 @@ public abstract class FillInterest { onFail(e); } + + return true; } /** diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 8eb83faa785..397932e8457 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -261,14 +261,17 @@ public class SslConnection extends AbstractConnection _decryptedEndPoint.getFillInterest().fillable(); // If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read + boolean runComplete = false; synchronized(_decryptedEndPoint) { if (_decryptedEndPoint._flushRequiresFillToProgress) { _decryptedEndPoint._flushRequiresFillToProgress = false; - _runCompleteWrite.run(); + runComplete = true; } } + if (runComplete) + _runCompleteWrite.run(); if (LOG.isDebugEnabled()) LOG.debug("onFillable exit {}", _decryptedEndPoint); @@ -455,6 +458,8 @@ public class SslConnection extends AbstractConnection // OR if we are handshaking we need to read some encrypted data OR // if neither then we should just try the flush again. boolean try_again = false; + boolean write = false; + boolean need_fill_interest = false; synchronized (DecryptedEndPoint.this) { if (LOG.isDebugEnabled()) @@ -464,15 +469,14 @@ public class SslConnection extends AbstractConnection { // write it _cannotAcceptMoreAppDataToFlush = true; - getEndPoint().write(_writeCallback, _encryptedOutput); + write = true; } // If we are handshaking and need to read, else if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) { // check if we are actually read blocked in order to write - _flushRequiresFillToProgress = true; - - ensureFillInterested(); + _flushRequiresFillToProgress = true; + need_fill_interest = !SslConnection.this.isFillInterested(); } else { @@ -485,8 +489,11 @@ public class SslConnection extends AbstractConnection } } - - if (try_again) + if (write) + getEndPoint().write(_writeCallback, _encryptedOutput); + else if (need_fill_interest) + ensureFillInterested(); + else if (try_again) { // If the output is closed, if (isOutputShutdown()) @@ -502,6 +509,7 @@ public class SslConnection extends AbstractConnection getExecutor().execute(_runCompleteWrite); } } + } @Override @@ -511,11 +519,12 @@ public class SslConnection extends AbstractConnection // method on the DecryptedEndPoint, so we have to work out if there is // decrypted data to be filled or what callbacks to setup to be told when there // might be more encrypted data available to attempt another call to fill - + boolean fillable; + boolean write = false; synchronized (DecryptedEndPoint.this) { // Do we already have some app data, then app can fill now so return true - boolean fillable = (BufferUtil.hasContent(_decryptedInput)) + fillable = (BufferUtil.hasContent(_decryptedInput)) // or if we have encryptedInput and have not underflowed yet, the it is worth trying a fill || BufferUtil.hasContent(_encryptedInput) && !_underFlown; @@ -534,7 +543,7 @@ public class SslConnection extends AbstractConnection { // write it _cannotAcceptMoreAppDataToFlush = true; - getEndPoint().write(_writeCallback, _encryptedOutput); + write = true; } else { @@ -545,12 +554,13 @@ public class SslConnection extends AbstractConnection } } } - - if (fillable) - getExecutor().execute(_runFillable); - else - ensureFillInterested(); } + if (write) + getEndPoint().write(_writeCallback, _encryptedOutput); + else if (fillable) + getExecutor().execute(_runFillable); + else + ensureFillInterested(); } @Override @@ -957,7 +967,7 @@ public class SslConnection extends AbstractConnection _flushRequiresFillToProgress = true; fill(__FLUSH_CALLED_FILL); // Check if after the fill() we need to wrap again - if (handshakeStatus == HandshakeStatus.NEED_WRAP) + if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) continue; } return allConsumed && BufferUtil.isEmpty(_encryptedOutput); @@ -1002,27 +1012,37 @@ public class SslConnection extends AbstractConnection { try { - synchronized (this) + boolean flush = false; + boolean close = false; + synchronized (_decryptedEndPoint) { boolean ishut = isInputShutdown(); boolean oshut = isOutputShutdown(); if (LOG.isDebugEnabled()) LOG.debug("{} shutdownOutput: oshut={}, ishut={}", SslConnection.this, oshut, ishut); - - if (!oshut) + + if (oshut) + return; + + if (!_closedOutbound) { - if (!_closedOutbound) - { - _closedOutbound=true; // Only attempt this once - _sslEngine.closeOutbound(); - flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message. - } - if (ishut) - getEndPoint().close(); - else - ensureFillInterested(); + _closedOutbound=true; // Only attempt this once + _sslEngine.closeOutbound(); + flush = true; } + + // TODO review close logic here + if (ishut) + close = true; + } + + if (flush) + flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message. + if (close) + getEndPoint().close(); + else + ensureFillInterested(); } catch (Throwable x) { @@ -1033,12 +1053,9 @@ public class SslConnection extends AbstractConnection private void ensureFillInterested() { - if (!SslConnection.this.isFillInterested()) - { - if (LOG.isDebugEnabled()) - LOG.debug("fillInterested SSL NB {}",SslConnection.this); - SslConnection.this.getEndPoint().fillInterested(_sslReadCallback); - } + if (LOG.isDebugEnabled()) + LOG.debug("fillInterested SSL NB {}",SslConnection.this); + SslConnection.this.tryFillInterested(_sslReadCallback); } @Override @@ -1110,4 +1127,5 @@ public class SslConnection extends AbstractConnection return super.toString()+"->"+getEndPoint().toString(); } } + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java index cdff2583333..84f16f88475 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java @@ -662,6 +662,11 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory _endp.fillInterested(callback); } + public boolean tryFillInterested(Callback callback) + { + return _endp.tryFillInterested(callback); + } + @Override public boolean isFillInterested() { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/DefaultSessionIdManager.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/DefaultSessionIdManager.java index c69130c9d6a..ef63c232451 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/DefaultSessionIdManager.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/DefaultSessionIdManager.java @@ -30,7 +30,6 @@ import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.SessionIdManager; import org.eclipse.jetty.server.handler.ContextHandler; -import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -209,33 +208,32 @@ public class DefaultSessionIdManager extends ContainerLifeCycle implements Sessi @Override public String newSessionId(HttpServletRequest request, long created) { - synchronized (this) + if (request==null) + return newSessionId(created); + + // A requested session ID can only be used if it is in use already. + String requested_id=request.getRequestedSessionId(); + if (requested_id!=null) { - if (request==null) - return newSessionId(created); - - // A requested session ID can only be used if it is in use already. - String requested_id=request.getRequestedSessionId(); - if (requested_id!=null) - { - String cluster_id=getId(requested_id); - if (isIdInUse(cluster_id)) - return cluster_id; - } - - - // Else reuse any new session ID already defined for this request. - String new_id=(String)request.getAttribute(__NEW_SESSION_ID); - if (new_id!=null&&isIdInUse(new_id)) - return new_id; - - // pick a new unique ID! - String id = newSessionId(request.hashCode()); - - request.setAttribute(__NEW_SESSION_ID,id); - return id; + String cluster_id=getId(requested_id); + if (isIdInUse(cluster_id)) + return cluster_id; } + + + // Else reuse any new session ID already defined for this request. + String new_id=(String)request.getAttribute(__NEW_SESSION_ID); + if (new_id!=null&&isIdInUse(new_id)) + return new_id; + + // pick a new unique ID! + String id = newSessionId(request.hashCode()); + + request.setAttribute(__NEW_SESSION_ID,id); + return id; } + + /* ------------------------------------------------------------ */ /** @@ -246,45 +244,49 @@ public class DefaultSessionIdManager extends ContainerLifeCycle implements Sessi { // pick a new unique ID! String id=null; - while (id==null||id.length()==0) - { - long r0=_weakRandom - ?(hashCode()^Runtime.getRuntime().freeMemory()^_random.nextInt()^((seedTerm)<<32)) - :_random.nextLong(); - if (r0<0) - r0=-r0; - - // random chance to reseed - if (_reseed>0 && (r0%_reseed)== 1L) - { - if (LOG.isDebugEnabled()) - LOG.debug("Reseeding {}",this); - if (_random instanceof SecureRandom) - { - SecureRandom secure = (SecureRandom)_random; - secure.setSeed(secure.generateSeed(8)); - } - else - { - _random.setSeed(_random.nextLong()^System.currentTimeMillis()^seedTerm^Runtime.getRuntime().freeMemory()); - } - } - - long r1=_weakRandom - ?(hashCode()^Runtime.getRuntime().freeMemory()^_random.nextInt()^((seedTerm)<<32)) - :_random.nextLong(); - if (r1<0) - r1=-r1; - - id=Long.toString(r0,36)+Long.toString(r1,36); - //add in the id of the node to ensure unique id across cluster - //NOTE this is different to the node suffix which denotes which node the request was received on - if (_workerName!=null) - id=_workerName + id; - - id = id+Long.toString(COUNTER.getAndIncrement()); - + synchronized (_random) + { + while (id==null||id.length()==0) + { + long r0=_weakRandom + ?(hashCode()^Runtime.getRuntime().freeMemory()^_random.nextInt()^((seedTerm)<<32)) + :_random.nextLong(); + if (r0<0) + r0=-r0; + + // random chance to reseed + if (_reseed>0 && (r0%_reseed)== 1L) + { + if (LOG.isDebugEnabled()) + LOG.debug("Reseeding {}",this); + if (_random instanceof SecureRandom) + { + SecureRandom secure = (SecureRandom)_random; + secure.setSeed(secure.generateSeed(8)); + } + else + { + _random.setSeed(_random.nextLong()^System.currentTimeMillis()^seedTerm^Runtime.getRuntime().freeMemory()); + } + } + + long r1=_weakRandom + ?(hashCode()^Runtime.getRuntime().freeMemory()^_random.nextInt()^((seedTerm)<<32)) + :_random.nextLong(); + if (r1<0) + r1=-r1; + + id=Long.toString(r0,36)+Long.toString(r1,36); + + //add in the id of the node to ensure unique id across cluster + //NOTE this is different to the node suffix which denotes which node the request was received on + if (_workerName!=null) + id=_workerName + id; + + id = id+Long.toString(COUNTER.getAndIncrement()); + + } } return id; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionData.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionData.java index ef385042af5..9150354907a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionData.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionData.java @@ -53,11 +53,16 @@ public class SessionData implements Serializable protected long _accessed; // the time of the last access protected long _lastAccessed; // the time of the last access excluding this one protected long _maxInactiveMs; - protected Map