First pass at the implementation of a reverse SPDY proxy.

This commit is contained in:
Simone Bordet 2012-06-11 18:53:37 +02:00
parent 457fdc74e5
commit 479c957a68
9 changed files with 1638 additions and 40 deletions

View File

@ -0,0 +1,64 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.http;
import java.io.IOException;
import org.eclipse.jetty.http.HttpSchemes;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class AbstractHTTPSPDYServerConnector extends SPDYServerConnector
{
public AbstractHTTPSPDYServerConnector(ServerSessionFrameListener listener, SslContextFactory sslContextFactory)
{
super(listener, sslContextFactory);
}
@Override
public void customize(EndPoint endPoint, Request request) throws IOException
{
super.customize(endPoint, request);
if (getSslContextFactory() != null)
request.setScheme(HttpSchemes.HTTPS);
}
@Override
public boolean isConfidential(Request request)
{
if (getSslContextFactory() != null)
{
int confidentialPort = getConfidentialPort();
return confidentialPort == 0 || confidentialPort == request.getServerPort();
}
return super.isConfidential(request);
}
@Override
public boolean isIntegral(Request request)
{
if (getSslContextFactory() != null)
{
int integralPort = getIntegralPort();
return integralPort == 0 || integralPort == request.getServerPort();
}
return super.isIntegral(request);
}
}

View File

@ -16,16 +16,10 @@
package org.eclipse.jetty.spdy.http;
import java.io.IOException;
import org.eclipse.jetty.http.HttpSchemes;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HTTPSPDYServerConnector extends SPDYServerConnector
public class HTTPSPDYServerConnector extends AbstractHTTPSPDYServerConnector
{
public HTTPSPDYServerConnector()
{
@ -57,34 +51,4 @@ public class HTTPSPDYServerConnector extends SPDYServerConnector
// The default connection factory handles plain HTTP on non-SSL or non-NPN connections
setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("http/1.1"));
}
@Override
public void customize(EndPoint endPoint, Request request) throws IOException
{
super.customize(endPoint, request);
if (getSslContextFactory() != null)
request.setScheme(HttpSchemes.HTTPS);
}
@Override
public boolean isConfidential(Request request)
{
if (getSslContextFactory() != null)
{
int confidentialPort = getConfidentialPort();
return confidentialPort == 0 || confidentialPort == request.getServerPort();
}
return super.isConfidential(request);
}
@Override
public boolean isIntegral(Request request)
{
if (getSslContextFactory() != null)
{
int integralPort = getIntegralPort();
return integralPort == 0 || integralPort == request.getServerPort();
}
return super.isIntegral(request);
}
}

View File

@ -21,18 +21,23 @@ import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.server.AsyncHttpConnection;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.spdy.AsyncConnectionFactory;
import org.eclipse.jetty.spdy.SPDYServerConnector;
public class ServerHTTPAsyncConnectionFactory implements AsyncConnectionFactory
{
private final Connector connector;
private final SPDYServerConnector connector;
public ServerHTTPAsyncConnectionFactory(Connector connector)
public ServerHTTPAsyncConnectionFactory(SPDYServerConnector connector)
{
this.connector = connector;
}
public SPDYServerConnector getConnector()
{
return connector;
}
@Override
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
{

View File

@ -0,0 +1,41 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.proxy;
import org.eclipse.jetty.spdy.ServerSPDYAsyncConnectionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.http.AbstractHTTPSPDYServerConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HTTPSPDYProxyConnector extends AbstractHTTPSPDYServerConnector
{
public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine)
{
this(proxyEngine, null);
}
public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine, SslContextFactory sslContextFactory)
{
super(proxyEngine, sslContextFactory);
clearAsyncConnectionFactories();
putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine));
putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine));
putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V3, proxyEngine));
setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("http/1.1"));
}
}

View File

@ -0,0 +1,118 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.proxy;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>{@link ProxyEngine} is the base class for SPDY proxy functionalities, that is a proxy that
* accepts SPDY from its client side and converts to any protocol to its server side.</p>
* <p>This class listens for SPDY events sent by clients; subclasses are responsible for translating
* these SPDY client events into appropriate events to forward to the server, in the appropriate
* protocol that is understood by the server.</p>
* <p>This class also provides configuration for the proxy rules.</p>
*/
public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter implements StreamFrameListener
{
protected final Logger logger = Log.getLogger(getClass());
private final ConcurrentMap<String, ProxyInfo> proxyInfos = new ConcurrentHashMap<>();
private final String name;
protected ProxyEngine()
{
this(name());
}
private static String name()
{
try
{
return InetAddress.getLocalHost().getHostName();
}
catch (UnknownHostException x)
{
return "localhost";
}
}
protected ProxyEngine(String name)
{
this.name = name;
}
public String getName()
{
return name;
}
protected void addRequestProxyHeaders(Headers headers)
{
String newValue = "";
Headers.Header header = headers.get("via");
if (header != null)
newValue = header.valuesAsString() + ", ";
newValue += "http/1.1 " + getName();
headers.put("via", newValue);
}
protected void addResponseProxyHeaders(Headers headers)
{
// TODO: add Via header
}
public void putProxyInfo(String host, ProxyInfo proxyInfo)
{
proxyInfos.put(host, proxyInfo);
}
protected ProxyInfo getProxyInfo(String host)
{
return proxyInfos.get(host);
}
public static class ProxyInfo
{
private final short version;
private final InetSocketAddress address;
public ProxyInfo(short version, String host, int port)
{
this.version = version;
this.address = new InetSocketAddress(host, port);
}
public short getVersion()
{
return version;
}
public InetSocketAddress getAddress()
{
return address;
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.proxy;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.http.ServerHTTPAsyncConnectionFactory;
public class ProxyHTTPAsyncConnectionFactory extends ServerHTTPAsyncConnectionFactory
{
private final short version;
private final ProxyEngine proxyEngine;
public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngine proxyEngine)
{
super(connector);
this.version = version;
this.proxyEngine = proxyEngine;
}
@Override
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
{
return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngine);
}
}

View File

@ -0,0 +1,307 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.proxy;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.DirectNIOBuffer;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.nio.NIOBuffer;
import org.eclipse.jetty.server.AsyncHttpConnection;
import org.eclipse.jetty.spdy.ISession;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.StandardSession;
import org.eclipse.jetty.spdy.StandardStream;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
{
private final Headers headers = new Headers();
private final short version;
private final ProxyEngine proxyEngine;
private final HttpGenerator generator;
private final ISession session;
private Stream stream;
private Buffer content;
public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endpoint, short version, ProxyEngine proxyEngine)
{
super(connector, endpoint, connector.getServer());
this.version = version;
this.proxyEngine = proxyEngine;
this.generator = (HttpGenerator)_generator;
this.session = new HTTPSession(version, connector);
}
@Override
public AsyncEndPoint getEndPoint()
{
return (AsyncEndPoint)super.getEndPoint();
}
@Override
protected void startRequest(Buffer method, Buffer uri, Buffer httpVersion) throws IOException
{
SPDYServerConnector connector = (SPDYServerConnector)getConnector();
String scheme = connector.getSslContextFactory() != null ? "https" : "http";
headers.put(HTTPSPDYHeader.SCHEME.name(version), scheme);
headers.put(HTTPSPDYHeader.METHOD.name(version), method.toString("UTF-8"));
headers.put(HTTPSPDYHeader.URI.name(version), uri.toString("UTF-8"));
headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.toString("UTF-8"));
}
@Override
protected void parsedHeader(Buffer name, Buffer value) throws IOException
{
String headerName = name.toString("UTF-8").toLowerCase();
String headerValue = value.toString("UTF-8");
switch (headerName)
{
case "host":
headers.put(HTTPSPDYHeader.HOST.name(version), headerValue);
break;
default:
headers.put(headerName, headerValue);
break;
}
}
@Override
protected void headerComplete() throws IOException
{
}
@Override
protected void content(Buffer buffer) throws IOException
{
if (content == null)
{
stream = syn(false);
content = buffer;
}
else
{
proxyEngine.onData(stream, toDataInfo(buffer, false));
}
}
@Override
public void messageComplete(long contentLength) throws IOException
{
if (stream == null)
{
assert content == null;
if (headers.isEmpty())
proxyEngine.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
else
syn(true);
}
else
{
proxyEngine.onData(stream, toDataInfo(content, true));
}
headers.clear();
stream = null;
content = null;
}
private Stream syn(boolean close)
{
// TODO: stream id uniqueness
Stream stream = new HTTPStream(1, (byte)0, session);
proxyEngine.onSyn(stream, new SynInfo(headers, close));
return stream;
}
private DataInfo toDataInfo(Buffer buffer, boolean close)
{
if (buffer instanceof ByteArrayBuffer)
return new BytesDataInfo(buffer.array(), buffer.getIndex(), buffer.length(), close);
if (buffer instanceof NIOBuffer)
{
ByteBuffer byteBuffer = ((NIOBuffer)buffer).getByteBuffer();
byteBuffer.limit(buffer.putIndex());
byteBuffer.position(buffer.getIndex());
return new ByteBufferDataInfo(byteBuffer, close);
}
return new BytesDataInfo(buffer.asArray(), close);
}
private class HTTPSession extends StandardSession
{
private HTTPSession(short version, SPDYServerConnector connector)
{
super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngine, null, null);
}
@Override
public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
{
try
{
getEndPoint().close();
handler.completed(null);
}
catch (IOException x)
{
handler.failed(null, x);
}
}
}
/**
* <p>This stream will convert the SPDY invocations performed by the proxy into HTTP to be sent to the client.</p>
*/
private class HTTPStream extends StandardStream
{
private final Pattern statusRegexp = Pattern.compile("(\\d{3})\\s*(.*)");
private HTTPStream(int id, byte priority, ISession session)
{
super(id, priority, session, null);
}
@Override
public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
{
// No support for pushed stream in HTTP, but we need to return a non-null stream anyway
// TODO
throw new UnsupportedOperationException();
}
@Override
public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
// TODO
throw new UnsupportedOperationException();
}
@Override
public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
try
{
Headers headers = new Headers(replyInfo.getHeaders(), false);
headers.remove(HTTPSPDYHeader.SCHEME.name(version));
String status = headers.remove(HTTPSPDYHeader.STATUS.name(version)).value();
Matcher matcher = statusRegexp.matcher(status);
matcher.matches();
int code = Integer.parseInt(matcher.group(1));
String reason = matcher.group(2);
generator.setResponse(code, reason);
String httpVersion = headers.remove(HTTPSPDYHeader.VERSION.name(version)).value();
generator.setVersion(Integer.parseInt(httpVersion.replaceAll("\\D", "")));
Headers.Header host = headers.remove(HTTPSPDYHeader.HOST.name(version));
if (host != null)
headers.put("host", host.value());
HttpFields fields = new HttpFields();
for (Headers.Header header : headers)
{
String name = camelize(header.name());
fields.put(name, header.value());
}
generator.completeHeader(fields, replyInfo.isClose());
if (replyInfo.isClose())
complete();
handler.completed(null);
}
catch (IOException x)
{
handler.failed(null, x);
}
}
private String camelize(String name)
{
char[] chars = name.toCharArray();
chars[0] = Character.toUpperCase(chars[0]);
for (int i = 0; i < chars.length; ++i)
{
char c = chars[i];
int j = i + 1;
if (c == '-' && j < chars.length)
chars[j] = Character.toUpperCase(chars[j]);
}
return new String(chars);
}
@Override
public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
try
{
// Data buffer must be copied, as the ByteBuffer is pooled
ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
Buffer buffer = byteBuffer.isDirect() ?
new DirectNIOBuffer(byteBuffer, false) :
new IndirectNIOBuffer(byteBuffer, false);
generator.addContent(buffer, dataInfo.isClose());
generator.flush(unit.toMillis(timeout));
if (dataInfo.isClose())
complete();
handler.completed(null);
}
catch (IOException x)
{
handler.failed(null, x);
}
}
private void complete() throws IOException
{
generator.complete();
// We need to call asyncDispatch() as if the HTTP request
// has been suspended and now we complete the response
getEndPoint().asyncDispatch();
}
}
}

View File

@ -0,0 +1,500 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.proxy;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.SPDYClient;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
/**
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by
* clients into SPDY events for the servers.</p>
*/
public class SPDYProxyEngine extends ProxyEngine
{
private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
private final ConcurrentMap<String, Future<Session>> serverSessions = new ConcurrentHashMap<>();
private final ConcurrentMap<Session, Set<Session>> clientSessions = new ConcurrentHashMap<>();
private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
private final SPDYClient.Factory factory;
private volatile long connectTimeout = 15000;
private volatile long timeout = 60000;
public SPDYProxyEngine(SPDYClient.Factory factory)
{
this.factory = factory;
}
public long getConnectTimeout()
{
return connectTimeout;
}
public void setConnectTimeout(long connectTimeout)
{
this.connectTimeout = connectTimeout;
}
public long getTimeout()
{
return timeout;
}
public void setTimeout(long timeout)
{
this.timeout = timeout;
}
@Override
public void onGoAway(Session clientSession, GoAwayInfo goAwayInfo)
{
Set<Session> target = null;
for (Set<Session> sessions : clientSessions.values())
{
for (Session session : sessions)
{
if (session == clientSession)
{
target = sessions;
break;
}
}
if (target != null)
break;
}
if (target != null)
{
target.remove(clientSession);
// Do not remove the Set if it's empty: there is one Set per proxied
// host, so we can afford this small leak and avoid synchronization
}
}
@Override
public StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo)
{
logger.debug("C -> P {} on {}", clientSynInfo, clientStream);
final Session clientSession = clientStream.getSession();
short clientVersion = clientSession.getVersion();
Headers headers = new Headers(clientSynInfo.getHeaders(), false);
Headers.Header hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion));
if (hostHeader == null)
{
rst(clientStream);
return null;
}
String host = hostHeader.value();
int colon = host.indexOf(':');
if (colon >= 0)
host = host.substring(0, colon);
ProxyInfo proxyInfo = getProxyInfo(host);
if (proxyInfo == null)
{
rst(clientStream);
return null;
}
// TODO: give a chance to modify headers and rewrite URI
short serverVersion = proxyInfo.getVersion();
InetSocketAddress address = proxyInfo.getAddress();
Session serverSession = produceSession(host, serverVersion, address);
if (serverSession == null)
{
rst(clientStream);
return null;
}
Set<Session> sessions = clientSessions.get(serverSession);
if (sessions == null)
{
sessions = Collections.newSetFromMap(new ConcurrentHashMap<Session, Boolean>());
Set<Session> existing = clientSessions.putIfAbsent(serverSession, sessions);
if (existing != null)
sessions = existing;
}
sessions.add(clientSession);
convert(clientVersion, serverVersion, headers);
addRequestProxyHeaders(headers);
SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
logger.debug("P -> S {}", serverSynInfo);
StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
if (serverSynInfo.isClose())
{
serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
{
@Override
public void failed(Stream context, Throwable x)
{
logger.debug(x);
rst(clientStream);
}
});
return null;
}
else
{
StreamHandler streamHandler = new StreamHandler(clientStream);
clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, streamHandler);
serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, streamHandler);
return this;
}
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Servers do not receive replies
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
// TODO
throw new UnsupportedOperationException("Not yet implemented");
}
@Override
public void onData(Stream clientStream, final DataInfo clientDataInfo)
{
logger.debug("C -> P {} on {}", clientDataInfo, clientStream);
ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
clientDataInfo.consume(delta);
}
};
StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
streamHandler.data(serverDataInfo);
}
private Session produceSession(String host, short version, InetSocketAddress address)
{
try
{
Future<Session> session = serverSessions.get(host);
if (session == null)
{
SPDYClient client = factory.newSPDYClient(version);
session = client.connect(address, sessionListener);
Future<Session> existing = serverSessions.putIfAbsent(host, session);
if (existing != null)
{
session.cancel(true);
session = existing;
}
}
return session.get(getConnectTimeout(), TimeUnit.MILLISECONDS);
}
catch (Exception x)
{
logger.debug(x);
return null;
}
}
private void convert(short fromVersion, short toVersion, Headers headers)
{
if (fromVersion != toVersion)
{
for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
{
Headers.Header header = headers.remove(httpHeader.name(fromVersion));
if (header != null)
{
String toName = httpHeader.name(toVersion);
for (String value : header.values())
headers.add(toName, value);
}
}
}
}
private void rst(Stream stream)
{
RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
stream.getSession().rst(rstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
}
private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
{
private final Stream clientStream;
private volatile ReplyInfo replyInfo;
public ProxyStreamFrameListener(Stream clientStream)
{
this.clientStream = clientStream;
}
@Override
public void onReply(final Stream stream, ReplyInfo replyInfo)
{
short serverVersion = stream.getSession().getVersion();
Headers headers = new Headers(replyInfo.getHeaders(), false);
short clientVersion = this.clientStream.getSession().getVersion();
convert(serverVersion, clientVersion, headers);
addResponseProxyHeaders(headers);
this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
if (replyInfo.isClose())
reply();
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
// TODO
throw new UnsupportedOperationException("Not Yet Implemented");
}
@Override
public void onData(final Stream stream, final DataInfo dataInfo)
{
if (replyInfo != null)
{
if (dataInfo.isClose())
replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
reply();
}
data(dataInfo);
}
private void reply()
{
clientStream.reply(replyInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>()
{
@Override
public void failed(Void context, Throwable x)
{
logger.debug(x);
rst(clientStream);
}
});
replyInfo = null;
}
private void data(final DataInfo dataInfo)
{
clientStream.data(dataInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
{
@Override
public void completed(Void context)
{
dataInfo.consume(dataInfo.length());
}
@Override
public void failed(Void context, Throwable x)
{
logger.debug(x);
rst(clientStream);
}
});
}
}
/**
* <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p>
* <p>Instances of this class buffer DATA frames sent by clients and send them to the server.
* The buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive
* from the client before the SYN_STREAM has been fully sent), and between DATA frames, if the client
* is a fast producer and the server a slow consumer, or if the client is a SPDY v2 client (and hence
* without flow control) while the server is a SPDY v3 server (and hence with flow control).</p>
*/
private class StreamHandler implements Handler<Stream>
{
private final Queue<DataInfoHandler> queue = new LinkedList<>();
private final Stream clientStream;
private Stream serverStream;
private StreamHandler(Stream clientStream)
{
this.clientStream = clientStream;
}
@Override
public void completed(Stream serverStream)
{
DataInfoHandler dataInfoHandler;
synchronized (queue)
{
this.serverStream = serverStream;
dataInfoHandler = queue.peek();
if (dataInfoHandler != null)
{
if (dataInfoHandler.flushing)
{
logger.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
dataInfoHandler = null;
}
else
{
dataInfoHandler.flushing = true;
logger.debug("SYN completed, queue size {}", queue.size());
}
}
else
{
logger.debug("SYN completed, queue empty");
}
}
if (dataInfoHandler != null)
flush(serverStream, dataInfoHandler);
}
@Override
public void failed(Stream serverStream, Throwable x)
{
logger.debug(x);
rst(clientStream);
}
public void data(DataInfo dataInfo)
{
Stream serverStream;
DataInfoHandler dataInfoHandler = null;
DataInfoHandler item = new DataInfoHandler(dataInfo);
synchronized (queue)
{
queue.offer(item);
serverStream = this.serverStream;
if (serverStream != null)
{
dataInfoHandler = queue.peek();
if (dataInfoHandler.flushing)
{
logger.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
serverStream = null;
}
else
{
dataInfoHandler.flushing = true;
logger.debug("Queued {}, queue size {}", dataInfo, queue.size());
}
}
else
{
logger.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
}
}
if (serverStream != null)
flush(serverStream, dataInfoHandler);
}
private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
{
logger.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
serverStream.data(dataInfoHandler.dataInfo, getTimeout(), TimeUnit.MILLISECONDS, dataInfoHandler);
}
private class DataInfoHandler implements Handler<Void>
{
private final DataInfo dataInfo;
private boolean flushing;
private DataInfoHandler(DataInfo dataInfo)
{
this.dataInfo = dataInfo;
}
@Override
public void completed(Void context)
{
Stream serverStream;
DataInfoHandler dataInfoHandler;
synchronized (queue)
{
serverStream = StreamHandler.this.serverStream;
assert serverStream != null;
dataInfoHandler = queue.poll();
assert dataInfoHandler == this;
dataInfoHandler = queue.peek();
if (dataInfoHandler != null)
{
assert !dataInfoHandler.flushing;
dataInfoHandler.flushing = true;
logger.debug("Completed {}, queue size {}", dataInfo, queue.size());
}
else
{
logger.debug("Completed {}, queue empty", dataInfo);
}
}
if (dataInfoHandler != null)
flush(serverStream, dataInfoHandler);
}
@Override
public void failed(Void context, Throwable x)
{
logger.debug(x);
rst(clientStream);
}
}
}
private class ProxySessionFrameListener extends SessionFrameListener.Adapter
{
@Override
public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
{
Set<Session> sessions = clientSessions.remove(serverSession);
if (sessions != null)
{
for (Session session : sessions)
session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
}
}
}
}

View File

@ -0,0 +1,556 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.proxy;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.SPDYClient;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.ServerSPDYAsyncConnectionFactory;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatchman;
import org.junit.runners.model.FrameworkMethod;
public class ProxyHTTPSPDYv2Test
{
@Rule
public final TestWatchman testName = new TestWatchman()
{
@Override
public void starting(FrameworkMethod method)
{
super.starting(method);
System.err.printf("Running %s.%s()%n",
method.getMethod().getDeclaringClass().getName(),
method.getName());
}
};
private SPDYClient.Factory factory;
private Server server;
private Server proxy;
private SPDYServerConnector proxyConnector;
protected short version()
{
return SPDY.V2;
}
protected InetSocketAddress startServer(ServerSessionFrameListener listener) throws Exception
{
server = new Server();
SPDYServerConnector serverConnector = new SPDYServerConnector(listener);
serverConnector.setDefaultAsyncConnectionFactory(new ServerSPDYAsyncConnectionFactory(version(), serverConnector.getByteBufferPool(), serverConnector.getExecutor(), serverConnector.getScheduler(), listener));
serverConnector.setPort(0);
server.addConnector(serverConnector);
server.start();
return new InetSocketAddress("localhost", serverConnector.getLocalPort());
}
protected InetSocketAddress startProxy(InetSocketAddress address) throws Exception
{
proxy = new Server();
SPDYProxyEngine proxyEngine = new SPDYProxyEngine(factory);
proxyEngine.putProxyInfo("localhost", new ProxyEngine.ProxyInfo(version(), address.getHostName(), address.getPort()));
proxyConnector = new HTTPSPDYProxyConnector(proxyEngine);
proxyConnector.setPort(0);
proxy.addConnector(proxyConnector);
proxy.start();
return new InetSocketAddress("localhost", proxyConnector.getLocalPort());
}
@Before
public void init() throws Exception
{
factory = new SPDYClient.Factory();
factory.start();
}
@After
public void destroy() throws Exception
{
if (server != null)
{
server.stop();
server.join();
}
if (proxy != null)
{
proxy.stop();
proxy.join();
}
factory.stop();
}
@Test
public void testClosingClientDoesNotCloseServer() throws Exception
{
final CountDownLatch closeLatch = new CountDownLatch(1);
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
stream.reply(new ReplyInfo(responseHeaders, true));
return null;
}
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
closeLatch.countDown();
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
Assert.assertFalse(reader.ready());
client.close();
// Must not close, other clients may still be connected
Assert.assertFalse(closeLatch.await(1, TimeUnit.SECONDS));
}
@Test
public void testClosingServerClosesHTTPClient() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
stream.reply(new ReplyInfo(responseHeaders, true));
stream.getSession().goAway();
return null;
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
client.setSoTimeout(1000);
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
Assert.assertFalse(reader.ready());
Assert.assertNull(reader.readLine());
client.close();
}
@Test
public void testClosingServerClosesSPDYClient() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
stream.reply(new ReplyInfo(responseHeaders, true));
stream.getSession().goAway();
return null;
}
}));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
final CountDownLatch goAwayLatch = new CountDownLatch(1);
Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
goAwayLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
final CountDownLatch replyLatch = new CountDownLatch(1);
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.SCHEME.name(version()), "http");
headers.put(HTTPSPDYHeader.METHOD.name(version()), "GET");
headers.put(HTTPSPDYHeader.URI.name(version()), "/");
headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyLatch.countDown();
}
});
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testGETThenNoContentFromTwoClients() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Assert.assertTrue(synInfo.isClose());
Headers requestHeaders = synInfo.getHeaders();
Assert.assertNotNull(requestHeaders.get("via"));
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, true);
stream.reply(replyInfo);
return null;
}
}));
Socket client1 = new Socket();
client1.connect(proxyAddress);
OutputStream output1 = client1.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output1.write(request.getBytes("UTF-8"));
output1.flush();
InputStream input1 = client1.getInputStream();
BufferedReader reader1 = new BufferedReader(new InputStreamReader(input1, "UTF-8"));
String line = reader1.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader1.readLine();
Assert.assertFalse(reader1.ready());
// Perform another request with another client
Socket client2 = new Socket();
client2.connect(proxyAddress);
OutputStream output2 = client2.getOutputStream();
output2.write(request.getBytes("UTF-8"));
output2.flush();
InputStream input2 = client2.getInputStream();
BufferedReader reader2 = new BufferedReader(new InputStreamReader(input2, "UTF-8"));
line = reader2.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader2.readLine();
Assert.assertFalse(reader2.ready());
client1.close();
client2.close();
}
@Test
public void testGETThenSmallResponseContent() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Assert.assertTrue(synInfo.isClose());
Headers requestHeaders = synInfo.getHeaders();
Assert.assertNotNull(requestHeaders.get("via"));
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
stream.reply(replyInfo);
stream.data(new BytesDataInfo(data, true));
return null;
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
for (byte datum : data)
Assert.assertEquals(datum, reader.read());
Assert.assertFalse(reader.ready());
// Perform another request so that we are sure we reset the states of parsers and generators
output.write(request.getBytes("UTF-8"));
output.flush();
line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
for (byte datum : data)
Assert.assertEquals(datum, reader.read());
Assert.assertFalse(reader.ready());
client.close();
}
@Test
public void testPOSTWithSmallRequestContentThenRedirect() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
{
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
headers.put(HTTPSPDYHeader.STATUS.name(version()), "303 See Other");
stream.reply(new ReplyInfo(headers, true));
}
}
};
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"POST / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"Content-Length: " + data.length + "\r\n" +
"Content-Type: application/octet-stream\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.write(data);
output.flush();
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 303"));
while (line.length() > 0)
line = reader.readLine();
Assert.assertFalse(reader.ready());
// Perform another request so that we are sure we reset the states of parsers and generators
output.write(request.getBytes("UTF-8"));
output.write(data);
output.flush();
line = reader.readLine();
Assert.assertTrue(line.contains(" 303"));
while (line.length() > 0)
line = reader.readLine();
Assert.assertFalse(reader.ready());
client.close();
}
@Test
public void testPOSTWithSmallRequestContentThenSmallResponseContent() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
stream.reply(replyInfo);
stream.data(new BytesDataInfo(data, true));
}
}
};
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"POST / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"Content-Length: " + data.length + "\r\n" +
"Content-Type: application/octet-stream\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.write(data);
output.flush();
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
for (byte datum : data)
Assert.assertEquals(datum, reader.read());
Assert.assertFalse(reader.ready());
// Perform another request so that we are sure we reset the states of parsers and generators
output.write(request.getBytes("UTF-8"));
output.write(data);
output.flush();
line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
for (byte datum : data)
Assert.assertEquals(datum, reader.read());
Assert.assertFalse(reader.ready());
client.close();
}
@Test
public void testSYNThenREPLY() throws Exception
{
final String header = "foo";
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers requestHeaders = synInfo.getHeaders();
Assert.assertNotNull(requestHeaders.get("via"));
Assert.assertNotNull(requestHeaders.get(header));
Headers responseHeaders = new Headers();
responseHeaders.put(header, "baz");
stream.reply(new ReplyInfo(responseHeaders, true));
return null;
}
}));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
Session client = factory.newSPDYClient(version()).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
final CountDownLatch replyLatch = new CountDownLatch(1);
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
headers.put(header, "bar");
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Headers headers = replyInfo.getHeaders();
Assert.assertNotNull(headers.get(header));
replyLatch.countDown();
}
});
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
client.goAway().get(5, TimeUnit.SECONDS);
}
}