401183 Handle push streams in new method StreamFrameListener.onPush() instead of SessionFrameListener.syn()

This commit is contained in:
Thomas Becker 2013-02-14 11:40:25 +01:00
parent 6d0f233c80
commit 88e32cb021
21 changed files with 463 additions and 290 deletions

View File

@ -59,6 +59,9 @@ public interface IStream extends Stream, Callback
*/ */
public void setStreamFrameListener(StreamFrameListener listener); public void setStreamFrameListener(StreamFrameListener listener);
//TODO: javadoc thomas
public StreamFrameListener getStreamFrameListener();
/** /**
* <p>A stream can be open, {@link #isHalfClosed() half closed} or * <p>A stream can be open, {@link #isHalfClosed() half closed} or
* {@link #isClosed() closed} and this method updates the close state * {@link #isClosed() closed} and this method updates the close state

View File

@ -45,9 +45,10 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PingInfo; import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.PingResultInfo; import org.eclipse.jetty.spdy.api.PingResultInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDYException; import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.Session;
@ -498,8 +499,17 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
stream.process(frame); stream.process(frame);
// Update the last stream id before calling the application (which may send a GO_AWAY) // Update the last stream id before calling the application (which may send a GO_AWAY)
updateLastStreamId(stream); updateLastStreamId(stream);
SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority()); StreamFrameListener streamListener;
StreamFrameListener streamListener = notifyOnSyn(listener, stream, synInfo); if (stream.isUnidirectional())
{
PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose());
streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo);
}
else
{
SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority());
streamListener = notifyOnSyn(listener, stream, synInfo);
}
stream.setStreamFrameListener(streamListener); stream.setStreamFrameListener(streamListener);
flush(); flush();
// The onSyn() listener may have sent a frame that closed the stream // The onSyn() listener may have sent a frame that closed the stream
@ -680,9 +690,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
{ {
if (goAwayReceived.compareAndSet(false, true)) if (goAwayReceived.compareAndSet(false, true))
{ {
//TODO: Find a better name for GoAwayReceivedInfo //TODO: Find a better name for GoAwayResultInfo
GoAwayReceivedInfo goAwayReceivedInfo = new GoAwayReceivedInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode())); GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
notifyOnGoAway(listener, goAwayReceivedInfo); notifyOnGoAway(listener, goAwayResultInfo);
flush(); flush();
// SPDY does not require to send back a response to a GO_AWAY. // SPDY does not require to send back a response to a GO_AWAY.
// We notified the application of the last good stream id and // We notified the application of the last good stream id and
@ -755,6 +765,27 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
} }
} }
private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo)
{
try
{
if (listener == null)
return null;
LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener);
return listener.onPush(stream, pushInfo);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
return null;
}
catch (Error x)
{
LOG.info("Exception while notifying listener " + listener, x);
throw x;
}
}
private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo) private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
{ {
try try
@ -839,14 +870,14 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
} }
} }
private void notifyOnGoAway(SessionFrameListener listener, GoAwayReceivedInfo goAwayReceivedInfo) private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo)
{ {
try try
{ {
if (listener != null) if (listener != null)
{ {
LOG.debug("Invoking callback with {} on listener {}", goAwayReceivedInfo, listener); LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener);
listener.onGoAway(this, goAwayReceivedInfo); listener.onGoAway(this, goAwayResultInfo);
} }
} }
catch (Exception x) catch (Exception x)

View File

@ -148,6 +148,7 @@ public class StandardStream implements IStream
this.listener = listener; this.listener = listener;
} }
@Override
public StreamFrameListener getStreamFrameListener() public StreamFrameListener getStreamFrameListener()
{ {
return listener; return listener;

View File

@ -22,18 +22,18 @@ package org.eclipse.jetty.spdy.api;
* <p>A container for GOAWAY frames metadata: the last good stream id and * <p>A container for GOAWAY frames metadata: the last good stream id and
* the session status.</p> * the session status.</p>
*/ */
public class GoAwayReceivedInfo public class GoAwayResultInfo
{ {
private final int lastStreamId; private final int lastStreamId;
private final SessionStatus sessionStatus; private final SessionStatus sessionStatus;
/** /**
* <p>Creates a new {@link GoAwayReceivedInfo} with the given last good stream id and session status</p> * <p>Creates a new {@link GoAwayResultInfo} with the given last good stream id and session status</p>
* *
* @param lastStreamId the last good stream id * @param lastStreamId the last good stream id
* @param sessionStatus the session status * @param sessionStatus the session status
*/ */
public GoAwayReceivedInfo(int lastStreamId, SessionStatus sessionStatus) public GoAwayResultInfo(int lastStreamId, SessionStatus sessionStatus)
{ {
this.lastStreamId = lastStreamId; this.lastStreamId = lastStreamId;
this.sessionStatus = sessionStatus; this.sessionStatus = sessionStatus;

View File

@ -36,7 +36,7 @@ public interface SessionFrameListener extends EventListener
* <p>Application code should implement this method and reply to the stream creation, eventually * <p>Application code should implement this method and reply to the stream creation, eventually
* sending data:</p> * sending data:</p>
* <pre> * <pre>
* public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo) * public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
* { * {
* // Do something with the metadata contained in synInfo * // Do something with the metadata contained in synInfo
* *
@ -52,7 +52,7 @@ public interface SessionFrameListener extends EventListener
* </pre> * </pre>
* <p>Alternatively, if the stream creation requires reading data sent from the other peer:</p> * <p>Alternatively, if the stream creation requires reading data sent from the other peer:</p>
* <pre> * <pre>
* public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo) * public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
* { * {
* // Do something with the metadata contained in synInfo * // Do something with the metadata contained in synInfo
* *
@ -106,9 +106,9 @@ public interface SessionFrameListener extends EventListener
* <p>Callback invoked when the other peer signals that it is closing the connection.</p> * <p>Callback invoked when the other peer signals that it is closing the connection.</p>
* *
* @param session the session * @param session the session
* @param goAwayReceivedInfo the metadata sent * @param goAwayResultInfo the metadata sent
*/ */
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo); public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo);
/** /**
* <p>Callback invoked when an exception is thrown during the processing of an event on a * <p>Callback invoked when an exception is thrown during the processing of an event on a
@ -119,6 +119,7 @@ public interface SessionFrameListener extends EventListener
*/ */
public void onException(Throwable x); public void onException(Throwable x);
/** /**
* <p>Empty implementation of {@link SessionFrameListener}</p> * <p>Empty implementation of {@link SessionFrameListener}</p>
*/ */
@ -148,7 +149,7 @@ public interface SessionFrameListener extends EventListener
} }
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo)
{ {
} }

View File

@ -50,6 +50,15 @@ public interface StreamFrameListener extends EventListener
*/ */
public void onHeaders(Stream stream, HeadersInfo headersInfo); public void onHeaders(Stream stream, HeadersInfo headersInfo);
/**
* <p>Callback invoked when a push syn has been received on a stream.</p>
*
* @param stream the push stream just created
* @param pushInfo
* @return a listener for stream events or null if there is no interest in being notified of stream events
*/
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo);
/** /**
* <p>Callback invoked when data bytes are received on a stream.</p> * <p>Callback invoked when data bytes are received on a stream.</p>
* <p>Implementers should be read or consume the content of the * <p>Implementers should be read or consume the content of the
@ -75,6 +84,12 @@ public interface StreamFrameListener extends EventListener
{ {
} }
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return null;
}
@Override @Override
public void onData(Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {

View File

@ -60,14 +60,24 @@ public class ClientUsageTest
} }
@Test @Test
public void testClientRequestWithBodyResponseNoBody() throws Exception public void testClientReceivesPush1() throws InterruptedException, ExecutionException, TimeoutException
{ {
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null); Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null);
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), false, (byte)0), session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
new StreamFrameListener.Adapter() {
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return new Adapter()
{ {
@Override @Override
public void onData(Stream stream, DataInfo dataInfo)
{
}
};
};
@Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
{ {
// Do something with the response // Do something with the response
@ -83,6 +93,71 @@ public class ClientUsageTest
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
} }
});
}
@Test
public void testClientReceivesPush2() throws InterruptedException, ExecutionException, TimeoutException
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, new SessionFrameListener.Adapter()
{
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
}
};
}
}, null, null);
session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
replyInfo.getHeaders().get("host");
// Then issue another similar request
try
{
stream.getSession().syn(new SynInfo(new Fields(), true), this);
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
throw new IllegalStateException(e);
}
}
});
}
@Test
public void testClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null);
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), false, (byte)0),
new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
replyInfo.getHeaders().get("host");
// Then issue another similar request
try
{
stream.getSession().syn(new SynInfo(new Fields(), true), this);
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
throw new IllegalStateException(e);
}
}
}); });
// Send-and-forget the data // Send-and-forget the data
stream.data(new StringDataInfo("data", true)); stream.data(new StringDataInfo("data", true));
@ -96,38 +171,39 @@ public class ClientUsageTest
final String context = "context"; final String context = "context";
session.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter() session.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter()
{ {
@Override @Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
{ {
// Do something with the response // Do something with the response
replyInfo.getHeaders().get("host"); replyInfo.getHeaders().get("host");
// Then issue another similar request // Then issue another similar request
try try
{ {
stream.getSession().syn(new SynInfo(new Fields(), true), this); stream.getSession().syn(new SynInfo(new Fields(), true), this);
} }
catch (ExecutionException | InterruptedException | TimeoutException e) catch (ExecutionException | InterruptedException | TimeoutException e)
{ {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
} }
}, new Promise.Adapter<Stream>() }, new Promise.Adapter<Stream>()
{ {
@Override @Override
public void succeeded(Stream stream) public void succeeded(Stream stream)
{ {
// Differently from JDK 7 AIO, there is no need to // Differently from JDK 7 AIO, there is no need to
// have an explicit parameter for the context since // have an explicit parameter for the context since
// that is captured while the handler is created anyway, // that is captured while the handler is created anyway,
// and it is used only by the handler as parameter // and it is used only by the handler as parameter
// The style below is fire-and-forget, since // The style below is fire-and-forget, since
// we do not pass the handler nor we call get() // we do not pass the handler nor we call get()
// to wait for the data to be sent // to wait for the data to be sent
stream.data(new StringDataInfo(context, true), new Callback.Adapter()); stream.data(new StringDataInfo(context, true), new Callback.Adapter());
} }
}); }
);
} }
@Test @Test
@ -136,48 +212,49 @@ public class ClientUsageTest
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null); Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null);
session.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter() session.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter()
{
// The good of passing the listener to push() is that applications can safely
// accumulate info from the reply headers to be used in the data callback,
// e.g. content-type, charset, etc.
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
Fields headers = replyInfo.getHeaders();
int contentLength = headers.get("content-length").valueAsInt();
stream.setAttribute("content-length", contentLength);
if (!replyInfo.isClose())
stream.setAttribute("builder", new StringBuilder());
// May issue another similar request while waiting for data
try
{ {
stream.getSession().syn(new SynInfo(new Fields(), true), this); // The good of passing the listener to push() is that applications can safely
} // accumulate info from the reply headers to be used in the data callback,
catch (ExecutionException | InterruptedException | TimeoutException e) // e.g. content-type, charset, etc.
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
Fields headers = replyInfo.getHeaders();
int contentLength = headers.get("content-length").valueAsInt();
stream.setAttribute("content-length", contentLength);
if (!replyInfo.isClose())
stream.setAttribute("builder", new StringBuilder());
// May issue another similar request while waiting for data
try
{
stream.getSession().syn(new SynInfo(new Fields(), true), this);
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
throw new IllegalStateException(e);
}
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
StringBuilder builder = (StringBuilder)stream.getAttribute("builder");
builder.append(dataInfo.asString("UTF-8", true));
}
}, new Promise.Adapter<Stream>()
{ {
throw new IllegalStateException(e); @Override
public void succeeded(Stream stream)
{
stream.data(new BytesDataInfo("wee".getBytes(Charset.forName("UTF-8")), false), new Callback.Adapter());
stream.data(new StringDataInfo("foo", false), new Callback.Adapter());
stream.data(new ByteBufferDataInfo(Charset.forName("UTF-8").encode("bar"), true), new Callback.Adapter());
}
} }
} );
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
StringBuilder builder = (StringBuilder)stream.getAttribute("builder");
builder.append(dataInfo.asString("UTF-8", true));
}
}, new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream stream)
{
stream.data(new BytesDataInfo("wee".getBytes(Charset.forName("UTF-8")), false), new Callback.Adapter());
stream.data(new StringDataInfo("foo", false), new Callback.Adapter());
stream.data(new ByteBufferDataInfo(Charset.forName("UTF-8").encode("bar"), true), new Callback.Adapter());
}
});
} }
} }

View File

@ -24,6 +24,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener; import org.eclipse.jetty.spdy.api.StreamFrameListener;
@ -131,6 +132,12 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory
channel.requestHeaders(headersInfo.getHeaders(), headersInfo.isClose()); channel.requestHeaders(headersInfo.getHeaders(), headersInfo.isClose());
} }
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return null;
}
@Override @Override
public void onData(Stream stream, final DataInfo dataInfo) public void onData(Stream stream, final DataInfo dataInfo)
{ {

View File

@ -23,7 +23,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PingResultInfo; import org.eclipse.jetty.spdy.api.PingResultInfo;
import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.Session;
@ -104,7 +104,7 @@ public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter
} }
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo)
{ {
// TODO: // TODO:
} }

View File

@ -42,7 +42,7 @@ import org.eclipse.jetty.spdy.StandardStream;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
@ -136,7 +136,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
{ {
assert content == null; assert content == null;
if (headers.isEmpty()) if (headers.isEmpty())
proxyEngineSelector.onGoAway(session, new GoAwayReceivedInfo(0, SessionStatus.OK)); proxyEngineSelector.onGoAway(session, new GoAwayResultInfo(0, SessionStatus.OK));
else else
syn(true); syn(true);
} }

View File

@ -29,8 +29,9 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.Info;
import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.RstInfo;
@ -50,8 +51,8 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
/** /**
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by * <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by clients into
* clients into SPDY events for the servers.</p> * SPDY events for the servers.</p>
*/ */
public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
{ {
@ -131,6 +132,12 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
} }
} }
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
throw new IllegalStateException("We shouldn't receive pushes from clients");
}
@Override @Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
{ {
@ -222,6 +229,61 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
this.clientStream = clientStream; this.clientStream = clientStream;
} }
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
LOG.debug("S -> P pushed {} on {}", pushInfo, stream);
Fields headers = new Fields(pushInfo.getHeaders(), false);
addResponseProxyHeaders(stream, headers);
customizeResponseHeaders(stream, headers);
Stream clientStream = (Stream)stream.getAssociatedStream().getAttribute
(CLIENT_STREAM_ATTRIBUTE);
convert(stream.getSession().getVersion(), clientStream.getSession().getVersion(),
headers);
StreamHandler handler = new StreamHandler(clientStream, pushInfo);
stream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers,
pushInfo.isClose()),
handler);
return new Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Push streams never send a reply
throw new UnsupportedOperationException();
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
throw new UnsupportedOperationException();
}
@Override
public void onData(Stream serverStream, final DataInfo serverDataInfo)
{
LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
serverDataInfo.consume(delta);
}
};
StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
handler.data(clientDataInfo);
}
};
}
@Override @Override
public void onReply(final Stream stream, ReplyInfo replyInfo) public void onReply(final Stream stream, ReplyInfo replyInfo)
{ {
@ -304,30 +366,30 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
} }
/** /**
* <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p> * <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p> <p>Instances
* <p>Instances of this class buffer DATA frames sent by clients and send them to the server. * of this class buffer DATA frames sent by clients and send them to the server. The buffering happens between the
* The buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive * send of the SYN_STREAM to the server (where DATA frames may arrive from the client before the SYN_STREAM has been
* from the client before the SYN_STREAM has been fully sent), and between DATA frames, if the client * fully sent), and between DATA frames, if the client is a fast producer and the server a slow consumer, or if the
* is a fast producer and the server a slow consumer, or if the client is a SPDY v2 client (and hence * client is a SPDY v2 client (and hence without flow control) while the server is a SPDY v3 server (and hence with
* without flow control) while the server is a SPDY v3 server (and hence with flow control).</p> * flow control).</p>
*/ */
private class StreamHandler implements Promise<Stream> private class StreamHandler implements Promise<Stream>
{ {
private final Queue<DataInfoHandler> queue = new LinkedList<>(); private final Queue<DataInfoHandler> queue = new LinkedList<>();
private final Stream clientStream; private final Stream clientStream;
private final SynInfo serverSynInfo; private final Info info;
private Stream serverStream; private Stream serverStream;
private StreamHandler(Stream clientStream, SynInfo serverSynInfo) private StreamHandler(Stream clientStream, Info info)
{ {
this.clientStream = clientStream; this.clientStream = clientStream;
this.serverSynInfo = serverSynInfo; this.info = info;
} }
@Override @Override
public void succeeded(Stream serverStream) public void succeeded(Stream serverStream)
{ {
LOG.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream); LOG.debug("P -> S {} from {} to {}", info, clientStream, serverStream);
serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream); serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
@ -449,26 +511,8 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
} }
} }
private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener private class ProxySessionFrameListener extends SessionFrameListener.Adapter
{ {
@Override
public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo)
{
LOG.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
Fields headers = new Fields(serverSynInfo.getHeaders(), false);
addResponseProxyHeaders(serverStream, headers);
customizeResponseHeaders(serverStream, headers);
Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE);
convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers);
StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, serverSynInfo.isClose()),
handler);
return this;
}
@Override @Override
public void onRst(Session serverSession, RstInfo serverRstInfo) public void onRst(Session serverSession, RstInfo serverRstInfo)
@ -487,41 +531,9 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
} }
@Override @Override
public void onGoAway(Session serverSession, GoAwayReceivedInfo goAwayReceivedInfo) public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo)
{ {
serverSessions.values().remove(serverSession); serverSessions.values().remove(serverSession);
} }
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Push streams never send a reply
throw new UnsupportedOperationException();
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
throw new UnsupportedOperationException();
}
@Override
public void onData(Stream serverStream, final DataInfo serverDataInfo)
{
LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
serverDataInfo.consume(delta);
}
};
StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
handler.data(clientDataInfo);
}
} }
} }

View File

@ -18,16 +18,12 @@
package org.eclipse.jetty.spdy.server.http; package org.eclipse.jetty.spdy.server.http;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -37,6 +33,7 @@ import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.SPDY;
@ -50,10 +47,15 @@ import org.eclipse.jetty.spdy.server.NPNServerConnectionFactory;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields; import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
{ {
private final int referrerPushPeriod = 1000; private final int referrerPushPeriod = 1000;
@ -107,39 +109,14 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
sendMainRequestAndCSSRequest(); sendMainRequestAndCSSRequest();
final CountDownLatch pushDataLatch = new CountDownLatch(1); final CountDownLatch pushDataLatch = new CountDownLatch(1);
final CountDownLatch pushSynHeadersValid = new CountDownLatch(1); final CountDownLatch pushSynHeadersValid = new CountDownLatch(1);
Session session = startClient(version, serverAddress, new SessionFrameListener.Adapter() Session session = startClient(version, serverAddress, null);
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
validateHeaders(synInfo.getHeaders(), pushSynHeadersValid);
assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true));
assertThat("URI header ends with css", synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))
.value().endsWith
("" +
".css"),
is(true));
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter());
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
pushDataLatch.countDown();
}
};
}
});
// Send main request. That should initiate the push push's which get reset by the client // Send main request. That should initiate the push push's which get reset by the client
sendRequest(session, mainRequestHeaders); sendRequest(session, mainRequestHeaders, pushSynHeadersValid, pushDataLatch);
assertThat("No push data is received", pushDataLatch.await(1, TimeUnit.SECONDS), is(false)); assertThat("No push data is received", pushDataLatch.await(1, TimeUnit.SECONDS), is(false));
assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true)); assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true));
sendRequest(session, associatedCSSRequestHeaders); sendRequest(session, associatedCSSRequestHeaders, pushSynHeadersValid, pushDataLatch);
} }
@Test @Test
@ -157,7 +134,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
// Sleep for pushPeriod This should prevent application.js from being mapped as pushResource // Sleep for pushPeriod This should prevent application.js from being mapped as pushResource
Thread.sleep(referrerPushPeriod + 1); Thread.sleep(referrerPushPeriod + 1);
sendRequest(session, associatedJSRequestHeaders); sendRequest(session, associatedJSRequestHeaders, null, null);
run2ndClientRequests(false, true); run2ndClientRequests(false, true);
} }
@ -171,7 +148,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
Session session = sendMainRequestAndCSSRequest(); Session session = sendMainRequestAndCSSRequest();
sendRequest(session, associatedJSRequestHeaders); sendRequest(session, associatedJSRequestHeaders, null, null);
run2ndClientRequests(false, true); run2ndClientRequests(false, true);
} }
@ -200,18 +177,43 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
{ {
Session session = startClient(version, serverAddress, null); Session session = startClient(version, serverAddress, null);
sendRequest(session, mainRequestHeaders); sendRequest(session, mainRequestHeaders, null, null);
sendRequest(session, associatedCSSRequestHeaders); sendRequest(session, associatedCSSRequestHeaders, null, null);
return session; return session;
} }
private void sendRequest(Session session, Fields requestHeaders) throws InterruptedException private void sendRequest(Session session, Fields requestHeaders, final CountDownLatch pushSynHeadersValid,
final CountDownLatch pushDataLatch) throws InterruptedException
{ {
final CountDownLatch dataReceivedLatch = new CountDownLatch(1); final CountDownLatch dataReceivedLatch = new CountDownLatch(1);
final CountDownLatch received200OKLatch = new CountDownLatch(1); final CountDownLatch received200OKLatch = new CountDownLatch(1);
session.syn(new SynInfo(requestHeaders, true), new StreamFrameListener.Adapter() session.syn(new SynInfo(requestHeaders, true), new StreamFrameListener.Adapter()
{ {
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid);
assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true));
assertThat("URI header ends with css", pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))
.value().endsWith
("" +
".css"),
is(true));
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter());
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
pushDataLatch.countDown();
}
};
}
@Override @Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
{ {
@ -238,16 +240,17 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(1); final CountDownLatch pushDataLatch = new CountDownLatch(1);
final CountDownLatch pushSynHeadersValid = new CountDownLatch(1); final CountDownLatch pushSynHeadersValid = new CountDownLatch(1);
Session session2 = startClient(version, serverAddress, new SessionFrameListener.Adapter() Session session2 = startClient(version, serverAddress, null);
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{ {
@Override @Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{ {
if (validateHeaders) if (validateHeaders)
validateHeaders(synInfo.getHeaders(), pushSynHeadersValid); validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid);
assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true)); assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true));
assertThat("URI header ends with css", synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)) assertThat("URI header ends with css", pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))
.value().endsWith .value().endsWith
("" + ("" +
".css"), ".css"),
@ -264,9 +267,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
} }
}; };
} }
});
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override @Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
{ {
@ -292,6 +293,8 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true)); assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true));
} }
private static final Logger LOG = Log.getLogger(ReferrerPushStrategyTest.class);
@Test @Test
public void testAssociatedResourceIsPushed() throws Exception public void testAssociatedResourceIsPushed() throws Exception
{ {
@ -326,16 +329,17 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
}); });
Assert.assertTrue(mainResourceLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(mainResourceLatch.await(5, TimeUnit.SECONDS));
sendRequest(session1, createHeaders(cssResource)); sendRequest(session1, createHeaders(cssResource), null, null);
// Create another client, and perform the same request for the main resource, we expect the css being pushed // Create another client, and perform the same request for the main resource, we expect the css being pushed
final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(1); final CountDownLatch pushDataLatch = new CountDownLatch(1);
Session session2 = startClient(version, address, new SessionFrameListener.Adapter() Session session2 = startClient(version, address, null);
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{ {
@Override @Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{ {
Assert.assertTrue(stream.isUnidirectional()); Assert.assertTrue(stream.isUnidirectional());
return new StreamFrameListener.Adapter() return new StreamFrameListener.Adapter()
@ -349,9 +353,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
} }
}; };
} }
});
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override @Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
{ {
@ -452,13 +454,15 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(1); final CountDownLatch pushDataLatch = new CountDownLatch(1);
Session session2 = startClient(version, address, new SessionFrameListener.Adapter() Session session2 = startClient(version, address, null);
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{ {
@Override @Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{ {
Assert.assertTrue(stream.isUnidirectional()); Assert.assertTrue(stream.isUnidirectional());
Assert.assertTrue(synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value().endsWith(".css")); Assert.assertTrue(pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value().endsWith("" +
".css"));
return new StreamFrameListener.Adapter() return new StreamFrameListener.Adapter()
{ {
@Override @Override
@ -470,9 +474,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
} }
}; };
} }
});
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override @Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
{ {
@ -563,14 +565,31 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(2); final CountDownLatch pushDataLatch = new CountDownLatch(2);
Session session2 = startClient(version, address, new SessionFrameListener.Adapter() Session session2 = startClient(version, address, null);
LOG.warn("REQUEST FOR PUSHED RESOURCES");
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{ {
@Override @Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{ {
Assert.assertTrue(stream.isUnidirectional()); Assert.assertTrue(stream.isUnidirectional());
return new StreamFrameListener.Adapter() return new StreamFrameListener.Adapter()
{ {
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
return new Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
pushDataLatch.countDown();
}
};
}
@Override @Override
public void onData(Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {
@ -580,9 +599,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
} }
}; };
} }
});
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override @Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
{ {

View File

@ -33,7 +33,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.RstInfo;
@ -164,7 +164,7 @@ public class ProxyHTTPToSPDYTest
} }
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
closeLatch.countDown(); closeLatch.countDown();
} }

View File

@ -37,7 +37,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PingInfo; import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.PingResultInfo; import org.eclipse.jetty.spdy.api.PingResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
@ -429,7 +429,7 @@ public class ProxySPDYToHTTPTest
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter() Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
{ {
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayReceivedInfo)
{ {
goAwayLatch.countDown(); goAwayLatch.countDown();
} }

View File

@ -281,10 +281,16 @@ public class ProxySPDYToSPDYTest
final CountDownLatch pushSynLatch = new CountDownLatch(1); final CountDownLatch pushSynLatch = new CountDownLatch(1);
final CountDownLatch pushDataLatch = new CountDownLatch(1); final CountDownLatch pushDataLatch = new CountDownLatch(1);
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter() Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
Fields headers = new Fields();
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
final CountDownLatch replyLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{ {
@Override @Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{ {
pushSynLatch.countDown(); pushSynLatch.countDown();
return new StreamFrameListener.Adapter() return new StreamFrameListener.Adapter()
@ -298,14 +304,7 @@ public class ProxySPDYToSPDYTest
} }
}; };
} }
}).get(5, TimeUnit.SECONDS);
Fields headers = new Fields();
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
final CountDownLatch replyLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
@Override @Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
{ {

View File

@ -35,7 +35,7 @@ import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.Session;
@ -211,7 +211,7 @@ public class ClosedStreamTest extends AbstractTest
}; };
} }
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
goAwayReceivedLatch.countDown(); goAwayReceivedLatch.countDown();
} }

View File

@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener; import org.eclipse.jetty.spdy.api.SessionFrameListener;
@ -61,7 +61,7 @@ public class GoAwayTest extends AbstractTest
} }
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
Assert.assertEquals(0, goAwayInfo.getLastStreamId()); Assert.assertEquals(0, goAwayInfo.getLastStreamId());
Assert.assertSame(SessionStatus.OK, goAwayInfo.getSessionStatus()); Assert.assertSame(SessionStatus.OK, goAwayInfo.getSessionStatus());
@ -90,12 +90,12 @@ public class GoAwayTest extends AbstractTest
return null; return null;
} }
}; };
final AtomicReference<GoAwayReceivedInfo> ref = new AtomicReference<>(); final AtomicReference<GoAwayResultInfo> ref = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter() SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter()
{ {
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
ref.set(goAwayInfo); ref.set(goAwayInfo);
latch.countDown(); latch.countDown();
@ -106,10 +106,10 @@ public class GoAwayTest extends AbstractTest
Stream stream1 = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), true, (byte)0), null); Stream stream1 = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), true, (byte)0), null);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
GoAwayReceivedInfo goAwayReceivedInfo = ref.get(); GoAwayResultInfo goAwayResultInfo = ref.get();
Assert.assertNotNull(goAwayReceivedInfo); Assert.assertNotNull(goAwayResultInfo);
Assert.assertEquals(stream1.getId(), goAwayReceivedInfo.getLastStreamId()); Assert.assertEquals(stream1.getId(), goAwayResultInfo.getLastStreamId());
Assert.assertSame(SessionStatus.OK, goAwayReceivedInfo.getSessionStatus()); Assert.assertSame(SessionStatus.OK, goAwayResultInfo.getSessionStatus());
} }
@Test @Test
@ -139,7 +139,7 @@ public class GoAwayTest extends AbstractTest
SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter() SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter()
{ {
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
session.syn(new SynInfo(new Fields(), true), null, new FuturePromise<Stream>()); session.syn(new SynInfo(new Fields(), true), null, new FuturePromise<Stream>());
} }
@ -184,12 +184,12 @@ public class GoAwayTest extends AbstractTest
} }
} }
}; };
final AtomicReference<GoAwayReceivedInfo> goAwayRef = new AtomicReference<>(); final AtomicReference<GoAwayResultInfo> goAwayRef = new AtomicReference<>();
final CountDownLatch goAwayLatch = new CountDownLatch(1); final CountDownLatch goAwayLatch = new CountDownLatch(1);
SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter() SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter()
{ {
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
goAwayRef.set(goAwayInfo); goAwayRef.set(goAwayInfo);
goAwayLatch.countDown(); goAwayLatch.countDown();
@ -228,7 +228,7 @@ public class GoAwayTest extends AbstractTest
// The last good stream is the second, because it was received by the server // The last good stream is the second, because it was received by the server
Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
GoAwayReceivedInfo goAway = goAwayRef.get(); GoAwayResultInfo goAway = goAwayRef.get();
Assert.assertNotNull(goAway); Assert.assertNotNull(goAway);
Assert.assertEquals(stream2.getId(), goAway.getLastStreamId()); Assert.assertEquals(stream2.getId(), goAway.getLastStreamId());
} }

View File

@ -23,7 +23,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.Session;
@ -63,7 +63,7 @@ public class IdleTimeoutTest extends AbstractTest
Session session = startClient(startServer(null), new SessionFrameListener.Adapter() Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
{ {
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
latch.countDown(); latch.countDown();
} }
@ -85,7 +85,7 @@ public class IdleTimeoutTest extends AbstractTest
Session session = startClient(startServer(null), new SessionFrameListener.Adapter() Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
{ {
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
latch.countDown(); latch.countDown();
} }
@ -125,7 +125,7 @@ public class IdleTimeoutTest extends AbstractTest
Session session = startClient(startServer(null), new SessionFrameListener.Adapter() Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
{ {
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
goAwayLatch.countDown(); goAwayLatch.countDown();
} }
@ -161,7 +161,7 @@ public class IdleTimeoutTest extends AbstractTest
} }
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
latch.countDown(); latch.countDown();
} }
@ -187,7 +187,7 @@ public class IdleTimeoutTest extends AbstractTest
InetSocketAddress address = startServer(new ServerSessionFrameListener.Adapter() InetSocketAddress address = startServer(new ServerSessionFrameListener.Adapter()
{ {
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
latch.countDown(); latch.countDown();
} }
@ -220,7 +220,7 @@ public class IdleTimeoutTest extends AbstractTest
} }
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
latch.countDown(); latch.countDown();
} }

View File

@ -19,12 +19,6 @@
package org.eclipse.jetty.spdy.server; package org.eclipse.jetty.spdy.server;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -44,7 +38,7 @@ import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory; import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.RstInfo;
@ -75,6 +69,12 @@ import org.eclipse.jetty.util.log.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public class PushStreamTest extends AbstractTest public class PushStreamTest extends AbstractTest
{ {
private static final Logger LOG = Log.getLogger(PushStreamTest.class); private static final Logger LOG = Log.getLogger(PushStreamTest.class);
@ -94,10 +94,12 @@ public class PushStreamTest extends AbstractTest
stream.push(new PushInfo(new Fields(), true), new Promise.Adapter<Stream>()); stream.push(new PushInfo(new Fields(), true), new Promise.Adapter<Stream>());
return null; return null;
} }
}), new SessionFrameListener.Adapter() }), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{ {
@Override @Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{ {
assertThat("streamId is even", stream.getId() % 2, is(0)); assertThat("streamId is even", stream.getId() % 2, is(0));
assertThat("stream is unidirectional", stream.isUnidirectional(), is(true)); assertThat("stream is unidirectional", stream.isUnidirectional(), is(true));
@ -117,8 +119,6 @@ public class PushStreamTest extends AbstractTest
return null; return null;
} }
}); });
Stream stream = clientSession.syn(new SynInfo(new Fields(), true), null);
assertThat("onSyn has been called", pushStreamLatch.await(5, TimeUnit.SECONDS), is(true)); assertThat("onSyn has been called", pushStreamLatch.await(5, TimeUnit.SECONDS), is(true));
Stream pushStream = pushStreamRef.get(); Stream pushStream = pushStreamRef.get();
assertThat("main stream and associated stream are the same", stream, sameInstance(pushStream.getAssociatedStream())); assertThat("main stream and associated stream are the same", stream, sameInstance(pushStream.getAssociatedStream()));
@ -177,10 +177,12 @@ public class PushStreamTest extends AbstractTest
} }
} }
}), new SessionFrameListener.Adapter() }), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter()
{ {
@Override @Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{ {
pushStreamSynLatch.countDown(); pushStreamSynLatch.countDown();
return new StreamFrameListener.Adapter() return new StreamFrameListener.Adapter()
@ -193,10 +195,7 @@ public class PushStreamTest extends AbstractTest
} }
}; };
} }
});
Stream stream = clientSession.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter()
{
@Override @Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
{ {
@ -298,10 +297,12 @@ public class PushStreamTest extends AbstractTest
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
} }
}), new SessionFrameListener.Adapter() }), null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{ {
@Override @Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{ {
return new StreamFrameListener.Adapter() return new StreamFrameListener.Adapter()
{ {
@ -327,10 +328,7 @@ public class PushStreamTest extends AbstractTest
} }
}; };
} }
});
Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
@Override @Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
{ {
@ -427,7 +425,7 @@ public class PushStreamTest extends AbstractTest
} }
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{ {
goAwayReceivedLatch.countDown(); goAwayReceivedLatch.countDown();
} }
@ -543,20 +541,14 @@ public class PushStreamTest extends AbstractTest
stream.push(new PushInfo(new Fields(), false), new Promise.Adapter<Stream>()); stream.push(new PushInfo(new Fields(), false), new Promise.Adapter<Stream>());
return null; return null;
} }
}), new SessionFrameListener.Adapter() }), null);
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
assertStreamIdIsEven(stream);
pushStreamIdIsEvenLatch.countDown();
return super.onSyn(stream, synInfo);
}
});
Stream stream = clientSession.syn(new SynInfo(new Fields(), false), null); Stream stream = clientSession.syn(new SynInfo(new Fields(), false),
Stream stream2 = clientSession.syn(new SynInfo(new Fields(), false), null); new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch));
Stream stream3 = clientSession.syn(new SynInfo(new Fields(), false), null); Stream stream2 = clientSession.syn(new SynInfo(new Fields(), false),
new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch));
Stream stream3 = clientSession.syn(new SynInfo(new Fields(), false),
new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch));
assertStreamIdIsOdd(stream); assertStreamIdIsOdd(stream);
assertStreamIdIsOdd(stream2); assertStreamIdIsOdd(stream2);
assertStreamIdIsOdd(stream3); assertStreamIdIsOdd(stream3);
@ -564,6 +556,24 @@ public class PushStreamTest extends AbstractTest
assertThat("all pushStreams had even ids", pushStreamIdIsEvenLatch.await(5, TimeUnit.SECONDS), is(true)); assertThat("all pushStreams had even ids", pushStreamIdIsEvenLatch.await(5, TimeUnit.SECONDS), is(true));
} }
private class VerifyPushStreamIdIsEvenStreamFrameListener extends StreamFrameListener.Adapter
{
final CountDownLatch pushStreamIdIsEvenLatch;
private VerifyPushStreamIdIsEvenStreamFrameListener(CountDownLatch pushStreamIdIsEvenLatch)
{
this.pushStreamIdIsEvenLatch = pushStreamIdIsEvenLatch;
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
assertStreamIdIsEven(stream);
pushStreamIdIsEvenLatch.countDown();
return super.onPush(stream, pushInfo);
}
}
private void assertStreamIdIsEven(Stream stream) private void assertStreamIdIsEven(Stream stream)
{ {
assertThat("streamId is odd", stream.getId() % 2, is(0)); assertThat("streamId is odd", stream.getId() % 2, is(0));

View File

@ -23,7 +23,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.GoAwayInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.junit.Assert; import org.junit.Assert;
@ -38,7 +38,7 @@ public class SPDYClientFactoryTest extends AbstractTest
startClient(startServer(new ServerSessionFrameListener.Adapter() startClient(startServer(new ServerSessionFrameListener.Adapter()
{ {
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo)
{ {
latch.countDown(); latch.countDown();
} }

View File

@ -23,7 +23,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.GoAwayInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener; import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.junit.Assert; import org.junit.Assert;
@ -38,7 +38,7 @@ public class SPDYServerConnectorTest extends AbstractTest
startClient(startServer(null), new SessionFrameListener.Adapter() startClient(startServer(null), new SessionFrameListener.Adapter()
{ {
@Override @Override
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo)
{ {
latch.countDown(); latch.countDown();
} }