Merge remote-tracking branch 'origin/master' into osgi_fix_jetty9

This commit is contained in:
Jan Bartel 2013-02-21 21:39:07 +11:00
commit 1ce132b6d3
33 changed files with 1116 additions and 294 deletions

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.security.HashLoginService;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.LowResourceMonitor;
import org.eclipse.jetty.server.NCSARequestLog;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@ -36,6 +37,7 @@ import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.TimerScheduler;
public class LikeJettyXml
{
@ -53,6 +55,8 @@ public class LikeJettyXml
server.setDumpAfterStart(false);
server.setDumpBeforeStop(false);
server.addBean(new TimerScheduler());
// Setup JMX
MBeanContainer mbContainer=new MBeanContainer(ManagementFactory.getPlatformMBeanServer());
server.addBean(mbContainer);
@ -121,6 +125,12 @@ public class LikeJettyXml
server.setStopAtShutdown(true);
LowResourceMonitor lowResourcesMonitor=new LowResourceMonitor(server);
lowResourcesMonitor.setLowResourcesIdleTimeout(1000);
lowResourcesMonitor.setMaxConnections(2);
lowResourcesMonitor.setPeriod(1200);
server.addBean(lowResourcesMonitor);
server.start();
server.join();
}

View File

@ -200,4 +200,5 @@ etc/jetty-requestlog.xml
# etc/jetty-stats.xml
# etc/jetty-debug.xml
# etc/jetty-ipaccess.xml
# etc/jetty-lowresources.xml
#===========================================================

View File

@ -25,7 +25,7 @@ import java.util.List;
import org.codehaus.plexus.util.xml.Xpp3Dom;
import edu.emory.mathcs.backport.java.util.Arrays;
import java.util.Arrays;
/**
* OverlayConfig

View File

@ -57,6 +57,20 @@
<!-- =========================================================== -->
<!-- jetty-jndi by default -->
<!-- =========================================================== -->
<Call class="org.eclipse.jetty.webapp.Configuration$ClassList" name="setServerDefault">
<Arg><Ref refid="Server" /></Arg>
<Call name="addAfter">
<Arg name="afterClass">org.eclipse.jetty.webapp.FragmentConfiguration</Arg>
<Arg>
<Array type="String">
<Item>org.eclipse.jetty.plus.webapp.EnvConfiguration</Item>
<Item>org.eclipse.jetty.plus.webapp.PlusConfiguration</Item>
<Item>org.eclipse.jetty.annotations.AnnotationConfiguration</Item>
</Array>
</Arg>
</Call>
</Call>
<Call class="java.lang.System" name="setProperty">
<Arg>java.naming.factory.initial</Arg>
<Arg><Property name="java.naming.factory.initial" default="org.eclipse.jetty.jndi.InitialContextFactory"/></Arg>

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_0.dtd">
<!-- =============================================================== -->
<!-- Mixin the Low Resources Monitor -->
<!-- =============================================================== -->
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<Call name="addBean">
<Arg>
<New class="org.eclipse.jetty.server.LowResourceMonitor">
<Arg name="server"><Ref refid='Server'/></Arg>
<Set name="period">1000</Set>
<Set name="lowResourcesIdleTimeout">200</Set>
<Set name="monitorThreads">true</Set>
<Set name="maxConnections">0</Set>
<Set name="maxMemory">0</Set>
<Set name="maxLowResourcesTime">5000</Set>
</New>
</Arg>
</Call>
</Configure>

View File

@ -23,10 +23,13 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
@ -35,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
@ -141,16 +145,19 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
private final Scheduler _scheduler;
private final ByteBufferPool _byteBufferPool;
private final Thread[] _acceptors;
private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap());
private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
private volatile CountDownLatch _stopping;
private long _idleTimeout = 30000;
private String _defaultProtocol;
private ConnectionFactory _defaultConnectionFactory;
/**
* @param server The server this connector will be added to. Must not be null.
* @param executor An executor for this connector or null to use the servers executor
* @param scheduler A scheduler for this connector or null to a new {@link TimerScheduler} instance.
* @param pool A buffer pool for this connector or null to use a default {@link ByteBufferPool}
* @param scheduler A scheduler for this connector or null to either a {@link Scheduler} set as a server bean or if none set, then a new {@link TimerScheduler} instance.
* @param pool A buffer pool for this connector or null to either a {@link ByteBufferPool} set as a server bean or none set, the new {@link ArrayByteBufferPool} instance.
* @param acceptors the number of acceptor threads to use, or 0 for a default value.
* @param factories The Connection Factories to use.
*/
@ -164,7 +171,11 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
{
_server=server;
_executor=executor!=null?executor:_server.getThreadPool();
if (scheduler==null)
scheduler=_server.getBean(Scheduler.class);
_scheduler=scheduler!=null?scheduler:new TimerScheduler();
if (pool==null)
pool=_server.getBean(ByteBufferPool.class);
_byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();
addBean(_server,false);
@ -468,6 +479,9 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
}
}
// protected void connectionOpened(Connection connection)
// {
// _stats.connectionOpened();
@ -488,6 +502,22 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
// newConnection.onOpen();
// }
@Override
public Collection<EndPoint> getConnectedEndPoints()
{
return _immutableEndPoints;
}
protected void onEndPointOpened(EndPoint endp)
{
_endpoints.add(endp);
}
protected void onEndPointClosed(EndPoint endp)
{
_endpoints.remove(endp);
}
@Override
public Scheduler getScheduler()
{

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Graceful;
@ -85,5 +86,8 @@ public interface Connector extends LifeCycle, Graceful
*/
public Object getTransport();
/**
* @return immutable collection of connected endpoints
*/
public Collection<EndPoint> getConnectedEndPoints();
}

View File

@ -167,6 +167,7 @@ public class LocalConnector extends AbstractConnector
LOG.debug("accepting {}", acceptorID);
LocalEndPoint endPoint = _connects.take();
endPoint.onOpen();
onEndPointOpened(endPoint);
Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
endPoint.setConnection(connection);
@ -209,6 +210,7 @@ public class LocalConnector extends AbstractConnector
@Override
public void onClose()
{
LocalConnector.this.onEndPointClosed(this);
super.onClose();
_closed.countDown();
}

View File

@ -0,0 +1,349 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.thread.TimerScheduler;
/* ------------------------------------------------------------ */
/** A monitor for low resources
* <p>An instance of this class will monitor all the connectors of a server (or a set of connectors
* configured with {@link #setMonitoredConnectors(Collection)}) for a low resources state.
* Low resources can be detected by:<ul>
* <li>{@link ThreadPool#isLowOnThreads()} if {@link Connector#getExecutor()} is
* an instance of {@link ThreadPool} and {@link #setMonitorThreads(boolean)} is true.<li>
* <li>If {@link #setMaxMemory(long)} is non zero then low resources is detected if the JVMs
* {@link Runtime} instance has {@link Runtime#totalMemory()} minus {@link Runtime#freeMemory()}
* greater than {@link #getMaxMemory()}</li>
* <li>If {@link #setMaxConnections(int)} is non zero then low resources is dected if the total number
* of connections exceeds {@link #getMaxConnections()}</li>
* </ul>
* </p>
* <p>Once low resources state is detected, the cause is logged and all existing connections returned
* by {@link Connector#getConnectedEndPoints()} have {@link EndPoint#setIdleTimeout(long)} set
* to {@link #getLowResourcesIdleTimeout()}. New connections are not affected, however if the low
* resources state persists for more than {@link #getMaxLowResourcesTime()}, then the
* {@link #getLowResourcesIdleTimeout()} to all connections again. Once the low resources state is
* cleared, the idle timeout is reset to the connector default given by {@link Connector#getIdleTimeout()}.
* </p>
*/
@ManagedObject ("Monitor for low resource conditions and activate a low resource mode if detected")
public class LowResourceMonitor extends AbstractLifeCycle
{
private static final Logger LOG = Log.getLogger(LowResourceMonitor.class);
private final Server _server;
private Scheduler _scheduler;
private Connector[] _monitoredConnectors;
private int _period=1000;
private int _maxConnections;
private long _maxMemory;
private int _lowResourcesIdleTimeout=1000;
private int _maxLowResourcesTime=0;
private boolean _monitorThreads=true;
private final AtomicBoolean _low = new AtomicBoolean();
private String _cause;
private String _reasons;
private long _lowStarted;
private final Runnable _monitor = new Runnable()
{
@Override
public void run()
{
if (isRunning())
{
monitor();
_scheduler.schedule(_monitor,_period,TimeUnit.MILLISECONDS);
}
}
};
public LowResourceMonitor(@Name("server") Server server)
{
_server=server;
}
@ManagedAttribute("Are the monitored connectors low on resources?")
public boolean isLowOnResources()
{
return _low.get();
}
@ManagedAttribute("The reason(s) the monitored connectors are low on resources")
public String getLowResourcesReasons()
{
return _reasons;
}
@ManagedAttribute("Get the timestamp in ms since epoch that low resources state started")
public long getLowResourcesStarted()
{
return _lowStarted;
}
@ManagedAttribute("The monitored connectors. If null then all server connectors are monitored")
public Collection<Connector> getMonitoredConnectors()
{
if (_monitoredConnectors==null)
return Collections.emptyList();
return Arrays.asList(_monitoredConnectors);
}
/**
* @param monitoredConnectors The collections of Connectors that should be monitored for low resources.
*/
public void setMonitoredConnectors(Collection<Connector> monitoredConnectors)
{
if (monitoredConnectors==null || monitoredConnectors.size()==0)
_monitoredConnectors=null;
else
_monitoredConnectors = monitoredConnectors.toArray(new Connector[monitoredConnectors.size()]);
}
@ManagedAttribute("The monitor period in ms")
public int getPeriod()
{
return _period;
}
/**
* @param periodMS The period in ms to monitor for low resources
*/
public void setPeriod(int periodMS)
{
_period = periodMS;
}
@ManagedAttribute("True if low available threads status is monitored")
public boolean getMonitorThreads()
{
return _monitorThreads;
}
/**
* @param monitorThreads If true, check connectors executors to see if they are
* {@link ThreadPool} instances that are low on threads.
*/
public void setMonitorThreads(boolean monitorThreads)
{
_monitorThreads = monitorThreads;
}
@ManagedAttribute("The maximum connections allowed for the monitored connectors before low resource handling is activated")
public int getMaxConnections()
{
return _maxConnections;
}
/**
* @param maxConnections The maximum connections before low resources state is triggered
*/
public void setMaxConnections(int maxConnections)
{
_maxConnections = maxConnections;
}
@ManagedAttribute("The maximum memory (in bytes) that can be used before low resources is triggered. Memory used is calculated as (totalMemory-freeMemory).")
public long getMaxMemory()
{
return _maxMemory;
}
/**
* @param maxMemoryBytes The maximum memory in bytes in use before low resources is triggered.
*/
public void setMaxMemory(long maxMemoryBytes)
{
_maxMemory = maxMemoryBytes;
}
@ManagedAttribute("The idletimeout in ms to apply to all existing connections when low resources is detected")
public int getLowResourcesIdleTimeout()
{
return _lowResourcesIdleTimeout;
}
/**
* @param lowResourcesIdleTimeoutMS The timeout in ms to apply to EndPoints when in the low resources state.
*/
public void setLowResourcesIdleTimeout(int lowResourcesIdleTimeoutMS)
{
_lowResourcesIdleTimeout = lowResourcesIdleTimeoutMS;
}
@ManagedAttribute("The maximum time in ms that low resources condition can persist before lowResourcesIdleTimeout is applied to new connections as well as existing connections")
public int getMaxLowResourcesTime()
{
return _maxLowResourcesTime;
}
/**
* @param maxLowResourcesTimeMS The time in milliseconds that a low resource state can persist before the low resource idle timeout is reapplied to all connections
*/
public void setMaxLowResourcesTime(int maxLowResourcesTimeMS)
{
_maxLowResourcesTime = maxLowResourcesTimeMS;
}
@Override
protected void doStart() throws Exception
{
_scheduler = _server.getBean(Scheduler.class);
if (_scheduler==null)
{
_scheduler=new LRMScheduler();
_scheduler.start();
}
super.doStart();
_scheduler.schedule(_monitor,_period,TimeUnit.MILLISECONDS);
}
@Override
protected void doStop() throws Exception
{
if (_scheduler instanceof LRMScheduler)
_scheduler.stop();
super.doStop();
}
protected Connector[] getMonitoredOrServerConnectors()
{
if (_monitoredConnectors!=null && _monitoredConnectors.length>0)
return _monitoredConnectors;
return _server.getConnectors();
}
protected void monitor()
{
String reasons=null;
String cause="";
int connections=0;
for(Connector connector : getMonitoredOrServerConnectors())
{
connections+=connector.getConnectedEndPoints().size();
Executor executor = connector.getExecutor();
if (executor instanceof ThreadPool)
{
ThreadPool threadpool=(ThreadPool) executor;
if (_monitorThreads && threadpool.isLowOnThreads())
{
reasons=low(reasons,"Low on threads: "+threadpool);
cause+="T";
}
}
}
if (_maxConnections>0 && connections>_maxConnections)
{
reasons=low(reasons,"Max Connections exceeded: "+connections+">"+_maxConnections);
cause+="C";
}
long memory=Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory();
if (_maxMemory>0 && memory>_maxMemory)
{
reasons=low(reasons,"Max memory exceeded: "+memory+">"+_maxMemory);
cause+="M";
}
if (reasons!=null)
{
// Log the reasons if there is any change in the cause
if (!cause.equals(_cause))
{
LOG.warn("Low Resources: {}",reasons);
_cause=cause;
}
// Enter low resources state?
if (_low.compareAndSet(false,true))
{
_reasons=reasons;
_lowStarted=System.currentTimeMillis();
setLowResources();
}
// Too long in low resources state?
if (_maxLowResourcesTime>0 && (System.currentTimeMillis()-_lowStarted)>_maxLowResourcesTime)
setLowResources();
}
else
{
if (_low.compareAndSet(true,false))
{
LOG.info("Low Resources cleared");
_reasons=null;
_lowStarted=0;
clearLowResources();
}
}
}
protected void setLowResources()
{
for(Connector connector : getMonitoredOrServerConnectors())
{
for (EndPoint endPoint : connector.getConnectedEndPoints())
endPoint.setIdleTimeout(_lowResourcesIdleTimeout);
}
}
protected void clearLowResources()
{
for(Connector connector : getMonitoredOrServerConnectors())
{
for (EndPoint endPoint : connector.getConnectedEndPoints())
endPoint.setIdleTimeout(connector.getIdleTimeout());
}
}
private String low(String reasons, String newReason)
{
if (reasons==null)
return newReason;
return reasons+", "+newReason;
}
private static class LRMScheduler extends TimerScheduler
{
}
}

View File

@ -400,5 +400,21 @@ public class ServerConnector extends AbstractNetworkConnector
{
return getDefaultConnectionFactory().newConnection(ServerConnector.this, endpoint);
}
@Override
protected void endPointOpened(EndPoint endpoint)
{
super.endPointOpened(endpoint);
onEndPointOpened(endpoint);
}
@Override
protected void endPointClosed(EndPoint endpoint)
{
onEndPointClosed(endpoint);
super.endPointClosed(endpoint);
}
}
}

View File

@ -0,0 +1,197 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import java.net.Socket;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class LowResourcesMonitorTest
{
QueuedThreadPool _threadPool;
Server _server;
ServerConnector _connector;
LowResourceMonitor _lowResourcesMonitor;
@Before
public void before() throws Exception
{
_threadPool = new QueuedThreadPool();
_threadPool.setMaxThreads(50);
_server = new Server(_threadPool);
_server.manage(_threadPool);
_server.addBean(new TimerScheduler());
_connector = new ServerConnector(_server);
_connector.setPort(0);
_connector.setIdleTimeout(35000);
_server.addConnector(_connector);
_server.setHandler(new DumpHandler());
_lowResourcesMonitor=new LowResourceMonitor(_server);
_lowResourcesMonitor.setLowResourcesIdleTimeout(200);
_lowResourcesMonitor.setMaxConnections(20);
_lowResourcesMonitor.setPeriod(900);
_server.addBean(_lowResourcesMonitor);
_server.start();
}
@After
public void after() throws Exception
{
_server.stop();
}
@Test
public void testLowOnThreads() throws Exception
{
_threadPool.setMaxThreads(_threadPool.getThreads()-_threadPool.getIdleThreads()+10);
Thread.sleep(1200);
Assert.assertFalse(_lowResourcesMonitor.isLowOnResources());
final CountDownLatch latch = new CountDownLatch(1);
for (int i=0;i<20;i++)
{
_threadPool.dispatch(new Runnable()
{
@Override
public void run()
{
try
{
latch.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
});
}
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
latch.countDown();
Thread.sleep(1200);
System.err.println(_threadPool.dump());
Assert.assertFalse(_lowResourcesMonitor.isLowOnResources());
}
@Ignore ("not reliable")
@Test
public void testLowOnMemory() throws Exception
{
_lowResourcesMonitor.setMaxMemory(Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()+(100*1024*1024));
Thread.sleep(1200);
Assert.assertFalse(_lowResourcesMonitor.isLowOnResources());
byte[] data = new byte[100*1024*1024];
Arrays.fill(data,(byte)1);
int hash = Arrays.hashCode(data);
assertThat(hash,not(equalTo(0)));
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
data=null;
System.gc();
System.gc();
Thread.sleep(1200);
Assert.assertFalse(_lowResourcesMonitor.isLowOnResources());
}
@Test
public void testMaxConnectionsAndMaxIdleTime() throws Exception
{
_lowResourcesMonitor.setMaxMemory(0);
Assert.assertFalse(_lowResourcesMonitor.isLowOnResources());
Socket[] socket = new Socket[_lowResourcesMonitor.getMaxConnections()+1];
for (int i=0;i<socket.length;i++)
socket[i]=new Socket("localhost",_connector.getLocalPort());
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
Socket newSocket = new Socket("localhost",_connector.getLocalPort());
// wait for low idle time to close sockets, but not new Socket
Thread.sleep(1200);
Assert.assertFalse(_lowResourcesMonitor.isLowOnResources());
for (int i=0;i<socket.length;i++)
Assert.assertEquals(-1,socket[i].getInputStream().read());
newSocket.getOutputStream().write("GET / HTTP/1.0\r\n\r\n".getBytes(StringUtil.__UTF8_CHARSET));
Assert.assertEquals('H',newSocket.getInputStream().read());
}
@Test
public void testMaxLowResourceTime() throws Exception
{
_lowResourcesMonitor.setMaxLowResourcesTime(2000);
Assert.assertFalse(_lowResourcesMonitor.isLowOnResources());
Socket socket0 = new Socket("localhost",_connector.getLocalPort());
_lowResourcesMonitor.setMaxMemory(1);
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
Socket socket1 = new Socket("localhost",_connector.getLocalPort());
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
Assert.assertEquals(-1,socket0.getInputStream().read());
socket1.getOutputStream().write("G".getBytes(StringUtil.__UTF8_CHARSET));
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
socket1.getOutputStream().write("E".getBytes(StringUtil.__UTF8_CHARSET));
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
Assert.assertEquals(-1,socket1.getInputStream().read());
}
}

View File

@ -59,6 +59,9 @@ public interface IStream extends Stream, Callback
*/
public void setStreamFrameListener(StreamFrameListener listener);
//TODO: javadoc thomas
public StreamFrameListener getStreamFrameListener();
/**
* <p>A stream can be open, {@link #isHalfClosed() half closed} or
* {@link #isClosed() closed} and this method updates the close state

View File

@ -45,9 +45,10 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.PingResultInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session;
@ -498,8 +499,17 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
stream.process(frame);
// Update the last stream id before calling the application (which may send a GO_AWAY)
updateLastStreamId(stream);
StreamFrameListener streamListener;
if (stream.isUnidirectional())
{
PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose());
streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo);
}
else
{
SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority());
StreamFrameListener streamListener = notifyOnSyn(listener, stream, synInfo);
streamListener = notifyOnSyn(listener, stream, synInfo);
}
stream.setStreamFrameListener(streamListener);
flush();
// The onSyn() listener may have sent a frame that closed the stream
@ -680,9 +690,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
{
if (goAwayReceived.compareAndSet(false, true))
{
//TODO: Find a better name for GoAwayReceivedInfo
GoAwayReceivedInfo goAwayReceivedInfo = new GoAwayReceivedInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
notifyOnGoAway(listener, goAwayReceivedInfo);
//TODO: Find a better name for GoAwayResultInfo
GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
notifyOnGoAway(listener, goAwayResultInfo);
flush();
// SPDY does not require to send back a response to a GO_AWAY.
// We notified the application of the last good stream id and
@ -755,6 +765,27 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
}
}
private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo)
{
try
{
if (listener == null)
return null;
LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener);
return listener.onPush(stream, pushInfo);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
return null;
}
catch (Error x)
{
LOG.info("Exception while notifying listener " + listener, x);
throw x;
}
}
private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
{
try
@ -839,14 +870,14 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
}
}
private void notifyOnGoAway(SessionFrameListener listener, GoAwayReceivedInfo goAwayReceivedInfo)
private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo)
{
try
{
if (listener != null)
{
LOG.debug("Invoking callback with {} on listener {}", goAwayReceivedInfo, listener);
listener.onGoAway(this, goAwayReceivedInfo);
LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener);
listener.onGoAway(this, goAwayResultInfo);
}
}
catch (Exception x)

View File

@ -148,6 +148,7 @@ public class StandardStream implements IStream
this.listener = listener;
}
@Override
public StreamFrameListener getStreamFrameListener()
{
return listener;

View File

@ -22,18 +22,18 @@ package org.eclipse.jetty.spdy.api;
* <p>A container for GOAWAY frames metadata: the last good stream id and
* the session status.</p>
*/
public class GoAwayReceivedInfo
public class GoAwayResultInfo
{
private final int lastStreamId;
private final SessionStatus sessionStatus;
/**
* <p>Creates a new {@link GoAwayReceivedInfo} with the given last good stream id and session status</p>
* <p>Creates a new {@link GoAwayResultInfo} with the given last good stream id and session status</p>
*
* @param lastStreamId the last good stream id
* @param sessionStatus the session status
*/
public GoAwayReceivedInfo(int lastStreamId, SessionStatus sessionStatus)
public GoAwayResultInfo(int lastStreamId, SessionStatus sessionStatus)
{
this.lastStreamId = lastStreamId;
this.sessionStatus = sessionStatus;

View File

@ -36,7 +36,7 @@ public interface SessionFrameListener extends EventListener
* <p>Application code should implement this method and reply to the stream creation, eventually
* sending data:</p>
* <pre>
* public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
* public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
* {
* // Do something with the metadata contained in synInfo
*
@ -52,7 +52,7 @@ public interface SessionFrameListener extends EventListener
* </pre>
* <p>Alternatively, if the stream creation requires reading data sent from the other peer:</p>
* <pre>
* public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
* public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
* {
* // Do something with the metadata contained in synInfo
*
@ -106,9 +106,9 @@ public interface SessionFrameListener extends EventListener
* <p>Callback invoked when the other peer signals that it is closing the connection.</p>
*
* @param session the session
* @param goAwayReceivedInfo the metadata sent
* @param goAwayResultInfo the metadata sent
*/
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo);
public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo);
/**
* <p>Callback invoked when an exception is thrown during the processing of an event on a
@ -119,6 +119,7 @@ public interface SessionFrameListener extends EventListener
*/
public void onException(Throwable x);
/**
* <p>Empty implementation of {@link SessionFrameListener}</p>
*/
@ -148,7 +149,7 @@ public interface SessionFrameListener extends EventListener
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo)
{
}

View File

@ -50,6 +50,15 @@ public interface StreamFrameListener extends EventListener
*/
public void onHeaders(Stream stream, HeadersInfo headersInfo);
/**
* <p>Callback invoked when a push syn has been received on a stream.</p>
*
* @param stream the push stream just created
* @param pushInfo
* @return a listener for stream events or null if there is no interest in being notified of stream events
*/
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo);
/**
* <p>Callback invoked when data bytes are received on a stream.</p>
* <p>Implementers should be read or consume the content of the
@ -75,6 +84,12 @@ public interface StreamFrameListener extends EventListener
{
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return null;
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{

View File

@ -59,6 +59,81 @@ public class ClientUsageTest
});
}
@Test
public void testClientReceivesPush1() throws InterruptedException, ExecutionException, TimeoutException
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null);
session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return new Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
}
};
};
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
replyInfo.getHeaders().get("host");
// Then issue another similar request
try
{
stream.getSession().syn(new SynInfo(new Fields(), true), this);
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
throw new IllegalStateException(e);
}
}
});
}
@Test
public void testClientReceivesPush2() throws InterruptedException, ExecutionException, TimeoutException
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, new SessionFrameListener.Adapter()
{
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
}
};
}
}, null, null);
session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
replyInfo.getHeaders().get("host");
// Then issue another similar request
try
{
stream.getSession().syn(new SynInfo(new Fields(), true), this);
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
throw new IllegalStateException(e);
}
}
});
}
@Test
public void testClientRequestWithBodyResponseNoBody() throws Exception
{
@ -127,7 +202,8 @@ public class ClientUsageTest
// to wait for the data to be sent
stream.data(new StringDataInfo(context, true), new Callback.Adapter());
}
});
}
);
}
@Test
@ -178,6 +254,7 @@ public class ClientUsageTest
stream.data(new StringDataInfo("foo", false), new Callback.Adapter());
stream.data(new ByteBufferDataInfo(Charset.forName("UTF-8").encode("bar"), true), new Callback.Adapter());
}
});
}
);
}
}

View File

@ -24,6 +24,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
@ -131,6 +132,12 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory
channel.requestHeaders(headersInfo.getHeaders(), headersInfo.isClose());
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return null;
}
@Override
public void onData(Stream stream, final DataInfo dataInfo)
{

View File

@ -23,7 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PingResultInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session;
@ -104,7 +104,7 @@ public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo)
{
// TODO:
}

View File

@ -42,7 +42,7 @@ import org.eclipse.jetty.spdy.StandardStream;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
@ -136,7 +136,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
{
assert content == null;
if (headers.isEmpty())
proxyEngineSelector.onGoAway(session, new GoAwayReceivedInfo(0, SessionStatus.OK));
proxyEngineSelector.onGoAway(session, new GoAwayResultInfo(0, SessionStatus.OK));
else
syn(true);
}

View File

@ -29,8 +29,9 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.Info;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
@ -50,8 +51,8 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by
* clients into SPDY events for the servers.</p>
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by clients into
* SPDY events for the servers.</p>
*/
public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
{
@ -131,6 +132,12 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
}
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
throw new IllegalStateException("We shouldn't receive pushes from clients");
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -222,6 +229,61 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
this.clientStream = clientStream;
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
LOG.debug("S -> P pushed {} on {}", pushInfo, stream);
Fields headers = new Fields(pushInfo.getHeaders(), false);
addResponseProxyHeaders(stream, headers);
customizeResponseHeaders(stream, headers);
Stream clientStream = (Stream)stream.getAssociatedStream().getAttribute
(CLIENT_STREAM_ATTRIBUTE);
convert(stream.getSession().getVersion(), clientStream.getSession().getVersion(),
headers);
StreamHandler handler = new StreamHandler(clientStream, pushInfo);
stream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers,
pushInfo.isClose()),
handler);
return new Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Push streams never send a reply
throw new UnsupportedOperationException();
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
throw new UnsupportedOperationException();
}
@Override
public void onData(Stream serverStream, final DataInfo serverDataInfo)
{
LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
serverDataInfo.consume(delta);
}
};
StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
handler.data(clientDataInfo);
}
};
}
@Override
public void onReply(final Stream stream, ReplyInfo replyInfo)
{
@ -304,30 +366,30 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
}
/**
* <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p>
* <p>Instances of this class buffer DATA frames sent by clients and send them to the server.
* The buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive
* from the client before the SYN_STREAM has been fully sent), and between DATA frames, if the client
* is a fast producer and the server a slow consumer, or if the client is a SPDY v2 client (and hence
* without flow control) while the server is a SPDY v3 server (and hence with flow control).</p>
* <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p> <p>Instances
* of this class buffer DATA frames sent by clients and send them to the server. The buffering happens between the
* send of the SYN_STREAM to the server (where DATA frames may arrive from the client before the SYN_STREAM has been
* fully sent), and between DATA frames, if the client is a fast producer and the server a slow consumer, or if the
* client is a SPDY v2 client (and hence without flow control) while the server is a SPDY v3 server (and hence with
* flow control).</p>
*/
private class StreamHandler implements Promise<Stream>
{
private final Queue<DataInfoHandler> queue = new LinkedList<>();
private final Stream clientStream;
private final SynInfo serverSynInfo;
private final Info info;
private Stream serverStream;
private StreamHandler(Stream clientStream, SynInfo serverSynInfo)
private StreamHandler(Stream clientStream, Info info)
{
this.clientStream = clientStream;
this.serverSynInfo = serverSynInfo;
this.info = info;
}
@Override
public void succeeded(Stream serverStream)
{
LOG.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream);
LOG.debug("P -> S {} from {} to {}", info, clientStream, serverStream);
serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
@ -449,26 +511,8 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
}
}
private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener
private class ProxySessionFrameListener extends SessionFrameListener.Adapter
{
@Override
public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo)
{
LOG.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
Fields headers = new Fields(serverSynInfo.getHeaders(), false);
addResponseProxyHeaders(serverStream, headers);
customizeResponseHeaders(serverStream, headers);
Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE);
convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers);
StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, serverSynInfo.isClose()),
handler);
return this;
}
@Override
public void onRst(Session serverSession, RstInfo serverRstInfo)
@ -487,41 +531,9 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
}
@Override
public void onGoAway(Session serverSession, GoAwayReceivedInfo goAwayReceivedInfo)
public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo)
{
serverSessions.values().remove(serverSession);
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Push streams never send a reply
throw new UnsupportedOperationException();
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
throw new UnsupportedOperationException();
}
@Override
public void onData(Stream serverStream, final DataInfo serverDataInfo)
{
LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
serverDataInfo.consume(delta);
}
};
StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
handler.data(clientDataInfo);
}
}
}

View File

@ -18,16 +18,12 @@
package org.eclipse.jetty.spdy.server.http;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -37,6 +33,7 @@ import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY;
@ -50,10 +47,15 @@ import org.eclipse.jetty.spdy.server.NPNServerConnectionFactory;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
{
private final int referrerPushPeriod = 1000;
@ -107,39 +109,14 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
sendMainRequestAndCSSRequest();
final CountDownLatch pushDataLatch = new CountDownLatch(1);
final CountDownLatch pushSynHeadersValid = new CountDownLatch(1);
Session session = startClient(version, serverAddress, new SessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
validateHeaders(synInfo.getHeaders(), pushSynHeadersValid);
assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true));
assertThat("URI header ends with css", synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))
.value().endsWith
("" +
".css"),
is(true));
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter());
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
pushDataLatch.countDown();
}
};
}
});
Session session = startClient(version, serverAddress, null);
// Send main request. That should initiate the push push's which get reset by the client
sendRequest(session, mainRequestHeaders);
sendRequest(session, mainRequestHeaders, pushSynHeadersValid, pushDataLatch);
assertThat("No push data is received", pushDataLatch.await(1, TimeUnit.SECONDS), is(false));
assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true));
sendRequest(session, associatedCSSRequestHeaders);
sendRequest(session, associatedCSSRequestHeaders, pushSynHeadersValid, pushDataLatch);
}
@Test
@ -157,7 +134,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
// Sleep for pushPeriod This should prevent application.js from being mapped as pushResource
Thread.sleep(referrerPushPeriod + 1);
sendRequest(session, associatedJSRequestHeaders);
sendRequest(session, associatedJSRequestHeaders, null, null);
run2ndClientRequests(false, true);
}
@ -171,7 +148,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
Session session = sendMainRequestAndCSSRequest();
sendRequest(session, associatedJSRequestHeaders);
sendRequest(session, associatedJSRequestHeaders, null, null);
run2ndClientRequests(false, true);
}
@ -200,18 +177,43 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
{
Session session = startClient(version, serverAddress, null);
sendRequest(session, mainRequestHeaders);
sendRequest(session, associatedCSSRequestHeaders);
sendRequest(session, mainRequestHeaders, null, null);
sendRequest(session, associatedCSSRequestHeaders, null, null);
return session;
}
private void sendRequest(Session session, Fields requestHeaders) throws InterruptedException
private void sendRequest(Session session, Fields requestHeaders, final CountDownLatch pushSynHeadersValid,
final CountDownLatch pushDataLatch) throws InterruptedException
{
final CountDownLatch dataReceivedLatch = new CountDownLatch(1);
final CountDownLatch received200OKLatch = new CountDownLatch(1);
session.syn(new SynInfo(requestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid);
assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true));
assertThat("URI header ends with css", pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))
.value().endsWith
("" +
".css"),
is(true));
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter());
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
pushDataLatch.countDown();
}
};
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -238,16 +240,17 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(1);
final CountDownLatch pushSynHeadersValid = new CountDownLatch(1);
Session session2 = startClient(version, serverAddress, new SessionFrameListener.Adapter()
Session session2 = startClient(version, serverAddress, null);
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
if (validateHeaders)
validateHeaders(synInfo.getHeaders(), pushSynHeadersValid);
validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid);
assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true));
assertThat("URI header ends with css", synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))
assertThat("URI header ends with css", pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))
.value().endsWith
("" +
".css"),
@ -264,9 +267,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
}
};
}
});
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -292,6 +293,8 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true));
}
private static final Logger LOG = Log.getLogger(ReferrerPushStrategyTest.class);
@Test
public void testAssociatedResourceIsPushed() throws Exception
{
@ -326,16 +329,17 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
});
Assert.assertTrue(mainResourceLatch.await(5, TimeUnit.SECONDS));
sendRequest(session1, createHeaders(cssResource));
sendRequest(session1, createHeaders(cssResource), null, null);
// Create another client, and perform the same request for the main resource, we expect the css being pushed
final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(1);
Session session2 = startClient(version, address, new SessionFrameListener.Adapter()
Session session2 = startClient(version, address, null);
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
Assert.assertTrue(stream.isUnidirectional());
return new StreamFrameListener.Adapter()
@ -349,9 +353,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
}
};
}
});
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -452,13 +454,15 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(1);
Session session2 = startClient(version, address, new SessionFrameListener.Adapter()
Session session2 = startClient(version, address, null);
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
Assert.assertTrue(stream.isUnidirectional());
Assert.assertTrue(synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value().endsWith(".css"));
Assert.assertTrue(pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value().endsWith("" +
".css"));
return new StreamFrameListener.Adapter()
{
@Override
@ -470,9 +474,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
}
};
}
});
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -563,13 +565,20 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(2);
Session session2 = startClient(version, address, new SessionFrameListener.Adapter()
Session session2 = startClient(version, address, null);
LOG.warn("REQUEST FOR PUSHED RESOURCES");
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
Assert.assertTrue(stream.isUnidirectional());
return new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return new Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
@ -580,9 +589,17 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
}
};
}
});
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
pushDataLatch.countDown();
}
};
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{

View File

@ -33,7 +33,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
@ -164,7 +164,7 @@ public class ProxyHTTPToSPDYTest
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
closeLatch.countDown();
}

View File

@ -37,7 +37,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.PingResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
@ -429,7 +429,7 @@ public class ProxySPDYToHTTPTest
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayReceivedInfo)
{
goAwayLatch.countDown();
}

View File

@ -281,10 +281,16 @@ public class ProxySPDYToSPDYTest
final CountDownLatch pushSynLatch = new CountDownLatch(1);
final CountDownLatch pushDataLatch = new CountDownLatch(1);
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
Fields headers = new Fields();
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
final CountDownLatch replyLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
pushSynLatch.countDown();
return new StreamFrameListener.Adapter()
@ -298,14 +304,7 @@ public class ProxySPDYToSPDYTest
}
};
}
}).get(5, TimeUnit.SECONDS);
Fields headers = new Fields();
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
final CountDownLatch replyLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{

View File

@ -35,7 +35,7 @@ import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
@ -211,7 +211,7 @@ public class ClosedStreamTest extends AbstractTest
};
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
goAwayReceivedLatch.countDown();
}

View File

@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
@ -61,7 +61,7 @@ public class GoAwayTest extends AbstractTest
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
Assert.assertEquals(0, goAwayInfo.getLastStreamId());
Assert.assertSame(SessionStatus.OK, goAwayInfo.getSessionStatus());
@ -90,12 +90,12 @@ public class GoAwayTest extends AbstractTest
return null;
}
};
final AtomicReference<GoAwayReceivedInfo> ref = new AtomicReference<>();
final AtomicReference<GoAwayResultInfo> ref = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
ref.set(goAwayInfo);
latch.countDown();
@ -106,10 +106,10 @@ public class GoAwayTest extends AbstractTest
Stream stream1 = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), true, (byte)0), null);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
GoAwayReceivedInfo goAwayReceivedInfo = ref.get();
Assert.assertNotNull(goAwayReceivedInfo);
Assert.assertEquals(stream1.getId(), goAwayReceivedInfo.getLastStreamId());
Assert.assertSame(SessionStatus.OK, goAwayReceivedInfo.getSessionStatus());
GoAwayResultInfo goAwayResultInfo = ref.get();
Assert.assertNotNull(goAwayResultInfo);
Assert.assertEquals(stream1.getId(), goAwayResultInfo.getLastStreamId());
Assert.assertSame(SessionStatus.OK, goAwayResultInfo.getSessionStatus());
}
@Test
@ -139,7 +139,7 @@ public class GoAwayTest extends AbstractTest
SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
session.syn(new SynInfo(new Fields(), true), null, new FuturePromise<Stream>());
}
@ -184,12 +184,12 @@ public class GoAwayTest extends AbstractTest
}
}
};
final AtomicReference<GoAwayReceivedInfo> goAwayRef = new AtomicReference<>();
final AtomicReference<GoAwayResultInfo> goAwayRef = new AtomicReference<>();
final CountDownLatch goAwayLatch = new CountDownLatch(1);
SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
goAwayRef.set(goAwayInfo);
goAwayLatch.countDown();
@ -228,7 +228,7 @@ public class GoAwayTest extends AbstractTest
// The last good stream is the second, because it was received by the server
Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
GoAwayReceivedInfo goAway = goAwayRef.get();
GoAwayResultInfo goAway = goAwayRef.get();
Assert.assertNotNull(goAway);
Assert.assertEquals(stream2.getId(), goAway.getLastStreamId());
}

View File

@ -23,7 +23,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
@ -63,7 +63,7 @@ public class IdleTimeoutTest extends AbstractTest
Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
latch.countDown();
}
@ -85,7 +85,7 @@ public class IdleTimeoutTest extends AbstractTest
Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
latch.countDown();
}
@ -125,7 +125,7 @@ public class IdleTimeoutTest extends AbstractTest
Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
goAwayLatch.countDown();
}
@ -161,7 +161,7 @@ public class IdleTimeoutTest extends AbstractTest
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
latch.countDown();
}
@ -187,7 +187,7 @@ public class IdleTimeoutTest extends AbstractTest
InetSocketAddress address = startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
latch.countDown();
}
@ -220,7 +220,7 @@ public class IdleTimeoutTest extends AbstractTest
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
latch.countDown();
}

View File

@ -19,12 +19,6 @@
package org.eclipse.jetty.spdy.server;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -44,7 +38,7 @@ import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
@ -75,6 +69,12 @@ import org.eclipse.jetty.util.log.Logger;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public class PushStreamTest extends AbstractTest
{
private static final Logger LOG = Log.getLogger(PushStreamTest.class);
@ -94,10 +94,12 @@ public class PushStreamTest extends AbstractTest
stream.push(new PushInfo(new Fields(), true), new Promise.Adapter<Stream>());
return null;
}
}), new SessionFrameListener.Adapter()
}), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
assertThat("streamId is even", stream.getId() % 2, is(0));
assertThat("stream is unidirectional", stream.isUnidirectional(), is(true));
@ -117,8 +119,6 @@ public class PushStreamTest extends AbstractTest
return null;
}
});
Stream stream = clientSession.syn(new SynInfo(new Fields(), true), null);
assertThat("onSyn has been called", pushStreamLatch.await(5, TimeUnit.SECONDS), is(true));
Stream pushStream = pushStreamRef.get();
assertThat("main stream and associated stream are the same", stream, sameInstance(pushStream.getAssociatedStream()));
@ -177,10 +177,12 @@ public class PushStreamTest extends AbstractTest
}
}
}), new SessionFrameListener.Adapter()
}), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
pushStreamSynLatch.countDown();
return new StreamFrameListener.Adapter()
@ -193,10 +195,7 @@ public class PushStreamTest extends AbstractTest
}
};
}
});
Stream stream = clientSession.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -298,10 +297,12 @@ public class PushStreamTest extends AbstractTest
throw new IllegalStateException(e);
}
}
}), new SessionFrameListener.Adapter()
}), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return new StreamFrameListener.Adapter()
{
@ -327,10 +328,7 @@ public class PushStreamTest extends AbstractTest
}
};
}
});
Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -427,7 +425,7 @@ public class PushStreamTest extends AbstractTest
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
goAwayReceivedLatch.countDown();
}
@ -543,20 +541,14 @@ public class PushStreamTest extends AbstractTest
stream.push(new PushInfo(new Fields(), false), new Promise.Adapter<Stream>());
return null;
}
}), new SessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
assertStreamIdIsEven(stream);
pushStreamIdIsEvenLatch.countDown();
return super.onSyn(stream, synInfo);
}
});
}), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), false), null);
Stream stream2 = clientSession.syn(new SynInfo(new Fields(), false), null);
Stream stream3 = clientSession.syn(new SynInfo(new Fields(), false), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), false),
new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch));
Stream stream2 = clientSession.syn(new SynInfo(new Fields(), false),
new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch));
Stream stream3 = clientSession.syn(new SynInfo(new Fields(), false),
new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch));
assertStreamIdIsOdd(stream);
assertStreamIdIsOdd(stream2);
assertStreamIdIsOdd(stream3);
@ -564,6 +556,24 @@ public class PushStreamTest extends AbstractTest
assertThat("all pushStreams had even ids", pushStreamIdIsEvenLatch.await(5, TimeUnit.SECONDS), is(true));
}
private class VerifyPushStreamIdIsEvenStreamFrameListener extends StreamFrameListener.Adapter
{
final CountDownLatch pushStreamIdIsEvenLatch;
private VerifyPushStreamIdIsEvenStreamFrameListener(CountDownLatch pushStreamIdIsEvenLatch)
{
this.pushStreamIdIsEvenLatch = pushStreamIdIsEvenLatch;
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
assertStreamIdIsEven(stream);
pushStreamIdIsEvenLatch.countDown();
return super.onPush(stream, pushInfo);
}
}
private void assertStreamIdIsEven(Stream stream)
{
assertThat("streamId is odd", stream.getId() % 2, is(0));

View File

@ -23,7 +23,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.junit.Assert;
@ -38,7 +38,7 @@ public class SPDYClientFactoryTest extends AbstractTest
startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo)
{
latch.countDown();
}

View File

@ -23,7 +23,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.junit.Assert;
@ -38,7 +38,7 @@ public class SPDYServerConnectorTest extends AbstractTest
startClient(startServer(null), new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo)
{
latch.countDown();
}

View File

@ -27,6 +27,10 @@ import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/* ------------------------------------------------------------ */
/** A scheduler based on the the JVM Timer class
*/
public class TimerScheduler extends AbstractLifeCycle implements Scheduler, Runnable
{
private static final Logger LOG = Log.getLogger(TimerScheduler.class);