spdy: spdy-proxy refactoring to allow multiple ProxyEngine implementations for different protocols. Header name fix to lowercase header names before creating the bytes to sent over the wire.

This commit is contained in:
Thomas Becker 2012-07-13 11:53:56 +02:00
parent b442fef60b
commit 198f713f7d
12 changed files with 307 additions and 284 deletions

View File

@ -138,6 +138,11 @@ public class StandardStream implements IStream
this.listener = listener;
}
public StreamFrameListener getStreamFrameListener()
{
return listener;
}
@Override
public void updateCloseState(boolean close, boolean local)
{

View File

@ -216,13 +216,15 @@ public class Headers implements Iterable<Headers.Header>
if (obj == null || getClass() != obj.getClass())
return false;
Header that = (Header)obj;
return name.equals(that.name) && Arrays.equals(values, that.values);
// Header names must be lowercase, thus we lowercase them before transmission, but keep them as is
// internally. That's why we've to compare them case insensitive.
return name.equalsIgnoreCase(that.name) && Arrays.equals(values, that.values);
}
@Override
public int hashCode()
{
int result = name.hashCode();
int result = name.toLowerCase().hashCode();
result = 31 * result + Arrays.hashCode(values);
return result;
}

View File

@ -40,7 +40,7 @@ public class HeadersBlockGenerator
writeCount(version, buffer, headers.size());
for (Headers.Header header : headers)
{
String name = header.name();
String name = header.name().toLowerCase();
byte[] nameBytes = name.getBytes(iso1);
writeNameLength(version, buffer, nameBytes.length);
buffer.write(nameBytes, 0, nameBytes.length);

View File

@ -22,65 +22,77 @@ import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
public class HeadersGenerateParseTest
{
@Test
public void testGenerateParse() throws Exception
private Headers headers = new Headers();
private int streamId = 13;
private byte flags = HeadersInfo.FLAG_RESET_COMPRESSION;
private final TestSPDYParserListener listener = new TestSPDYParserListener();
private final Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
private ByteBuffer buffer;
@Before
public void setUp()
{
byte flags = HeadersInfo.FLAG_RESET_COMPRESSION;
int streamId = 13;
Headers headers = new Headers();
parser.addListener(listener);
headers.put("a", "b");
buffer = createHeadersFrameBuffer(headers);
}
private ByteBuffer createHeadersFrameBuffer(Headers headers)
{
HeadersFrame frame1 = new HeadersFrame(SPDY.V2, flags, streamId, headers);
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
assertThat("Buffer is not null", buffer, notNullValue());
return buffer;
}
Assert.assertNotNull(buffer);
TestSPDYParserListener listener = new TestSPDYParserListener();
Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
parser.addListener(listener);
@Test
public void testGenerateParse() throws Exception
{
parser.parse(buffer);
ControlFrame frame2 = listener.getControlFrame();
Assert.assertNotNull(frame2);
Assert.assertEquals(ControlFrameType.HEADERS, frame2.getType());
HeadersFrame headersFrame = (HeadersFrame)frame2;
Assert.assertEquals(SPDY.V2, headersFrame.getVersion());
Assert.assertEquals(streamId, headersFrame.getStreamId());
Assert.assertEquals(flags, headersFrame.getFlags());
Assert.assertEquals(headers, headersFrame.getHeaders());
assertExpectationsAreMet(headers);
}
@Test
public void testGenerateParseOneByteAtATime() throws Exception
{
byte flags = HeadersInfo.FLAG_RESET_COMPRESSION;
int streamId = 13;
Headers headers = new Headers();
headers.put("a", "b");
HeadersFrame frame1 = new HeadersFrame(SPDY.V2, flags, streamId, headers);
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);
TestSPDYParserListener listener = new TestSPDYParserListener();
Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
parser.addListener(listener);
while (buffer.hasRemaining())
parser.parse(ByteBuffer.wrap(new byte[]{buffer.get()}));
ControlFrame frame2 = listener.getControlFrame();
Assert.assertNotNull(frame2);
Assert.assertEquals(ControlFrameType.HEADERS, frame2.getType());
HeadersFrame headersFrame = (HeadersFrame)frame2;
Assert.assertEquals(SPDY.V2, headersFrame.getVersion());
Assert.assertEquals(streamId, headersFrame.getStreamId());
Assert.assertEquals(flags, headersFrame.getFlags());
Assert.assertEquals(headers, headersFrame.getHeaders());
assertExpectationsAreMet(headers);
}
@Test
public void testHeadersAreTranslatedToLowerCase()
{
Headers headers = new Headers();
headers.put("Via","localhost");
parser.parse(createHeadersFrameBuffer(headers));
HeadersFrame parsedHeadersFrame = assertExpectationsAreMet(headers);
Headers.Header viaHeader = parsedHeadersFrame.getHeaders().get("via");
assertThat("Via Header name is lowercase", viaHeader.name(), is("via"));
}
private HeadersFrame assertExpectationsAreMet(Headers headers)
{
ControlFrame parsedControlFrame = listener.getControlFrame();
assertThat("listener received controlFrame", parsedControlFrame, notNullValue());
assertThat("ControlFrame type is HEADERS", ControlFrameType.HEADERS, is(parsedControlFrame.getType()));
HeadersFrame headersFrame = (HeadersFrame)parsedControlFrame;
assertThat("Version matches", SPDY.V2, is(headersFrame.getVersion()));
assertThat("StreamId matches", streamId, is(headersFrame.getStreamId()));
assertThat("flags match", flags, is(headersFrame.getFlags()));
assertThat("headers match", headers, is(headersFrame.getHeaders()));
return headersFrame;
}
}

View File

@ -32,24 +32,34 @@
</Call>
<!--
The ProxyEngine receives SPDY/x(HTTP) requests from proxy connectors below
and is configured to process requests for host "localhost".
Such requests are converted from SPDY/x(HTTP) to SPDY/2(HTTP) and forwarded
to 127.0.0.1:9090, where they are served by the upstream server above.
This ProxyEngine translates the incoming SPDY/x(HTTP) request to SPDY/2(HTTP)
-->
<New id="proxyEngine" class="org.eclipse.jetty.spdy.proxy.SPDYProxyEngine">
<New id="spdyProxyEngine" class="org.eclipse.jetty.spdy.proxy.SPDYProxyEngine">
<Arg>
<New class="org.eclipse.jetty.spdy.SPDYClient$Factory">
<Call name="start" />
<Call name="start"/>
</New>
</Arg>
<Set name="proxyInfos">
</New>
<!--
The ProxyEngineSelector receives SPDY/x(HTTP) requests from proxy connectors below
and is configured to process requests for host "localhost".
Such requests are converted from SPDY/x(HTTP) to SPDY/2(HTTP) by the configured ProxyEngine
and forwarded to 127.0.0.1:9090, where they are served by the upstream server above.
-->
<New id="proxyEngineSelector" class="org.eclipse.jetty.spdy.proxy.ProxyEngineSelector">
<Call name="putProxyEngine">
<Arg>spdy/2</Arg>
<Arg><Ref id="spdyProxyEngine" /></Arg>
</Call>
<Set name="proxyServerInfos">
<Map>
<Entry>
<Item>localhost</Item>
<Item>
<New class="org.eclipse.jetty.spdy.proxy.ProxyEngine$ProxyInfo">
<Arg type="short">2</Arg>
<New class="org.eclipse.jetty.spdy.proxy.ProxyEngineSelector$ProxyServerInfo">
<Arg type="String">spdy/2</Arg>
<Arg>127.0.0.1</Arg>
<Arg type="int">9090</Arg>
</New>
@ -69,7 +79,7 @@
<Call name="addConnector">
<Arg>
<New class="org.eclipse.jetty.spdy.proxy.HTTPSPDYProxyConnector">
<Arg><Ref id="proxyEngine" /></Arg>
<Arg><Ref id="proxyEngineSelector" /></Arg>
<Set name="Port">8080</Set>
</New>
</Arg>
@ -77,7 +87,7 @@
<Call name="addConnector">
<Arg>
<New class="org.eclipse.jetty.spdy.proxy.HTTPSPDYProxyConnector">
<Arg><Ref id="proxyEngine" /></Arg>
<Arg><Ref id="proxyEngineSelector" /></Arg>
<Arg><Ref id="sslContextFactory" /></Arg>
<Set name="Port">8443</Set>
</New>

View File

@ -21,19 +21,19 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HTTPSPDYProxyConnector extends AbstractHTTPSPDYServerConnector
{
public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine)
public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector)
{
this(proxyEngine, null);
this(proxyEngineSelector, null);
}
public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine, SslContextFactory sslContextFactory)
public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector, SslContextFactory sslContextFactory)
{
super(proxyEngine, sslContextFactory);
super(proxyEngineSelector, sslContextFactory);
clearAsyncConnectionFactories();
putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine));
putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine));
putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V3, proxyEngine));
putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector));
putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector));
putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V2, proxyEngineSelector));
setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("http/1.1"));
}
}

View File

@ -15,32 +15,25 @@
package org.eclipse.jetty.spdy.proxy;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>{@link ProxyEngine} is the base class for SPDY proxy functionalities, that is a proxy that
* accepts SPDY from its client side and converts to any protocol to its server side.</p>
* <p>{@link ProxyEngine} is the class for SPDY proxy functionalities that receives a SPDY request and converts it to
* any protocol to its server side.</p>
* <p>This class listens for SPDY events sent by clients; subclasses are responsible for translating
* these SPDY client events into appropriate events to forward to the server, in the appropriate
* protocol that is understood by the server.</p>
* <p>This class also provides configuration for the proxy rules.</p>
*/
public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter implements StreamFrameListener
public abstract class ProxyEngine
{
protected final Logger logger = Log.getLogger(getClass());
private final ConcurrentMap<String, ProxyInfo> proxyInfos = new ConcurrentHashMap<>();
private final String name;
protected ProxyEngine()
@ -60,6 +53,8 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp
}
}
public abstract StreamFrameListener proxy(Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo);
protected ProxyEngine(String name)
{
this.name = name;
@ -96,46 +91,4 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp
{
}
public Map<String, ProxyInfo> getProxyInfos()
{
return new HashMap<>(proxyInfos);
}
public void setProxyInfos(Map<String, ProxyInfo> proxyInfos)
{
this.proxyInfos.clear();
this.proxyInfos.putAll(proxyInfos);
}
public void putProxyInfo(String host, ProxyInfo proxyInfo)
{
proxyInfos.put(host, proxyInfo);
}
protected ProxyInfo getProxyInfo(String host)
{
return proxyInfos.get(host);
}
public static class ProxyInfo
{
private final short version;
private final InetSocketAddress address;
public ProxyInfo(short version, String host, int port)
{
this.version = version;
this.address = new InetSocketAddress(host, port);
}
public short getVersion()
{
return version;
}
public InetSocketAddress getAddress()
{
return address;
}
}
}

View File

@ -0,0 +1,166 @@
package org.eclipse.jetty.spdy.proxy;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>{@link ProxyEngineSelector} is the main entry point for syn stream events of a jetty SPDY proxy. It receives the
* syn stream frames from the clients, checks if there's an appropriate {@link ProxyServerInfo} for the given target
* host and forwards the syn to a {@link ProxyEngine} for the protocol defined in {@link ProxyServerInfo}.</p>
*
* <p>If no {@link ProxyServerInfo} can be found for the given target host or no {@link ProxyEngine} can be found for
* the given protocol, it resets the client stream.</p>
*
* <p>This class also provides configuration for the proxy rules.</p>
*/
public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter
{
protected final Logger logger = Log.getLogger(getClass());
private final Map<String, ProxyServerInfo> proxyInfos = new ConcurrentHashMap<>();
private final Map<String, ProxyEngine> proxyEngines = new ConcurrentHashMap<>();
@Override
public final StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo)
{
logger.debug("C -> P {} on {}", clientSynInfo, clientStream);
final Session clientSession = clientStream.getSession();
short clientVersion = clientSession.getVersion();
Headers headers = new Headers(clientSynInfo.getHeaders(), false);
Headers.Header hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion));
if (hostHeader == null)
{
rst(clientStream);
return null;
}
String host = hostHeader.value();
int colon = host.indexOf(':');
if (colon >= 0)
host = host.substring(0, colon);
ProxyServerInfo proxyServerInfo = getProxyServerInfo(host);
if (proxyServerInfo == null)
{
rst(clientStream);
return null;
}
String protocol = proxyServerInfo.getProtocol();
ProxyEngine proxyEngine = proxyEngines.get(protocol);
if (proxyEngine == null)
{
rst(clientStream);
return null;
}
return proxyEngine.proxy(clientStream, clientSynInfo, proxyServerInfo);
}
@Override
public void onPing(Session clientSession, PingInfo pingInfo)
{
// We do not know to which upstream server
// to send the PING so we just ignore it
}
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
// TODO:
}
public Map<String, ProxyEngine> getProxyEngines()
{
return new HashMap<>(proxyEngines);
}
public void setProxyEngines(Map<String, ProxyEngine> proxyEngines)
{
this.proxyEngines.clear();
this.proxyEngines.putAll(proxyEngines);
}
public ProxyEngine getProxyEngine(String protocol)
{
return proxyEngines.get(protocol);
}
public void putProxyEngine(String protocol, ProxyEngine proxyEngine)
{
proxyEngines.put(protocol, proxyEngine);
}
public Map<String, ProxyServerInfo> getProxyServerInfos()
{
return new HashMap<>(proxyInfos);
}
protected ProxyServerInfo getProxyServerInfo(String host)
{
return proxyInfos.get(host);
}
public void setProxyServerInfos(Map<String, ProxyServerInfo> proxyServerInfos)
{
this.proxyInfos.clear();
this.proxyInfos.putAll(proxyServerInfos);
}
public void putProxyServerInfo(String host, ProxyServerInfo proxyServerInfo)
{
proxyInfos.put(host, proxyServerInfo);
}
private void rst(Stream stream)
{
RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
stream.getSession().rst(rstInfo);
}
public static class ProxyServerInfo
{
private final String protocol;
private final String host;
private final InetSocketAddress address;
public ProxyServerInfo(String protocol, String host, int port)
{
this.protocol = protocol;
this.host = host;
this.address = new InetSocketAddress(host, port);
}
public String getProtocol()
{
return protocol;
}
public String getHost()
{
return host;
}
public InetSocketAddress getAddress()
{
return address;
}
}
}

View File

@ -24,18 +24,18 @@ import org.eclipse.jetty.spdy.http.ServerHTTPAsyncConnectionFactory;
public class ProxyHTTPAsyncConnectionFactory extends ServerHTTPAsyncConnectionFactory
{
private final short version;
private final ProxyEngine proxyEngine;
private final ProxyEngineSelector proxyEngineSelector;
public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngine proxyEngine)
public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngineSelector proxyEngineSelector)
{
super(connector);
this.version = version;
this.proxyEngine = proxyEngine;
this.proxyEngineSelector = proxyEngineSelector;
}
@Override
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
{
return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngine);
return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngineSelector);
}
}

View File

@ -46,6 +46,7 @@ import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
@ -53,17 +54,17 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
{
private final Headers headers = new Headers();
private final short version;
private final ProxyEngine proxyEngine;
private final ProxyEngineSelector proxyEngineSelector;
private final HttpGenerator generator;
private final ISession session;
private Stream stream;
private HTTPStream stream;
private Buffer content;
public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endPoint, short version, ProxyEngine proxyEngine)
public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endPoint, short version, ProxyEngineSelector proxyEngineSelector)
{
super(connector, endPoint, connector.getServer());
this.version = version;
this.proxyEngine = proxyEngine;
this.proxyEngineSelector = proxyEngineSelector;
this.generator = (HttpGenerator)_generator;
this.session = new HTTPSession(version, connector);
this.session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddr());
@ -117,7 +118,7 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
}
else
{
proxyEngine.onData(stream, toDataInfo(buffer, false));
stream.getStreamFrameListener().onData(stream, toDataInfo(buffer, false));
}
}
@ -128,23 +129,24 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
{
assert content == null;
if (headers.isEmpty())
proxyEngine.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
proxyEngineSelector.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
else
syn(true);
}
else
{
proxyEngine.onData(stream, toDataInfo(content, true));
stream.getStreamFrameListener().onData(stream, toDataInfo(content, true));
}
headers.clear();
stream = null;
content = null;
}
private Stream syn(boolean close)
private HTTPStream syn(boolean close)
{
Stream stream = new HTTPStream(1, (byte)0, session, null);
proxyEngine.onSyn(stream, new SynInfo(headers, close));
HTTPStream stream = new HTTPStream(1, (byte)0, session, null);
StreamFrameListener streamFrameListener = proxyEngineSelector.onSyn(stream, new SynInfo(headers, close));
stream.setStreamFrameListener(streamFrameListener);
return stream;
}
@ -168,7 +170,7 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
{
private HTTPSession(short version, SPDYServerConnector connector)
{
super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngine, null, null);
super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngineSelector, null, null);
}
@Override

View File

@ -15,10 +15,8 @@
package org.eclipse.jetty.spdy.proxy;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -30,9 +28,9 @@ import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
@ -45,11 +43,10 @@ import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by
* clients into SPDY events for the servers.</p>
*/
public class SPDYProxyEngine extends ProxyEngine
public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
{
private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream";
private static final String CLIENT_SESSIONS_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientSessions";
private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
@ -82,68 +79,24 @@ public class SPDYProxyEngine extends ProxyEngine
this.timeout = timeout;
}
@Override
public void onPing(Session clientSession, PingInfo pingInfo)
public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
{
// We do not know to which upstream server
// to send the PING so we just ignore it
}
@Override
public void onGoAway(Session clientSession, GoAwayInfo goAwayInfo)
{
for (Session serverSession : serverSessions.values())
{
@SuppressWarnings("unchecked")
Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE);
if (sessions.remove(clientSession))
break;
}
}
@Override
public StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo)
{
logger.debug("C -> P {} on {}", clientSynInfo, clientStream);
final Session clientSession = clientStream.getSession();
short clientVersion = clientSession.getVersion();
Headers headers = new Headers(clientSynInfo.getHeaders(), false);
Headers.Header hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion));
if (hostHeader == null)
{
rst(clientStream);
return null;
}
String host = hostHeader.value();
int colon = host.indexOf(':');
if (colon >= 0)
host = host.substring(0, colon);
ProxyInfo proxyInfo = getProxyInfo(host);
if (proxyInfo == null)
{
rst(clientStream);
return null;
}
short serverVersion = proxyInfo.getVersion();
InetSocketAddress address = proxyInfo.getAddress();
Session serverSession = produceSession(host, serverVersion, address);
short serverVersion = getVersion(proxyServerInfo.getProtocol());
InetSocketAddress address = proxyServerInfo.getAddress();
Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
if (serverSession == null)
{
rst(clientStream);
return null;
}
@SuppressWarnings("unchecked")
Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE);
sessions.add(clientSession);
final Session clientSession = clientStream.getSession();
addRequestProxyHeaders(clientStream, headers);
customizeRequestHeaders(clientStream, headers);
convert(clientVersion, serverVersion, headers);
convert(clientSession.getVersion(), serverVersion, headers);
SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
@ -153,6 +106,19 @@ public class SPDYProxyEngine extends ProxyEngine
return this;
}
private static short getVersion(String protocol)
{
switch (protocol)
{
case "spdy/2":
return SPDY.V2;
case "spdy/3":
return SPDY.V3;
default:
throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
}
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -194,7 +160,6 @@ public class SPDYProxyEngine extends ProxyEngine
{
SPDYClient client = factory.newSPDYClient(version);
session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
session.setAttribute(CLIENT_SESSIONS_ATTRIBUTE, Collections.newSetFromMap(new ConcurrentHashMap<Session, Boolean>()));
logger.debug("Proxy session connected to {}", address);
Session existing = serverSessions.putIfAbsent(host, session);
if (existing != null)
@ -513,10 +478,6 @@ public class SPDYProxyEngine extends ProxyEngine
public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
{
serverSessions.values().remove(serverSession);
@SuppressWarnings("unchecked")
Set<Session> sessions = (Set<Session>)serverSession.removeAttribute(CLIENT_SESSIONS_ATTRIBUTE);
for (Session session : sessions)
session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
}
@Override
@ -528,7 +489,7 @@ public class SPDYProxyEngine extends ProxyEngine
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
throw new UnsupportedOperationException();
throw new UnsupportedOperationException(); //TODO
}
@Override

View File

@ -91,9 +91,11 @@ public class ProxyHTTPSPDYv2Test
protected InetSocketAddress startProxy(InetSocketAddress address) throws Exception
{
proxy = new Server();
SPDYProxyEngine proxyEngine = new SPDYProxyEngine(factory);
proxyEngine.putProxyInfo("localhost", new ProxyEngine.ProxyInfo(version(), address.getHostName(), address.getPort()));
proxyConnector = new HTTPSPDYProxyConnector(proxyEngine);
ProxyEngineSelector proxyEngineSelector = new ProxyEngineSelector();
SPDYProxyEngine spdyProxyEngine = new SPDYProxyEngine(factory);
proxyEngineSelector.putProxyEngine("spdy/" + version(), spdyProxyEngine);
proxyEngineSelector.putProxyServerInfo("localhost", new ProxyEngineSelector.ProxyServerInfo("spdy/" + version(), address.getHostName(), address.getPort()));
proxyConnector = new HTTPSPDYProxyConnector(proxyEngineSelector);
proxyConnector.setPort(0);
proxy.addConnector(proxyConnector);
proxy.start();
@ -171,96 +173,6 @@ public class ProxyHTTPSPDYv2Test
Assert.assertFalse(closeLatch.await(1, TimeUnit.SECONDS));
}
@Test
public void testClosingServerClosesHTTPClient() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
stream.reply(new ReplyInfo(responseHeaders, true));
stream.getSession().goAway();
return null;
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
client.setSoTimeout(1000);
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
Assert.assertFalse(reader.ready());
Assert.assertNull(reader.readLine());
client.close();
}
@Test
public void testClosingServerClosesSPDYClient() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
stream.reply(new ReplyInfo(responseHeaders, true));
stream.getSession().goAway();
return null;
}
}));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
final CountDownLatch goAwayLatch = new CountDownLatch(1);
Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
goAwayLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
final CountDownLatch replyLatch = new CountDownLatch(1);
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.SCHEME.name(version()), "http");
headers.put(HTTPSPDYHeader.METHOD.name(version()), "GET");
headers.put(HTTPSPDYHeader.URI.name(version()), "/");
headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyLatch.countDown();
}
});
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testGETThenNoContentFromTwoClients() throws Exception
{