diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index dcce655edeb..114ffc1ed53 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -1350,6 +1350,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable { bufferPool.release(buffer); IStream stream = getStream(); + dataInfo.consume(size); flowControlStrategy.updateWindow(StandardSession.this, stream, -size); if (dataInfo.available() > 0) { diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java index 7e9afac7a0e..e0b65c7b793 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java @@ -403,7 +403,7 @@ public class StandardStream implements IStream if (isLocallyClosed()) { session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); - throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream"); + throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream"); } // Cannot update the close state here, because the data that we send may @@ -481,7 +481,7 @@ public class StandardStream implements IStream @Override public String toString() { - return String.format("stream=%d v%d windowSize=%db reset=%s prio=%d %s %s", getId(), session.getVersion(), + return String.format("stream=%d v%d windowSize=%d reset=%s prio=%d %s %s", getId(), session.getVersion(), getWindowSize(), isReset(), priority, openState, closeState); } diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java index c8ca8a7bec9..abc1b888473 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java @@ -97,13 +97,13 @@ public interface Stream /** *

Initiate a unidirectional spdy pushstream associated to this stream asynchronously

Callers may pass a - * non-null completion callback to be notified of when the pushstream has been established.

+ * non-null completion promise to be notified of when the pushstream has been established.

* * @param pushInfo the metadata to send on stream creation - * @param callback the completion callback that gets notified once the pushstream is established + * @param promise the completion promise that gets notified once the pushstream is established * @see #push(PushInfo) */ - public void push(PushInfo pushInfo, Promise callback); + public void push(PushInfo pushInfo, Promise promise); /** *

Sends asynchronously a SYN_REPLY frame in response to a SYN_STREAM frame.

Callers may use the returned diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java index 67db8bff185..5fe79092b4b 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java @@ -58,7 +58,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener { private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class); - private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamHandler"; + private static final String STREAM_PROMISE_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamPromise"; private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream"; private final ConcurrentMap serverSessions = new ConcurrentHashMap<>(); @@ -113,9 +113,9 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose()); StreamFrameListener listener = new ProxyStreamFrameListener(clientStream); - StreamHandler handler = new StreamHandler(clientStream, serverSynInfo); - clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler); - serverSession.syn(serverSynInfo, listener, handler); + StreamPromise promise = new StreamPromise(clientStream, serverSynInfo); + clientStream.setAttribute(STREAM_PROMISE_ATTRIBUTE, promise); + serverSession.syn(serverSynInfo, listener, promise); return this; } @@ -166,8 +166,8 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener } }; - StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE); - streamHandler.data(serverDataInfo); + StreamPromise streamPromise = (StreamPromise)clientStream.getAttribute(STREAM_PROMISE_ATTRIBUTE); + streamPromise.data(serverDataInfo); } private Session produceSession(String host, short version, InetSocketAddress address) @@ -219,87 +219,101 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener stream.getSession().rst(rstInfo, new Callback.Adapter()); } - private class ProxyStreamFrameListener extends StreamFrameListener.Adapter + private class ProxyPushStreamFrameListener implements StreamFrameListener { - private final Stream clientStream; - private volatile ReplyInfo replyInfo; + private PushStreamPromise pushStreamPromise; - public ProxyStreamFrameListener(Stream clientStream) + private ProxyPushStreamFrameListener(PushStreamPromise pushStreamPromise) { - this.clientStream = clientStream; + this.pushStreamPromise = pushStreamPromise; } @Override public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { - LOG.debug("S -> P pushed {} on {}", pushInfo, stream); + LOG.debug("S -> P pushed {} on {}. Opening new PushStream P -> C now.", pushInfo, stream); + PushStreamPromise newPushStreamPromise = new PushStreamPromise(stream, pushInfo); + this.pushStreamPromise.push(newPushStreamPromise); + return new ProxyPushStreamFrameListener(newPushStreamPromise); + } - Fields headers = new Fields(pushInfo.getHeaders(), false); + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + // Push streams never send a reply + throw new UnsupportedOperationException(); + } - addResponseProxyHeaders(stream, headers); - customizeResponseHeaders(stream, headers); - Stream clientStream = (Stream)stream.getAssociatedStream().getAttribute - (CLIENT_STREAM_ATTRIBUTE); - convert(stream.getSession().getVersion(), clientStream.getSession().getVersion(), - headers); + @Override + public void onHeaders(Stream stream, HeadersInfo headersInfo) + { + throw new UnsupportedOperationException(); + } - StreamHandler handler = new StreamHandler(clientStream, pushInfo); - stream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler); - clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, - pushInfo.isClose()), - handler); - return new Adapter() + @Override + public void onData(Stream serverStream, final DataInfo serverDataInfo) + { + LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream); + + ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose()) { @Override - public void onReply(Stream stream, ReplyInfo replyInfo) + public void consume(int delta) { - // Push streams never send a reply - throw new UnsupportedOperationException(); - } - - @Override - public void onHeaders(Stream stream, HeadersInfo headersInfo) - { - throw new UnsupportedOperationException(); - } - - @Override - public void onData(Stream serverStream, final DataInfo serverDataInfo) - { - LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream); - - ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose()) - { - @Override - public void consume(int delta) - { - super.consume(delta); - serverDataInfo.consume(delta); - } - }; - - StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE); - handler.data(clientDataInfo); + super.consume(delta); + serverDataInfo.consume(delta); } }; + + pushStreamPromise.data(clientDataInfo); + } + } + + private class ProxyStreamFrameListener extends StreamFrameListener.Adapter + { + private final Stream receiverStream; + + public ProxyStreamFrameListener(Stream receiverStream) + { + this.receiverStream = receiverStream; + } + + @Override + public StreamFrameListener onPush(Stream senderStream, PushInfo pushInfo) + { + LOG.debug("S -> P {} on {}"); + PushInfo newPushInfo = convertPushInfo(pushInfo, senderStream, receiverStream); + PushStreamPromise pushStreamPromise = new PushStreamPromise(senderStream, newPushInfo); + receiverStream.push(newPushInfo, pushStreamPromise); + return new ProxyPushStreamFrameListener(pushStreamPromise); } @Override public void onReply(final Stream stream, ReplyInfo replyInfo) { LOG.debug("S -> P {} on {}", replyInfo, stream); + final ReplyInfo clientReplyInfo = new ReplyInfo(convertHeaders(stream, receiverStream, replyInfo.getHeaders()), + replyInfo.isClose()); + reply(stream, clientReplyInfo); + } - short serverVersion = stream.getSession().getVersion(); - Fields headers = new Fields(replyInfo.getHeaders(), false); + private void reply(final Stream stream, final ReplyInfo clientReplyInfo) + { + receiverStream.reply(clientReplyInfo, new Callback() + { + @Override + public void succeeded() + { + LOG.debug("P -> C {} from {} to {}", clientReplyInfo, stream, receiverStream); + } - addResponseProxyHeaders(stream, headers); - customizeResponseHeaders(stream, headers); - short clientVersion = this.clientStream.getSession().getVersion(); - convert(serverVersion, clientVersion, headers); - - this.replyInfo = new ReplyInfo(headers, replyInfo.isClose()); - if (replyInfo.isClose()) - reply(stream); + @Override + public void failed(Throwable x) + { + LOG.debug(x); + rst(receiverStream); + } + }); } @Override @@ -313,101 +327,82 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener public void onData(final Stream stream, final DataInfo dataInfo) { LOG.debug("S -> P {} on {}", dataInfo, stream); - - if (replyInfo != null) - { - if (dataInfo.isClose()) - replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available())); - reply(stream); - } data(stream, dataInfo); } - private void reply(final Stream stream) + private void data(final Stream stream, final DataInfo serverDataInfo) { - final ReplyInfo replyInfo = this.replyInfo; - this.replyInfo = null; - clientStream.reply(replyInfo, new Callback() + final ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose()) + { + @Override + public void consume(int delta) + { + super.consume(delta); + serverDataInfo.consume(delta); + } + }; + + receiverStream.data(clientDataInfo, new Callback() //TODO: timeout??? { @Override public void succeeded() { - LOG.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream); + LOG.debug("P -> C {} from {} to {}", clientDataInfo, stream, receiverStream); } @Override public void failed(Throwable x) { LOG.debug(x); - rst(clientStream); - } - }); - } - - private void data(final Stream stream, final DataInfo dataInfo) - { - clientStream.data(dataInfo, new Callback() //TODO: timeout??? - { - @Override - public void succeeded() - { - dataInfo.consume(dataInfo.length()); - LOG.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream); - } - - @Override - public void failed(Throwable x) - { - LOG.debug(x); - rst(clientStream); + rst(receiverStream); } }); } } /** - *

{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.

Instances - * of this class buffer DATA frames sent by clients and send them to the server. The buffering happens between the - * send of the SYN_STREAM to the server (where DATA frames may arrive from the client before the SYN_STREAM has been - * fully sent), and between DATA frames, if the client is a fast producer and the server a slow consumer, or if the - * client is a SPDY v2 client (and hence without flow control) while the server is a SPDY v3 server (and hence with - * flow control).

+ *

{@link StreamPromise} implements the forwarding of DATA frames from the client to the server and vice + * versa.

Instances of this class buffer DATA frames sent by clients and send them to the server. The + * buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive from the client + * before the SYN_STREAM has been fully sent), and between DATA frames, if the client is a fast producer and the + * server a slow consumer, or if the client is a SPDY v2 client (and hence without flow control) while the server is + * a SPDY v3 server (and hence with flow control).

*/ - private class StreamHandler implements Promise + private class StreamPromise implements Promise { - private final Queue queue = new LinkedList<>(); - private final Stream clientStream; + private final Queue queue = new LinkedList<>(); + private final Stream senderStream; private final Info info; - private Stream serverStream; + private Stream receiverStream; - private StreamHandler(Stream clientStream, Info info) + private StreamPromise(Stream senderStream, Info info) { - this.clientStream = clientStream; + this.senderStream = senderStream; this.info = info; } @Override - public void succeeded(Stream serverStream) + public void succeeded(Stream stream) { - LOG.debug("P -> S {} from {} to {}", info, clientStream, serverStream); + LOG.debug("P -> S {} from {} to {}", info, senderStream, stream); - serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream); + stream.setAttribute(CLIENT_STREAM_ATTRIBUTE, senderStream); - DataInfoHandler dataInfoHandler; + DataInfoCallback dataInfoCallback; synchronized (queue) { - this.serverStream = serverStream; - dataInfoHandler = queue.peek(); - if (dataInfoHandler != null) + this.receiverStream = stream; + dataInfoCallback = queue.peek(); + if (dataInfoCallback != null) { - if (dataInfoHandler.flushing) + if (dataInfoCallback.flushing) { - LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size()); - dataInfoHandler = null; + LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoCallback.dataInfo, queue.size()); + dataInfoCallback = null; } else { - dataInfoHandler.flushing = true; + dataInfoCallback.flushing = true; LOG.debug("SYN completed, queue size {}", queue.size()); } } @@ -416,37 +411,37 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener LOG.debug("SYN completed, queue empty"); } } - if (dataInfoHandler != null) - flush(serverStream, dataInfoHandler); + if (dataInfoCallback != null) + flush(stream, dataInfoCallback); } @Override public void failed(Throwable x) { LOG.debug(x); - rst(clientStream); + rst(senderStream); } public void data(DataInfo dataInfo) { - Stream serverStream; - DataInfoHandler dataInfoHandler = null; - DataInfoHandler item = new DataInfoHandler(dataInfo); + Stream receiverStream; + DataInfoCallback dataInfoCallbackToFlush = null; + DataInfoCallback dataInfoCallBackToQueue = new DataInfoCallback(dataInfo); synchronized (queue) { - queue.offer(item); - serverStream = this.serverStream; - if (serverStream != null) + queue.offer(dataInfoCallBackToQueue); + receiverStream = this.receiverStream; + if (receiverStream != null) { - dataInfoHandler = queue.peek(); - if (dataInfoHandler.flushing) + dataInfoCallbackToFlush = queue.peek(); + if (dataInfoCallbackToFlush.flushing) { - LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size()); - serverStream = null; + LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoCallbackToFlush.dataInfo, queue.size()); + receiverStream = null; } else { - dataInfoHandler.flushing = true; + dataInfoCallbackToFlush.flushing = true; LOG.debug("Queued {}, queue size {}", dataInfo, queue.size()); } } @@ -455,22 +450,22 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size()); } } - if (serverStream != null) - flush(serverStream, dataInfoHandler); + if (receiverStream != null) + flush(receiverStream, dataInfoCallbackToFlush); } - private void flush(Stream serverStream, DataInfoHandler dataInfoHandler) + private void flush(Stream receiverStream, DataInfoCallback dataInfoCallback) { - LOG.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream); - serverStream.data(dataInfoHandler.dataInfo, dataInfoHandler); //TODO: timeout??? + LOG.debug("P -> S {} on {}", dataInfoCallback.dataInfo, receiverStream); + receiverStream.data(dataInfoCallback.dataInfo, dataInfoCallback); //TODO: timeout??? } - private class DataInfoHandler implements Callback + private class DataInfoCallback implements Callback { private final DataInfo dataInfo; private boolean flushing; - private DataInfoHandler(DataInfo dataInfo) + private DataInfoCallback(DataInfo dataInfo) { this.dataInfo = dataInfo; } @@ -479,18 +474,18 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener public void succeeded() { Stream serverStream; - DataInfoHandler dataInfoHandler; + DataInfoCallback dataInfoCallback; synchronized (queue) { - serverStream = StreamHandler.this.serverStream; + serverStream = StreamPromise.this.receiverStream; assert serverStream != null; - dataInfoHandler = queue.poll(); - assert dataInfoHandler == this; - dataInfoHandler = queue.peek(); - if (dataInfoHandler != null) + dataInfoCallback = queue.poll(); + assert dataInfoCallback == this; + dataInfoCallback = queue.peek(); + if (dataInfoCallback != null) { - assert !dataInfoHandler.flushing; - dataInfoHandler.flushing = true; + assert !dataInfoCallback.flushing; + dataInfoCallback.flushing = true; LOG.debug("Completed {}, queue size {}", dataInfo, queue.size()); } else @@ -498,22 +493,71 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener LOG.debug("Completed {}, queue empty", dataInfo); } } - if (dataInfoHandler != null) - flush(serverStream, dataInfoHandler); + if (dataInfoCallback != null) + flush(serverStream, dataInfoCallback); } @Override public void failed(Throwable x) { LOG.debug(x); - rst(clientStream); + rst(senderStream); } } + + public Stream getSenderStream() + { + return senderStream; + } + + public Info getInfo() + { + return info; + } + + public Stream getReceiverStream() + { + synchronized (queue) + { + return receiverStream; + } + } + } + + private class PushStreamPromise extends StreamPromise + { + private volatile PushStreamPromise pushStreamPromise; + + private PushStreamPromise(Stream senderStream, PushInfo pushInfo) + { + super(senderStream, pushInfo); + } + + @Override + public void succeeded(Stream receiverStream) + { + super.succeeded(receiverStream); + + LOG.debug("P -> C PushStreamPromise.succeeded() called with pushStreamPromise: {}", pushStreamPromise); + + PushStreamPromise promise = pushStreamPromise; + if (promise != null) + receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise); + } + + public void push(PushStreamPromise pushStreamPromise) + { + Stream receiverStream = getReceiverStream(); + + if (receiverStream != null) + receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise); + else + this.pushStreamPromise = pushStreamPromise; + } } private class ProxySessionFrameListener extends SessionFrameListener.Adapter { - @Override public void onRst(Session serverSession, RstInfo serverRstInfo) { @@ -536,4 +580,20 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener serverSessions.values().remove(serverSession); } } + + private PushInfo convertPushInfo(PushInfo pushInfo, Stream from, Stream to) + { + Fields headersToConvert = pushInfo.getHeaders(); + Fields headers = convertHeaders(from, to, headersToConvert); + return new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, pushInfo.isClose()); + } + + private Fields convertHeaders(Stream from, Stream to, Fields headersToConvert) + { + Fields headers = new Fields(headersToConvert, false); + addResponseProxyHeaders(from, headers); + customizeResponseHeaders(from, headers); + convert(from.getSession().getVersion(), to.getSession().getVersion(), headers); + return headers; + } } diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java index 7d65a6f5c17..1bf46ac039e 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java @@ -566,7 +566,6 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest final CountDownLatch mainStreamLatch = new CountDownLatch(2); final CountDownLatch pushDataLatch = new CountDownLatch(2); Session session2 = startClient(version, address, null); - LOG.warn("REQUEST FOR PUSHED RESOURCES"); session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() { @Override diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java index 11f44dc2a32..bcb46a9b6cb 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java @@ -311,6 +311,8 @@ public class ProxyHTTPToSPDYTest Fields responseHeaders = new Fields(); responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1"); responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK"); + responseHeaders.put("content-length", String.valueOf(data.length)); + ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false); stream.reply(replyInfo, new Callback.Adapter()); stream.data(new BytesDataInfo(data, true), new Callback.Adapter()); @@ -437,6 +439,7 @@ public class ProxyHTTPToSPDYTest Fields responseHeaders = new Fields(); responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1"); responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK"); + responseHeaders.put("content-length", String.valueOf(data.length)); ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false); stream.reply(replyInfo, new Callback.Adapter()); stream.data(new BytesDataInfo(data, true), new Callback.Adapter()); diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java index 506c5ed571d..2739d1e0c5e 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java @@ -328,6 +328,140 @@ public class ProxySPDYToSPDYTest client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS)); } + @Test + public void testSYNThenSPDYNestedPushIsReceived() throws Exception + { + final byte[] data = "0123456789ABCDEF".getBytes("UTF-8"); + InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + Fields responseHeaders = new Fields(); + responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1"); + responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK"); + stream.reply(new ReplyInfo(responseHeaders, false), new Callback.Adapter()); + + final Fields pushHeaders = new Fields(); + pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/push"); + stream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Promise.Adapter() + { + @Override + public void succeeded(Stream pushStream) + { + pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/nestedpush"); + pushStream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Adapter() + { + @Override + public void succeeded(Stream pushStream) + { + pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/anothernestedpush"); + pushStream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Adapter() + { + @Override + public void succeeded(Stream pushStream) + { + pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter()); + } + }); + pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter()); + } + }); + pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter()); + } + }); + + stream.data(new BytesDataInfo(data, true), new Callback.Adapter()); + + return null; + } + })); + proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version)); + + final CountDownLatch pushSynLatch = new CountDownLatch(3); + final CountDownLatch pushDataLatch = new CountDownLatch(3); + Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS); + + Fields headers = new Fields(); + headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort()); + final CountDownLatch replyLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter() + { + // onPush for 1st push stream + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + pushSynLatch.countDown(); + return new StreamFrameListener.Adapter() + { + // onPush for 2nd nested push stream + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + pushSynLatch.countDown(); + return new Adapter() + { + // onPush for 3rd nested push stream + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + pushSynLatch.countDown(); + return new Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataInfo.consume(dataInfo.length()); + if (dataInfo.isClose()) + pushDataLatch.countDown(); + } + }; + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataInfo.consume(dataInfo.length()); + if (dataInfo.isClose()) + pushDataLatch.countDown(); + } + }; + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataInfo.consume(dataInfo.length()); + if (dataInfo.isClose()) + pushDataLatch.countDown(); + } + }; + } + + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + replyLatch.countDown(); + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataInfo.consume(dataInfo.length()); + if (dataInfo.isClose()) + dataLatch.countDown(); + } + }); + + Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(pushSynLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(pushDataLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + + client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS)); + } + @Test public void testPING() throws Exception {