Merge branch 'spdy_onPushSynApiChange'

This commit is contained in:
Thomas Becker 2013-02-19 15:23:46 +01:00
commit 60dfbffbce
21 changed files with 463 additions and 290 deletions

View File

@ -59,6 +59,9 @@ public interface IStream extends Stream, Callback
*/
public void setStreamFrameListener(StreamFrameListener listener);
//TODO: javadoc thomas
public StreamFrameListener getStreamFrameListener();
/**
* <p>A stream can be open, {@link #isHalfClosed() half closed} or
* {@link #isClosed() closed} and this method updates the close state

View File

@ -45,9 +45,10 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.PingResultInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session;
@ -498,8 +499,17 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
stream.process(frame);
// Update the last stream id before calling the application (which may send a GO_AWAY)
updateLastStreamId(stream);
SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority());
StreamFrameListener streamListener = notifyOnSyn(listener, stream, synInfo);
StreamFrameListener streamListener;
if (stream.isUnidirectional())
{
PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose());
streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo);
}
else
{
SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority());
streamListener = notifyOnSyn(listener, stream, synInfo);
}
stream.setStreamFrameListener(streamListener);
flush();
// The onSyn() listener may have sent a frame that closed the stream
@ -680,9 +690,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
{
if (goAwayReceived.compareAndSet(false, true))
{
//TODO: Find a better name for GoAwayReceivedInfo
GoAwayReceivedInfo goAwayReceivedInfo = new GoAwayReceivedInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
notifyOnGoAway(listener, goAwayReceivedInfo);
//TODO: Find a better name for GoAwayResultInfo
GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
notifyOnGoAway(listener, goAwayResultInfo);
flush();
// SPDY does not require to send back a response to a GO_AWAY.
// We notified the application of the last good stream id and
@ -755,6 +765,27 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
}
}
private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo)
{
try
{
if (listener == null)
return null;
LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener);
return listener.onPush(stream, pushInfo);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
return null;
}
catch (Error x)
{
LOG.info("Exception while notifying listener " + listener, x);
throw x;
}
}
private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
{
try
@ -839,14 +870,14 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
}
}
private void notifyOnGoAway(SessionFrameListener listener, GoAwayReceivedInfo goAwayReceivedInfo)
private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo)
{
try
{
if (listener != null)
{
LOG.debug("Invoking callback with {} on listener {}", goAwayReceivedInfo, listener);
listener.onGoAway(this, goAwayReceivedInfo);
LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener);
listener.onGoAway(this, goAwayResultInfo);
}
}
catch (Exception x)

View File

@ -148,6 +148,7 @@ public class StandardStream implements IStream
this.listener = listener;
}
@Override
public StreamFrameListener getStreamFrameListener()
{
return listener;

View File

@ -22,18 +22,18 @@ package org.eclipse.jetty.spdy.api;
* <p>A container for GOAWAY frames metadata: the last good stream id and
* the session status.</p>
*/
public class GoAwayReceivedInfo
public class GoAwayResultInfo
{
private final int lastStreamId;
private final SessionStatus sessionStatus;
/**
* <p>Creates a new {@link GoAwayReceivedInfo} with the given last good stream id and session status</p>
* <p>Creates a new {@link GoAwayResultInfo} with the given last good stream id and session status</p>
*
* @param lastStreamId the last good stream id
* @param sessionStatus the session status
*/
public GoAwayReceivedInfo(int lastStreamId, SessionStatus sessionStatus)
public GoAwayResultInfo(int lastStreamId, SessionStatus sessionStatus)
{
this.lastStreamId = lastStreamId;
this.sessionStatus = sessionStatus;

View File

@ -36,7 +36,7 @@ public interface SessionFrameListener extends EventListener
* <p>Application code should implement this method and reply to the stream creation, eventually
* sending data:</p>
* <pre>
* public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
* public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
* {
* // Do something with the metadata contained in synInfo
*
@ -52,7 +52,7 @@ public interface SessionFrameListener extends EventListener
* </pre>
* <p>Alternatively, if the stream creation requires reading data sent from the other peer:</p>
* <pre>
* public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
* public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
* {
* // Do something with the metadata contained in synInfo
*
@ -106,9 +106,9 @@ public interface SessionFrameListener extends EventListener
* <p>Callback invoked when the other peer signals that it is closing the connection.</p>
*
* @param session the session
* @param goAwayReceivedInfo the metadata sent
* @param goAwayResultInfo the metadata sent
*/
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo);
public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo);
/**
* <p>Callback invoked when an exception is thrown during the processing of an event on a
@ -119,6 +119,7 @@ public interface SessionFrameListener extends EventListener
*/
public void onException(Throwable x);
/**
* <p>Empty implementation of {@link SessionFrameListener}</p>
*/
@ -148,7 +149,7 @@ public interface SessionFrameListener extends EventListener
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo)
{
}

View File

@ -50,6 +50,15 @@ public interface StreamFrameListener extends EventListener
*/
public void onHeaders(Stream stream, HeadersInfo headersInfo);
/**
* <p>Callback invoked when a push syn has been received on a stream.</p>
*
* @param stream the push stream just created
* @param pushInfo
* @return a listener for stream events or null if there is no interest in being notified of stream events
*/
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo);
/**
* <p>Callback invoked when data bytes are received on a stream.</p>
* <p>Implementers should be read or consume the content of the
@ -75,6 +84,12 @@ public interface StreamFrameListener extends EventListener
{
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return null;
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{

View File

@ -60,14 +60,24 @@ public class ClientUsageTest
}
@Test
public void testClientRequestWithBodyResponseNoBody() throws Exception
public void testClientReceivesPush1() throws InterruptedException, ExecutionException, TimeoutException
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null);
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), false, (byte)0),
new StreamFrameListener.Adapter()
session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return new Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
}
};
};
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
@ -83,6 +93,71 @@ public class ClientUsageTest
throw new IllegalStateException(e);
}
}
});
}
@Test
public void testClientReceivesPush2() throws InterruptedException, ExecutionException, TimeoutException
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, new SessionFrameListener.Adapter()
{
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
}
};
}
}, null, null);
session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
replyInfo.getHeaders().get("host");
// Then issue another similar request
try
{
stream.getSession().syn(new SynInfo(new Fields(), true), this);
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
throw new IllegalStateException(e);
}
}
});
}
@Test
public void testClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null);
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), false, (byte)0),
new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
replyInfo.getHeaders().get("host");
// Then issue another similar request
try
{
stream.getSession().syn(new SynInfo(new Fields(), true), this);
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
throw new IllegalStateException(e);
}
}
});
// Send-and-forget the data
stream.data(new StringDataInfo("data", true));
@ -96,38 +171,39 @@ public class ClientUsageTest
final String context = "context";
session.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
replyInfo.getHeaders().get("host");
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
replyInfo.getHeaders().get("host");
// Then issue another similar request
try
{
stream.getSession().syn(new SynInfo(new Fields(), true), this);
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
throw new IllegalStateException(e);
}
}
// Then issue another similar request
try
{
stream.getSession().syn(new SynInfo(new Fields(), true), this);
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
throw new IllegalStateException(e);
}
}
}, new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream stream)
{
// Differently from JDK 7 AIO, there is no need to
// have an explicit parameter for the context since
// that is captured while the handler is created anyway,
// and it is used only by the handler as parameter
public void succeeded(Stream stream)
{
// Differently from JDK 7 AIO, there is no need to
// have an explicit parameter for the context since
// that is captured while the handler is created anyway,
// and it is used only by the handler as parameter
// The style below is fire-and-forget, since
// we do not pass the handler nor we call get()
// to wait for the data to be sent
stream.data(new StringDataInfo(context, true), new Callback.Adapter());
}
});
// The style below is fire-and-forget, since
// we do not pass the handler nor we call get()
// to wait for the data to be sent
stream.data(new StringDataInfo(context, true), new Callback.Adapter());
}
}
);
}
@Test
@ -136,48 +212,49 @@ public class ClientUsageTest
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null);
session.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter()
{
// The good of passing the listener to push() is that applications can safely
// accumulate info from the reply headers to be used in the data callback,
// e.g. content-type, charset, etc.
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
Fields headers = replyInfo.getHeaders();
int contentLength = headers.get("content-length").valueAsInt();
stream.setAttribute("content-length", contentLength);
if (!replyInfo.isClose())
stream.setAttribute("builder", new StringBuilder());
// May issue another similar request while waiting for data
try
{
stream.getSession().syn(new SynInfo(new Fields(), true), this);
}
catch (ExecutionException | InterruptedException | TimeoutException e)
// The good of passing the listener to push() is that applications can safely
// accumulate info from the reply headers to be used in the data callback,
// e.g. content-type, charset, etc.
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
Fields headers = replyInfo.getHeaders();
int contentLength = headers.get("content-length").valueAsInt();
stream.setAttribute("content-length", contentLength);
if (!replyInfo.isClose())
stream.setAttribute("builder", new StringBuilder());
// May issue another similar request while waiting for data
try
{
stream.getSession().syn(new SynInfo(new Fields(), true), this);
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
throw new IllegalStateException(e);
}
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
StringBuilder builder = (StringBuilder)stream.getAttribute("builder");
builder.append(dataInfo.asString("UTF-8", true));
}
}, new Promise.Adapter<Stream>()
{
throw new IllegalStateException(e);
@Override
public void succeeded(Stream stream)
{
stream.data(new BytesDataInfo("wee".getBytes(Charset.forName("UTF-8")), false), new Callback.Adapter());
stream.data(new StringDataInfo("foo", false), new Callback.Adapter());
stream.data(new ByteBufferDataInfo(Charset.forName("UTF-8").encode("bar"), true), new Callback.Adapter());
}
}
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
StringBuilder builder = (StringBuilder)stream.getAttribute("builder");
builder.append(dataInfo.asString("UTF-8", true));
}
}, new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream stream)
{
stream.data(new BytesDataInfo("wee".getBytes(Charset.forName("UTF-8")), false), new Callback.Adapter());
stream.data(new StringDataInfo("foo", false), new Callback.Adapter());
stream.data(new ByteBufferDataInfo(Charset.forName("UTF-8").encode("bar"), true), new Callback.Adapter());
}
});
);
}
}

View File

@ -24,6 +24,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
@ -131,6 +132,12 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory
channel.requestHeaders(headersInfo.getHeaders(), headersInfo.isClose());
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return null;
}
@Override
public void onData(Stream stream, final DataInfo dataInfo)
{

View File

@ -23,7 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PingResultInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session;
@ -104,7 +104,7 @@ public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo)
{
// TODO:
}

View File

@ -42,7 +42,7 @@ import org.eclipse.jetty.spdy.StandardStream;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
@ -136,7 +136,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
{
assert content == null;
if (headers.isEmpty())
proxyEngineSelector.onGoAway(session, new GoAwayReceivedInfo(0, SessionStatus.OK));
proxyEngineSelector.onGoAway(session, new GoAwayResultInfo(0, SessionStatus.OK));
else
syn(true);
}

View File

@ -29,8 +29,9 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.Info;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
@ -50,8 +51,8 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by
* clients into SPDY events for the servers.</p>
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by clients into
* SPDY events for the servers.</p>
*/
public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
{
@ -131,6 +132,12 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
}
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
throw new IllegalStateException("We shouldn't receive pushes from clients");
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -222,6 +229,61 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
this.clientStream = clientStream;
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
LOG.debug("S -> P pushed {} on {}", pushInfo, stream);
Fields headers = new Fields(pushInfo.getHeaders(), false);
addResponseProxyHeaders(stream, headers);
customizeResponseHeaders(stream, headers);
Stream clientStream = (Stream)stream.getAssociatedStream().getAttribute
(CLIENT_STREAM_ATTRIBUTE);
convert(stream.getSession().getVersion(), clientStream.getSession().getVersion(),
headers);
StreamHandler handler = new StreamHandler(clientStream, pushInfo);
stream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers,
pushInfo.isClose()),
handler);
return new Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Push streams never send a reply
throw new UnsupportedOperationException();
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
throw new UnsupportedOperationException();
}
@Override
public void onData(Stream serverStream, final DataInfo serverDataInfo)
{
LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
serverDataInfo.consume(delta);
}
};
StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
handler.data(clientDataInfo);
}
};
}
@Override
public void onReply(final Stream stream, ReplyInfo replyInfo)
{
@ -304,30 +366,30 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
}
/**
* <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p>
* <p>Instances of this class buffer DATA frames sent by clients and send them to the server.
* The buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive
* from the client before the SYN_STREAM has been fully sent), and between DATA frames, if the client
* is a fast producer and the server a slow consumer, or if the client is a SPDY v2 client (and hence
* without flow control) while the server is a SPDY v3 server (and hence with flow control).</p>
* <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p> <p>Instances
* of this class buffer DATA frames sent by clients and send them to the server. The buffering happens between the
* send of the SYN_STREAM to the server (where DATA frames may arrive from the client before the SYN_STREAM has been
* fully sent), and between DATA frames, if the client is a fast producer and the server a slow consumer, or if the
* client is a SPDY v2 client (and hence without flow control) while the server is a SPDY v3 server (and hence with
* flow control).</p>
*/
private class StreamHandler implements Promise<Stream>
{
private final Queue<DataInfoHandler> queue = new LinkedList<>();
private final Stream clientStream;
private final SynInfo serverSynInfo;
private final Info info;
private Stream serverStream;
private StreamHandler(Stream clientStream, SynInfo serverSynInfo)
private StreamHandler(Stream clientStream, Info info)
{
this.clientStream = clientStream;
this.serverSynInfo = serverSynInfo;
this.info = info;
}
@Override
public void succeeded(Stream serverStream)
{
LOG.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream);
LOG.debug("P -> S {} from {} to {}", info, clientStream, serverStream);
serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
@ -449,26 +511,8 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
}
}
private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener
private class ProxySessionFrameListener extends SessionFrameListener.Adapter
{
@Override
public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo)
{
LOG.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
Fields headers = new Fields(serverSynInfo.getHeaders(), false);
addResponseProxyHeaders(serverStream, headers);
customizeResponseHeaders(serverStream, headers);
Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE);
convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers);
StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, serverSynInfo.isClose()),
handler);
return this;
}
@Override
public void onRst(Session serverSession, RstInfo serverRstInfo)
@ -487,41 +531,9 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
}
@Override
public void onGoAway(Session serverSession, GoAwayReceivedInfo goAwayReceivedInfo)
public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo)
{
serverSessions.values().remove(serverSession);
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Push streams never send a reply
throw new UnsupportedOperationException();
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
throw new UnsupportedOperationException();
}
@Override
public void onData(Stream serverStream, final DataInfo serverDataInfo)
{
LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
serverDataInfo.consume(delta);
}
};
StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
handler.data(clientDataInfo);
}
}
}

View File

@ -18,16 +18,12 @@
package org.eclipse.jetty.spdy.server.http;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -37,6 +33,7 @@ import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY;
@ -50,10 +47,15 @@ import org.eclipse.jetty.spdy.server.NPNServerConnectionFactory;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
{
private final int referrerPushPeriod = 1000;
@ -107,39 +109,14 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
sendMainRequestAndCSSRequest();
final CountDownLatch pushDataLatch = new CountDownLatch(1);
final CountDownLatch pushSynHeadersValid = new CountDownLatch(1);
Session session = startClient(version, serverAddress, new SessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
validateHeaders(synInfo.getHeaders(), pushSynHeadersValid);
assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true));
assertThat("URI header ends with css", synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))
.value().endsWith
("" +
".css"),
is(true));
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter());
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
pushDataLatch.countDown();
}
};
}
});
Session session = startClient(version, serverAddress, null);
// Send main request. That should initiate the push push's which get reset by the client
sendRequest(session, mainRequestHeaders);
sendRequest(session, mainRequestHeaders, pushSynHeadersValid, pushDataLatch);
assertThat("No push data is received", pushDataLatch.await(1, TimeUnit.SECONDS), is(false));
assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true));
sendRequest(session, associatedCSSRequestHeaders);
sendRequest(session, associatedCSSRequestHeaders, pushSynHeadersValid, pushDataLatch);
}
@Test
@ -157,7 +134,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
// Sleep for pushPeriod This should prevent application.js from being mapped as pushResource
Thread.sleep(referrerPushPeriod + 1);
sendRequest(session, associatedJSRequestHeaders);
sendRequest(session, associatedJSRequestHeaders, null, null);
run2ndClientRequests(false, true);
}
@ -171,7 +148,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
Session session = sendMainRequestAndCSSRequest();
sendRequest(session, associatedJSRequestHeaders);
sendRequest(session, associatedJSRequestHeaders, null, null);
run2ndClientRequests(false, true);
}
@ -200,18 +177,43 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
{
Session session = startClient(version, serverAddress, null);
sendRequest(session, mainRequestHeaders);
sendRequest(session, associatedCSSRequestHeaders);
sendRequest(session, mainRequestHeaders, null, null);
sendRequest(session, associatedCSSRequestHeaders, null, null);
return session;
}
private void sendRequest(Session session, Fields requestHeaders) throws InterruptedException
private void sendRequest(Session session, Fields requestHeaders, final CountDownLatch pushSynHeadersValid,
final CountDownLatch pushDataLatch) throws InterruptedException
{
final CountDownLatch dataReceivedLatch = new CountDownLatch(1);
final CountDownLatch received200OKLatch = new CountDownLatch(1);
session.syn(new SynInfo(requestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid);
assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true));
assertThat("URI header ends with css", pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))
.value().endsWith
("" +
".css"),
is(true));
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter());
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
pushDataLatch.countDown();
}
};
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -238,16 +240,17 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(1);
final CountDownLatch pushSynHeadersValid = new CountDownLatch(1);
Session session2 = startClient(version, serverAddress, new SessionFrameListener.Adapter()
Session session2 = startClient(version, serverAddress, null);
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
if (validateHeaders)
validateHeaders(synInfo.getHeaders(), pushSynHeadersValid);
validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid);
assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true));
assertThat("URI header ends with css", synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))
assertThat("URI header ends with css", pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))
.value().endsWith
("" +
".css"),
@ -264,9 +267,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
}
};
}
});
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -292,6 +293,8 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true));
}
private static final Logger LOG = Log.getLogger(ReferrerPushStrategyTest.class);
@Test
public void testAssociatedResourceIsPushed() throws Exception
{
@ -326,16 +329,17 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
});
Assert.assertTrue(mainResourceLatch.await(5, TimeUnit.SECONDS));
sendRequest(session1, createHeaders(cssResource));
sendRequest(session1, createHeaders(cssResource), null, null);
// Create another client, and perform the same request for the main resource, we expect the css being pushed
final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(1);
Session session2 = startClient(version, address, new SessionFrameListener.Adapter()
Session session2 = startClient(version, address, null);
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
Assert.assertTrue(stream.isUnidirectional());
return new StreamFrameListener.Adapter()
@ -349,9 +353,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
}
};
}
});
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -452,13 +454,15 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(1);
Session session2 = startClient(version, address, new SessionFrameListener.Adapter()
Session session2 = startClient(version, address, null);
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
Assert.assertTrue(stream.isUnidirectional());
Assert.assertTrue(synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value().endsWith(".css"));
Assert.assertTrue(pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value().endsWith("" +
".css"));
return new StreamFrameListener.Adapter()
{
@Override
@ -470,9 +474,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
}
};
}
});
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -563,14 +565,31 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(2);
Session session2 = startClient(version, address, new SessionFrameListener.Adapter()
Session session2 = startClient(version, address, null);
LOG.warn("REQUEST FOR PUSHED RESOURCES");
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
Assert.assertTrue(stream.isUnidirectional());
return new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return new Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
pushDataLatch.countDown();
}
};
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
@ -580,9 +599,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
}
};
}
});
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{

View File

@ -33,7 +33,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
@ -164,7 +164,7 @@ public class ProxyHTTPToSPDYTest
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
closeLatch.countDown();
}

View File

@ -37,7 +37,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.PingResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
@ -429,7 +429,7 @@ public class ProxySPDYToHTTPTest
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayReceivedInfo)
{
goAwayLatch.countDown();
}

View File

@ -281,10 +281,16 @@ public class ProxySPDYToSPDYTest
final CountDownLatch pushSynLatch = new CountDownLatch(1);
final CountDownLatch pushDataLatch = new CountDownLatch(1);
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
Fields headers = new Fields();
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
final CountDownLatch replyLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
pushSynLatch.countDown();
return new StreamFrameListener.Adapter()
@ -298,14 +304,7 @@ public class ProxySPDYToSPDYTest
}
};
}
}).get(5, TimeUnit.SECONDS);
Fields headers = new Fields();
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
final CountDownLatch replyLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{

View File

@ -35,7 +35,7 @@ import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
@ -211,7 +211,7 @@ public class ClosedStreamTest extends AbstractTest
};
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
goAwayReceivedLatch.countDown();
}

View File

@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
@ -61,7 +61,7 @@ public class GoAwayTest extends AbstractTest
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
Assert.assertEquals(0, goAwayInfo.getLastStreamId());
Assert.assertSame(SessionStatus.OK, goAwayInfo.getSessionStatus());
@ -90,12 +90,12 @@ public class GoAwayTest extends AbstractTest
return null;
}
};
final AtomicReference<GoAwayReceivedInfo> ref = new AtomicReference<>();
final AtomicReference<GoAwayResultInfo> ref = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
ref.set(goAwayInfo);
latch.countDown();
@ -106,10 +106,10 @@ public class GoAwayTest extends AbstractTest
Stream stream1 = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), true, (byte)0), null);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
GoAwayReceivedInfo goAwayReceivedInfo = ref.get();
Assert.assertNotNull(goAwayReceivedInfo);
Assert.assertEquals(stream1.getId(), goAwayReceivedInfo.getLastStreamId());
Assert.assertSame(SessionStatus.OK, goAwayReceivedInfo.getSessionStatus());
GoAwayResultInfo goAwayResultInfo = ref.get();
Assert.assertNotNull(goAwayResultInfo);
Assert.assertEquals(stream1.getId(), goAwayResultInfo.getLastStreamId());
Assert.assertSame(SessionStatus.OK, goAwayResultInfo.getSessionStatus());
}
@Test
@ -139,7 +139,7 @@ public class GoAwayTest extends AbstractTest
SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
session.syn(new SynInfo(new Fields(), true), null, new FuturePromise<Stream>());
}
@ -184,12 +184,12 @@ public class GoAwayTest extends AbstractTest
}
}
};
final AtomicReference<GoAwayReceivedInfo> goAwayRef = new AtomicReference<>();
final AtomicReference<GoAwayResultInfo> goAwayRef = new AtomicReference<>();
final CountDownLatch goAwayLatch = new CountDownLatch(1);
SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
goAwayRef.set(goAwayInfo);
goAwayLatch.countDown();
@ -228,7 +228,7 @@ public class GoAwayTest extends AbstractTest
// The last good stream is the second, because it was received by the server
Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
GoAwayReceivedInfo goAway = goAwayRef.get();
GoAwayResultInfo goAway = goAwayRef.get();
Assert.assertNotNull(goAway);
Assert.assertEquals(stream2.getId(), goAway.getLastStreamId());
}

View File

@ -23,7 +23,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
@ -63,7 +63,7 @@ public class IdleTimeoutTest extends AbstractTest
Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
latch.countDown();
}
@ -85,7 +85,7 @@ public class IdleTimeoutTest extends AbstractTest
Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
latch.countDown();
}
@ -125,7 +125,7 @@ public class IdleTimeoutTest extends AbstractTest
Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
goAwayLatch.countDown();
}
@ -161,7 +161,7 @@ public class IdleTimeoutTest extends AbstractTest
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
latch.countDown();
}
@ -187,7 +187,7 @@ public class IdleTimeoutTest extends AbstractTest
InetSocketAddress address = startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
latch.countDown();
}
@ -220,7 +220,7 @@ public class IdleTimeoutTest extends AbstractTest
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
latch.countDown();
}

View File

@ -19,12 +19,6 @@
package org.eclipse.jetty.spdy.server;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -44,7 +38,7 @@ import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
@ -75,6 +69,12 @@ import org.eclipse.jetty.util.log.Logger;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public class PushStreamTest extends AbstractTest
{
private static final Logger LOG = Log.getLogger(PushStreamTest.class);
@ -94,10 +94,12 @@ public class PushStreamTest extends AbstractTest
stream.push(new PushInfo(new Fields(), true), new Promise.Adapter<Stream>());
return null;
}
}), new SessionFrameListener.Adapter()
}), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
assertThat("streamId is even", stream.getId() % 2, is(0));
assertThat("stream is unidirectional", stream.isUnidirectional(), is(true));
@ -117,8 +119,6 @@ public class PushStreamTest extends AbstractTest
return null;
}
});
Stream stream = clientSession.syn(new SynInfo(new Fields(), true), null);
assertThat("onSyn has been called", pushStreamLatch.await(5, TimeUnit.SECONDS), is(true));
Stream pushStream = pushStreamRef.get();
assertThat("main stream and associated stream are the same", stream, sameInstance(pushStream.getAssociatedStream()));
@ -177,10 +177,12 @@ public class PushStreamTest extends AbstractTest
}
}
}), new SessionFrameListener.Adapter()
}), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
pushStreamSynLatch.countDown();
return new StreamFrameListener.Adapter()
@ -193,10 +195,7 @@ public class PushStreamTest extends AbstractTest
}
};
}
});
Stream stream = clientSession.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -298,10 +297,12 @@ public class PushStreamTest extends AbstractTest
throw new IllegalStateException(e);
}
}
}), new SessionFrameListener.Adapter()
}), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return new StreamFrameListener.Adapter()
{
@ -327,10 +328,7 @@ public class PushStreamTest extends AbstractTest
}
};
}
});
Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -427,7 +425,7 @@ public class PushStreamTest extends AbstractTest
}
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
goAwayReceivedLatch.countDown();
}
@ -543,20 +541,14 @@ public class PushStreamTest extends AbstractTest
stream.push(new PushInfo(new Fields(), false), new Promise.Adapter<Stream>());
return null;
}
}), new SessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
assertStreamIdIsEven(stream);
pushStreamIdIsEvenLatch.countDown();
return super.onSyn(stream, synInfo);
}
});
}), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), false), null);
Stream stream2 = clientSession.syn(new SynInfo(new Fields(), false), null);
Stream stream3 = clientSession.syn(new SynInfo(new Fields(), false), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), false),
new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch));
Stream stream2 = clientSession.syn(new SynInfo(new Fields(), false),
new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch));
Stream stream3 = clientSession.syn(new SynInfo(new Fields(), false),
new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch));
assertStreamIdIsOdd(stream);
assertStreamIdIsOdd(stream2);
assertStreamIdIsOdd(stream3);
@ -564,6 +556,24 @@ public class PushStreamTest extends AbstractTest
assertThat("all pushStreams had even ids", pushStreamIdIsEvenLatch.await(5, TimeUnit.SECONDS), is(true));
}
private class VerifyPushStreamIdIsEvenStreamFrameListener extends StreamFrameListener.Adapter
{
final CountDownLatch pushStreamIdIsEvenLatch;
private VerifyPushStreamIdIsEvenStreamFrameListener(CountDownLatch pushStreamIdIsEvenLatch)
{
this.pushStreamIdIsEvenLatch = pushStreamIdIsEvenLatch;
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
assertStreamIdIsEven(stream);
pushStreamIdIsEvenLatch.countDown();
return super.onPush(stream, pushInfo);
}
}
private void assertStreamIdIsEven(Stream stream)
{
assertThat("streamId is odd", stream.getId() % 2, is(0));

View File

@ -23,7 +23,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.junit.Assert;
@ -38,7 +38,7 @@ public class SPDYClientFactoryTest extends AbstractTest
startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo)
{
latch.countDown();
}

View File

@ -23,7 +23,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.junit.Assert;
@ -38,7 +38,7 @@ public class SPDYServerConnectorTest extends AbstractTest
startClient(startServer(null), new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo)
public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo)
{
latch.countDown();
}