From af5d27c2f74495f11eac95eecf1bb3d45a12d9f1 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 7 Dec 2016 22:50:27 +1100 Subject: [PATCH 1/9] Issue #1146 DecryptedEndPoint deadlock --- .../eclipse/jetty/io/AbstractConnection.java | 10 ++ .../eclipse/jetty/io/AbstractEndPoint.java | 9 +- .../java/org/eclipse/jetty/io/EndPoint.java | 8 ++ .../org/eclipse/jetty/io/FillInterest.java | 39 +++++--- .../eclipse/jetty/io/ssl/SslConnection.java | 94 +++++++++++-------- .../jetty/server/ProxyConnectionFactory.java | 5 + 6 files changed, 114 insertions(+), 51 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index 8bd212c32df..215469e2969 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -142,6 +142,16 @@ public abstract class AbstractConnection implements Connection getEndPoint().fillInterested(_readCallback); } + public void tryFillInterested() + { + tryFillInterested(_readCallback); + } + + public void tryFillInterested(Callback callback) + { + getEndPoint().tryFillInterested(callback); + } + public boolean isFillInterested() { return getEndPoint().isFillInterested(); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 64d719f6852..a7e42e3c955 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -122,12 +122,19 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint } @Override - public void fillInterested(Callback callback) throws IllegalStateException + public void fillInterested(Callback callback) { notIdle(); _fillInterest.register(callback); } + @Override + public boolean tryFillInterested(Callback callback) + { + notIdle(); + return _fillInterest.tryRegister(callback); + } + @Override public boolean isFillInterested() { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index 8f285c3ed02..479660c0ddd 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -205,6 +205,14 @@ public interface EndPoint extends Closeable */ void fillInterested(Callback callback) throws ReadPendingException; + /** + *

Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.

+ * + * @param callback the callback to call when an error occurs or we are readable. + * @return true if set + */ + boolean tryFillInterested(Callback callback); + /** * @return whether {@link #fillInterested(Callback)} has been called, but {@link #fill(ByteBuffer)} has not yet * been called diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java index a00a8932278..a1b2f2e299f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java @@ -53,24 +53,37 @@ public abstract class FillInterest */ public void register(Callback callback) throws ReadPendingException { - if (callback == null) - throw new IllegalArgumentException(); - - if (_interested.compareAndSet(null, callback)) - { - if (LOG.isDebugEnabled()) - { - LOG.debug("{} register {}",this,callback); - _lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName()); - } - } - else + if (!tryRegister(callback)) { LOG.warn("Read pending for {} prevented {}", _interested, callback); if (LOG.isDebugEnabled()) LOG.warn("callback set at ",_lastSet); throw new ReadPendingException(); + } + } + + /** + * Call to register interest in a callback when a read is possible. + * The callback will be called either immediately if {@link #needsFillInterest()} + * returns true or eventually once {@link #fillable()} is called. + * + * @param callback the callback to register + * @return true if the register succeeded + */ + public boolean tryRegister(Callback callback) + { + if (callback == null) + throw new IllegalArgumentException(); + + if (!_interested.compareAndSet(null, callback)) + return false; + + if (LOG.isDebugEnabled()) + { + LOG.debug("{} register {}",this,callback); + _lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName()); } + try { if (LOG.isDebugEnabled()) @@ -81,6 +94,8 @@ public abstract class FillInterest { onFail(e); } + + return true; } /** diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 8bc06cad61b..5319ae5e96f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -223,14 +223,17 @@ public class SslConnection extends AbstractConnection _decryptedEndPoint.getFillInterest().fillable(); // If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read + boolean runComplete = false; synchronized(_decryptedEndPoint) { if (_decryptedEndPoint._flushRequiresFillToProgress) { _decryptedEndPoint._flushRequiresFillToProgress = false; - _runCompletWrite.run(); + runComplete = true; } } + if (runComplete) + _runCompletWrite.run(); if (LOG.isDebugEnabled()) LOG.debug("onFillable exit {}", _decryptedEndPoint); @@ -390,6 +393,8 @@ public class SslConnection extends AbstractConnection // OR if we are handshaking we need to read some encrypted data OR // if neither then we should just try the flush again. boolean try_again = false; + boolean write = false; + boolean need_fill_interest = false; synchronized (DecryptedEndPoint.this) { if (LOG.isDebugEnabled()) @@ -399,15 +404,14 @@ public class SslConnection extends AbstractConnection { // write it _cannotAcceptMoreAppDataToFlush = true; - getEndPoint().write(_writeCallback, _encryptedOutput); + write = true; } // If we are handshaking and need to read, else if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) { // check if we are actually read blocked in order to write - _flushRequiresFillToProgress = true; - - ensureFillInterested(); + _flushRequiresFillToProgress = true; + need_fill_interest = !SslConnection.this.isFillInterested(); } else { @@ -420,8 +424,11 @@ public class SslConnection extends AbstractConnection } } - - if (try_again) + if (write) + getEndPoint().write(_writeCallback, _encryptedOutput); + else if (need_fill_interest) + ensureFillInterested(); + else if (try_again) { // If the output is closed, if (isOutputShutdown()) @@ -437,6 +444,7 @@ public class SslConnection extends AbstractConnection getExecutor().execute(_runCompletWrite); } } + } @Override @@ -446,11 +454,12 @@ public class SslConnection extends AbstractConnection // method on the DecryptedEndPoint, so we have to work out if there is // decrypted data to be filled or what callbacks to setup to be told when there // might be more encrypted data available to attempt another call to fill - + boolean fillable; + boolean write = false; synchronized (DecryptedEndPoint.this) { // Do we already have some app data, then app can fill now so return true - boolean fillable = (BufferUtil.hasContent(_decryptedInput)) + fillable = (BufferUtil.hasContent(_decryptedInput)) // or if we have encryptedInput and have not underflowed yet, the it is worth trying a fill || BufferUtil.hasContent(_encryptedInput) && !_underFlown; @@ -469,7 +478,7 @@ public class SslConnection extends AbstractConnection { // write it _cannotAcceptMoreAppDataToFlush = true; - getEndPoint().write(_writeCallback, _encryptedOutput); + write = true; } else { @@ -480,12 +489,13 @@ public class SslConnection extends AbstractConnection } } } - - if (fillable) - getExecutor().execute(_runFillable); - else - ensureFillInterested(); } + if (write) + getEndPoint().write(_writeCallback, _encryptedOutput); + else if (fillable) + getExecutor().execute(_runFillable); + else + ensureFillInterested(); } @Override @@ -892,7 +902,7 @@ public class SslConnection extends AbstractConnection _flushRequiresFillToProgress = true; fill(__FLUSH_CALLED_FILL); // Check if after the fill() we need to wrap again - if (handshakeStatus == HandshakeStatus.NEED_WRAP) + if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) continue; } return allConsumed && BufferUtil.isEmpty(_encryptedOutput); @@ -937,27 +947,37 @@ public class SslConnection extends AbstractConnection { try { - synchronized (this) + boolean flush = false; + boolean close = false; + synchronized (_decryptedEndPoint) { boolean ishut = isInputShutdown(); boolean oshut = isOutputShutdown(); if (LOG.isDebugEnabled()) LOG.debug("{} shutdownOutput: oshut={}, ishut={}", SslConnection.this, oshut, ishut); - - if (!oshut) + + if (oshut) + return; + + if (!_closedOutbound) { - if (!_closedOutbound) - { - _closedOutbound=true; // Only attempt this once - _sslEngine.closeOutbound(); - flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message. - } - if (ishut) - getEndPoint().close(); - else - ensureFillInterested(); + _closedOutbound=true; // Only attempt this once + _sslEngine.closeOutbound(); + flush = true; } + + // TODO review close logic here + if (ishut) + close = true; + } + + if (flush) + flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message. + if (close) + getEndPoint().close(); + else + ensureFillInterested(); } catch (Throwable x) { @@ -968,16 +988,13 @@ public class SslConnection extends AbstractConnection private void ensureFillInterested() { - if (!SslConnection.this.isFillInterested()) + if (getFillInterest().isCallbackNonBlocking()) { - if (getFillInterest().isCallbackNonBlocking()) - { - SslConnection.this.getEndPoint().fillInterested(_nonBlockingReadCallback); - } - else - { - SslConnection.this.fillInterested(); - } + SslConnection.this.tryFillInterested(_nonBlockingReadCallback); + } + else + { + SslConnection.this.tryFillInterested(); } } @@ -1058,4 +1075,5 @@ public class SslConnection extends AbstractConnection return super.toString()+"->"+getEndPoint().toString(); } } + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java index 9752434140b..733670ce945 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java @@ -303,6 +303,11 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory _endp.fillInterested(callback); } + public boolean tryFillInterested(Callback callback) + { + return _endp.tryFillInterested(callback); + } + @Override public boolean isFillInterested() { From 44c84ffb0928f9642b21f846f46fd618c27e6ba8 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 7 Dec 2016 22:28:01 +0100 Subject: [PATCH 2/9] Fixes #1151 - NPE in ClasspathPattern.match(). --- .../jetty/webapp/ClasspathPattern.java | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/jetty-webapp/src/main/java/org/eclipse/jetty/webapp/ClasspathPattern.java b/jetty-webapp/src/main/java/org/eclipse/jetty/webapp/ClasspathPattern.java index 32ed9526aad..9a403210bd5 100644 --- a/jetty-webapp/src/main/java/org/eclipse/jetty/webapp/ClasspathPattern.java +++ b/jetty-webapp/src/main/java/org/eclipse/jetty/webapp/ClasspathPattern.java @@ -19,9 +19,6 @@ package org.eclipse.jetty.webapp; -import static java.lang.Boolean.FALSE; -import static java.lang.Boolean.TRUE; - import java.io.File; import java.net.URL; import java.nio.file.Path; @@ -33,7 +30,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Predicate; import org.eclipse.jetty.util.ArrayTernaryTrie; @@ -44,7 +40,6 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.resource.Resource; -/* ------------------------------------------------------------ */ /** * Classpath classes list performs sequential pattern matching of a class name * against an internal array of classpath pattern entries. @@ -156,7 +151,7 @@ public class ClasspathPattern extends AbstractSet @Override public Iterator iterator() { - return _entries.keySet().stream().map(k->_entries.get(k)).iterator(); + return _entries.keySet().stream().map(_entries::get).iterator(); } @Override @@ -335,8 +330,6 @@ public class ClasspathPattern extends AbstractSet Map _entries = new HashMap<>(); - Set _classes = new HashSet<>(); - IncludeExcludeSet _patterns = new IncludeExcludeSet<>(ByPackageOrName.class); IncludeExcludeSet _locations = new IncludeExcludeSet<>(ByLocation.class); @@ -522,16 +515,16 @@ public class ClasspathPattern extends AbstractSet { try { - Resource resource = TypeUtil.getLoadedFrom(clazz); - Path path = resource.getFile().toPath(); - Boolean byName = _patterns.isIncludedAndNotExcluded(clazz.getName()); - Boolean byLocation = _locations.isIncludedAndNotExcluded(path); - + Resource resource = TypeUtil.getLoadedFrom(clazz); + Boolean byLocation = resource == null || resource.getFile() == null + ? null + : _locations.isIncludedAndNotExcluded(resource.getFile().toPath()); + // Combine the tri-state match of both IncludeExclude Sets - boolean included = byName==TRUE || byLocation==TRUE + boolean included = byName==Boolean.TRUE || byLocation==Boolean.TRUE || (byName==null && !_patterns.hasIncludes() && byLocation==null && !_locations.hasIncludes()); - boolean excluded = byName==FALSE || byLocation==FALSE; + boolean excluded = byName==Boolean.FALSE || byLocation==Boolean.FALSE; return included && !excluded; } catch (Exception e) @@ -566,9 +559,9 @@ public class ClasspathPattern extends AbstractSet } // Combine the tri-state match of both IncludeExclude Sets - boolean included = byName==TRUE || byLocation==TRUE + boolean included = byName==Boolean.TRUE || byLocation==Boolean.TRUE || (byName==null && !_patterns.hasIncludes() && byLocation==null && !_locations.hasIncludes()); - boolean excluded = byName==FALSE || byLocation==FALSE; + boolean excluded = byName==Boolean.FALSE || byLocation==Boolean.FALSE; return included && !excluded; } } From 8e0725db1c6f0da90e4611e97be43e9ab6c2b536 Mon Sep 17 00:00:00 2001 From: Jan Bartel Date: Thu, 8 Dec 2016 11:29:13 +1100 Subject: [PATCH 3/9] Issue #123 Make sync on id generation more tightly scoped. --- .../session/DefaultSessionIdManager.java | 128 +++++++++--------- 1 file changed, 65 insertions(+), 63 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/DefaultSessionIdManager.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/DefaultSessionIdManager.java index c69130c9d6a..ef63c232451 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/DefaultSessionIdManager.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/DefaultSessionIdManager.java @@ -30,7 +30,6 @@ import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.SessionIdManager; import org.eclipse.jetty.server.handler.ContextHandler; -import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -209,33 +208,32 @@ public class DefaultSessionIdManager extends ContainerLifeCycle implements Sessi @Override public String newSessionId(HttpServletRequest request, long created) { - synchronized (this) + if (request==null) + return newSessionId(created); + + // A requested session ID can only be used if it is in use already. + String requested_id=request.getRequestedSessionId(); + if (requested_id!=null) { - if (request==null) - return newSessionId(created); - - // A requested session ID can only be used if it is in use already. - String requested_id=request.getRequestedSessionId(); - if (requested_id!=null) - { - String cluster_id=getId(requested_id); - if (isIdInUse(cluster_id)) - return cluster_id; - } - - - // Else reuse any new session ID already defined for this request. - String new_id=(String)request.getAttribute(__NEW_SESSION_ID); - if (new_id!=null&&isIdInUse(new_id)) - return new_id; - - // pick a new unique ID! - String id = newSessionId(request.hashCode()); - - request.setAttribute(__NEW_SESSION_ID,id); - return id; + String cluster_id=getId(requested_id); + if (isIdInUse(cluster_id)) + return cluster_id; } + + + // Else reuse any new session ID already defined for this request. + String new_id=(String)request.getAttribute(__NEW_SESSION_ID); + if (new_id!=null&&isIdInUse(new_id)) + return new_id; + + // pick a new unique ID! + String id = newSessionId(request.hashCode()); + + request.setAttribute(__NEW_SESSION_ID,id); + return id; } + + /* ------------------------------------------------------------ */ /** @@ -246,45 +244,49 @@ public class DefaultSessionIdManager extends ContainerLifeCycle implements Sessi { // pick a new unique ID! String id=null; - while (id==null||id.length()==0) - { - long r0=_weakRandom - ?(hashCode()^Runtime.getRuntime().freeMemory()^_random.nextInt()^((seedTerm)<<32)) - :_random.nextLong(); - if (r0<0) - r0=-r0; - - // random chance to reseed - if (_reseed>0 && (r0%_reseed)== 1L) - { - if (LOG.isDebugEnabled()) - LOG.debug("Reseeding {}",this); - if (_random instanceof SecureRandom) - { - SecureRandom secure = (SecureRandom)_random; - secure.setSeed(secure.generateSeed(8)); - } - else - { - _random.setSeed(_random.nextLong()^System.currentTimeMillis()^seedTerm^Runtime.getRuntime().freeMemory()); - } - } - - long r1=_weakRandom - ?(hashCode()^Runtime.getRuntime().freeMemory()^_random.nextInt()^((seedTerm)<<32)) - :_random.nextLong(); - if (r1<0) - r1=-r1; - - id=Long.toString(r0,36)+Long.toString(r1,36); - //add in the id of the node to ensure unique id across cluster - //NOTE this is different to the node suffix which denotes which node the request was received on - if (_workerName!=null) - id=_workerName + id; - - id = id+Long.toString(COUNTER.getAndIncrement()); - + synchronized (_random) + { + while (id==null||id.length()==0) + { + long r0=_weakRandom + ?(hashCode()^Runtime.getRuntime().freeMemory()^_random.nextInt()^((seedTerm)<<32)) + :_random.nextLong(); + if (r0<0) + r0=-r0; + + // random chance to reseed + if (_reseed>0 && (r0%_reseed)== 1L) + { + if (LOG.isDebugEnabled()) + LOG.debug("Reseeding {}",this); + if (_random instanceof SecureRandom) + { + SecureRandom secure = (SecureRandom)_random; + secure.setSeed(secure.generateSeed(8)); + } + else + { + _random.setSeed(_random.nextLong()^System.currentTimeMillis()^seedTerm^Runtime.getRuntime().freeMemory()); + } + } + + long r1=_weakRandom + ?(hashCode()^Runtime.getRuntime().freeMemory()^_random.nextInt()^((seedTerm)<<32)) + :_random.nextLong(); + if (r1<0) + r1=-r1; + + id=Long.toString(r0,36)+Long.toString(r1,36); + + //add in the id of the node to ensure unique id across cluster + //NOTE this is different to the node suffix which denotes which node the request was received on + if (_workerName!=null) + id=_workerName + id; + + id = id+Long.toString(COUNTER.getAndIncrement()); + + } } return id; } From 51855ba45e3618ecdc7099965a09f5db912bfc30 Mon Sep 17 00:00:00 2001 From: Jan Bartel Date: Thu, 8 Dec 2016 17:16:27 +1100 Subject: [PATCH 4/9] Issue #1153 --- .../org/eclipse/jetty/server/session/SessionData.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionData.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionData.java index ef385042af5..bc6aa0d5687 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionData.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionData.java @@ -53,11 +53,16 @@ public class SessionData implements Serializable protected long _accessed; // the time of the last access protected long _lastAccessed; // the time of the last access excluding this one protected long _maxInactiveMs; - protected Map _attributes = new ConcurrentHashMap(); + protected Map _attributes; protected boolean _dirty; protected long _lastSaved; //time in msec since last save public SessionData (String id, String cpath, String vhost, long created, long accessed, long lastAccessed, long maxInactiveMs) + { + this(id, cpath, vhost, created, accessed, lastAccessed, maxInactiveMs, new ConcurrentHashMap()); + } + + public SessionData (String id, String cpath, String vhost, long created, long accessed, long lastAccessed, long maxInactiveMs, Map attributes) { _id = id; setContextPath(cpath); @@ -67,8 +72,8 @@ public class SessionData implements Serializable _lastAccessed = lastAccessed; _maxInactiveMs = maxInactiveMs; calcAndSetExpiry(); + _attributes = attributes; } - /** * Copy the info from the given sessiondata @@ -355,7 +360,7 @@ public class SessionData implements Serializable _lastNode = in.readUTF(); //last managing node _expiry = in.readLong(); _maxInactiveMs = in.readLong(); - _attributes = (ConcurrentHashMap)in.readObject(); + _attributes = (Map)in.readObject(); } public boolean isExpiredAt (long time) From fe493c2c06a7ff79b5f086445fd759578e4a6d03 Mon Sep 17 00:00:00 2001 From: Jan Bartel Date: Mon, 5 Dec 2016 09:42:44 +1100 Subject: [PATCH 5/9] Issue #1124 Fix classloading of WebSocketServerFactory for osgi. --- .../jetty/websocket/servlet/WebSocketServletFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-websocket/websocket-servlet/src/main/java/org/eclipse/jetty/websocket/servlet/WebSocketServletFactory.java b/jetty-websocket/websocket-servlet/src/main/java/org/eclipse/jetty/websocket/servlet/WebSocketServletFactory.java index 72c14782346..93d8377ef7b 100644 --- a/jetty-websocket/websocket-servlet/src/main/java/org/eclipse/jetty/websocket/servlet/WebSocketServletFactory.java +++ b/jetty-websocket/websocket-servlet/src/main/java/org/eclipse/jetty/websocket/servlet/WebSocketServletFactory.java @@ -44,7 +44,7 @@ public interface WebSocketServletFactory try { Class wsClazz = - (Class) Class.forName(DEFAULT_IMPL); + (Class) Class.forName(DEFAULT_IMPL,true,Thread.currentThread().getContextClassLoader()); Constructor ctor = wsClazz.getDeclaredConstructor(new Class[]{ServletContext.class, WebSocketPolicy.class}); return ctor.newInstance(ctx, policy); } From c7e139a8a6b2ffd199f6263c5d1879669a9eec69 Mon Sep 17 00:00:00 2001 From: Ben Duffield Date: Thu, 8 Dec 2016 08:08:06 +0100 Subject: [PATCH 6/9] fix typo in PathParamServerEndpointConfig javadoc (#1145) Signed-off-by: Ben Duffield --- .../websocket/jsr356/server/PathParamServerEndpointConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-websocket/javax-websocket-server-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/server/PathParamServerEndpointConfig.java b/jetty-websocket/javax-websocket-server-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/server/PathParamServerEndpointConfig.java index 274a86a1c33..7f624fd5454 100644 --- a/jetty-websocket/javax-websocket-server-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/server/PathParamServerEndpointConfig.java +++ b/jetty-websocket/javax-websocket-server-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/server/PathParamServerEndpointConfig.java @@ -28,7 +28,7 @@ import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec; import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope; /** - * Wrapper for a {@link ServerEndpointConfig} where there PathParm information from the incoming request. + * Wrapper for a {@link ServerEndpointConfig} where there is PathParam information from the incoming request. */ public class PathParamServerEndpointConfig extends BasicServerEndpointConfig implements ServerEndpointConfig { From 4ff441a8b1cd79665625ea4f9c352fd17029a4a1 Mon Sep 17 00:00:00 2001 From: WalkerWatch Date: Thu, 8 Dec 2016 02:11:07 -0500 Subject: [PATCH 7/9] Issue #1112 - Doc update. (#1147) Signed-off-by: WalkerWatch --- .../configuring/jsp/configuring-jsp.adoc | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/jetty-documentation/src/main/asciidoc/configuring/jsp/configuring-jsp.adoc b/jetty-documentation/src/main/asciidoc/configuring/jsp/configuring-jsp.adoc index f943de97fbc..391c7d4400e 100644 --- a/jetty-documentation/src/main/asciidoc/configuring/jsp/configuring-jsp.adoc +++ b/jetty-documentation/src/main/asciidoc/configuring/jsp/configuring-jsp.adoc @@ -22,9 +22,9 @@ This document provides information about configuring Java Server Pages (JSP) for [[which-jsp-implementation]] ==== Which JSP Implementation -As of Jetty 9.2, Jetty is using Jasper from http://tomcat.apache.org/tomcat-8.0-doc/jasper-howto.html[Apache] as the default JSP container implementation. +Jetty uses Jasper from http://tomcat.apache.org/tomcat-8.0-doc/jasper-howto.html[Apache] as the default JSP container implementation. -By default the Jetty distribution enables the JSP link:#startup-modules[module], and by default, this link:#startup-modules[module] is set to Apache Jasper. +By default the Jetty distribution enables the JSP link:#startup-modules[module], and by default, this module is set to Apache Jasper. [source, plain, subs="{sub-order}"] ---- @@ -213,7 +213,7 @@ If you are using the Jetty distribution, and you want to change the JSP settings If you want to change the JSP settings for all webapps, edit the `{$jetty.home}/etc/webdefaults.xml` file directly instead. [[configuring-jsp-servlet-in-web.xml]] -===== Configuring the JSP Servlet in `web.xml` +===== Configuring the JSP Servlet in web.xml Another option is to add an entry for the JSPServlet to the `WEB-INF/web.xml` file of your webapp and change or add init-params. You may also add (but not remove) servlet-mappings. @@ -259,11 +259,25 @@ You can use the entry in link:#webdefault-xml[{$jetty.home}/etc/webdefault.xml] ... ---- +[[jsp-async-support]] +===== Configuring Async Support + +By default, Jetty does not enable async support for the JSP servlet. +Configuring the JSP servlet for async is relatively easy - simply define the `async-supported` parameter as `true` in either your `webdefault.xml` or the `web.xml` for a specific context. + +[source, xml, subs="{sub-order}"] +---- + + jsp + true + +---- + [[using-jstl-taglibs-for-jetty7-jetty8]] ==== Using JSTL Taglibs The JavaServer Pages Standlard Tag Library (JSTL) is part of the Jetty distribution and is automatically put on the classpath when you link:#which-jsp-implementation[select your flavour of JSP]. -It is also automatically on the classpath for the Jetty Maven plugin, which uses the Apache JSP engine as of Jetty 9.2. +It is also automatically on the classpath for the Jetty Maven plugin, which uses the Apache JSP engine. ===== Embedding From a721e8b25d8c8667c8fc3f51527308d34a1d0474 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 8 Dec 2016 10:08:56 +0100 Subject: [PATCH 8/9] Fixes #1148 - Support HTTP/2 HEADERS trailer. --- .../http2/client/HTTP2ClientSession.java | 35 +- .../jetty/http2/client/RawHTTP2ProxyTest.java | 709 ++++++++++++++++++ .../jetty/http2/client/TrailersTest.java | 145 ++++ .../org/eclipse/jetty/http2/HTTP2Session.java | 15 + .../http2/server/HTTP2ServerSession.java | 18 +- 5 files changed, 901 insertions(+), 21 deletions(-) create mode 100644 jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/RawHTTP2ProxyTest.java create mode 100644 jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/TrailersTest.java diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java index 295fdf56aaf..d5c509018c2 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java @@ -20,6 +20,8 @@ package org.eclipse.jetty.http2.client; import java.util.concurrent.atomic.AtomicLong; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.IStream; @@ -78,30 +80,23 @@ public class HTTP2ClientSession extends HTTP2Session int streamId = frame.getStreamId(); IStream stream = getStream(streamId); - if (stream == null) + if (stream != null) { - if (LOG.isDebugEnabled()) - LOG.debug("Ignoring {}, stream #{} not found", frame, streamId); + MetaData metaData = frame.getMetaData(); + if (metaData.isRequest()) + { + onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_response"); + } + else + { + stream.process(frame, Callback.NOOP); + notifyHeaders(stream, frame); + } } else { - stream.process(frame, Callback.NOOP); - notifyHeaders(stream, frame); - } - } - - private void notifyHeaders(IStream stream, HeadersFrame frame) - { - Stream.Listener listener = stream.getListener(); - if (listener == null) - return; - try - { - listener.onHeaders(stream, frame); - } - catch (Throwable x) - { - LOG.info("Failure while notifying listener " + listener, x); + if (LOG.isDebugEnabled()) + LOG.debug("Ignoring {}, stream #{} not found", frame, streamId); } } diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/RawHTTP2ProxyTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/RawHTTP2ProxyTest.java new file mode 100644 index 00000000000..9896d7355ef --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/RawHTTP2ProxyTest.java @@ -0,0 +1,709 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpURI; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.Frame; +import org.eclipse.jetty.http2.frames.GoAwayFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.PushPromiseFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class RawHTTP2ProxyTest +{ + private static final Logger LOGGER = Log.getLogger(RawHTTP2ProxyTest.class); + + private final List servers = new ArrayList<>(); + private final List clients = new ArrayList<>(); + + private Server startServer(String name, ServerSessionListener listener) throws Exception + { + QueuedThreadPool serverExecutor = new QueuedThreadPool(); + serverExecutor.setName(name); + Server server = new Server(serverExecutor); + RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener); + ServerConnector connector = new ServerConnector(server, 1, 1, connectionFactory); + server.addConnector(connector); + server.setAttribute("connector", connector); + servers.add(server); + server.start(); + return server; + } + + private HTTP2Client startClient(String name) throws Exception + { + HTTP2Client client = new HTTP2Client(); + QueuedThreadPool clientExecutor = new QueuedThreadPool(); + clientExecutor.setName(name); + client.setExecutor(clientExecutor); + clients.add(client); + client.start(); + return client; + } + + @After + public void dispose() throws Exception + { + for (int i = clients.size() - 1; i >= 0; i--) + { + HTTP2Client client = clients.get(i); + client.stop(); + } + for (int i = servers.size() - 1; i >= 0; i--) + { + Server server = servers.get(i); + server.stop(); + } + } + + + @Test + public void testRawHTTP2Proxy() throws Exception + { + byte[] data1 = new byte[1024]; + new Random().nextBytes(data1); + ByteBuffer buffer1 = ByteBuffer.wrap(data1); + Server server1 = startServer("server1", new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER1 received {}", frame); + return new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER1 received {}", frame); + if (frame.isEndStream()) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + HeadersFrame reply = new HeadersFrame(stream.getId(), response, null, false); + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER1 sending {}", reply); + stream.headers(reply, new Callback() + { + @Override + public void succeeded() + { + DataFrame data = new DataFrame(stream.getId(), buffer1.slice(), true); + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER1 sending {}", data); + stream.data(data, NOOP); + } + }); + } + } + }; + } + }); + ServerConnector connector1 = (ServerConnector)server1.getAttribute("connector"); + Server server2 = startServer("server2", new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER2 received {}", frame); + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER2 received {}", frame); + callback.succeeded(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + Callback.Completable completable1 = new Callback.Completable(); + HeadersFrame reply = new HeadersFrame(stream.getId(), response, null, false); + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER2 sending {}", reply); + stream.headers(reply, completable1); + completable1.thenCompose(ignored -> + { + Callback.Completable completable2 = new Callback.Completable(); + DataFrame data = new DataFrame(stream.getId(), buffer1.slice(), false); + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER2 sending {}", data); + stream.data(data, completable2); + return completable2; + }).thenRun(() -> + { + MetaData trailer = new MetaData(HttpVersion.HTTP_2, new HttpFields()); + HeadersFrame end = new HeadersFrame(stream.getId(), trailer, null, true); + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER2 sending {}", end); + stream.headers(end, Callback.NOOP); + }); + } + }; + } + }); + ServerConnector connector2 = (ServerConnector)server2.getAttribute("connector"); + HTTP2Client proxyClient = startClient("proxyClient"); + Server proxyServer = startServer("proxyServer", new ClientToProxySessionListener(proxyClient)); + ServerConnector proxyConnector = (ServerConnector)proxyServer.getAttribute("connector"); + InetSocketAddress proxyAddress = new InetSocketAddress("localhost", proxyConnector.getLocalPort()); + HTTP2Client client = startClient("client"); + + FuturePromise clientPromise = new FuturePromise<>(); + client.connect(proxyAddress, new Session.Listener.Adapter(), clientPromise); + Session clientSession = clientPromise.get(5, TimeUnit.SECONDS); + + // Send a request with trailers for server1. + HttpFields fields1 = new HttpFields(); + fields1.put("X-Target", String.valueOf(connector1.getLocalPort())); + MetaData.Request request1 = new MetaData.Request("GET", new HttpURI("http://localhost/server1"), HttpVersion.HTTP_2, fields1); + FuturePromise streamPromise1 = new FuturePromise<>(); + CountDownLatch latch1 = new CountDownLatch(1); + clientSession.newStream(new HeadersFrame(request1, null, false), streamPromise1, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CLIENT received {}", frame); + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CLIENT received {}", frame); + Assert.assertEquals(buffer1.slice(), frame.getData()); + callback.succeeded(); + latch1.countDown(); + } + }); + Stream stream1 = streamPromise1.get(5, TimeUnit.SECONDS); + stream1.headers(new HeadersFrame(stream1.getId(), new MetaData(HttpVersion.HTTP_2, new HttpFields()), null, true), Callback.NOOP); + + // Send a request for server2. + HttpFields fields2 = new HttpFields(); + fields2.put("X-Target", String.valueOf(connector2.getLocalPort())); + MetaData.Request request2 = new MetaData.Request("GET", new HttpURI("http://localhost/server1"), HttpVersion.HTTP_2, fields2); + FuturePromise streamPromise2 = new FuturePromise<>(); + CountDownLatch latch2 = new CountDownLatch(1); + clientSession.newStream(new HeadersFrame(request2, null, false), streamPromise2, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CLIENT received {}", frame); + if (frame.isEndStream()) + latch2.countDown(); + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CLIENT received {}", frame); + callback.succeeded(); + } + }); + Stream stream2 = streamPromise2.get(5, TimeUnit.SECONDS); + stream2.data(new DataFrame(stream2.getId(), buffer1.slice(), true), Callback.NOOP); + + Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS)); + } + + private static class ClientToProxySessionListener extends ServerSessionListener.Adapter + { + private final Map forwarders = new ConcurrentHashMap<>(); + private final HTTP2Client client; + + private ClientToProxySessionListener(HTTP2Client client) + { + this.client = client; + } + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Received {} for {} on {}: {}", frame, stream, stream.getSession(), frame.getMetaData()); + // Forward to the right server. + MetaData metaData = frame.getMetaData(); + HttpFields fields = metaData.getFields(); + int port = Integer.parseInt(fields.get("X-Target")); + ClientToProxyToServer clientToProxyToServer = forwarders.computeIfAbsent(port, p -> new ClientToProxyToServer("localhost", p, client)); + clientToProxyToServer.offer(stream, frame, Callback.NOOP); + return clientToProxyToServer; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Received {} on {}", frame, session); + // TODO + } + + @Override + public boolean onIdleTimeout(Session session) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Idle timeout on {}", session); + // TODO + return true; + } + + @Override + public void onFailure(Session session, Throwable failure) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Failure on " + session, failure); + // TODO + } + } + + private static class ClientToProxyToServer extends IteratingCallback implements Stream.Listener + { + private final Object lock = this; + private final Map> frames = new HashMap<>(); + private final Map streams = new HashMap<>(); + private final ServerToProxyToClient serverToProxyToClient = new ServerToProxyToClient(); + private final String host; + private final int port; + private final HTTP2Client client; + private Session proxyToServerSession; + private FrameInfo frameInfo; + private Stream clientToProxyStream; + + private ClientToProxyToServer(String host, int port, HTTP2Client client) + { + this.host = host; + this.port = port; + this.client = client; + } + + private void offer(Stream stream, Frame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS queueing {} for {} on {}", frame, stream, stream.getSession()); + boolean connected; + synchronized (lock) + { + Deque deque = frames.computeIfAbsent(stream, s -> new ArrayDeque<>()); + deque.offer(new FrameInfo(frame, callback)); + connected = proxyToServerSession != null; + } + if (connected) + iterate(); + else + connect(); + } + + private void connect() + { + InetSocketAddress address = new InetSocketAddress(host, port); + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS connecting to {}", address); + client.connect(address, new ServerToProxySessionListener(), new Promise() + { + @Override + public void succeeded(Session result) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS connected to {} with {}", address, result); + synchronized (lock) + { + proxyToServerSession = result; + } + iterate(); + } + + @Override + public void failed(Throwable x) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS connect failed to {}", address); + // TODO: drain the queue and fail the streams. + } + }); + } + + @Override + protected Action process() throws Throwable + { + Stream proxyToServerStream = null; + Session proxyToServerSession = null; + synchronized (lock) + { + for (Map.Entry> entry : frames.entrySet()) + { + frameInfo = entry.getValue().poll(); + if (frameInfo != null) + { + clientToProxyStream = entry.getKey(); + proxyToServerStream = streams.get(clientToProxyStream); + proxyToServerSession = this.proxyToServerSession; + break; + } + } + } + + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS processing {} for {} to {}", frameInfo, clientToProxyStream, proxyToServerStream); + + if (frameInfo == null) + return Action.IDLE; + + if (proxyToServerStream == null) + { + HeadersFrame clientToProxyFrame = (HeadersFrame)frameInfo.frame; + HeadersFrame proxyToServerFrame = new HeadersFrame(clientToProxyFrame.getMetaData(), clientToProxyFrame.getPriority(), clientToProxyFrame.isEndStream()); + proxyToServerSession.newStream(proxyToServerFrame, new Promise() + { + @Override + public void succeeded(Stream result) + { + synchronized (lock) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS created {}", result); + streams.put(clientToProxyStream, result); + } + serverToProxyToClient.link(result, clientToProxyStream); + ClientToProxyToServer.this.succeeded(); + } + + @Override + public void failed(Throwable failure) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS create failed", failure); + // TODO: cannot open stream to server. + ClientToProxyToServer.this.failed(failure); + } + }, serverToProxyToClient); + return Action.SCHEDULED; + } + else + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS forwarding {} from {} to {}", frameInfo, clientToProxyStream, proxyToServerStream); + switch (frameInfo.frame.getType()) + { + case HEADERS: + { + HeadersFrame clientToProxyFrame = (HeadersFrame)frameInfo.frame; + HeadersFrame proxyToServerFrame = new HeadersFrame(proxyToServerStream.getId(), clientToProxyFrame.getMetaData(), clientToProxyFrame.getPriority(), clientToProxyFrame.isEndStream()); + proxyToServerStream.headers(proxyToServerFrame, this); + return Action.SCHEDULED; + } + case DATA: + { + DataFrame clientToProxyFrame = (DataFrame)frameInfo.frame; + DataFrame proxyToServerFrame = new DataFrame(proxyToServerStream.getId(), clientToProxyFrame.getData(), clientToProxyFrame.isEndStream()); + proxyToServerStream.data(proxyToServerFrame, this); + return Action.SCHEDULED; + } + default: + { + throw new IllegalStateException(); + } + } + } + } + + @Override + public void succeeded() + { + frameInfo.callback.succeeded(); + super.succeeded(); + } + + @Override + public void failed(Throwable failure) + { + frameInfo.callback.failed(failure); + super.failed(failure); + } + + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS received {} on {}", frame, stream); + offer(stream, frame, NOOP); + } + + @Override + public Stream.Listener onPush(Stream stream, PushPromiseFrame frame) + { + // Clients don't push. + return null; + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS received {} on {}", frame, stream); + offer(stream, frame, callback); + } + + @Override + public void onReset(Stream stream, ResetFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS received {} on {}", frame, stream); + // TODO: drain the queue for that stream, and notify server. + } + + @Override + public boolean onIdleTimeout(Stream stream, Throwable x) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS idle timeout for {}", stream); + // TODO: drain the queue for that stream, reset stream, and notify server. + return true; + } + } + + private static class ServerToProxySessionListener extends Session.Listener.Adapter + { + @Override + public void onClose(Session session, GoAwayFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Received {} on {}", frame, session); + // TODO + } + + @Override + public boolean onIdleTimeout(Session session) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Idle timeout on {}", session); + // TODO + return true; + } + + @Override + public void onFailure(Session session, Throwable failure) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Failure on " + session, failure); + // TODO + } + } + + private static class ServerToProxyToClient extends IteratingCallback implements Stream.Listener + { + private final Object lock = this; + private final Map> frames = new HashMap<>(); + private final Map streams = new HashMap<>(); + private FrameInfo frameInfo; + private Stream serverToProxyStream; + + @Override + protected Action process() throws Throwable + { + Stream proxyToClientStream = null; + synchronized (lock) + { + for (Map.Entry> entry : frames.entrySet()) + { + frameInfo = entry.getValue().poll(); + if (frameInfo != null) + { + serverToProxyStream = entry.getKey(); + proxyToClientStream = streams.get(serverToProxyStream); + break; + } + } + } + + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC processing {} for {} to {}", frameInfo, serverToProxyStream, proxyToClientStream); + + // It may happen that we received a frame from the server, + // but the proxyToClientStream is not linked yet. + if (proxyToClientStream == null) + return Action.IDLE; + + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC forwarding {} for {} to {}", frameInfo, serverToProxyStream, proxyToClientStream); + + switch (frameInfo.frame.getType()) + { + case HEADERS: + { + HeadersFrame serverToProxyFrame = (HeadersFrame)frameInfo.frame; + HeadersFrame proxyToClientFrame = new HeadersFrame(proxyToClientStream.getId(), serverToProxyFrame.getMetaData(), serverToProxyFrame.getPriority(), serverToProxyFrame.isEndStream()); + proxyToClientStream.headers(proxyToClientFrame, this); + return Action.SCHEDULED; + } + case DATA: + { + DataFrame clientToProxyFrame = (DataFrame)frameInfo.frame; + DataFrame proxyToServerFrame = new DataFrame(serverToProxyStream.getId(), clientToProxyFrame.getData(), clientToProxyFrame.isEndStream()); + proxyToClientStream.data(proxyToServerFrame, this); + return Action.SCHEDULED; + } + case PUSH_PROMISE: + { + // TODO + throw new UnsupportedOperationException(); + } + default: + { + throw new IllegalStateException(); + } + } + } + + @Override + public void succeeded() + { + frameInfo.callback.succeeded(); + super.succeeded(); + } + + @Override + public void failed(Throwable failure) + { + frameInfo.callback.failed(failure); + super.failed(failure); + } + + private void offer(Stream stream, Frame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC queueing {} for {} on {}", frame, stream, stream.getSession()); + synchronized (lock) + { + Deque deque = frames.computeIfAbsent(stream, s -> new ArrayDeque<>()); + deque.offer(new FrameInfo(frame, callback)); + } + iterate(); + } + + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC received {} on {}", frame, stream); + offer(stream, frame, NOOP); + } + + @Override + public Stream.Listener onPush(Stream stream, PushPromiseFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC received {} on {}", frame, stream); + // TODO + return null; + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC received {} on {}", frame, stream); + offer(stream, frame, callback); + } + + @Override + public void onReset(Stream stream, ResetFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC received {} on {}", frame, stream); + // TODO: drain queue, reset client stream. + } + + @Override + public boolean onIdleTimeout(Stream stream, Throwable x) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC idle timeout for {}", stream); + // TODO: + return false; + } + + private void link(Stream proxyToServerStream, Stream clientToProxyStream) + { + synchronized (lock) + { + streams.put(proxyToServerStream, clientToProxyStream); + } + iterate(); + } + } + + private static class FrameInfo + { + private final Frame frame; + private final Callback callback; + + private FrameInfo(Frame frame, Callback callback) + { + this.frame = frame; + this.callback = callback; + } + + @Override + public String toString() + { + return frame.toString(); + } + } +} diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/TrailersTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/TrailersTest.java new file mode 100644 index 00000000000..de303464183 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/TrailersTest.java @@ -0,0 +1,145 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.Promise; +import org.junit.Assert; +import org.junit.Test; + +public class TrailersTest extends AbstractTest +{ + @Test + public void testTrailersSentByClient() throws Exception + { + CountDownLatch latch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Request request = (MetaData.Request)frame.getMetaData(); + Assert.assertFalse(frame.isEndStream()); + Assert.assertTrue(request.getFields().containsKey("X-Request")); + return new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData trailer = frame.getMetaData(); + Assert.assertTrue(frame.isEndStream()); + Assert.assertTrue(trailer.getFields().containsKey("X-Trailer")); + latch.countDown(); + } + }; + } + }); + + Session session = newClient(new Session.Listener.Adapter()); + + HttpFields requestFields = new HttpFields(); + requestFields.put("X-Request", "true"); + MetaData.Request request = newRequest("GET", requestFields); + HeadersFrame requestFrame = new HeadersFrame(request, null, false); + FuturePromise streamPromise = new FuturePromise<>(); + session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter()); + Stream stream = streamPromise.get(5, TimeUnit.SECONDS); + + // Send the trailers. + HttpFields trailerFields = new HttpFields(); + trailerFields.put("X-Trailer", "true"); + MetaData trailers = new MetaData(HttpVersion.HTTP_2, trailerFields); + HeadersFrame trailerFrame = new HeadersFrame(stream.getId(), trailers, null, true); + stream.headers(trailerFrame, Callback.NOOP); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testTrailersSentByServer() throws Exception + { + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + HttpFields responseFields = new HttpFields(); + responseFields.put("X-Response", "true"); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, responseFields); + HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false); + stream.headers(responseFrame, new Callback() + { + @Override + public void succeeded() + { + HttpFields trailerFields = new HttpFields(); + trailerFields.put("X-Trailer", "true"); + MetaData trailer = new MetaData(HttpVersion.HTTP_2, trailerFields); + HeadersFrame trailerFrame = new HeadersFrame(stream.getId(), trailer, null, true); + stream.headers(trailerFrame, NOOP); + } + }); + return null; + } + }); + + Session session = newClient(new Session.Listener.Adapter()); + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame requestFrame = new HeadersFrame(request, null, true); + CountDownLatch latch = new CountDownLatch(1); + session.newStream(requestFrame, new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + private boolean responded; + + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (!responded) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + Assert.assertTrue(response.getFields().containsKey("X-Response")); + Assert.assertFalse(frame.isEndStream()); + responded = true; + } + else + { + MetaData trailer = frame.getMetaData(); + Assert.assertTrue(trailer.getFields().containsKey("X-Trailer")); + Assert.assertTrue(frame.isEndStream()); + latch.countDown(); + } + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } +} diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index be26e1a5bb9..b65b02ba3dc 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -1097,6 +1097,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } + protected void notifyHeaders(IStream stream, HeadersFrame frame) + { + Stream.Listener listener = stream.getListener(); + if (listener == null) + return; + try + { + listener.onHeaders(stream, frame); + } + catch (Throwable x) + { + LOG.info("Failure while notifying listener " + listener, x); + } + } + @Override public String toString() { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java index cb5de81d6c2..1ef23a1e0ce 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java @@ -95,9 +95,25 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis stream.setListener(listener); } } + else if (metaData.isResponse()) + { + onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_request"); + } else { - onConnectionFailure(ErrorCode.INTERNAL_ERROR.code, "invalid_request"); + // Trailers. + int streamId = frame.getStreamId(); + IStream stream = getStream(streamId); + if (stream != null) + { + stream.process(frame, Callback.NOOP); + notifyHeaders(stream, frame); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Ignoring {}, stream #{} not found", frame, streamId); + } } } From 4f48d7aadf0db7e9391b2fdee38d17d41ae9f4c2 Mon Sep 17 00:00:00 2001 From: Jan Bartel Date: Thu, 8 Dec 2016 20:14:56 +1100 Subject: [PATCH 9/9] Issue #1153 Refactor calcExpiry methods a little. --- .../main/java/org/eclipse/jetty/server/session/SessionData.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionData.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionData.java index bc6aa0d5687..9150354907a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionData.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionData.java @@ -253,7 +253,7 @@ public class SessionData implements Serializable public long calcExpiry () { - return (getMaxInactiveMs() <= 0 ? 0 : (System.currentTimeMillis() + getMaxInactiveMs())); + return calcExpiry(System.currentTimeMillis()); } public long calcExpiry (long time)