Second pass at the implementation of a reverse SPDY proxy.

This commit is contained in:
Simone Bordet 2012-06-11 20:46:43 +02:00
parent 479c957a68
commit 8f356ea922
6 changed files with 498 additions and 70 deletions

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -80,6 +81,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
} }
}; };
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private final List<Listener> listeners = new CopyOnWriteArrayList<>(); private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final LinkedList<FrameBytes> queue = new LinkedList<>(); private final LinkedList<FrameBytes> queue = new LinkedList<>();
@ -209,7 +211,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler) public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{ {
SettingsFrame frame = new SettingsFrame(version,settingsInfo.getFlags(),settingsInfo.getSettings()); SettingsFrame frame = new SettingsFrame(version,settingsInfo.getFlags(),settingsInfo.getSettings());
control(null,frame,timeout,unit,handler,null); control(null, frame, timeout, unit, handler, null);
} }
@Override @Override
@ -245,7 +247,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override @Override
public void goAway(long timeout, TimeUnit unit, Handler<Void> handler) public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
{ {
goAway(SessionStatus.OK,timeout,unit,handler); goAway(SessionStatus.OK, timeout, unit, handler);
} }
private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Handler<Void> handler) private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Handler<Void> handler)
@ -270,6 +272,30 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
return result; return result;
} }
@Override
public IStream getStream(int streamId)
{
return streams.get(streamId);
}
@Override
public Object getAttribute(String key)
{
return attributes.get(key);
}
@Override
public void setAttribute(String key, Object value)
{
attributes.put(key, value);
}
@Override
public Object removeAttribute(String key)
{
return attributes.remove(key);
}
@Override @Override
public void onControlFrame(ControlFrame frame) public void onControlFrame(ControlFrame frame)
{ {

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener; import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus; import org.eclipse.jetty.spdy.api.StreamStatus;
@ -113,7 +112,7 @@ public class StandardStream implements IStream
} }
@Override @Override
public Session getSession() public ISession getSession()
{ {
return session; return session;
} }

View File

@ -180,10 +180,40 @@ public interface Session
public void goAway(long timeout, TimeUnit unit, Handler<Void> handler); public void goAway(long timeout, TimeUnit unit, Handler<Void> handler);
/** /**
* @return the streams currently active in this session * @return a snapshot of the streams currently active in this session
* @see #getStream(int)
*/ */
public Set<Stream> getStreams(); public Set<Stream> getStreams();
/**
* @param streamId the id of the stream to retrieve
* @return the stream with the given stream id
* @see #getStreams()
*/
public Stream getStream(int streamId);
/**
* @param key the attribute key
* @return an arbitrary object associated with the given key to this session
* @see #setAttribute(String, Object)
*/
public Object getAttribute(String key);
/**
* @param key the attribute key
* @param value an arbitrary object to associate with the given key to this session
* @see #getAttribute(String)
* @see #removeAttribute(String)
*/
public void setAttribute(String key, Object value);
/**
* @param key the attribute key
* @return the arbitrary object associated with the given key to this session
* @see #setAttribute(String, Object)
*/
public Object removeAttribute(String key);
/** /**
* <p>Super interface for listeners with callbacks that are invoked on specific session events.</p> * <p>Super interface for listeners with callbacks that are invoked on specific session events.</p>
*/ */

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.nio.NIOBuffer; import org.eclipse.jetty.io.nio.NIOBuffer;
import org.eclipse.jetty.server.AsyncHttpConnection; import org.eclipse.jetty.server.AsyncHttpConnection;
import org.eclipse.jetty.spdy.ISession; import org.eclipse.jetty.spdy.ISession;
import org.eclipse.jetty.spdy.IStream;
import org.eclipse.jetty.spdy.SPDYServerConnector; import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.StandardSession; import org.eclipse.jetty.spdy.StandardSession;
import org.eclipse.jetty.spdy.StandardStream; import org.eclipse.jetty.spdy.StandardStream;
@ -44,6 +45,7 @@ import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SessionStatus; import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.api.SynInfo;
@ -142,8 +144,7 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
private Stream syn(boolean close) private Stream syn(boolean close)
{ {
// TODO: stream id uniqueness Stream stream = new HTTPStream(1, (byte)0, session, null);
Stream stream = new HTTPStream(1, (byte)0, session);
proxyEngine.onSyn(stream, new SynInfo(headers, close)); proxyEngine.onSyn(stream, new SynInfo(headers, close));
return stream; return stream;
} }
@ -171,6 +172,13 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngine, null, null); super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngine, null, null);
} }
@Override
public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
// Not much we can do in HTTP land: just close the connection
goAway(timeout, unit, handler);
}
@Override @Override
public void goAway(long timeout, TimeUnit unit, Handler<Void> handler) public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
{ {
@ -193,24 +201,23 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
{ {
private final Pattern statusRegexp = Pattern.compile("(\\d{3})\\s*(.*)"); private final Pattern statusRegexp = Pattern.compile("(\\d{3})\\s*(.*)");
private HTTPStream(int id, byte priority, ISession session) private HTTPStream(int id, byte priority, ISession session, IStream associatedStream)
{ {
super(id, priority, session, null); super(id, priority, session, associatedStream);
} }
@Override @Override
public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler) public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
{ {
// No support for pushed stream in HTTP, but we need to return a non-null stream anyway // HTTP does not support pushed streams
// TODO handler.completed(new HTTPPushStream(2, getPriority(), getSession(), this));
throw new UnsupportedOperationException();
} }
@Override @Override
public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler) public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{ {
// TODO // TODO
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("Not Yet Implemented");
} }
@Override @Override
@ -304,4 +311,26 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
getEndPoint().asyncDispatch(); getEndPoint().asyncDispatch();
} }
} }
private class HTTPPushStream extends StandardStream
{
private HTTPPushStream(int id, byte priority, ISession session, IStream associatedStream)
{
super(id, priority, session, associatedStream);
}
@Override
public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
// Ignore pushed headers
handler.completed(null);
}
@Override
public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
// Ignore pushed data
handler.completed(null);
}
}
} }

View File

@ -23,7 +23,6 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.SPDYClient; import org.eclipse.jetty.spdy.SPDYClient;
@ -33,6 +32,7 @@ import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler; import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.Session;
@ -50,9 +50,10 @@ import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
public class SPDYProxyEngine extends ProxyEngine public class SPDYProxyEngine extends ProxyEngine
{ {
private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler"; private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream";
private static final String CLIENT_SESSIONS_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientSessions";
private final ConcurrentMap<String, Future<Session>> serverSessions = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
private final ConcurrentMap<Session, Set<Session>> clientSessions = new ConcurrentHashMap<>();
private final SessionFrameListener sessionListener = new ProxySessionFrameListener(); private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
private final SPDYClient.Factory factory; private final SPDYClient.Factory factory;
private volatile long connectTimeout = 15000; private volatile long connectTimeout = 15000;
@ -83,28 +84,28 @@ public class SPDYProxyEngine extends ProxyEngine
this.timeout = timeout; this.timeout = timeout;
} }
@Override
public void onPing(Session clientSession, PingInfo pingInfo)
{
// We do not know to which upstream server
// to send the PING so we just ignore it
}
@Override @Override
public void onGoAway(Session clientSession, GoAwayInfo goAwayInfo) public void onGoAway(Session clientSession, GoAwayInfo goAwayInfo)
{ {
Set<Session> target = null; for (Session serverSession : serverSessions.values())
for (Set<Session> sessions : clientSessions.values())
{ {
@SuppressWarnings("unchecked")
Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE);
for (Session session : sessions) for (Session session : sessions)
{ {
if (session == clientSession) if (session == clientSession)
{ {
target = sessions; sessions.remove(session);
break; return;
} }
} }
if (target != null)
break;
}
if (target != null)
{
target.remove(clientSession);
// Do not remove the Set if it's empty: there is one Set per proxied
// host, so we can afford this small leak and avoid synchronization
} }
} }
@ -146,14 +147,8 @@ public class SPDYProxyEngine extends ProxyEngine
return null; return null;
} }
Set<Session> sessions = clientSessions.get(serverSession); @SuppressWarnings("unchecked")
if (sessions == null) Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE);
{
sessions = Collections.newSetFromMap(new ConcurrentHashMap<Session, Boolean>());
Set<Session> existing = clientSessions.putIfAbsent(serverSession, sessions);
if (existing != null)
sessions = existing;
}
sessions.add(clientSession); sessions.add(clientSession);
convert(clientVersion, serverVersion, headers); convert(clientVersion, serverVersion, headers);
@ -164,27 +159,11 @@ public class SPDYProxyEngine extends ProxyEngine
logger.debug("P -> S {}", serverSynInfo); logger.debug("P -> S {}", serverSynInfo);
StreamFrameListener listener = new ProxyStreamFrameListener(clientStream); StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
if (serverSynInfo.isClose()) StreamHandler handler = new StreamHandler(clientStream);
{ clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>() serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, handler);
{
@Override
public void failed(Stream context, Throwable x)
{
logger.debug(x);
rst(clientStream);
}
});
return null;
}
else
{
StreamHandler streamHandler = new StreamHandler(clientStream);
clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, streamHandler);
serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, streamHandler);
return this; return this;
} }
}
@Override @Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
@ -196,7 +175,7 @@ public class SPDYProxyEngine extends ProxyEngine
public void onHeaders(Stream stream, HeadersInfo headersInfo) public void onHeaders(Stream stream, HeadersInfo headersInfo)
{ {
// TODO // TODO
throw new UnsupportedOperationException("Not yet implemented"); throw new UnsupportedOperationException("Not Yet Implemented");
} }
@Override @Override
@ -222,19 +201,20 @@ public class SPDYProxyEngine extends ProxyEngine
{ {
try try
{ {
Future<Session> session = serverSessions.get(host); Session session = serverSessions.get(host);
if (session == null) if (session == null)
{ {
SPDYClient client = factory.newSPDYClient(version); SPDYClient client = factory.newSPDYClient(version);
session = client.connect(address, sessionListener); session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
Future<Session> existing = serverSessions.putIfAbsent(host, session); session.setAttribute(CLIENT_SESSIONS_ATTRIBUTE, Collections.newSetFromMap(new ConcurrentHashMap<Session, Boolean>()));
Session existing = serverSessions.putIfAbsent(host, session);
if (existing != null) if (existing != null)
{ {
session.cancel(true); session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
session = existing; session = existing;
} }
} }
return session.get(getConnectTimeout(), TimeUnit.MILLISECONDS); return session;
} }
catch (Exception x) catch (Exception x)
{ {
@ -366,6 +346,8 @@ public class SPDYProxyEngine extends ProxyEngine
@Override @Override
public void completed(Stream serverStream) public void completed(Stream serverStream)
{ {
serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
DataInfoHandler dataInfoHandler; DataInfoHandler dataInfoHandler;
synchronized (queue) synchronized (queue)
{ {
@ -484,17 +466,80 @@ public class SPDYProxyEngine extends ProxyEngine
} }
} }
private class ProxySessionFrameListener extends SessionFrameListener.Adapter private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener
{ {
@Override
public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo)
{
logger.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
Headers headers = new Headers(serverSynInfo.getHeaders(), false);
Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE);
convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers);
addResponseProxyHeaders(headers);
StreamHandler handler = new StreamHandler(clientStream);
serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
clientStream.syn(new SynInfo(headers, serverSynInfo.isClose()), getTimeout(), TimeUnit.MILLISECONDS, handler);
return this;
}
@Override
public void onRst(Session serverSession, RstInfo serverRstInfo)
{
Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
if (serverStream != null)
{
Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
if (clientStream != null)
{
Session clientSession = clientStream.getSession();
RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
clientSession.rst(clientRstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
}
}
}
@Override @Override
public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo) public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
{ {
Set<Session> sessions = clientSessions.remove(serverSession); @SuppressWarnings("unchecked")
if (sessions != null) Set<Session> sessions = (Set<Session>)serverSession.removeAttribute(CLIENT_SESSIONS_ATTRIBUTE);
{
for (Session session : sessions) for (Session session : sessions)
session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>()); session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
} }
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Push streams never send a reply
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
throw new UnsupportedOperationException();
}
@Override
public void onData(Stream serverStream, final DataInfo serverDataInfo)
{
logger.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
serverDataInfo.consume(delta);
}
};
StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
handler.data(clientDataInfo);
} }
} }
} }

View File

@ -17,6 +17,7 @@
package org.eclipse.jetty.spdy.proxy; package org.eclipse.jetty.spdy.proxy;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.OutputStream; import java.io.OutputStream;
@ -32,13 +33,17 @@ import org.eclipse.jetty.spdy.ServerSPDYAsyncConnectionFactory;
import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo; import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener; import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener; import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader; import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
@ -553,4 +558,298 @@ public class ProxyHTTPSPDYv2Test
client.goAway().get(5, TimeUnit.SECONDS); client.goAway().get(5, TimeUnit.SECONDS);
} }
@Test
public void testSYNThenREPLYAndDATA() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
final String header = "foo";
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers requestHeaders = synInfo.getHeaders();
Assert.assertNotNull(requestHeaders.get("via"));
Assert.assertNotNull(requestHeaders.get(header));
Headers responseHeaders = new Headers();
responseHeaders.put(header, "baz");
stream.reply(new ReplyInfo(responseHeaders, false));
stream.data(new BytesDataInfo(data, true));
return null;
}
}));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
Session client = factory.newSPDYClient(version()).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
final CountDownLatch replyLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
headers.put(header, "bar");
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
private final ByteArrayOutputStream result = new ByteArrayOutputStream();
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Headers headers = replyInfo.getHeaders();
Assert.assertNotNull(headers.get(header));
replyLatch.countDown();
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
result.write(dataInfo.asBytes(true), 0, dataInfo.length());
if (dataInfo.isClose())
{
Assert.assertArrayEquals(data, result.toByteArray());
dataLatch.countDown();
}
}
});
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
client.goAway().get(5, TimeUnit.SECONDS);
}
@Test
public void testGETThenSPDYPushIsIgnored() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
Headers pushHeaders = new Headers();
pushHeaders.put(HTTPSPDYHeader.URI.name(version()), "/push");
stream.syn(new SynInfo(pushHeaders, false), 5, TimeUnit.SECONDS, new Handler.Adapter<Stream>()
{
@Override
public void completed(Stream pushStream)
{
pushStream.data(new BytesDataInfo(data, true));
}
});
stream.reply(new ReplyInfo(responseHeaders, true));
return null;
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
client.setSoTimeout(1000);
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
Assert.assertFalse(reader.ready());
client.close();
}
@Test
public void testSYNThenSPDYPushIsReceived() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
stream.reply(new ReplyInfo(responseHeaders, false));
Headers pushHeaders = new Headers();
pushHeaders.put(HTTPSPDYHeader.URI.name(version()), "/push");
stream.syn(new SynInfo(pushHeaders, false), 5, TimeUnit.SECONDS, new Handler.Adapter<Stream>()
{
@Override
public void completed(Stream pushStream)
{
pushStream.data(new BytesDataInfo(data, true));
}
});
stream.data(new BytesDataInfo(data, true));
return null;
}
}));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
final CountDownLatch pushSynLatch = new CountDownLatch(1);
final CountDownLatch pushDataLatch = new CountDownLatch(1);
Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
pushSynLatch.countDown();
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
pushDataLatch.countDown();
}
};
}
}).get(5, TimeUnit.SECONDS);
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
final CountDownLatch replyLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyLatch.countDown();
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
dataLatch.countDown();
}
});
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(pushSynLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(pushDataLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
client.goAway().get(5, TimeUnit.SECONDS);
}
@Test
public void testPING() throws Exception
{
// PING is per hop, and it does not carry the information to which server to ping to
// We just verify that it works
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
final CountDownLatch pingLatch = new CountDownLatch(1);
Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
{
@Override
public void onPing(Session session, PingInfo pingInfo)
{
pingLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
client.ping().get(5, TimeUnit.SECONDS);
Assert.assertTrue(pingLatch.await(5, TimeUnit.SECONDS));
client.goAway().get(5, TimeUnit.SECONDS);
}
@Test
public void testGETThenReset() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Assert.assertTrue(synInfo.isClose());
Headers requestHeaders = synInfo.getHeaders();
Assert.assertNotNull(requestHeaders.get("via"));
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
return null;
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
Assert.assertNull(reader.readLine());
client.close();
}
@Test
public void testSYNThenReset() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Assert.assertTrue(synInfo.isClose());
Headers requestHeaders = synInfo.getHeaders();
Assert.assertNotNull(requestHeaders.get("via"));
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
return null;
}
}));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
final CountDownLatch resetLatch = new CountDownLatch(1);
Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
{
@Override
public void onRst(Session session, RstInfo rstInfo)
{
resetLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
client.syn(new SynInfo(headers, true), null);
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
client.goAway().get(5, TimeUnit.SECONDS);
}
} }