From 88e32cb0214d4ea05ba1333d1aac03d8e8f15635 Mon Sep 17 00:00:00 2001 From: Thomas Becker Date: Thu, 14 Feb 2013 11:40:25 +0100 Subject: [PATCH] 401183 Handle push streams in new method StreamFrameListener.onPush() instead of SessionFrameListener.syn() --- .../java/org/eclipse/jetty/spdy/IStream.java | 3 + .../eclipse/jetty/spdy/StandardSession.java | 49 +++- .../eclipse/jetty/spdy/StandardStream.java | 1 + ...eceivedInfo.java => GoAwayResultInfo.java} | 6 +- .../jetty/spdy/api/SessionFrameListener.java | 11 +- .../jetty/spdy/api/StreamFrameListener.java | 15 ++ .../jetty/spdy/api/ClientUsageTest.java | 217 ++++++++++++------ .../http/HTTPSPDYServerConnectionFactory.java | 7 + .../server/proxy/ProxyEngineSelector.java | 4 +- .../server/proxy/ProxyHTTPSPDYConnection.java | 4 +- .../spdy/server/proxy/SPDYProxyEngine.java | 142 ++++++------ .../server/http/ReferrerPushStrategyTest.java | 139 ++++++----- .../server/proxy/ProxyHTTPToSPDYTest.java | 4 +- .../server/proxy/ProxySPDYToHTTPTest.java | 4 +- .../server/proxy/ProxySPDYToSPDYTest.java | 17 +- .../jetty/spdy/server/ClosedStreamTest.java | 4 +- .../eclipse/jetty/spdy/server/GoAwayTest.java | 24 +- .../jetty/spdy/server/IdleTimeoutTest.java | 14 +- .../jetty/spdy/server/PushStreamTest.java | 80 ++++--- .../spdy/server/SPDYClientFactoryTest.java | 4 +- .../spdy/server/SPDYServerConnectorTest.java | 4 +- 21 files changed, 463 insertions(+), 290 deletions(-) rename jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/{GoAwayReceivedInfo.java => GoAwayResultInfo.java} (87%) diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java index 53f0671d9ca..fc6c4d96bfc 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java @@ -59,6 +59,9 @@ public interface IStream extends Stream, Callback */ public void setStreamFrameListener(StreamFrameListener listener); + //TODO: javadoc thomas + public StreamFrameListener getStreamFrameListener(); + /** *

A stream can be open, {@link #isHalfClosed() half closed} or * {@link #isClosed() closed} and this method updates the close state diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index 704c595e979..dcce655edeb 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -45,9 +45,10 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.PingInfo; import org.eclipse.jetty.spdy.api.PingResultInfo; +import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.SPDYException; import org.eclipse.jetty.spdy.api.Session; @@ -498,8 +499,17 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable stream.process(frame); // Update the last stream id before calling the application (which may send a GO_AWAY) updateLastStreamId(stream); - SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority()); - StreamFrameListener streamListener = notifyOnSyn(listener, stream, synInfo); + StreamFrameListener streamListener; + if (stream.isUnidirectional()) + { + PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose()); + streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo); + } + else + { + SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority()); + streamListener = notifyOnSyn(listener, stream, synInfo); + } stream.setStreamFrameListener(streamListener); flush(); // The onSyn() listener may have sent a frame that closed the stream @@ -680,9 +690,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable { if (goAwayReceived.compareAndSet(false, true)) { - //TODO: Find a better name for GoAwayReceivedInfo - GoAwayReceivedInfo goAwayReceivedInfo = new GoAwayReceivedInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode())); - notifyOnGoAway(listener, goAwayReceivedInfo); + //TODO: Find a better name for GoAwayResultInfo + GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode())); + notifyOnGoAway(listener, goAwayResultInfo); flush(); // SPDY does not require to send back a response to a GO_AWAY. // We notified the application of the last good stream id and @@ -755,6 +765,27 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable } } + private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo) + { + try + { + if (listener == null) + return null; + LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener); + return listener.onPush(stream, pushInfo); + } + catch (Exception x) + { + LOG.info("Exception while notifying listener " + listener, x); + return null; + } + catch (Error x) + { + LOG.info("Exception while notifying listener " + listener, x); + throw x; + } + } + private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo) { try @@ -839,14 +870,14 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable } } - private void notifyOnGoAway(SessionFrameListener listener, GoAwayReceivedInfo goAwayReceivedInfo) + private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo) { try { if (listener != null) { - LOG.debug("Invoking callback with {} on listener {}", goAwayReceivedInfo, listener); - listener.onGoAway(this, goAwayReceivedInfo); + LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener); + listener.onGoAway(this, goAwayResultInfo); } } catch (Exception x) diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java index 78ee9d51599..7e9afac7a0e 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java @@ -148,6 +148,7 @@ public class StandardStream implements IStream this.listener = listener; } + @Override public StreamFrameListener getStreamFrameListener() { return listener; diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/GoAwayReceivedInfo.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/GoAwayResultInfo.java similarity index 87% rename from jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/GoAwayReceivedInfo.java rename to jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/GoAwayResultInfo.java index ef50de19fcf..eb75f886785 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/GoAwayReceivedInfo.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/GoAwayResultInfo.java @@ -22,18 +22,18 @@ package org.eclipse.jetty.spdy.api; *

A container for GOAWAY frames metadata: the last good stream id and * the session status.

*/ -public class GoAwayReceivedInfo +public class GoAwayResultInfo { private final int lastStreamId; private final SessionStatus sessionStatus; /** - *

Creates a new {@link GoAwayReceivedInfo} with the given last good stream id and session status

+ *

Creates a new {@link GoAwayResultInfo} with the given last good stream id and session status

* * @param lastStreamId the last good stream id * @param sessionStatus the session status */ - public GoAwayReceivedInfo(int lastStreamId, SessionStatus sessionStatus) + public GoAwayResultInfo(int lastStreamId, SessionStatus sessionStatus) { this.lastStreamId = lastStreamId; this.sessionStatus = sessionStatus; diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SessionFrameListener.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SessionFrameListener.java index b950698d977..ffff51fe50d 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SessionFrameListener.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SessionFrameListener.java @@ -36,7 +36,7 @@ public interface SessionFrameListener extends EventListener *

Application code should implement this method and reply to the stream creation, eventually * sending data:

*
-     * public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
+     * public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
      * {
      *     // Do something with the metadata contained in synInfo
      *
@@ -52,7 +52,7 @@ public interface SessionFrameListener extends EventListener
      * 
*

Alternatively, if the stream creation requires reading data sent from the other peer:

*
-     * public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
+     * public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
      * {
      *     // Do something with the metadata contained in synInfo
      *
@@ -106,9 +106,9 @@ public interface SessionFrameListener extends EventListener
      * 

Callback invoked when the other peer signals that it is closing the connection.

* * @param session the session - * @param goAwayReceivedInfo the metadata sent + * @param goAwayResultInfo the metadata sent */ - public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo); + public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo); /** *

Callback invoked when an exception is thrown during the processing of an event on a @@ -119,6 +119,7 @@ public interface SessionFrameListener extends EventListener */ public void onException(Throwable x); + /** *

Empty implementation of {@link SessionFrameListener}

*/ @@ -148,7 +149,7 @@ public interface SessionFrameListener extends EventListener } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo) { } diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java index 6a91fc77f67..0a4248a1f97 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java @@ -50,6 +50,15 @@ public interface StreamFrameListener extends EventListener */ public void onHeaders(Stream stream, HeadersInfo headersInfo); + /** + *

Callback invoked when a push syn has been received on a stream.

+ * + * @param stream the push stream just created + * @param pushInfo + * @return a listener for stream events or null if there is no interest in being notified of stream events + */ + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo); + /** *

Callback invoked when data bytes are received on a stream.

*

Implementers should be read or consume the content of the @@ -75,6 +84,12 @@ public interface StreamFrameListener extends EventListener { } + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + return null; + } + @Override public void onData(Stream stream, DataInfo dataInfo) { diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java index ebb98d63666..455b58e0fd7 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java @@ -60,14 +60,24 @@ public class ClientUsageTest } @Test - public void testClientRequestWithBodyResponseNoBody() throws Exception + public void testClientReceivesPush1() throws InterruptedException, ExecutionException, TimeoutException { Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null); - Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), false, (byte)0), - new StreamFrameListener.Adapter() + session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter() + { + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + return new Adapter() { @Override + public void onData(Stream stream, DataInfo dataInfo) + { + } + }; + }; + + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { // Do something with the response @@ -83,6 +93,71 @@ public class ClientUsageTest throw new IllegalStateException(e); } } + }); + } + + @Test + public void testClientReceivesPush2() throws InterruptedException, ExecutionException, TimeoutException + { + Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, new SessionFrameListener.Adapter() + { + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + return new StreamFrameListener.Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + } + }; + } + }, null, null); + + session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + // Do something with the response + replyInfo.getHeaders().get("host"); + + // Then issue another similar request + try + { + stream.getSession().syn(new SynInfo(new Fields(), true), this); + } + catch (ExecutionException | InterruptedException | TimeoutException e) + { + throw new IllegalStateException(e); + } + } + }); + } + + @Test + public void testClientRequestWithBodyResponseNoBody() throws Exception + { + Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null); + + Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), false, (byte)0), + new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + // Do something with the response + replyInfo.getHeaders().get("host"); + + // Then issue another similar request + try + { + stream.getSession().syn(new SynInfo(new Fields(), true), this); + } + catch (ExecutionException | InterruptedException | TimeoutException e) + { + throw new IllegalStateException(e); + } + } }); // Send-and-forget the data stream.data(new StringDataInfo("data", true)); @@ -96,38 +171,39 @@ public class ClientUsageTest final String context = "context"; session.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter() { - @Override - public void onReply(Stream stream, ReplyInfo replyInfo) - { - // Do something with the response - replyInfo.getHeaders().get("host"); + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + // Do something with the response + replyInfo.getHeaders().get("host"); - // Then issue another similar request - try - { - stream.getSession().syn(new SynInfo(new Fields(), true), this); - } - catch (ExecutionException | InterruptedException | TimeoutException e) - { - throw new IllegalStateException(e); - } - } + // Then issue another similar request + try + { + stream.getSession().syn(new SynInfo(new Fields(), true), this); + } + catch (ExecutionException | InterruptedException | TimeoutException e) + { + throw new IllegalStateException(e); + } + } }, new Promise.Adapter() { @Override - public void succeeded(Stream stream) - { - // Differently from JDK 7 AIO, there is no need to - // have an explicit parameter for the context since - // that is captured while the handler is created anyway, - // and it is used only by the handler as parameter + public void succeeded(Stream stream) + { + // Differently from JDK 7 AIO, there is no need to + // have an explicit parameter for the context since + // that is captured while the handler is created anyway, + // and it is used only by the handler as parameter - // The style below is fire-and-forget, since - // we do not pass the handler nor we call get() - // to wait for the data to be sent - stream.data(new StringDataInfo(context, true), new Callback.Adapter()); - } - }); + // The style below is fire-and-forget, since + // we do not pass the handler nor we call get() + // to wait for the data to be sent + stream.data(new StringDataInfo(context, true), new Callback.Adapter()); + } + } + ); } @Test @@ -136,48 +212,49 @@ public class ClientUsageTest Session session = new StandardSession(SPDY.V2, null, null, null, null, null, null, 1, null, null, null); session.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter() - { - // The good of passing the listener to push() is that applications can safely - // accumulate info from the reply headers to be used in the data callback, - // e.g. content-type, charset, etc. - - @Override - public void onReply(Stream stream, ReplyInfo replyInfo) - { - // Do something with the response - Fields headers = replyInfo.getHeaders(); - int contentLength = headers.get("content-length").valueAsInt(); - stream.setAttribute("content-length", contentLength); - if (!replyInfo.isClose()) - stream.setAttribute("builder", new StringBuilder()); - - // May issue another similar request while waiting for data - try { - stream.getSession().syn(new SynInfo(new Fields(), true), this); - } - catch (ExecutionException | InterruptedException | TimeoutException e) + // The good of passing the listener to push() is that applications can safely + // accumulate info from the reply headers to be used in the data callback, + // e.g. content-type, charset, etc. + + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + // Do something with the response + Fields headers = replyInfo.getHeaders(); + int contentLength = headers.get("content-length").valueAsInt(); + stream.setAttribute("content-length", contentLength); + if (!replyInfo.isClose()) + stream.setAttribute("builder", new StringBuilder()); + + // May issue another similar request while waiting for data + try + { + stream.getSession().syn(new SynInfo(new Fields(), true), this); + } + catch (ExecutionException | InterruptedException | TimeoutException e) + { + throw new IllegalStateException(e); + } + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + StringBuilder builder = (StringBuilder)stream.getAttribute("builder"); + builder.append(dataInfo.asString("UTF-8", true)); + + } + }, new Promise.Adapter() { - throw new IllegalStateException(e); + @Override + public void succeeded(Stream stream) + { + stream.data(new BytesDataInfo("wee".getBytes(Charset.forName("UTF-8")), false), new Callback.Adapter()); + stream.data(new StringDataInfo("foo", false), new Callback.Adapter()); + stream.data(new ByteBufferDataInfo(Charset.forName("UTF-8").encode("bar"), true), new Callback.Adapter()); + } } - } - - @Override - public void onData(Stream stream, DataInfo dataInfo) - { - StringBuilder builder = (StringBuilder)stream.getAttribute("builder"); - builder.append(dataInfo.asString("UTF-8", true)); - - } - }, new Promise.Adapter() - { - @Override - public void succeeded(Stream stream) - { - stream.data(new BytesDataInfo("wee".getBytes(Charset.forName("UTF-8")), false), new Callback.Adapter()); - stream.data(new StringDataInfo("foo", false), new Callback.Adapter()); - stream.data(new ByteBufferDataInfo(Charset.forName("UTF-8").encode("bar"), true), new Callback.Adapter()); - } - }); + ); } } diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java index 395f9dcf3e2..1569f749150 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java @@ -24,6 +24,7 @@ import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.HeadersInfo; +import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.StreamFrameListener; @@ -131,6 +132,12 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory channel.requestHeaders(headersInfo.getHeaders(), headersInfo.isClose()); } + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + return null; + } + @Override public void onData(Stream stream, final DataInfo dataInfo) { diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyEngineSelector.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyEngineSelector.java index 75b6a4e5062..452e32b95eb 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyEngineSelector.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyEngineSelector.java @@ -23,7 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.PingResultInfo; import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.Session; @@ -104,7 +104,7 @@ public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo) { // TODO: } diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java index 0c1b9202087..7d5c8784026 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java @@ -42,7 +42,7 @@ import org.eclipse.jetty.spdy.StandardStream; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; @@ -136,7 +136,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse { assert content == null; if (headers.isEmpty()) - proxyEngineSelector.onGoAway(session, new GoAwayReceivedInfo(0, SessionStatus.OK)); + proxyEngineSelector.onGoAway(session, new GoAwayResultInfo(0, SessionStatus.OK)); else syn(true); } diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java index 874402ecc34..67db8bff185 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java @@ -29,8 +29,9 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.HeadersInfo; +import org.eclipse.jetty.spdy.api.Info; import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.RstInfo; @@ -50,8 +51,8 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; /** - *

{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by - * clients into SPDY events for the servers.

+ *

{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by clients into + * SPDY events for the servers.

*/ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener { @@ -131,6 +132,12 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener } } + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + throw new IllegalStateException("We shouldn't receive pushes from clients"); + } + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -222,6 +229,61 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener this.clientStream = clientStream; } + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + LOG.debug("S -> P pushed {} on {}", pushInfo, stream); + + Fields headers = new Fields(pushInfo.getHeaders(), false); + + addResponseProxyHeaders(stream, headers); + customizeResponseHeaders(stream, headers); + Stream clientStream = (Stream)stream.getAssociatedStream().getAttribute + (CLIENT_STREAM_ATTRIBUTE); + convert(stream.getSession().getVersion(), clientStream.getSession().getVersion(), + headers); + + StreamHandler handler = new StreamHandler(clientStream, pushInfo); + stream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler); + clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, + pushInfo.isClose()), + handler); + return new Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + // Push streams never send a reply + throw new UnsupportedOperationException(); + } + + @Override + public void onHeaders(Stream stream, HeadersInfo headersInfo) + { + throw new UnsupportedOperationException(); + } + + @Override + public void onData(Stream serverStream, final DataInfo serverDataInfo) + { + LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream); + + ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose()) + { + @Override + public void consume(int delta) + { + super.consume(delta); + serverDataInfo.consume(delta); + } + }; + + StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE); + handler.data(clientDataInfo); + } + }; + } + @Override public void onReply(final Stream stream, ReplyInfo replyInfo) { @@ -304,30 +366,30 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener } /** - *

{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.

- *

Instances of this class buffer DATA frames sent by clients and send them to the server. - * The buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive - * from the client before the SYN_STREAM has been fully sent), and between DATA frames, if the client - * is a fast producer and the server a slow consumer, or if the client is a SPDY v2 client (and hence - * without flow control) while the server is a SPDY v3 server (and hence with flow control).

+ *

{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.

Instances + * of this class buffer DATA frames sent by clients and send them to the server. The buffering happens between the + * send of the SYN_STREAM to the server (where DATA frames may arrive from the client before the SYN_STREAM has been + * fully sent), and between DATA frames, if the client is a fast producer and the server a slow consumer, or if the + * client is a SPDY v2 client (and hence without flow control) while the server is a SPDY v3 server (and hence with + * flow control).

*/ private class StreamHandler implements Promise { private final Queue queue = new LinkedList<>(); private final Stream clientStream; - private final SynInfo serverSynInfo; + private final Info info; private Stream serverStream; - private StreamHandler(Stream clientStream, SynInfo serverSynInfo) + private StreamHandler(Stream clientStream, Info info) { this.clientStream = clientStream; - this.serverSynInfo = serverSynInfo; + this.info = info; } @Override public void succeeded(Stream serverStream) { - LOG.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream); + LOG.debug("P -> S {} from {} to {}", info, clientStream, serverStream); serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream); @@ -449,26 +511,8 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener } } - private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener + private class ProxySessionFrameListener extends SessionFrameListener.Adapter { - @Override - public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo) - { - LOG.debug("S -> P pushed {} on {}", serverSynInfo, serverStream); - - Fields headers = new Fields(serverSynInfo.getHeaders(), false); - - addResponseProxyHeaders(serverStream, headers); - customizeResponseHeaders(serverStream, headers); - Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE); - convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers); - - StreamHandler handler = new StreamHandler(clientStream, serverSynInfo); - serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler); - clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, serverSynInfo.isClose()), - handler); - return this; - } @Override public void onRst(Session serverSession, RstInfo serverRstInfo) @@ -487,41 +531,9 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener } @Override - public void onGoAway(Session serverSession, GoAwayReceivedInfo goAwayReceivedInfo) + public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo) { serverSessions.values().remove(serverSession); } - - @Override - public void onReply(Stream stream, ReplyInfo replyInfo) - { - // Push streams never send a reply - throw new UnsupportedOperationException(); - } - - @Override - public void onHeaders(Stream stream, HeadersInfo headersInfo) - { - throw new UnsupportedOperationException(); - } - - @Override - public void onData(Stream serverStream, final DataInfo serverDataInfo) - { - LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream); - - ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose()) - { - @Override - public void consume(int delta) - { - super.consume(delta); - serverDataInfo.consume(delta); - } - }; - - StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE); - handler.data(clientDataInfo); - } } } diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java index d4c5e2a0395..7d65a6f5c17 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java @@ -18,16 +18,12 @@ package org.eclipse.jetty.spdy.server.http; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; - import java.io.IOException; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -37,6 +33,7 @@ import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.spdy.api.DataInfo; +import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.SPDY; @@ -50,10 +47,15 @@ import org.eclipse.jetty.spdy.server.NPNServerConnectionFactory; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Fields; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest { private final int referrerPushPeriod = 1000; @@ -107,39 +109,14 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest sendMainRequestAndCSSRequest(); final CountDownLatch pushDataLatch = new CountDownLatch(1); final CountDownLatch pushSynHeadersValid = new CountDownLatch(1); - Session session = startClient(version, serverAddress, new SessionFrameListener.Adapter() - { - @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) - { - validateHeaders(synInfo.getHeaders(), pushSynHeadersValid); - - assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true)); - assertThat("URI header ends with css", synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)) - .value().endsWith - ("" + - ".css"), - is(true)); - stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter()); - return new StreamFrameListener.Adapter() - { - - @Override - public void onData(Stream stream, DataInfo dataInfo) - { - dataInfo.consume(dataInfo.length()); - pushDataLatch.countDown(); - } - }; - } - }); + Session session = startClient(version, serverAddress, null); // Send main request. That should initiate the push push's which get reset by the client - sendRequest(session, mainRequestHeaders); + sendRequest(session, mainRequestHeaders, pushSynHeadersValid, pushDataLatch); assertThat("No push data is received", pushDataLatch.await(1, TimeUnit.SECONDS), is(false)); assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true)); - sendRequest(session, associatedCSSRequestHeaders); + sendRequest(session, associatedCSSRequestHeaders, pushSynHeadersValid, pushDataLatch); } @Test @@ -157,7 +134,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest // Sleep for pushPeriod This should prevent application.js from being mapped as pushResource Thread.sleep(referrerPushPeriod + 1); - sendRequest(session, associatedJSRequestHeaders); + sendRequest(session, associatedJSRequestHeaders, null, null); run2ndClientRequests(false, true); } @@ -171,7 +148,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest Session session = sendMainRequestAndCSSRequest(); - sendRequest(session, associatedJSRequestHeaders); + sendRequest(session, associatedJSRequestHeaders, null, null); run2ndClientRequests(false, true); } @@ -200,18 +177,43 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest { Session session = startClient(version, serverAddress, null); - sendRequest(session, mainRequestHeaders); - sendRequest(session, associatedCSSRequestHeaders); + sendRequest(session, mainRequestHeaders, null, null); + sendRequest(session, associatedCSSRequestHeaders, null, null); return session; } - private void sendRequest(Session session, Fields requestHeaders) throws InterruptedException + private void sendRequest(Session session, Fields requestHeaders, final CountDownLatch pushSynHeadersValid, + final CountDownLatch pushDataLatch) throws InterruptedException { final CountDownLatch dataReceivedLatch = new CountDownLatch(1); final CountDownLatch received200OKLatch = new CountDownLatch(1); session.syn(new SynInfo(requestHeaders, true), new StreamFrameListener.Adapter() { + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid); + + assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true)); + assertThat("URI header ends with css", pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)) + .value().endsWith + ("" + + ".css"), + is(true)); + stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter()); + return new StreamFrameListener.Adapter() + { + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataInfo.consume(dataInfo.length()); + pushDataLatch.countDown(); + } + }; + } + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -238,16 +240,17 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch pushDataLatch = new CountDownLatch(1); final CountDownLatch pushSynHeadersValid = new CountDownLatch(1); - Session session2 = startClient(version, serverAddress, new SessionFrameListener.Adapter() + Session session2 = startClient(version, serverAddress, null); + session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { if (validateHeaders) - validateHeaders(synInfo.getHeaders(), pushSynHeadersValid); + validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid); assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true)); - assertThat("URI header ends with css", synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)) + assertThat("URI header ends with css", pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)) .value().endsWith ("" + ".css"), @@ -264,9 +267,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest } }; } - }); - session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() - { + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -292,6 +293,8 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true)); } + private static final Logger LOG = Log.getLogger(ReferrerPushStrategyTest.class); + @Test public void testAssociatedResourceIsPushed() throws Exception { @@ -326,16 +329,17 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest }); Assert.assertTrue(mainResourceLatch.await(5, TimeUnit.SECONDS)); - sendRequest(session1, createHeaders(cssResource)); + sendRequest(session1, createHeaders(cssResource), null, null); // Create another client, and perform the same request for the main resource, we expect the css being pushed final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch pushDataLatch = new CountDownLatch(1); - Session session2 = startClient(version, address, new SessionFrameListener.Adapter() + Session session2 = startClient(version, address, null); + session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { Assert.assertTrue(stream.isUnidirectional()); return new StreamFrameListener.Adapter() @@ -349,9 +353,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest } }; } - }); - session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() - { + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -452,13 +454,15 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch pushDataLatch = new CountDownLatch(1); - Session session2 = startClient(version, address, new SessionFrameListener.Adapter() + Session session2 = startClient(version, address, null); + session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { Assert.assertTrue(stream.isUnidirectional()); - Assert.assertTrue(synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value().endsWith(".css")); + Assert.assertTrue(pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value().endsWith("" + + ".css")); return new StreamFrameListener.Adapter() { @Override @@ -470,9 +474,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest } }; } - }); - session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() - { + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -563,14 +565,31 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch pushDataLatch = new CountDownLatch(2); - Session session2 = startClient(version, address, new SessionFrameListener.Adapter() + Session session2 = startClient(version, address, null); + LOG.warn("REQUEST FOR PUSHED RESOURCES"); + session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { Assert.assertTrue(stream.isUnidirectional()); return new StreamFrameListener.Adapter() { + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + return new Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataInfo.consume(dataInfo.length()); + if (dataInfo.isClose()) + pushDataLatch.countDown(); + } + }; + } + @Override public void onData(Stream stream, DataInfo dataInfo) { @@ -580,9 +599,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest } }; } - }); - session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() - { + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java index e65cebb4051..11f44dc2a32 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java @@ -33,7 +33,7 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.RstInfo; @@ -164,7 +164,7 @@ public class ProxyHTTPToSPDYTest } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { closeLatch.countDown(); } diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToHTTPTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToHTTPTest.java index f3c2e9d3a28..6a6c2ef6289 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToHTTPTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToHTTPTest.java @@ -37,7 +37,7 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.spdy.api.DataInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.PingInfo; import org.eclipse.jetty.spdy.api.PingResultInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; @@ -429,7 +429,7 @@ public class ProxySPDYToHTTPTest Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayReceivedInfo) { goAwayLatch.countDown(); } diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java index 211a2c4a790..506c5ed571d 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java @@ -281,10 +281,16 @@ public class ProxySPDYToSPDYTest final CountDownLatch pushSynLatch = new CountDownLatch(1); final CountDownLatch pushDataLatch = new CountDownLatch(1); - Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter() + Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS); + + Fields headers = new Fields(); + headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort()); + final CountDownLatch replyLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { pushSynLatch.countDown(); return new StreamFrameListener.Adapter() @@ -298,14 +304,7 @@ public class ProxySPDYToSPDYTest } }; } - }).get(5, TimeUnit.SECONDS); - Fields headers = new Fields(); - headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort()); - final CountDownLatch replyLatch = new CountDownLatch(1); - final CountDownLatch dataLatch = new CountDownLatch(1); - client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter() - { @Override public void onReply(Stream stream, ReplyInfo replyInfo) { diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/ClosedStreamTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/ClosedStreamTest.java index 27f06da7c43..ca14e68ae4c 100644 --- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/ClosedStreamTest.java +++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/ClosedStreamTest.java @@ -35,7 +35,7 @@ import org.eclipse.jetty.spdy.StandardCompressionFactory; import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Session; @@ -211,7 +211,7 @@ public class ClosedStreamTest extends AbstractTest }; } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { goAwayReceivedLatch.countDown(); } diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/GoAwayTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/GoAwayTest.java index 509fb130a63..9a4b3f35d9b 100644 --- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/GoAwayTest.java +++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/GoAwayTest.java @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.SessionFrameListener; @@ -61,7 +61,7 @@ public class GoAwayTest extends AbstractTest } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { Assert.assertEquals(0, goAwayInfo.getLastStreamId()); Assert.assertSame(SessionStatus.OK, goAwayInfo.getSessionStatus()); @@ -90,12 +90,12 @@ public class GoAwayTest extends AbstractTest return null; } }; - final AtomicReference ref = new AtomicReference<>(); + final AtomicReference ref = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { ref.set(goAwayInfo); latch.countDown(); @@ -106,10 +106,10 @@ public class GoAwayTest extends AbstractTest Stream stream1 = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), true, (byte)0), null); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); - GoAwayReceivedInfo goAwayReceivedInfo = ref.get(); - Assert.assertNotNull(goAwayReceivedInfo); - Assert.assertEquals(stream1.getId(), goAwayReceivedInfo.getLastStreamId()); - Assert.assertSame(SessionStatus.OK, goAwayReceivedInfo.getSessionStatus()); + GoAwayResultInfo goAwayResultInfo = ref.get(); + Assert.assertNotNull(goAwayResultInfo); + Assert.assertEquals(stream1.getId(), goAwayResultInfo.getLastStreamId()); + Assert.assertSame(SessionStatus.OK, goAwayResultInfo.getSessionStatus()); } @Test @@ -139,7 +139,7 @@ public class GoAwayTest extends AbstractTest SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { session.syn(new SynInfo(new Fields(), true), null, new FuturePromise()); } @@ -184,12 +184,12 @@ public class GoAwayTest extends AbstractTest } } }; - final AtomicReference goAwayRef = new AtomicReference<>(); + final AtomicReference goAwayRef = new AtomicReference<>(); final CountDownLatch goAwayLatch = new CountDownLatch(1); SessionFrameListener clientSessionFrameListener = new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { goAwayRef.set(goAwayInfo); goAwayLatch.countDown(); @@ -228,7 +228,7 @@ public class GoAwayTest extends AbstractTest // The last good stream is the second, because it was received by the server Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS)); - GoAwayReceivedInfo goAway = goAwayRef.get(); + GoAwayResultInfo goAway = goAwayRef.get(); Assert.assertNotNull(goAway); Assert.assertEquals(stream2.getId(), goAway.getLastStreamId()); } diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/IdleTimeoutTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/IdleTimeoutTest.java index 3005b2cd99c..708e6dc9be9 100644 --- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/IdleTimeoutTest.java +++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/IdleTimeoutTest.java @@ -23,7 +23,7 @@ import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Session; @@ -63,7 +63,7 @@ public class IdleTimeoutTest extends AbstractTest Session session = startClient(startServer(null), new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { latch.countDown(); } @@ -85,7 +85,7 @@ public class IdleTimeoutTest extends AbstractTest Session session = startClient(startServer(null), new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { latch.countDown(); } @@ -125,7 +125,7 @@ public class IdleTimeoutTest extends AbstractTest Session session = startClient(startServer(null), new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { goAwayLatch.countDown(); } @@ -161,7 +161,7 @@ public class IdleTimeoutTest extends AbstractTest } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { latch.countDown(); } @@ -187,7 +187,7 @@ public class IdleTimeoutTest extends AbstractTest InetSocketAddress address = startServer(new ServerSessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { latch.countDown(); } @@ -220,7 +220,7 @@ public class IdleTimeoutTest extends AbstractTest } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { latch.countDown(); } diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/PushStreamTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/PushStreamTest.java index 0a38ae72fb7..344667e8c95 100644 --- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/PushStreamTest.java +++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/PushStreamTest.java @@ -19,12 +19,6 @@ package org.eclipse.jetty.spdy.server; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.sameInstance; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -44,7 +38,7 @@ import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.spdy.StandardCompressionFactory; import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.RstInfo; @@ -75,6 +69,12 @@ import org.eclipse.jetty.util.log.Logger; import org.junit.Assert; import org.junit.Test; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + public class PushStreamTest extends AbstractTest { private static final Logger LOG = Log.getLogger(PushStreamTest.class); @@ -94,10 +94,12 @@ public class PushStreamTest extends AbstractTest stream.push(new PushInfo(new Fields(), true), new Promise.Adapter()); return null; } - }), new SessionFrameListener.Adapter() + }), null); + + Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { assertThat("streamId is even", stream.getId() % 2, is(0)); assertThat("stream is unidirectional", stream.isUnidirectional(), is(true)); @@ -117,8 +119,6 @@ public class PushStreamTest extends AbstractTest return null; } }); - - Stream stream = clientSession.syn(new SynInfo(new Fields(), true), null); assertThat("onSyn has been called", pushStreamLatch.await(5, TimeUnit.SECONDS), is(true)); Stream pushStream = pushStreamRef.get(); assertThat("main stream and associated stream are the same", stream, sameInstance(pushStream.getAssociatedStream())); @@ -177,10 +177,12 @@ public class PushStreamTest extends AbstractTest } } - }), new SessionFrameListener.Adapter() + }), null); + + Stream stream = clientSession.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { pushStreamSynLatch.countDown(); return new StreamFrameListener.Adapter() @@ -193,10 +195,7 @@ public class PushStreamTest extends AbstractTest } }; } - }); - Stream stream = clientSession.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter() - { @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -298,10 +297,12 @@ public class PushStreamTest extends AbstractTest throw new IllegalStateException(e); } } - }), new SessionFrameListener.Adapter() + }), null); + + Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { return new StreamFrameListener.Adapter() { @@ -327,10 +328,7 @@ public class PushStreamTest extends AbstractTest } }; } - }); - Stream stream = clientSession.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter() - { @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -427,7 +425,7 @@ public class PushStreamTest extends AbstractTest } @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) { goAwayReceivedLatch.countDown(); } @@ -543,20 +541,14 @@ public class PushStreamTest extends AbstractTest stream.push(new PushInfo(new Fields(), false), new Promise.Adapter()); return null; } - }), new SessionFrameListener.Adapter() - { - @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) - { - assertStreamIdIsEven(stream); - pushStreamIdIsEvenLatch.countDown(); - return super.onSyn(stream, synInfo); - } - }); + }), null); - Stream stream = clientSession.syn(new SynInfo(new Fields(), false), null); - Stream stream2 = clientSession.syn(new SynInfo(new Fields(), false), null); - Stream stream3 = clientSession.syn(new SynInfo(new Fields(), false), null); + Stream stream = clientSession.syn(new SynInfo(new Fields(), false), + new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch)); + Stream stream2 = clientSession.syn(new SynInfo(new Fields(), false), + new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch)); + Stream stream3 = clientSession.syn(new SynInfo(new Fields(), false), + new VerifyPushStreamIdIsEvenStreamFrameListener(pushStreamIdIsEvenLatch)); assertStreamIdIsOdd(stream); assertStreamIdIsOdd(stream2); assertStreamIdIsOdd(stream3); @@ -564,6 +556,24 @@ public class PushStreamTest extends AbstractTest assertThat("all pushStreams had even ids", pushStreamIdIsEvenLatch.await(5, TimeUnit.SECONDS), is(true)); } + private class VerifyPushStreamIdIsEvenStreamFrameListener extends StreamFrameListener.Adapter + { + final CountDownLatch pushStreamIdIsEvenLatch; + + private VerifyPushStreamIdIsEvenStreamFrameListener(CountDownLatch pushStreamIdIsEvenLatch) + { + this.pushStreamIdIsEvenLatch = pushStreamIdIsEvenLatch; + } + + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + assertStreamIdIsEven(stream); + pushStreamIdIsEvenLatch.countDown(); + return super.onPush(stream, pushInfo); + } + } + private void assertStreamIdIsEven(Stream stream) { assertThat("streamId is odd", stream.getId() % 2, is(0)); diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYClientFactoryTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYClientFactoryTest.java index 076c7ce7945..fc40469dbea 100644 --- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYClientFactoryTest.java +++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYClientFactoryTest.java @@ -23,7 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.junit.Assert; @@ -38,7 +38,7 @@ public class SPDYClientFactoryTest extends AbstractTest startClient(startServer(new ServerSessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo) { latch.countDown(); } diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYServerConnectorTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYServerConnectorTest.java index 02bc42a81a6..0dc771a150a 100644 --- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYServerConnectorTest.java +++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/SPDYServerConnectorTest.java @@ -23,7 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo; +import org.eclipse.jetty.spdy.api.GoAwayResultInfo; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.SessionFrameListener; import org.junit.Assert; @@ -38,7 +38,7 @@ public class SPDYServerConnectorTest extends AbstractTest startClient(startServer(null), new SessionFrameListener.Adapter() { @Override - public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo) + public void onGoAway(Session session, GoAwayResultInfo goAwayResultInfo) { latch.countDown(); }