From 8f356ea9224de08384ef59c64a09a13baed93d3a Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 11 Jun 2012 20:46:43 +0200 Subject: [PATCH] Second pass at the implementation of a reverse SPDY proxy. --- .../eclipse/jetty/spdy/StandardSession.java | 30 +- .../eclipse/jetty/spdy/StandardStream.java | 3 +- .../org/eclipse/jetty/spdy/api/Session.java | 36 ++- .../proxy/ProxyHTTPSPDYAsyncConnection.java | 45 ++- .../jetty/spdy/proxy/SPDYProxyEngine.java | 155 +++++---- .../jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java | 299 ++++++++++++++++++ 6 files changed, 498 insertions(+), 70 deletions(-) diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index e99ab8e8606..446a9103e98 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -80,6 +81,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler attributes = new ConcurrentHashMap<>(); private final List listeners = new CopyOnWriteArrayList<>(); private final ConcurrentMap streams = new ConcurrentHashMap<>(); private final LinkedList queue = new LinkedList<>(); @@ -209,7 +211,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler handler) { SettingsFrame frame = new SettingsFrame(version,settingsInfo.getFlags(),settingsInfo.getSettings()); - control(null,frame,timeout,unit,handler,null); + control(null, frame, timeout, unit, handler, null); } @Override @@ -245,7 +247,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler handler) { - goAway(SessionStatus.OK,timeout,unit,handler); + goAway(SessionStatus.OK, timeout, unit, handler); } private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Handler handler) @@ -270,6 +272,30 @@ public class StandardSession implements ISession, Parser.Listener, Handler syn(SynInfo synInfo, StreamFrameListener listener); - + /** *

Sends asynchronously a SYN_FRAME to create a new {@link Stream SPDY stream}.

*

Callers may pass a non-null completion handler to be notified of when the @@ -90,7 +90,7 @@ public interface Session */ public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Handler handler); - + /** *

Sends asynchronously a RST_STREAM to abort a stream.

*

Callers may use the returned future to wait for the reset to be sent.

@@ -180,10 +180,40 @@ public interface Session public void goAway(long timeout, TimeUnit unit, Handler handler); /** - * @return the streams currently active in this session + * @return a snapshot of the streams currently active in this session + * @see #getStream(int) */ public Set getStreams(); + /** + * @param streamId the id of the stream to retrieve + * @return the stream with the given stream id + * @see #getStreams() + */ + public Stream getStream(int streamId); + + /** + * @param key the attribute key + * @return an arbitrary object associated with the given key to this session + * @see #setAttribute(String, Object) + */ + public Object getAttribute(String key); + + /** + * @param key the attribute key + * @param value an arbitrary object to associate with the given key to this session + * @see #getAttribute(String) + * @see #removeAttribute(String) + */ + public void setAttribute(String key, Object value); + + /** + * @param key the attribute key + * @return the arbitrary object associated with the given key to this session + * @see #setAttribute(String, Object) + */ + public Object removeAttribute(String key); + /** *

Super interface for listeners with callbacks that are invoked on specific session events.

*/ diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java index 5d5b6719928..e7253094e0c 100644 --- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java +++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java @@ -33,6 +33,7 @@ import org.eclipse.jetty.io.nio.IndirectNIOBuffer; import org.eclipse.jetty.io.nio.NIOBuffer; import org.eclipse.jetty.server.AsyncHttpConnection; import org.eclipse.jetty.spdy.ISession; +import org.eclipse.jetty.spdy.IStream; import org.eclipse.jetty.spdy.SPDYServerConnector; import org.eclipse.jetty.spdy.StandardSession; import org.eclipse.jetty.spdy.StandardStream; @@ -44,6 +45,7 @@ import org.eclipse.jetty.spdy.api.Handler; import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; +import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.SessionStatus; import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.SynInfo; @@ -142,8 +144,7 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection private Stream syn(boolean close) { - // TODO: stream id uniqueness - Stream stream = new HTTPStream(1, (byte)0, session); + Stream stream = new HTTPStream(1, (byte)0, session, null); proxyEngine.onSyn(stream, new SynInfo(headers, close)); return stream; } @@ -171,6 +172,13 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngine, null, null); } + @Override + public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler handler) + { + // Not much we can do in HTTP land: just close the connection + goAway(timeout, unit, handler); + } + @Override public void goAway(long timeout, TimeUnit unit, Handler handler) { @@ -193,24 +201,23 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection { private final Pattern statusRegexp = Pattern.compile("(\\d{3})\\s*(.*)"); - private HTTPStream(int id, byte priority, ISession session) + private HTTPStream(int id, byte priority, ISession session, IStream associatedStream) { - super(id, priority, session, null); + super(id, priority, session, associatedStream); } @Override public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler handler) { - // No support for pushed stream in HTTP, but we need to return a non-null stream anyway - // TODO - throw new UnsupportedOperationException(); + // HTTP does not support pushed streams + handler.completed(new HTTPPushStream(2, getPriority(), getSession(), this)); } @Override public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler handler) { // TODO - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Not Yet Implemented"); } @Override @@ -304,4 +311,26 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection getEndPoint().asyncDispatch(); } } + + private class HTTPPushStream extends StandardStream + { + private HTTPPushStream(int id, byte priority, ISession session, IStream associatedStream) + { + super(id, priority, session, associatedStream); + } + + @Override + public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler handler) + { + // Ignore pushed headers + handler.completed(null); + } + + @Override + public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler handler) + { + // Ignore pushed data + handler.completed(null); + } + } } diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java index 24b806fb1fc..b4413b8f323 100644 --- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java +++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java @@ -23,7 +23,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.spdy.SPDYClient; @@ -33,6 +32,7 @@ import org.eclipse.jetty.spdy.api.GoAwayInfo; import org.eclipse.jetty.spdy.api.Handler; import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.HeadersInfo; +import org.eclipse.jetty.spdy.api.PingInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.Session; @@ -50,9 +50,10 @@ import org.eclipse.jetty.spdy.http.HTTPSPDYHeader; public class SPDYProxyEngine extends ProxyEngine { private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler"; + private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream"; + private static final String CLIENT_SESSIONS_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientSessions"; - private final ConcurrentMap> serverSessions = new ConcurrentHashMap<>(); - private final ConcurrentMap> clientSessions = new ConcurrentHashMap<>(); + private final ConcurrentMap serverSessions = new ConcurrentHashMap<>(); private final SessionFrameListener sessionListener = new ProxySessionFrameListener(); private final SPDYClient.Factory factory; private volatile long connectTimeout = 15000; @@ -83,28 +84,28 @@ public class SPDYProxyEngine extends ProxyEngine this.timeout = timeout; } + @Override + public void onPing(Session clientSession, PingInfo pingInfo) + { + // We do not know to which upstream server + // to send the PING so we just ignore it + } + @Override public void onGoAway(Session clientSession, GoAwayInfo goAwayInfo) { - Set target = null; - for (Set sessions : clientSessions.values()) + for (Session serverSession : serverSessions.values()) { + @SuppressWarnings("unchecked") + Set sessions = (Set)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE); for (Session session : sessions) { if (session == clientSession) { - target = sessions; - break; + sessions.remove(session); + return; } } - if (target != null) - break; - } - if (target != null) - { - target.remove(clientSession); - // Do not remove the Set if it's empty: there is one Set per proxied - // host, so we can afford this small leak and avoid synchronization } } @@ -146,14 +147,8 @@ public class SPDYProxyEngine extends ProxyEngine return null; } - Set sessions = clientSessions.get(serverSession); - if (sessions == null) - { - sessions = Collections.newSetFromMap(new ConcurrentHashMap()); - Set existing = clientSessions.putIfAbsent(serverSession, sessions); - if (existing != null) - sessions = existing; - } + @SuppressWarnings("unchecked") + Set sessions = (Set)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE); sessions.add(clientSession); convert(clientVersion, serverVersion, headers); @@ -164,26 +159,10 @@ public class SPDYProxyEngine extends ProxyEngine logger.debug("P -> S {}", serverSynInfo); StreamFrameListener listener = new ProxyStreamFrameListener(clientStream); - if (serverSynInfo.isClose()) - { - serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, new Handler.Adapter() - { - @Override - public void failed(Stream context, Throwable x) - { - logger.debug(x); - rst(clientStream); - } - }); - return null; - } - else - { - StreamHandler streamHandler = new StreamHandler(clientStream); - clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, streamHandler); - serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, streamHandler); - return this; - } + StreamHandler handler = new StreamHandler(clientStream); + clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler); + serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, handler); + return this; } @Override @@ -196,7 +175,7 @@ public class SPDYProxyEngine extends ProxyEngine public void onHeaders(Stream stream, HeadersInfo headersInfo) { // TODO - throw new UnsupportedOperationException("Not yet implemented"); + throw new UnsupportedOperationException("Not Yet Implemented"); } @Override @@ -222,19 +201,20 @@ public class SPDYProxyEngine extends ProxyEngine { try { - Future session = serverSessions.get(host); + Session session = serverSessions.get(host); if (session == null) { SPDYClient client = factory.newSPDYClient(version); - session = client.connect(address, sessionListener); - Future existing = serverSessions.putIfAbsent(host, session); + session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS); + session.setAttribute(CLIENT_SESSIONS_ATTRIBUTE, Collections.newSetFromMap(new ConcurrentHashMap())); + Session existing = serverSessions.putIfAbsent(host, session); if (existing != null) { - session.cancel(true); + session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter()); session = existing; } } - return session.get(getConnectTimeout(), TimeUnit.MILLISECONDS); + return session; } catch (Exception x) { @@ -366,6 +346,8 @@ public class SPDYProxyEngine extends ProxyEngine @Override public void completed(Stream serverStream) { + serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream); + DataInfoHandler dataInfoHandler; synchronized (queue) { @@ -484,17 +466,80 @@ public class SPDYProxyEngine extends ProxyEngine } } - private class ProxySessionFrameListener extends SessionFrameListener.Adapter + private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener { + @Override + public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo) + { + logger.debug("S -> P pushed {} on {}", serverSynInfo, serverStream); + + Headers headers = new Headers(serverSynInfo.getHeaders(), false); + + Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE); + convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers); + + addResponseProxyHeaders(headers); + + StreamHandler handler = new StreamHandler(clientStream); + serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler); + clientStream.syn(new SynInfo(headers, serverSynInfo.isClose()), getTimeout(), TimeUnit.MILLISECONDS, handler); + return this; + } + + @Override + public void onRst(Session serverSession, RstInfo serverRstInfo) + { + Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId()); + if (serverStream != null) + { + Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE); + if (clientStream != null) + { + Session clientSession = clientStream.getSession(); + RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus()); + clientSession.rst(clientRstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter()); + } + } + } + @Override public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo) { - Set sessions = clientSessions.remove(serverSession); - if (sessions != null) + @SuppressWarnings("unchecked") + Set sessions = (Set)serverSession.removeAttribute(CLIENT_SESSIONS_ATTRIBUTE); + for (Session session : sessions) + session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter()); + } + + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + // Push streams never send a reply + } + + @Override + public void onHeaders(Stream stream, HeadersInfo headersInfo) + { + throw new UnsupportedOperationException(); + } + + @Override + public void onData(Stream serverStream, final DataInfo serverDataInfo) + { + logger.debug("S -> P pushed {} on {}", serverDataInfo, serverStream); + + ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose()) { - for (Session session : sessions) - session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter()); - } + @Override + public void consume(int delta) + { + super.consume(delta); + serverDataInfo.consume(delta); + } + }; + + StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE); + handler.data(clientDataInfo); } } } diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java index 782f91ce481..4bf8e02f6ab 100644 --- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java +++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java @@ -17,6 +17,7 @@ package org.eclipse.jetty.spdy.proxy; import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; @@ -32,13 +33,17 @@ import org.eclipse.jetty.spdy.ServerSPDYAsyncConnectionFactory; import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo; +import org.eclipse.jetty.spdy.api.Handler; import org.eclipse.jetty.spdy.api.Headers; +import org.eclipse.jetty.spdy.api.PingInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; +import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.SessionFrameListener; import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.StreamFrameListener; +import org.eclipse.jetty.spdy.api.StreamStatus; import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.spdy.http.HTTPSPDYHeader; @@ -553,4 +558,298 @@ public class ProxyHTTPSPDYv2Test client.goAway().get(5, TimeUnit.SECONDS); } + + @Test + public void testSYNThenREPLYAndDATA() throws Exception + { + final byte[] data = "0123456789ABCDEF".getBytes("UTF-8"); + final String header = "foo"; + InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + Headers requestHeaders = synInfo.getHeaders(); + Assert.assertNotNull(requestHeaders.get("via")); + Assert.assertNotNull(requestHeaders.get(header)); + + Headers responseHeaders = new Headers(); + responseHeaders.put(header, "baz"); + stream.reply(new ReplyInfo(responseHeaders, false)); + stream.data(new BytesDataInfo(data, true)); + return null; + } + })); + proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version())); + + Session client = factory.newSPDYClient(version()).connect(proxyAddress, null).get(5, TimeUnit.SECONDS); + + final CountDownLatch replyLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + Headers headers = new Headers(); + headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort()); + headers.put(header, "bar"); + client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter() + { + private final ByteArrayOutputStream result = new ByteArrayOutputStream(); + + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + Headers headers = replyInfo.getHeaders(); + Assert.assertNotNull(headers.get(header)); + replyLatch.countDown(); + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + result.write(dataInfo.asBytes(true), 0, dataInfo.length()); + if (dataInfo.isClose()) + { + Assert.assertArrayEquals(data, result.toByteArray()); + dataLatch.countDown(); + } + } + }); + + Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + + client.goAway().get(5, TimeUnit.SECONDS); + } + + @Test + public void testGETThenSPDYPushIsIgnored() throws Exception + { + final byte[] data = "0123456789ABCDEF".getBytes("UTF-8"); + InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + Headers responseHeaders = new Headers(); + responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1"); + responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK"); + + Headers pushHeaders = new Headers(); + pushHeaders.put(HTTPSPDYHeader.URI.name(version()), "/push"); + stream.syn(new SynInfo(pushHeaders, false), 5, TimeUnit.SECONDS, new Handler.Adapter() + { + @Override + public void completed(Stream pushStream) + { + pushStream.data(new BytesDataInfo(data, true)); + } + }); + + stream.reply(new ReplyInfo(responseHeaders, true)); + return null; + } + })); + + Socket client = new Socket(); + client.connect(proxyAddress); + OutputStream output = client.getOutputStream(); + + String request = "" + + "GET / HTTP/1.1\r\n" + + "Host: localhost:" + proxyAddress.getPort() + "\r\n" + + "\r\n"; + output.write(request.getBytes("UTF-8")); + output.flush(); + + client.setSoTimeout(1000); + InputStream input = client.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8")); + String line = reader.readLine(); + Assert.assertTrue(line.contains(" 200")); + while (line.length() > 0) + line = reader.readLine(); + Assert.assertFalse(reader.ready()); + + client.close(); + } + + @Test + public void testSYNThenSPDYPushIsReceived() throws Exception + { + final byte[] data = "0123456789ABCDEF".getBytes("UTF-8"); + InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + Headers responseHeaders = new Headers(); + responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1"); + responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK"); + stream.reply(new ReplyInfo(responseHeaders, false)); + + Headers pushHeaders = new Headers(); + pushHeaders.put(HTTPSPDYHeader.URI.name(version()), "/push"); + stream.syn(new SynInfo(pushHeaders, false), 5, TimeUnit.SECONDS, new Handler.Adapter() + { + @Override + public void completed(Stream pushStream) + { + pushStream.data(new BytesDataInfo(data, true)); + } + }); + + stream.data(new BytesDataInfo(data, true)); + + return null; + } + })); + proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version())); + + final CountDownLatch pushSynLatch = new CountDownLatch(1); + final CountDownLatch pushDataLatch = new CountDownLatch(1); + Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + pushSynLatch.countDown(); + return new StreamFrameListener.Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataInfo.consume(dataInfo.length()); + if (dataInfo.isClose()) + pushDataLatch.countDown(); + } + }; + } + }).get(5, TimeUnit.SECONDS); + + Headers headers = new Headers(); + headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort()); + final CountDownLatch replyLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + replyLatch.countDown(); + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataInfo.consume(dataInfo.length()); + if (dataInfo.isClose()) + dataLatch.countDown(); + } + }); + + Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(pushSynLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(pushDataLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + + client.goAway().get(5, TimeUnit.SECONDS); + } + + @Test + public void testPING() throws Exception + { + // PING is per hop, and it does not carry the information to which server to ping to + // We just verify that it works + + InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter())); + proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version())); + + final CountDownLatch pingLatch = new CountDownLatch(1); + Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter() + { + @Override + public void onPing(Session session, PingInfo pingInfo) + { + pingLatch.countDown(); + } + }).get(5, TimeUnit.SECONDS); + + client.ping().get(5, TimeUnit.SECONDS); + + Assert.assertTrue(pingLatch.await(5, TimeUnit.SECONDS)); + + client.goAway().get(5, TimeUnit.SECONDS); + } + + @Test + public void testGETThenReset() throws Exception + { + InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + Assert.assertTrue(synInfo.isClose()); + Headers requestHeaders = synInfo.getHeaders(); + Assert.assertNotNull(requestHeaders.get("via")); + + stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM)); + + return null; + } + })); + + Socket client = new Socket(); + client.connect(proxyAddress); + OutputStream output = client.getOutputStream(); + + String request = "" + + "GET / HTTP/1.1\r\n" + + "Host: localhost:" + proxyAddress.getPort() + "\r\n" + + "\r\n"; + output.write(request.getBytes("UTF-8")); + output.flush(); + + InputStream input = client.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8")); + Assert.assertNull(reader.readLine()); + + client.close(); + } + + @Test + public void testSYNThenReset() throws Exception + { + InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + Assert.assertTrue(synInfo.isClose()); + Headers requestHeaders = synInfo.getHeaders(); + Assert.assertNotNull(requestHeaders.get("via")); + + stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM)); + + return null; + } + })); + proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version())); + + final CountDownLatch resetLatch = new CountDownLatch(1); + Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter() + { + @Override + public void onRst(Session session, RstInfo rstInfo) + { + resetLatch.countDown(); + } + }).get(5, TimeUnit.SECONDS); + + Headers headers = new Headers(); + headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort()); + client.syn(new SynInfo(headers, true), null); + + Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + + client.goAway().get(5, TimeUnit.SECONDS); + } }