diff --git a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/LikeJettyXml.java b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/LikeJettyXml.java index ae1f1938601..bd01c93dc0b 100644 --- a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/LikeJettyXml.java +++ b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/LikeJettyXml.java @@ -26,6 +26,7 @@ import org.eclipse.jetty.jmx.MBeanContainer; import org.eclipse.jetty.security.HashLoginService; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.LowResourceMonitor; import org.eclipse.jetty.server.NCSARequestLog; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; @@ -36,6 +37,7 @@ import org.eclipse.jetty.server.handler.RequestLogHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.TimerScheduler; public class LikeJettyXml { @@ -53,6 +55,8 @@ public class LikeJettyXml server.setDumpAfterStart(false); server.setDumpBeforeStop(false); + server.addBean(new TimerScheduler()); + // Setup JMX MBeanContainer mbContainer=new MBeanContainer(ManagementFactory.getPlatformMBeanServer()); server.addBean(mbContainer); @@ -120,6 +124,12 @@ public class LikeJettyXml requestLogHandler.setRequestLog(requestLog); server.setStopAtShutdown(true); + + LowResourceMonitor lowResourcesMonitor=new LowResourceMonitor(server); + lowResourcesMonitor.setLowResourcesIdleTimeout(1000); + lowResourcesMonitor.setMaxConnections(2); + lowResourcesMonitor.setPeriod(1200); + server.addBean(lowResourcesMonitor); server.start(); server.join(); diff --git a/jetty-distribution/src/main/resources/start.ini b/jetty-distribution/src/main/resources/start.ini index 495b098c598..b927a2bddf6 100644 --- a/jetty-distribution/src/main/resources/start.ini +++ b/jetty-distribution/src/main/resources/start.ini @@ -200,4 +200,5 @@ etc/jetty-requestlog.xml # etc/jetty-stats.xml # etc/jetty-debug.xml # etc/jetty-ipaccess.xml +# etc/jetty-lowresources.xml #=========================================================== diff --git a/jetty-maven-plugin/src/main/java/org/eclipse/jetty/maven/plugin/OverlayConfig.java b/jetty-maven-plugin/src/main/java/org/eclipse/jetty/maven/plugin/OverlayConfig.java index d7ce310596e..e4bd0f851a1 100644 --- a/jetty-maven-plugin/src/main/java/org/eclipse/jetty/maven/plugin/OverlayConfig.java +++ b/jetty-maven-plugin/src/main/java/org/eclipse/jetty/maven/plugin/OverlayConfig.java @@ -25,7 +25,7 @@ import java.util.List; import org.codehaus.plexus.util.xml.Xpp3Dom; -import edu.emory.mathcs.backport.java.util.Arrays; +import java.util.Arrays; /** * OverlayConfig diff --git a/jetty-osgi/jetty-osgi-boot/jettyhome/etc/jetty.xml b/jetty-osgi/jetty-osgi-boot/jettyhome/etc/jetty.xml index 7df9ee2d003..1c998353866 100644 --- a/jetty-osgi/jetty-osgi-boot/jettyhome/etc/jetty.xml +++ b/jetty-osgi/jetty-osgi-boot/jettyhome/etc/jetty.xml @@ -57,6 +57,20 @@ + + + + org.eclipse.jetty.webapp.FragmentConfiguration + + + org.eclipse.jetty.plus.webapp.EnvConfiguration + org.eclipse.jetty.plus.webapp.PlusConfiguration + org.eclipse.jetty.annotations.AnnotationConfiguration + + + + + java.naming.factory.initial diff --git a/jetty-server/src/main/config/etc/jetty-lowresources.xml b/jetty-server/src/main/config/etc/jetty-lowresources.xml new file mode 100644 index 00000000000..5b264f1749f --- /dev/null +++ b/jetty-server/src/main/config/etc/jetty-lowresources.xml @@ -0,0 +1,22 @@ + + + + + + + + + + + + + 1000 + 200 + true + 0 + 0 + 5000 + + + + diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index 8c7945c19f7..ca40a390df5 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -23,10 +23,13 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Future; @@ -35,6 +38,7 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ssl.SslConnection; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.annotation.ManagedAttribute; @@ -141,16 +145,19 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co private final Scheduler _scheduler; private final ByteBufferPool _byteBufferPool; private final Thread[] _acceptors; + private final Set _endpoints = Collections.newSetFromMap(new ConcurrentHashMap()); + private final Set _immutableEndPoints = Collections.unmodifiableSet(_endpoints); private volatile CountDownLatch _stopping; private long _idleTimeout = 30000; private String _defaultProtocol; private ConnectionFactory _defaultConnectionFactory; + /** * @param server The server this connector will be added to. Must not be null. * @param executor An executor for this connector or null to use the servers executor - * @param scheduler A scheduler for this connector or null to a new {@link TimerScheduler} instance. - * @param pool A buffer pool for this connector or null to use a default {@link ByteBufferPool} + * @param scheduler A scheduler for this connector or null to either a {@link Scheduler} set as a server bean or if none set, then a new {@link TimerScheduler} instance. + * @param pool A buffer pool for this connector or null to either a {@link ByteBufferPool} set as a server bean or none set, the new {@link ArrayByteBufferPool} instance. * @param acceptors the number of acceptor threads to use, or 0 for a default value. * @param factories The Connection Factories to use. */ @@ -164,7 +171,11 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co { _server=server; _executor=executor!=null?executor:_server.getThreadPool(); + if (scheduler==null) + scheduler=_server.getBean(Scheduler.class); _scheduler=scheduler!=null?scheduler:new TimerScheduler(); + if (pool==null) + pool=_server.getBean(ByteBufferPool.class); _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool(); addBean(_server,false); @@ -468,6 +479,9 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co } } + + + // protected void connectionOpened(Connection connection) // { // _stats.connectionOpened(); @@ -488,6 +502,22 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co // newConnection.onOpen(); // } + @Override + public Collection getConnectedEndPoints() + { + return _immutableEndPoints; + } + + protected void onEndPointOpened(EndPoint endp) + { + _endpoints.add(endp); + } + + protected void onEndPointClosed(EndPoint endp) + { + _endpoints.remove(endp); + } + @Override public Scheduler getScheduler() { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java index 85e0708ff88..3454c7869ec 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.concurrent.Executor; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.Graceful; @@ -85,5 +86,8 @@ public interface Connector extends LifeCycle, Graceful */ public Object getTransport(); - + /** + * @return immutable collection of connected endpoints + */ + public Collection getConnectedEndPoints(); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index b425007c76c..6d161def3a1 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -167,6 +167,7 @@ public class LocalConnector extends AbstractConnector LOG.debug("accepting {}", acceptorID); LocalEndPoint endPoint = _connects.take(); endPoint.onOpen(); + onEndPointOpened(endPoint); Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint); endPoint.setConnection(connection); @@ -209,6 +210,7 @@ public class LocalConnector extends AbstractConnector @Override public void onClose() { + LocalConnector.this.onEndPointClosed(this); super.onClose(); _closed.countDown(); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LowResourceMonitor.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LowResourceMonitor.java new file mode 100644 index 00000000000..d4a1dbb0290 --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LowResourceMonitor.java @@ -0,0 +1,349 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.annotation.Name; +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.ThreadPool; +import org.eclipse.jetty.util.thread.TimerScheduler; + + +/* ------------------------------------------------------------ */ +/** A monitor for low resources + *

An instance of this class will monitor all the connectors of a server (or a set of connectors + * configured with {@link #setMonitoredConnectors(Collection)}) for a low resources state. + * Low resources can be detected by:

    + *
  • {@link ThreadPool#isLowOnThreads()} if {@link Connector#getExecutor()} is + * an instance of {@link ThreadPool} and {@link #setMonitorThreads(boolean)} is true.
  • + *
  • If {@link #setMaxMemory(long)} is non zero then low resources is detected if the JVMs + * {@link Runtime} instance has {@link Runtime#totalMemory()} minus {@link Runtime#freeMemory()} + * greater than {@link #getMaxMemory()}
  • + *
  • If {@link #setMaxConnections(int)} is non zero then low resources is dected if the total number + * of connections exceeds {@link #getMaxConnections()}
  • + *
+ *

+ *

Once low resources state is detected, the cause is logged and all existing connections returned + * by {@link Connector#getConnectedEndPoints()} have {@link EndPoint#setIdleTimeout(long)} set + * to {@link #getLowResourcesIdleTimeout()}. New connections are not affected, however if the low + * resources state persists for more than {@link #getMaxLowResourcesTime()}, then the + * {@link #getLowResourcesIdleTimeout()} to all connections again. Once the low resources state is + * cleared, the idle timeout is reset to the connector default given by {@link Connector#getIdleTimeout()}. + *

+ */ +@ManagedObject ("Monitor for low resource conditions and activate a low resource mode if detected") +public class LowResourceMonitor extends AbstractLifeCycle +{ + private static final Logger LOG = Log.getLogger(LowResourceMonitor.class); + private final Server _server; + private Scheduler _scheduler; + private Connector[] _monitoredConnectors; + private int _period=1000; + private int _maxConnections; + private long _maxMemory; + private int _lowResourcesIdleTimeout=1000; + private int _maxLowResourcesTime=0; + private boolean _monitorThreads=true; + private final AtomicBoolean _low = new AtomicBoolean(); + private String _cause; + private String _reasons; + private long _lowStarted; + + + private final Runnable _monitor = new Runnable() + { + @Override + public void run() + { + if (isRunning()) + { + monitor(); + _scheduler.schedule(_monitor,_period,TimeUnit.MILLISECONDS); + } + } + }; + + public LowResourceMonitor(@Name("server") Server server) + { + _server=server; + } + + @ManagedAttribute("Are the monitored connectors low on resources?") + public boolean isLowOnResources() + { + return _low.get(); + } + + @ManagedAttribute("The reason(s) the monitored connectors are low on resources") + public String getLowResourcesReasons() + { + return _reasons; + } + + @ManagedAttribute("Get the timestamp in ms since epoch that low resources state started") + public long getLowResourcesStarted() + { + return _lowStarted; + } + + @ManagedAttribute("The monitored connectors. If null then all server connectors are monitored") + public Collection getMonitoredConnectors() + { + if (_monitoredConnectors==null) + return Collections.emptyList(); + return Arrays.asList(_monitoredConnectors); + } + + /** + * @param monitoredConnectors The collections of Connectors that should be monitored for low resources. + */ + public void setMonitoredConnectors(Collection monitoredConnectors) + { + if (monitoredConnectors==null || monitoredConnectors.size()==0) + _monitoredConnectors=null; + else + _monitoredConnectors = monitoredConnectors.toArray(new Connector[monitoredConnectors.size()]); + } + + @ManagedAttribute("The monitor period in ms") + public int getPeriod() + { + return _period; + } + + /** + * @param periodMS The period in ms to monitor for low resources + */ + public void setPeriod(int periodMS) + { + _period = periodMS; + } + + @ManagedAttribute("True if low available threads status is monitored") + public boolean getMonitorThreads() + { + return _monitorThreads; + } + + /** + * @param monitorThreads If true, check connectors executors to see if they are + * {@link ThreadPool} instances that are low on threads. + */ + public void setMonitorThreads(boolean monitorThreads) + { + _monitorThreads = monitorThreads; + } + + @ManagedAttribute("The maximum connections allowed for the monitored connectors before low resource handling is activated") + public int getMaxConnections() + { + return _maxConnections; + } + + /** + * @param maxConnections The maximum connections before low resources state is triggered + */ + public void setMaxConnections(int maxConnections) + { + _maxConnections = maxConnections; + } + + @ManagedAttribute("The maximum memory (in bytes) that can be used before low resources is triggered. Memory used is calculated as (totalMemory-freeMemory).") + public long getMaxMemory() + { + return _maxMemory; + } + + /** + * @param maxMemoryBytes The maximum memory in bytes in use before low resources is triggered. + */ + public void setMaxMemory(long maxMemoryBytes) + { + _maxMemory = maxMemoryBytes; + } + + @ManagedAttribute("The idletimeout in ms to apply to all existing connections when low resources is detected") + public int getLowResourcesIdleTimeout() + { + return _lowResourcesIdleTimeout; + } + + /** + * @param lowResourcesIdleTimeoutMS The timeout in ms to apply to EndPoints when in the low resources state. + */ + public void setLowResourcesIdleTimeout(int lowResourcesIdleTimeoutMS) + { + _lowResourcesIdleTimeout = lowResourcesIdleTimeoutMS; + } + + @ManagedAttribute("The maximum time in ms that low resources condition can persist before lowResourcesIdleTimeout is applied to new connections as well as existing connections") + public int getMaxLowResourcesTime() + { + return _maxLowResourcesTime; + } + + /** + * @param maxLowResourcesTimeMS The time in milliseconds that a low resource state can persist before the low resource idle timeout is reapplied to all connections + */ + public void setMaxLowResourcesTime(int maxLowResourcesTimeMS) + { + _maxLowResourcesTime = maxLowResourcesTimeMS; + } + + @Override + protected void doStart() throws Exception + { + _scheduler = _server.getBean(Scheduler.class); + + if (_scheduler==null) + { + _scheduler=new LRMScheduler(); + _scheduler.start(); + } + super.doStart(); + + _scheduler.schedule(_monitor,_period,TimeUnit.MILLISECONDS); + } + + @Override + protected void doStop() throws Exception + { + if (_scheduler instanceof LRMScheduler) + _scheduler.stop(); + super.doStop(); + } + + protected Connector[] getMonitoredOrServerConnectors() + { + if (_monitoredConnectors!=null && _monitoredConnectors.length>0) + return _monitoredConnectors; + return _server.getConnectors(); + } + + protected void monitor() + { + String reasons=null; + String cause=""; + int connections=0; + + for(Connector connector : getMonitoredOrServerConnectors()) + { + connections+=connector.getConnectedEndPoints().size(); + + Executor executor = connector.getExecutor(); + if (executor instanceof ThreadPool) + { + ThreadPool threadpool=(ThreadPool) executor; + if (_monitorThreads && threadpool.isLowOnThreads()) + { + reasons=low(reasons,"Low on threads: "+threadpool); + cause+="T"; + } + } + } + + if (_maxConnections>0 && connections>_maxConnections) + { + reasons=low(reasons,"Max Connections exceeded: "+connections+">"+_maxConnections); + cause+="C"; + } + + long memory=Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory(); + if (_maxMemory>0 && memory>_maxMemory) + { + reasons=low(reasons,"Max memory exceeded: "+memory+">"+_maxMemory); + cause+="M"; + } + + + if (reasons!=null) + { + // Log the reasons if there is any change in the cause + if (!cause.equals(_cause)) + { + LOG.warn("Low Resources: {}",reasons); + _cause=cause; + } + + // Enter low resources state? + if (_low.compareAndSet(false,true)) + { + _reasons=reasons; + _lowStarted=System.currentTimeMillis(); + setLowResources(); + } + + // Too long in low resources state? + if (_maxLowResourcesTime>0 && (System.currentTimeMillis()-_lowStarted)>_maxLowResourcesTime) + setLowResources(); + } + else + { + if (_low.compareAndSet(true,false)) + { + LOG.info("Low Resources cleared"); + _reasons=null; + _lowStarted=0; + clearLowResources(); + } + } + } + + protected void setLowResources() + { + for(Connector connector : getMonitoredOrServerConnectors()) + { + for (EndPoint endPoint : connector.getConnectedEndPoints()) + endPoint.setIdleTimeout(_lowResourcesIdleTimeout); + } + } + + protected void clearLowResources() + { + for(Connector connector : getMonitoredOrServerConnectors()) + { + for (EndPoint endPoint : connector.getConnectedEndPoints()) + endPoint.setIdleTimeout(connector.getIdleTimeout()); + } + } + + private String low(String reasons, String newReason) + { + if (reasons==null) + return newReason; + return reasons+", "+newReason; + } + + + private static class LRMScheduler extends TimerScheduler + { + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java index 8ffea32f88f..eadb2f413c9 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java @@ -400,5 +400,21 @@ public class ServerConnector extends AbstractNetworkConnector { return getDefaultConnectionFactory().newConnection(ServerConnector.this, endpoint); } + + @Override + protected void endPointOpened(EndPoint endpoint) + { + super.endPointOpened(endpoint); + onEndPointOpened(endpoint); + } + + @Override + protected void endPointClosed(EndPoint endpoint) + { + onEndPointClosed(endpoint); + super.endPointClosed(endpoint); + } + + } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/LowResourcesMonitorTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/LowResourcesMonitorTest.java new file mode 100644 index 00000000000..365a85b0798 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/LowResourcesMonitorTest.java @@ -0,0 +1,197 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server; + + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import java.net.Socket; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; + +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.TimerScheduler; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +public class LowResourcesMonitorTest +{ + QueuedThreadPool _threadPool; + Server _server; + ServerConnector _connector; + LowResourceMonitor _lowResourcesMonitor; + + @Before + public void before() throws Exception + { + _threadPool = new QueuedThreadPool(); + _threadPool.setMaxThreads(50); + + _server = new Server(_threadPool); + _server.manage(_threadPool); + + _server.addBean(new TimerScheduler()); + + _connector = new ServerConnector(_server); + _connector.setPort(0); + _connector.setIdleTimeout(35000); + _server.addConnector(_connector); + + _server.setHandler(new DumpHandler()); + + _lowResourcesMonitor=new LowResourceMonitor(_server); + _lowResourcesMonitor.setLowResourcesIdleTimeout(200); + _lowResourcesMonitor.setMaxConnections(20); + _lowResourcesMonitor.setPeriod(900); + _server.addBean(_lowResourcesMonitor); + + _server.start(); + } + + @After + public void after() throws Exception + { + _server.stop(); + } + + + @Test + public void testLowOnThreads() throws Exception + { + _threadPool.setMaxThreads(_threadPool.getThreads()-_threadPool.getIdleThreads()+10); + Thread.sleep(1200); + Assert.assertFalse(_lowResourcesMonitor.isLowOnResources()); + + final CountDownLatch latch = new CountDownLatch(1); + + for (int i=0;i<20;i++) + { + _threadPool.dispatch(new Runnable() + { + @Override + public void run() + { + try + { + latch.await(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + }); + } + + Thread.sleep(1200); + Assert.assertTrue(_lowResourcesMonitor.isLowOnResources()); + + latch.countDown(); + Thread.sleep(1200); + System.err.println(_threadPool.dump()); + Assert.assertFalse(_lowResourcesMonitor.isLowOnResources()); + } + + + @Ignore ("not reliable") + @Test + public void testLowOnMemory() throws Exception + { + _lowResourcesMonitor.setMaxMemory(Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()+(100*1024*1024)); + Thread.sleep(1200); + Assert.assertFalse(_lowResourcesMonitor.isLowOnResources()); + + byte[] data = new byte[100*1024*1024]; + Arrays.fill(data,(byte)1); + int hash = Arrays.hashCode(data); + assertThat(hash,not(equalTo(0))); + + Thread.sleep(1200); + Assert.assertTrue(_lowResourcesMonitor.isLowOnResources()); + data=null; + System.gc(); + System.gc(); + + Thread.sleep(1200); + Assert.assertFalse(_lowResourcesMonitor.isLowOnResources()); + } + + + @Test + public void testMaxConnectionsAndMaxIdleTime() throws Exception + { + _lowResourcesMonitor.setMaxMemory(0); + Assert.assertFalse(_lowResourcesMonitor.isLowOnResources()); + + Socket[] socket = new Socket[_lowResourcesMonitor.getMaxConnections()+1]; + for (int i=0;iA stream can be open, {@link #isHalfClosed() half closed} or * {@link #isClosed() closed} and this method updates the close state diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index 704c595e979..dcce655edeb 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -45,9 +45,10 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.PingInfo; import org.eclipse.jetty.spdy.api.PingResultInfo; +import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.SPDYException; import org.eclipse.jetty.spdy.api.Session; @@ -498,8 +499,17 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable stream.process(frame); // Update the last stream id before calling the application (which may send a GO_AWAY) updateLastStreamId(stream); - SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority()); - StreamFrameListener streamListener = notifyOnSyn(listener, stream, synInfo); + StreamFrameListener streamListener; + if (stream.isUnidirectional()) + { + PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose()); + streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo); + } + else + { + SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority()); + streamListener = notifyOnSyn(listener, stream, synInfo); + } stream.setStreamFrameListener(streamListener); flush(); // The onSyn() listener may have sent a frame that closed the stream @@ -680,9 +690,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable { if (goAwayReceived.compareAndSet(false, true)) { - //TODO: Find a better name for GoAwayReceivedInfo - GoAwayReceivedInfo goAwayReceivedInfo = new GoAwayReceivedInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode())); - notifyOnGoAway(listener, goAwayReceivedInfo); + //TODO: Find a better name for GoAwayResultInfo + GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode())); + notifyOnGoAway(listener, goAwayResultInfo); flush(); // SPDY does not require to send back a response to a GO_AWAY. // We notified the application of the last good stream id and @@ -755,6 +765,27 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable } } + private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo) + { + try + { + if (listener == null) + return null; + LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener); + return listener.onPush(stream, pushInfo); + } + catch (Exception x) + { + LOG.info("Exception while notifying listener " + listener, x); + return null; + } + catch (Error x) + { + LOG.info("Exception while notifying listener " + listener, x); + throw x; + } + } + private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo) { try @@ -839,14 +870,14 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable } } - private void notifyOnGoAway(SessionFrameListener listener, GoAwayReceivedInfo goAwayReceivedInfo) + private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo) { try { if (listener != null) { - LOG.debug("Invoking callback with {} on listener {}", goAwayReceivedInfo, listener); - listener.onGoAway(this, goAwayReceivedInfo); + LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener); + listener.onGoAway(this, goAwayResultInfo); } } catch (Exception x) diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java index 78ee9d51599..7e9afac7a0e 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java @@ -148,6 +148,7 @@ public class StandardStream implements IStream this.listener = listener; } + @Override public StreamFrameListener getStreamFrameListener() { return listener; diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/GoAwayReceivedInfo.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/GoAwayResultInfo.java similarity index 87% rename from jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/GoAwayReceivedInfo.java rename to jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/GoAwayResultInfo.java index ef50de19fcf..eb75f886785 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/GoAwayReceivedInfo.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/GoAwayResultInfo.java @@ -22,18 +22,18 @@ package org.eclipse.jetty.spdy.api; *

A container for GOAWAY frames metadata: the last good stream id and * the session status.

*/ -public class GoAwayReceivedInfo +public class GoAwayResultInfo { private final int lastStreamId; private final SessionStatus sessionStatus; /** - *

Creates a new {@link GoAwayReceivedInfo} with the given last good stream id and session status

+ *

Creates a new {@link GoAwayResultInfo} with the given last good stream id and session status

* * @param lastStreamId the last good stream id * @param sessionStatus the session status */ - public GoAwayReceivedInfo(int lastStreamId, SessionStatus sessionStatus) + public GoAwayResultInfo(int lastStreamId, SessionStatus sessionStatus) { this.lastStreamId = lastStreamId; this.sessionStatus = sessionStatus; diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SessionFrameListener.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SessionFrameListener.java index b950698d977..ffff51fe50d 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SessionFrameListener.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SessionFrameListener.java @@ -36,7 +36,7 @@ public interface SessionFrameListener extends EventListener *

Application code should implement this method and reply to the stream creation, eventually * sending data:

*
-     * public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
+     * public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
      * {
      *     // Do something with the metadata contained in synInfo
      *
@@ -52,7 +52,7 @@ public interface SessionFrameListener extends EventListener
      * 
*

Alternatively, if the stream creation requires reading data sent from the other peer:

*
-     * public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
+     * public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
      * {
      *     // Do something with the metadata contained in synInfo
      *
@@ -106,9 +106,9 @@ public interface SessionFrameListener extends EventListener
      * 

Callback invoked when the other peer signals that it is closing the connection.

* * @param session the session - * @param goAwayReceivedInfo the metadata sent + * @param goAwayResultInfo the metadata sent */ - public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo); + public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo); /** *

Callback invoked when an exception is thrown during the processing of an event on a @@ -119,6 +119,7 @@ public interface SessionFrameListener extends EventListener */ public void onException(Throwable x); + /** *

Empty implementation of {@link SessionFrameListener}

*/ @@ -148,7 +149,7 @@ public interface SessionFrameListener extends EventListener } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo) { } diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java index 6a91fc77f67..0a4248a1f97 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java @@ -50,6 +50,15 @@ public interface StreamFrameListener extends EventListener */ public void onHeaders(Stream stream, HeadersInfo headersInfo); + /** + *

Callback invoked when a push syn has been received on a stream.

+ * + * @param stream the push stream just created + * @param pushInfo + * @return a listener for stream events or null if there is no interest in being notified of stream events + */ + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo); + /** *

Callback invoked when data bytes are received on a stream.

*

Implementers should be read or consume the content of the @@ -75,6 +84,12 @@ public interface StreamFrameListener extends EventListener { } + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + return null; + } + @Override public void onData(Stream stream, DataInfo dataInfo) { diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java index ebb98d63666..455b58e0fd7 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java @@ -60,14 +60,24 @@ public class ClientUsageTest } @Test - public void testClientRequestWithBodyResponseNoBody() throws Exception + public void testClientReceivesPush1() throws InterruptedException, ExecutionException, TimeoutException { Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null); - Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), false, (byte)0), - new StreamFrameListener.Adapter() + session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter() + { + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + return new Adapter() { @Override + public void onData(Stream stream, DataInfo dataInfo) + { + } + }; + }; + + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { // Do something with the response @@ -83,6 +93,71 @@ public class ClientUsageTest throw new IllegalStateException(e); } } + }); + } + + @Test + public void testClientReceivesPush2() throws InterruptedException, ExecutionException, TimeoutException + { + Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, new SessionFrameListener.Adapter() + { + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + return new StreamFrameListener.Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + } + }; + } + }, null, null); + + session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + // Do something with the response + replyInfo.getHeaders().get("host"); + + // Then issue another similar request + try + { + stream.getSession().syn(new SynInfo(new Fields(), true), this); + } + catch (ExecutionException | InterruptedException | TimeoutException e) + { + throw new IllegalStateException(e); + } + } + }); + } + + @Test + public void testClientRequestWithBodyResponseNoBody() throws Exception + { + Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null); + + Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), false, (byte)0), + new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + // Do something with the response + replyInfo.getHeaders().get("host"); + + // Then issue another similar request + try + { + stream.getSession().syn(new SynInfo(new Fields(), true), this); + } + catch (ExecutionException | InterruptedException | TimeoutException e) + { + throw new IllegalStateException(e); + } + } }); // Send-and-forget the data stream.data(new StringDataInfo("data", true)); @@ -96,38 +171,39 @@ public class ClientUsageTest final String context = "context"; session.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter() { - @Override - public void onReply(Stream stream, ReplyInfo replyInfo) - { - // Do something with the response - replyInfo.getHeaders().get("host"); + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + // Do something with the response + replyInfo.getHeaders().get("host"); - // Then issue another similar request - try - { - stream.getSession().syn(new SynInfo(new Fields(), true), this); - } - catch (ExecutionException | InterruptedException | TimeoutException e) - { - throw new IllegalStateException(e); - } - } + // Then issue another similar request + try + { + stream.getSession().syn(new SynInfo(new Fields(), true), this); + } + catch (ExecutionException | InterruptedException | TimeoutException e) + { + throw new IllegalStateException(e); + } + } }, new Promise.Adapter() { @Override - public void succeeded(Stream stream) - { - // Differently from JDK 7 AIO, there is no need to - // have an explicit parameter for the context since - // that is captured while the handler is created anyway, - // and it is used only by the handler as parameter + public void succeeded(Stream stream) + { + // Differently from JDK 7 AIO, there is no need to + // have an explicit parameter for the context since + // that is captured while the handler is created anyway, + // and it is used only by the handler as parameter - // The style below is fire-and-forget, since - // we do not pass the handler nor we call get() - // to wait for the data to be sent - stream.data(new StringDataInfo(context, true), new Callback.Adapter()); - } - }); + // The style below is fire-and-forget, since + // we do not pass the handler nor we call get() + // to wait for the data to be sent + stream.data(new StringDataInfo(context, true), new Callback.Adapter()); + } + } + ); } @Test @@ -136,48 +212,49 @@ public class ClientUsageTest Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null); session.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter() - { - // The good of passing the listener to push() is that applications can safely - // accumulate info from the reply headers to be used in the data callback, - // e.g. content-type, charset, etc. - - @Override - public void onReply(Stream stream, ReplyInfo replyInfo) - { - // Do something with the response - Fields headers = replyInfo.getHeaders(); - int contentLength = headers.get("content-length").valueAsInt(); - stream.setAttribute("content-length", contentLength); - if (!replyInfo.isClose()) - stream.setAttribute("builder", new StringBuilder()); - - // May issue another similar request while waiting for data - try { - stream.getSession().syn(new SynInfo(new Fields(), true), this); - } - catch (ExecutionException | InterruptedException | TimeoutException e) + // The good of passing the listener to push() is that applications can safely + // accumulate info from the reply headers to be used in the data callback, + // e.g. content-type, charset, etc. + + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + // Do something with the response + Fields headers = replyInfo.getHeaders(); + int contentLength = headers.get("content-length").valueAsInt(); + stream.setAttribute("content-length", contentLength); + if (!replyInfo.isClose()) + stream.setAttribute("builder", new StringBuilder()); + + // May issue another similar request while waiting for data + try + { + stream.getSession().syn(new SynInfo(new Fields(), true), this); + } + catch (ExecutionException | InterruptedException | TimeoutException e) + { + throw new IllegalStateException(e); + } + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + StringBuilder builder = (StringBuilder)stream.getAttribute("builder"); + builder.append(dataInfo.asString("UTF-8", true)); + + } + }, new Promise.Adapter() { - throw new IllegalStateException(e); + @Override + public void succeeded(Stream stream) + { + stream.data(new BytesDataInfo("wee".getBytes(Charset.forName("UTF-8")), false), new Callback.Adapter()); + stream.data(new StringDataInfo("foo", false), new Callback.Adapter()); + stream.data(new ByteBufferDataInfo(Charset.forName("UTF-8").encode("bar"), true), new Callback.Adapter()); + } } - } - - @Override - public void onData(Stream stream, DataInfo dataInfo) - { - StringBuilder builder = (StringBuilder)stream.getAttribute("builder"); - builder.append(dataInfo.asString("UTF-8", true)); - - } - }, new Promise.Adapter() - { - @Override - public void succeeded(Stream stream) - { - stream.data(new BytesDataInfo("wee".getBytes(Charset.forName("UTF-8")), false), new Callback.Adapter()); - stream.data(new StringDataInfo("foo", false), new Callback.Adapter()); - stream.data(new ByteBufferDataInfo(Charset.forName("UTF-8").encode("bar"), true), new Callback.Adapter()); - } - }); + ); } } diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java index 395f9dcf3e2..1569f749150 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java @@ -24,6 +24,7 @@ import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.HeadersInfo; +import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.StreamFrameListener; @@ -131,6 +132,12 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory channel.requestHeaders(headersInfo.getHeaders(), headersInfo.isClose()); } + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + return null; + } + @Override public void onData(Stream stream, final DataInfo dataInfo) { diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyEngineSelector.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyEngineSelector.java index 75b6a4e5062..452e32b95eb 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyEngineSelector.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyEngineSelector.java @@ -23,7 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.PingResultInfo; import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.Session; @@ -104,7 +104,7 @@ public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo) { // TODO: } diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java index 0c1b9202087..7d5c8784026 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java @@ -42,7 +42,7 @@ import org.eclipse.jetty.spdy.StandardStream; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; @@ -136,7 +136,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse { assert content == null; if (headers.isEmpty()) - proxyEngineSelector.onGoAway(session, new GoAwayReceivedInfo(0, SessionStatus.OK)); + proxyEngineSelector.onGoAway(session, new GoAwayResultInfo(0, SessionStatus.OK)); else syn(true); } diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java index 874402ecc34..67db8bff185 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java @@ -29,8 +29,9 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.HeadersInfo; +import org.eclipse.jetty.spdy.api.Info; import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.RstInfo; @@ -50,8 +51,8 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; /** - *

{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by - * clients into SPDY events for the servers.

+ *

{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by clients into + * SPDY events for the servers.

*/ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener { @@ -131,6 +132,12 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener } } + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + throw new IllegalStateException("We shouldn't receive pushes from clients"); + } + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -222,6 +229,61 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener this.clientStream = clientStream; } + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + LOG.debug("S -> P pushed {} on {}", pushInfo, stream); + + Fields headers = new Fields(pushInfo.getHeaders(), false); + + addResponseProxyHeaders(stream, headers); + customizeResponseHeaders(stream, headers); + Stream clientStream = (Stream)stream.getAssociatedStream().getAttribute + (CLIENT_STREAM_ATTRIBUTE); + convert(stream.getSession().getVersion(), clientStream.getSession().getVersion(), + headers); + + StreamHandler handler = new StreamHandler(clientStream, pushInfo); + stream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler); + clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, + pushInfo.isClose()), + handler); + return new Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + // Push streams never send a reply + throw new UnsupportedOperationException(); + } + + @Override + public void onHeaders(Stream stream, HeadersInfo headersInfo) + { + throw new UnsupportedOperationException(); + } + + @Override + public void onData(Stream serverStream, final DataInfo serverDataInfo) + { + LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream); + + ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose()) + { + @Override + public void consume(int delta) + { + super.consume(delta); + serverDataInfo.consume(delta); + } + }; + + StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE); + handler.data(clientDataInfo); + } + }; + } + @Override public void onReply(final Stream stream, ReplyInfo replyInfo) { @@ -304,30 +366,30 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener } /** - *

{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.

- *

Instances of this class buffer DATA frames sent by clients and send them to the server. - * The buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive - * from the client before the SYN_STREAM has been fully sent), and between DATA frames, if the client - * is a fast producer and the server a slow consumer, or if the client is a SPDY v2 client (and hence - * without flow control) while the server is a SPDY v3 server (and hence with flow control).

+ *

{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.

Instances + * of this class buffer DATA frames sent by clients and send them to the server. The buffering happens between the + * send of the SYN_STREAM to the server (where DATA frames may arrive from the client before the SYN_STREAM has been + * fully sent), and between DATA frames, if the client is a fast producer and the server a slow consumer, or if the + * client is a SPDY v2 client (and hence without flow control) while the server is a SPDY v3 server (and hence with + * flow control).

*/ private class StreamHandler implements Promise { private final Queue queue = new LinkedList<>(); private final Stream clientStream; - private final SynInfo serverSynInfo; + private final Info info; private Stream serverStream; - private StreamHandler(Stream clientStream, SynInfo serverSynInfo) + private StreamHandler(Stream clientStream, Info info) { this.clientStream = clientStream; - this.serverSynInfo = serverSynInfo; + this.info = info; } @Override public void succeeded(Stream serverStream) { - LOG.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream); + LOG.debug("P -> S {} from {} to {}", info, clientStream, serverStream); serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream); @@ -449,26 +511,8 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener } } - private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener + private class ProxySessionFrameListener extends SessionFrameListener.Adapter { - @Override - public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo) - { - LOG.debug("S -> P pushed {} on {}", serverSynInfo, serverStream); - - Fields headers = new Fields(serverSynInfo.getHeaders(), false); - - addResponseProxyHeaders(serverStream, headers); - customizeResponseHeaders(serverStream, headers); - Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE); - convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers); - - StreamHandler handler = new StreamHandler(clientStream, serverSynInfo); - serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler); - clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, serverSynInfo.isClose()), - handler); - return this; - } @Override public void onRst(Session serverSession, RstInfo serverRstInfo) @@ -487,41 +531,9 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener } @Override - public void onGoAway(Session serverSession, GoAwayReceivedInfo goAwayReceivedInfo) + public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo) { serverSessions.values().remove(serverSession); } - - @Override - public void onReply(Stream stream, ReplyInfo replyInfo) - { - // Push streams never send a reply - throw new UnsupportedOperationException(); - } - - @Override - public void onHeaders(Stream stream, HeadersInfo headersInfo) - { - throw new UnsupportedOperationException(); - } - - @Override - public void onData(Stream serverStream, final DataInfo serverDataInfo) - { - LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream); - - ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose()) - { - @Override - public void consume(int delta) - { - super.consume(delta); - serverDataInfo.consume(delta); - } - }; - - StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE); - handler.data(clientDataInfo); - } } } diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java index d4c5e2a0395..7d65a6f5c17 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java @@ -18,16 +18,12 @@ package org.eclipse.jetty.spdy.server.http; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; - import java.io.IOException; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -37,6 +33,7 @@ import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.spdy.api.DataInfo; +import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.SPDY; @@ -50,10 +47,15 @@ import org.eclipse.jetty.spdy.server.NPNServerConnectionFactory; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Fields; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest { private final int referrerPushPeriod = 1000; @@ -107,39 +109,14 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest sendMainRequestAndCSSRequest(); final CountDownLatch pushDataLatch = new CountDownLatch(1); final CountDownLatch pushSynHeadersValid = new CountDownLatch(1); - Session session = startClient(version, serverAddress, new SessionFrameListener.Adapter() - { - @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) - { - validateHeaders(synInfo.getHeaders(), pushSynHeadersValid); - - assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true)); - assertThat("URI header ends with css", synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)) - .value().endsWith - ("" + - ".css"), - is(true)); - stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter()); - return new StreamFrameListener.Adapter() - { - - @Override - public void onData(Stream stream, DataInfo dataInfo) - { - dataInfo.consume(dataInfo.length()); - pushDataLatch.countDown(); - } - }; - } - }); + Session session = startClient(version, serverAddress, null); // Send main request. That should initiate the push push's which get reset by the client - sendRequest(session, mainRequestHeaders); + sendRequest(session, mainRequestHeaders, pushSynHeadersValid, pushDataLatch); assertThat("No push data is received", pushDataLatch.await(1, TimeUnit.SECONDS), is(false)); assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true)); - sendRequest(session, associatedCSSRequestHeaders); + sendRequest(session, associatedCSSRequestHeaders, pushSynHeadersValid, pushDataLatch); } @Test @@ -157,7 +134,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest // Sleep for pushPeriod This should prevent application.js from being mapped as pushResource Thread.sleep(referrerPushPeriod + 1); - sendRequest(session, associatedJSRequestHeaders); + sendRequest(session, associatedJSRequestHeaders, null, null); run2ndClientRequests(false, true); } @@ -171,7 +148,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest Session session = sendMainRequestAndCSSRequest(); - sendRequest(session, associatedJSRequestHeaders); + sendRequest(session, associatedJSRequestHeaders, null, null); run2ndClientRequests(false, true); } @@ -200,18 +177,43 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest { Session session = startClient(version, serverAddress, null); - sendRequest(session, mainRequestHeaders); - sendRequest(session, associatedCSSRequestHeaders); + sendRequest(session, mainRequestHeaders, null, null); + sendRequest(session, associatedCSSRequestHeaders, null, null); return session; } - private void sendRequest(Session session, Fields requestHeaders) throws InterruptedException + private void sendRequest(Session session, Fields requestHeaders, final CountDownLatch pushSynHeadersValid, + final CountDownLatch pushDataLatch) throws InterruptedException { final CountDownLatch dataReceivedLatch = new CountDownLatch(1); final CountDownLatch received200OKLatch = new CountDownLatch(1); session.syn(new SynInfo(requestHeaders, true), new StreamFrameListener.Adapter() { + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid); + + assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true)); + assertThat("URI header ends with css", pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)) + .value().endsWith + ("" + + ".css"), + is(true)); + stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter()); + return new StreamFrameListener.Adapter() + { + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataInfo.consume(dataInfo.length()); + pushDataLatch.countDown(); + } + }; + } + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -238,16 +240,17 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch pushDataLatch = new CountDownLatch(1); final CountDownLatch pushSynHeadersValid = new CountDownLatch(1); - Session session2 = startClient(version, serverAddress, new SessionFrameListener.Adapter() + Session session2 = startClient(version, serverAddress, null); + session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { if (validateHeaders) - validateHeaders(synInfo.getHeaders(), pushSynHeadersValid); + validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid); assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true)); - assertThat("URI header ends with css", synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)) + assertThat("URI header ends with css", pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)) .value().endsWith ("" + ".css"), @@ -264,9 +267,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest } }; } - }); - session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() - { + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -292,6 +293,8 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true)); } + private static final Logger LOG = Log.getLogger(ReferrerPushStrategyTest.class); + @Test public void testAssociatedResourceIsPushed() throws Exception { @@ -326,16 +329,17 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest }); Assert.assertTrue(mainResourceLatch.await(5, TimeUnit.SECONDS)); - sendRequest(session1, createHeaders(cssResource)); + sendRequest(session1, createHeaders(cssResource), null, null); // Create another client, and perform the same request for the main resource, we expect the css being pushed final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch pushDataLatch = new CountDownLatch(1); - Session session2 = startClient(version, address, new SessionFrameListener.Adapter() + Session session2 = startClient(version, address, null); + session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { Assert.assertTrue(stream.isUnidirectional()); return new StreamFrameListener.Adapter() @@ -349,9 +353,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest } }; } - }); - session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() - { + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -452,13 +454,15 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch pushDataLatch = new CountDownLatch(1); - Session session2 = startClient(version, address, new SessionFrameListener.Adapter() + Session session2 = startClient(version, address, null); + session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { Assert.assertTrue(stream.isUnidirectional()); - Assert.assertTrue(synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value().endsWith(".css")); + Assert.assertTrue(pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value().endsWith("" + + ".css")); return new StreamFrameListener.Adapter() { @Override @@ -470,9 +474,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest } }; } - }); - session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() - { + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -563,14 +565,31 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch pushDataLatch = new CountDownLatch(2); - Session session2 = startClient(version, address, new SessionFrameListener.Adapter() + Session session2 = startClient(version, address, null); + LOG.warn("REQUEST FOR PUSHED RESOURCES"); + session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { Assert.assertTrue(stream.isUnidirectional()); return new StreamFrameListener.Adapter() { + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + return new Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataInfo.consume(dataInfo.length()); + if (dataInfo.isClose()) + pushDataLatch.countDown(); + } + }; + } + @Override public void onData(Stream stream, DataInfo dataInfo) { @@ -580,9 +599,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest } }; } - }); - session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() - { + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java index e65cebb4051..11f44dc2a32 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java @@ -33,7 +33,7 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.RstInfo; @@ -164,7 +164,7 @@ public class ProxyHTTPToSPDYTest } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { closeLatch.countDown(); } diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToHTTPTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToHTTPTest.java index f3c2e9d3a28..6a6c2ef6289 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToHTTPTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToHTTPTest.java @@ -37,7 +37,7 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.spdy.api.DataInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.PingInfo; import org.eclipse.jetty.spdy.api.PingResultInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; @@ -429,7 +429,7 @@ public class ProxySPDYToHTTPTest Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayReceivedInfo) { goAwayLatch.countDown(); } diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java index 211a2c4a790..506c5ed571d 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java @@ -281,10 +281,16 @@ public class ProxySPDYToSPDYTest final CountDownLatch pushSynLatch = new CountDownLatch(1); final CountDownLatch pushDataLatch = new CountDownLatch(1); - Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter() + Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS); + + Fields headers = new Fields(); + headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort()); + final CountDownLatch replyLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { pushSynLatch.countDown(); return new StreamFrameListener.Adapter() @@ -298,14 +304,7 @@ public class ProxySPDYToSPDYTest } }; } - }).get(5, TimeUnit.SECONDS); - Fields headers = new Fields(); - headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort()); - final CountDownLatch replyLatch = new CountDownLatch(1); - final CountDownLatch dataLatch = new CountDownLatch(1); - client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter() - { @Override public void onReply(Stream stream, ReplyInfo replyInfo) { diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/ClosedStreamTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/ClosedStreamTest.java index 27f06da7c43..ca14e68ae4c 100644 --- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/ClosedStreamTest.java +++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/ClosedStreamTest.java @@ -35,7 +35,7 @@ import org.eclipse.jetty.spdy.StandardCompressionFactory; import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Session; @@ -211,7 +211,7 @@ public class ClosedStreamTest extends AbstractTest }; } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { goAwayReceivedLatch.countDown(); } diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/GoAwayTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/GoAwayTest.java index 509fb130a63..9a4b3f35d9b 100644 --- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/GoAwayTest.java +++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/GoAwayTest.java @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.SessionFrameListener; @@ -61,7 +61,7 @@ public class GoAwayTest extends AbstractTest } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { Assert.assertEquals(0, goAwayInfo.getLastStreamId()); Assert.assertSame(SessionStatus.OK, goAwayInfo.getSessionStatus()); @@ -90,12 +90,12 @@ public class GoAwayTest extends AbstractTest return null; } }; - final AtomicReference ref = new AtomicReference<>(); + final AtomicReference ref = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { ref.set(goAwayInfo); latch.countDown(); @@ -106,10 +106,10 @@ public class GoAwayTest extends AbstractTest Stream stream1 = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), true, (byte)0), null); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); - GoAwayReceivedInfo goAwayReceivedInfo = ref.get(); - Assert.assertNotNull(goAwayReceivedInfo); - Assert.assertEquals(stream1.getId(), goAwayReceivedInfo.getLastStreamId()); - Assert.assertSame(SessionStatus.OK, goAwayReceivedInfo.getSessionStatus()); + GoAwayResultInfo goAwayResultInfo = ref.get(); + Assert.assertNotNull(goAwayResultInfo); + Assert.assertEquals(stream1.getId(), goAwayResultInfo.getLastStreamId()); + Assert.assertSame(SessionStatus.OK, goAwayResultInfo.getSessionStatus()); } @Test @@ -139,7 +139,7 @@ public class GoAwayTest extends AbstractTest SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { session.syn(new SynInfo(new Fields(), true), null, new FuturePromise()); } @@ -184,12 +184,12 @@ public class GoAwayTest extends AbstractTest } } }; - final AtomicReference goAwayRef = new AtomicReference<>(); + final AtomicReference goAwayRef = new AtomicReference<>(); final CountDownLatch goAwayLatch = new CountDownLatch(1); SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { goAwayRef.set(goAwayInfo); goAwayLatch.countDown(); @@ -228,7 +228,7 @@ public class GoAwayTest extends AbstractTest // The last good stream is the second, because it was received by the server Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS)); - GoAwayReceivedInfo goAway = goAwayRef.get(); + GoAwayResultInfo goAway = goAwayRef.get(); Assert.assertNotNull(goAway); Assert.assertEquals(stream2.getId(), goAway.getLastStreamId()); } diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/IdleTimeoutTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/IdleTimeoutTest.java index 3005b2cd99c..708e6dc9be9 100644 --- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/IdleTimeoutTest.java +++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/IdleTimeoutTest.java @@ -23,7 +23,7 @@ import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Session; @@ -63,7 +63,7 @@ public class IdleTimeoutTest extends AbstractTest Session session = startClient(startServer(null), new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { latch.countDown(); } @@ -85,7 +85,7 @@ public class IdleTimeoutTest extends AbstractTest Session session = startClient(startServer(null), new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { latch.countDown(); } @@ -125,7 +125,7 @@ public class IdleTimeoutTest extends AbstractTest Session session = startClient(startServer(null), new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { goAwayLatch.countDown(); } @@ -161,7 +161,7 @@ public class IdleTimeoutTest extends AbstractTest } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { latch.countDown(); } @@ -187,7 +187,7 @@ public class IdleTimeoutTest extends AbstractTest InetSocketAddress address = startServer(new ServerSessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { latch.countDown(); } @@ -220,7 +220,7 @@ public class IdleTimeoutTest extends AbstractTest } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { latch.countDown(); } diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/PushStreamTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/PushStreamTest.java index 0a38ae72fb7..344667e8c95 100644 --- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/PushStreamTest.java +++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/PushStreamTest.java @@ -19,12 +19,6 @@ package org.eclipse.jetty.spdy.server; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.sameInstance; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -44,7 +38,7 @@ import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.spdy.StandardCompressionFactory; import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.RstInfo; @@ -75,6 +69,12 @@ import org.eclipse.jetty.util.log.Logger; import org.junit.Assert; import org.junit.Test; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + public class PushStreamTest extends AbstractTest { private static final Logger LOG = Log.getLogger(PushStreamTest.class); @@ -94,10 +94,12 @@ public class PushStreamTest extends AbstractTest stream.push(new PushInfo(new Fields(), true), new Promise.Adapter()); return null; } - }), new SessionFrameListener.Adapter() + }), null); + + Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { assertThat("streamId is even", stream.getId() % 2, is(0)); assertThat("stream is unidirectional", stream.isUnidirectional(), is(true)); @@ -117,8 +119,6 @@ public class PushStreamTest extends AbstractTest return null; } }); - - Stream stream = clientSession.syn(new SynInfo(new Fields(), true), null); assertThat("onSyn has been called", pushStreamLatch.await(5, TimeUnit.SECONDS), is(true)); Stream pushStream = pushStreamRef.get(); assertThat("main stream and associated stream are the same", stream, sameInstance(pushStream.getAssociatedStream())); @@ -177,10 +177,12 @@ public class PushStreamTest extends AbstractTest } } - }), new SessionFrameListener.Adapter() + }), null); + + Stream stream = clientSession.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { pushStreamSynLatch.countDown(); return new StreamFrameListener.Adapter() @@ -193,10 +195,7 @@ public class PushStreamTest extends AbstractTest } }; } - }); - Stream stream = clientSession.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter() - { @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -298,10 +297,12 @@ public class PushStreamTest extends AbstractTest throw new IllegalStateException(e); } } - }), new SessionFrameListener.Adapter() + }), null); + + Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { return new StreamFrameListener.Adapter() { @@ -327,10 +328,7 @@ public class PushStreamTest extends AbstractTest } }; } - }); - Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter() - { @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -427,7 +425,7 @@ public class PushStreamTest extends AbstractTest } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { goAwayReceivedLatch.countDown(); } @@ -543,20 +541,14 @@ public class PushStreamTest extends AbstractTest stream.push(new PushInfo(new Fields(), false), new Promise.Adapter()); return null; } - }), new SessionFrameListener.Adapter() - { - @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) - { - assertStreamIdIsEven(stream); - pushStreamIdIsEvenLatch.countDown(); - return super.onSyn(stream, synInfo); - } - }); + }), null); - Stream stream = clientSession.syn(new SynInfo(new Fields(), false), null); - Stream stream2 = clientSession.syn(new SynInfo(new Fields(), false), null); - Stream stream3 = clientSession.syn(new SynInfo(new Fields(), false), null); + Stream stream = clientSession.syn(new SynInfo(new Fields(), false), + new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch)); + Stream stream2 = clientSession.syn(new SynInfo(new Fields(), false), + new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch)); + Stream stream3 = clientSession.syn(new SynInfo(new Fields(), false), + new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch)); assertStreamIdIsOdd(stream); assertStreamIdIsOdd(stream2); assertStreamIdIsOdd(stream3); @@ -564,6 +556,24 @@ public class PushStreamTest extends AbstractTest assertThat("all pushStreams had even ids", pushStreamIdIsEvenLatch.await(5, TimeUnit.SECONDS), is(true)); } + private class VerifyPushStreamIdIsEvenStreamFrameListener extends StreamFrameListener.Adapter + { + final CountDownLatch pushStreamIdIsEvenLatch; + + private VerifyPushStreamIdIsEvenStreamFrameListener(CountDownLatch pushStreamIdIsEvenLatch) + { + this.pushStreamIdIsEvenLatch = pushStreamIdIsEvenLatch; + } + + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + assertStreamIdIsEven(stream); + pushStreamIdIsEvenLatch.countDown(); + return super.onPush(stream, pushInfo); + } + } + private void assertStreamIdIsEven(Stream stream) { assertThat("streamId is odd", stream.getId() % 2, is(0)); diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYClientFactoryTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYClientFactoryTest.java index 076c7ce7945..fc40469dbea 100644 --- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYClientFactoryTest.java +++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYClientFactoryTest.java @@ -23,7 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.junit.Assert; @@ -38,7 +38,7 @@ public class SPDYClientFactoryTest extends AbstractTest startClient(startServer(new ServerSessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo) { latch.countDown(); } diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYServerConnectorTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYServerConnectorTest.java index 02bc42a81a6..0dc771a150a 100644 --- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYServerConnectorTest.java +++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYServerConnectorTest.java @@ -23,7 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.SessionFrameListener; import org.junit.Assert; @@ -38,7 +38,7 @@ public class SPDYServerConnectorTest extends AbstractTest startClient(startServer(null), new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo) { latch.countDown(); } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TimerScheduler.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TimerScheduler.java index 4f20739562d..eef62659cf2 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TimerScheduler.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TimerScheduler.java @@ -27,6 +27,10 @@ import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; + +/* ------------------------------------------------------------ */ +/** A scheduler based on the the JVM Timer class + */ public class TimerScheduler extends AbstractLifeCycle implements Scheduler, Runnable { private static final Logger LOG = Log.getLogger(TimerScheduler.class);