intermittent commit
This commit is contained in:
parent
cda27ae4a1
commit
1f6d058b20
|
@ -140,8 +140,8 @@ public class HttpSender implements AsyncContentProvider.Listener
|
|||
this.contentIterator = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
|
||||
|
||||
boolean updated = updateSendState(SendState.IDLE, SendState.EXECUTE);
|
||||
assert updated;
|
||||
send();
|
||||
if (updated)
|
||||
send();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -201,7 +201,8 @@ public class HttpSender implements AsyncContentProvider.Listener
|
|||
}
|
||||
case FLUSH:
|
||||
{
|
||||
out: while (true)
|
||||
out:
|
||||
while (true)
|
||||
{
|
||||
State currentState = state.get();
|
||||
switch (currentState)
|
||||
|
@ -290,7 +291,8 @@ public class HttpSender implements AsyncContentProvider.Listener
|
|||
|
||||
if (contentChunk.isDeferred())
|
||||
{
|
||||
out: while (true)
|
||||
out:
|
||||
while (true)
|
||||
{
|
||||
currentSendState = sendState.get();
|
||||
switch (currentSendState)
|
||||
|
|
|
@ -221,8 +221,6 @@ public class SslConnection extends AbstractConnection
|
|||
// to do the fill and/or flush again and these calls will do the actually
|
||||
// handle the cause.
|
||||
|
||||
super.onFillInterestedFailed(cause);
|
||||
|
||||
synchronized(_decryptedEndPoint)
|
||||
{
|
||||
_decryptedEndPoint.getFillInterest().onFail(cause);
|
||||
|
|
|
@ -281,6 +281,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
|
||||
private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Callback callback)
|
||||
{
|
||||
new Exception().printStackTrace();
|
||||
if (goAwaySent.compareAndSet(false, true))
|
||||
{
|
||||
if (!goAwayReceived.get())
|
||||
|
|
|
@ -71,10 +71,8 @@ public interface Session
|
|||
public void removeListener(Listener listener);
|
||||
|
||||
/**
|
||||
* <p>Sends asynchronously a SYN_FRAME to create a new {@link Stream SPDY stream}.</p>
|
||||
* <p>Callers may use the returned future to wait for the stream to be created, and
|
||||
* use the stream, for example, to send data frames.</p>
|
||||
*
|
||||
* <p>Sends a SYN_FRAME to create a new {@link Stream SPDY stream}.</p>
|
||||
* <p>Callers may use the returned Stream for example, to send data frames.</p>
|
||||
*
|
||||
* @param synInfo the metadata to send on stream creation
|
||||
* @param listener the listener to invoke when events happen on the stream just created
|
||||
|
@ -97,9 +95,7 @@ public interface Session
|
|||
public void syn(SynInfo synInfo, StreamFrameListener listener, Promise<Stream> promise);
|
||||
|
||||
/**
|
||||
* <p>Sends asynchronously a RST_STREAM to abort a stream.</p>
|
||||
* <p>Callers may use the returned future to wait for the reset to be sent.</p>
|
||||
*
|
||||
* <p>Sends synchronously a RST_STREAM to abort a stream.</p>
|
||||
*
|
||||
* @param rstInfo the metadata to reset the stream
|
||||
* @return the RstInfo belonging to the reset to be sent
|
||||
|
@ -112,7 +108,6 @@ public interface Session
|
|||
* <p>Callers may pass a non-null completion callback to be notified of when the
|
||||
* reset has been actually sent.</p>
|
||||
*
|
||||
*
|
||||
* @param rstInfo the metadata to reset the stream
|
||||
* @param callback the completion callback that gets notified of reset's send
|
||||
* @see #rst(RstInfo)
|
||||
|
@ -120,12 +115,9 @@ public interface Session
|
|||
public void rst(RstInfo rstInfo, Callback callback);
|
||||
|
||||
/**
|
||||
* <p>Sends asynchronously a SETTINGS to configure the SPDY connection.</p>
|
||||
* <p>Callers may use the returned future to wait for the settings to be sent.</p>
|
||||
*
|
||||
* <p>Sends synchronously a SETTINGS to configure the SPDY connection.</p>
|
||||
*
|
||||
* @param settingsInfo the metadata to send
|
||||
* @return a future to wait for the settings to be sent
|
||||
* @see #settings(SettingsInfo, Callback)
|
||||
*/
|
||||
public void settings(SettingsInfo settingsInfo) throws ExecutionException, InterruptedException, TimeoutException;
|
||||
|
@ -143,10 +135,8 @@ public interface Session
|
|||
public void settings(SettingsInfo settingsInfo, Callback callback);
|
||||
|
||||
/**
|
||||
* <p>Sends asynchronously a PING, normally to measure round-trip time.</p>
|
||||
* <p>Callers may use the returned future to wait for the ping to be sent.</p>
|
||||
* <p>Sends synchronously a PING, normally to measure round-trip time.</p>
|
||||
*
|
||||
* @return a future for the metadata sent
|
||||
* @see #ping(PingInfo, Promise
|
||||
* @param pingInfo
|
||||
*/
|
||||
|
@ -157,8 +147,6 @@ public interface Session
|
|||
* <p>Callers may pass a non-null completion callback to be notified of when the
|
||||
* ping has been actually sent.</p>
|
||||
*
|
||||
*
|
||||
*
|
||||
* @param pingInfo
|
||||
* @param promise the completion callback that gets notified of ping's send
|
||||
* @see #ping(PingInfo)
|
||||
|
@ -167,9 +155,7 @@ public interface Session
|
|||
|
||||
/**
|
||||
* <p>Closes gracefully this session, sending a GO_AWAY frame and then closing the TCP connection.</p>
|
||||
* <p>Callers may use the returned future to wait for the go away to be sent.</p>
|
||||
*
|
||||
* @return a future to wait for the go away to be sent
|
||||
* @see #goAway(GoAwayInfo, Callback)
|
||||
* @param goAwayInfo
|
||||
*/
|
||||
|
@ -180,8 +166,6 @@ public interface Session
|
|||
* <p>Callers may pass a non-null completion callback to be notified of when the
|
||||
* go away has been actually sent.</p>
|
||||
*
|
||||
*
|
||||
*
|
||||
* @param goAwayInfo
|
||||
* @param callback the completion callback that gets notified of go away's send
|
||||
* @see #goAway(GoAwayInfo)
|
||||
|
|
|
@ -91,6 +91,11 @@
|
|||
<artifactId>spdy-server</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-client</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.npn</groupId>
|
||||
<artifactId>npn-api</artifactId>
|
||||
|
@ -103,12 +108,6 @@
|
|||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-client</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
|
|
|
@ -0,0 +1,275 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
|
||||
package org.eclipse.jetty.spdy.server.proxy;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.util.DeferredContentProvider;
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
|
||||
import org.eclipse.jetty.spdy.api.DataInfo;
|
||||
import org.eclipse.jetty.spdy.api.HeadersInfo;
|
||||
import org.eclipse.jetty.spdy.api.ReplyInfo;
|
||||
import org.eclipse.jetty.spdy.api.RstInfo;
|
||||
import org.eclipse.jetty.spdy.api.Stream;
|
||||
import org.eclipse.jetty.spdy.api.StreamFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.StreamStatus;
|
||||
import org.eclipse.jetty.spdy.api.SynInfo;
|
||||
import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Fields;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* <p>{@link HTTPProxyEngine} implements a SPDY to HTTP proxy, that is, converts SPDY events received by clients into
|
||||
* HTTP events for the servers.</p>
|
||||
*/
|
||||
public class HTTPProxyEngine extends ProxyEngine implements StreamFrameListener
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(HTTPProxyEngine.class);
|
||||
private static final String CLIENT_REQUEST_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.request";
|
||||
private static final Callback LOGGING_CALLBACK = new LoggingCallback();
|
||||
|
||||
private final HttpClient httpClient = new HttpClient();
|
||||
private volatile boolean committed;
|
||||
|
||||
public HTTPProxyEngine()
|
||||
{
|
||||
try
|
||||
{
|
||||
httpClient.start();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
LOG.warn("Exception while starting HttpClient: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
public long getConnectTimeout()
|
||||
{
|
||||
return httpClient.getConnectTimeout();
|
||||
}
|
||||
|
||||
public void setConnectTimeout(long connectTimeout)
|
||||
{
|
||||
httpClient.setConnectTimeout(connectTimeout);
|
||||
}
|
||||
|
||||
public long getIdleTimeout()
|
||||
{
|
||||
return httpClient.getIdleTimeout();
|
||||
}
|
||||
|
||||
public void setIdleTimeout(long idleTimeout)
|
||||
{
|
||||
httpClient.setIdleTimeout(idleTimeout);
|
||||
}
|
||||
|
||||
public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
|
||||
{
|
||||
short version = clientStream.getSession().getVersion();
|
||||
String method = clientSynInfo.getHeaders().get(HTTPSPDYHeader.METHOD.name(version)).value();
|
||||
String path = clientSynInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value();
|
||||
|
||||
Fields headers = new Fields(clientSynInfo.getHeaders(), false);
|
||||
|
||||
removeHopHeaders(headers);
|
||||
addRequestProxyHeaders(clientStream, headers);
|
||||
customizeRequestHeaders(clientStream, headers);
|
||||
|
||||
String host = proxyServerInfo.getHost();
|
||||
int port = proxyServerInfo.getAddress().getPort();
|
||||
LOG.debug("Sending HTTP request to: {}", host + ":" + port);
|
||||
Request request = httpClient.newRequest(host, port)
|
||||
.path(path)
|
||||
.method(HttpMethod.fromString(method));
|
||||
addNonSpdyHeadersToRequest(version, headers, request);
|
||||
|
||||
if (!clientSynInfo.isClose())
|
||||
{
|
||||
clientStream.setAttribute(CLIENT_REQUEST_ATTRIBUTE, request);
|
||||
request.content(new DeferredContentProvider());
|
||||
}
|
||||
|
||||
sendRequest(clientStream, request);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private void sendRequest(final Stream clientStream, Request request)
|
||||
{
|
||||
request.send(new Response.Listener.Empty()
|
||||
{
|
||||
@Override
|
||||
public void onHeaders(final Response response)
|
||||
{
|
||||
LOG.debug("onHeaders called with response: {}. Sending replyInfo to client.", response);
|
||||
Fields responseHeaders = createResponseHeaders(clientStream, response);
|
||||
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
|
||||
clientStream.reply(replyInfo, new Callback.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
LOG.debug("failed: ", x);
|
||||
response.abort(x);
|
||||
}
|
||||
});
|
||||
committed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContent(final Response response, ByteBuffer content)
|
||||
{
|
||||
LOG.debug("onContent called with response: {} and content: {}. Sending response content to client.",
|
||||
response, content);
|
||||
final ByteBuffer contentCopy = httpClient.getByteBufferPool().acquire(content.remaining(), true);
|
||||
BufferUtil.flipPutFlip(content, contentCopy);
|
||||
ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(contentCopy, false);
|
||||
clientStream.data(dataInfo, new Callback()
|
||||
{
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
LOG.debug("failed: ", x);
|
||||
releaseBuffer();
|
||||
response.abort(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
releaseBuffer();
|
||||
}
|
||||
|
||||
private void releaseBuffer()
|
||||
{
|
||||
httpClient.getByteBufferPool().release(contentCopy);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(Response response)
|
||||
{
|
||||
LOG.debug("onSuccess called. Closing client stream.");
|
||||
clientStream.data(new ByteBufferDataInfo(BufferUtil.EMPTY_BUFFER, true), LOGGING_CALLBACK);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Response response, Throwable failure)
|
||||
{
|
||||
LOG.debug("onFailure called: {}", failure);
|
||||
if (committed)
|
||||
{
|
||||
LOG.debug("clientStream already committed. Resetting stream.");
|
||||
try
|
||||
{
|
||||
clientStream.getSession().rst(new RstInfo(clientStream.getId(), StreamStatus.INTERNAL_ERROR));
|
||||
}
|
||||
catch (InterruptedException | ExecutionException | TimeoutException e)
|
||||
{
|
||||
LOG.debug(e);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (clientStream.isClosed())
|
||||
return;
|
||||
Fields responseHeaders = createResponseHeaders(clientStream, response);
|
||||
if (failure instanceof TimeoutException)
|
||||
responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
|
||||
String.valueOf(HttpStatus.GATEWAY_TIMEOUT_504));
|
||||
else
|
||||
responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
|
||||
String.valueOf(HttpStatus.BAD_GATEWAY_502));
|
||||
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, true);
|
||||
clientStream.reply(replyInfo, LOGGING_CALLBACK);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private Fields createResponseHeaders(Stream clientStream, Response response)
|
||||
{
|
||||
Fields responseHeaders = new Fields();
|
||||
for (HttpField header : response.getHeaders())
|
||||
responseHeaders.add(header.getName(), header.getValue());
|
||||
addResponseProxyHeaders(clientStream, responseHeaders);
|
||||
return responseHeaders;
|
||||
}
|
||||
|
||||
private void addNonSpdyHeadersToRequest(short version, Fields headers, Request request)
|
||||
{
|
||||
for (Fields.Field header : headers)
|
||||
if (HTTPSPDYHeader.from(version, header.name()) == null)
|
||||
request.header(header.name(), header.value());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
// We proxy to HTTP so we do not receive replies
|
||||
throw new UnsupportedOperationException("Not Yet Implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHeaders(Stream stream, HeadersInfo headersInfo)
|
||||
{
|
||||
throw new UnsupportedOperationException("Not Yet Implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onData(Stream clientStream, final DataInfo clientDataInfo)
|
||||
{
|
||||
LOG.debug("received clientDataInfo: {} for stream: {}", clientDataInfo, clientStream);
|
||||
Request request = (Request)clientStream.getAttribute(CLIENT_REQUEST_ATTRIBUTE);
|
||||
|
||||
DeferredContentProvider contentProvider = (DeferredContentProvider)request.getContent();
|
||||
contentProvider.offer(clientDataInfo.asByteBuffer(true));
|
||||
|
||||
if (clientDataInfo.isClose())
|
||||
contentProvider.close();
|
||||
}
|
||||
|
||||
static class LoggingCallback extends Callback.Adapter
|
||||
{
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
LOG.debug(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
LOG.debug("succeeded");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -25,7 +25,7 @@ import org.eclipse.jetty.server.Server;
|
|||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.spdy.api.SPDY;
|
||||
import org.eclipse.jetty.spdy.server.NPNServerConnectionFactory;
|
||||
import org.eclipse.jetty.spdy.server.http.HTTPSPDYServerConnectionFactory;
|
||||
import org.eclipse.jetty.spdy.server.SPDYServerConnectionFactory;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
|
||||
public class HTTPSPDYProxyServerConnector extends ServerConnector
|
||||
|
@ -52,8 +52,8 @@ public class HTTPSPDYProxyServerConnector extends ServerConnector
|
|||
sslContextFactory == null
|
||||
? new ConnectionFactory[]{new ProxyHTTPConnectionFactory(config, SPDY.V2, proxyEngineSelector)}
|
||||
: new ConnectionFactory[]{new NPNServerConnectionFactory("spdy/3", "spdy/2", "http/1.1"),
|
||||
new HTTPSPDYServerConnectionFactory(SPDY.V3, config),
|
||||
new HTTPSPDYServerConnectionFactory(SPDY.V2, config),
|
||||
new SPDYServerConnectionFactory(SPDY.V3, proxyEngineSelector),
|
||||
new SPDYServerConnectionFactory(SPDY.V2, proxyEngineSelector),
|
||||
new ProxyHTTPConnectionFactory(config, SPDY.V2, proxyEngineSelector)});
|
||||
NPNServerConnectionFactory npnConnectionFactory = getConnectionFactory(NPNServerConnectionFactory.class);
|
||||
if (npnConnectionFactory != null)
|
||||
|
|
|
@ -22,13 +22,14 @@ package org.eclipse.jetty.spdy.server.proxy;
|
|||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.eclipse.jetty.spdy.api.Stream;
|
||||
import org.eclipse.jetty.spdy.api.StreamFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.SynInfo;
|
||||
import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
|
||||
import org.eclipse.jetty.util.Fields;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* <p>{@link ProxyEngine} is the class for SPDY proxy functionalities that receives a SPDY request and converts it to
|
||||
|
@ -39,7 +40,20 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
*/
|
||||
public abstract class ProxyEngine
|
||||
{
|
||||
protected final Logger logger = Log.getLogger(getClass());
|
||||
private static final Set<String> HOP_HEADERS = new HashSet<>();
|
||||
static
|
||||
{
|
||||
HOP_HEADERS.add("proxy-connection");
|
||||
HOP_HEADERS.add("connection");
|
||||
HOP_HEADERS.add("keep-alive");
|
||||
HOP_HEADERS.add("transfer-encoding");
|
||||
HOP_HEADERS.add("te");
|
||||
HOP_HEADERS.add("trailer");
|
||||
HOP_HEADERS.add("proxy-authorization");
|
||||
HOP_HEADERS.add("proxy-authenticate");
|
||||
HOP_HEADERS.add("upgrade");
|
||||
}
|
||||
|
||||
private final String name;
|
||||
|
||||
protected ProxyEngine()
|
||||
|
@ -71,12 +85,25 @@ public abstract class ProxyEngine
|
|||
return name;
|
||||
}
|
||||
|
||||
protected void removeHopHeaders(Fields headers)
|
||||
{
|
||||
for (String hopHeader : HOP_HEADERS)
|
||||
headers.remove(hopHeader);
|
||||
}
|
||||
|
||||
protected void addRequestProxyHeaders(Stream stream, Fields headers)
|
||||
{
|
||||
addViaHeader(headers);
|
||||
Fields.Field schemeField = headers.get(HTTPSPDYHeader.SCHEME.name(stream.getSession().getVersion()));
|
||||
if(schemeField != null)
|
||||
headers.add("X-Forwarded-Proto", schemeField.value());
|
||||
InetSocketAddress address = stream.getSession().getRemoteAddress();
|
||||
if (address != null)
|
||||
headers.add("X-Forwarded-For", address.getHostName());
|
||||
{
|
||||
headers.add("X-Forwarded-Host", address.getHostName());
|
||||
headers.add("X-Forwarded-For", address.toString());
|
||||
}
|
||||
headers.add("X-Forwarded-Server", name());
|
||||
}
|
||||
|
||||
protected void addResponseProxyHeaders(Stream stream, Fields headers)
|
||||
|
|
|
@ -50,14 +50,14 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
*/
|
||||
public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter
|
||||
{
|
||||
protected final Logger logger = Log.getLogger(getClass());
|
||||
protected final Logger LOG = Log.getLogger(getClass());
|
||||
private final Map<String, ProxyServerInfo> proxyInfos = new ConcurrentHashMap<>();
|
||||
private final Map<String, ProxyEngine> proxyEngines = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public final StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo)
|
||||
{
|
||||
logger.debug("C -> P {} on {}", clientSynInfo, clientStream);
|
||||
LOG.debug("C -> P {} on {}", clientSynInfo, clientStream);
|
||||
|
||||
final Session clientSession = clientStream.getSession();
|
||||
short clientVersion = clientSession.getVersion();
|
||||
|
@ -66,7 +66,7 @@ public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter
|
|||
Fields.Field hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion));
|
||||
if (hostHeader == null)
|
||||
{
|
||||
logger.debug("No host header found: " + headers);
|
||||
LOG.debug("No host header found: " + headers);
|
||||
rst(clientStream);
|
||||
return null;
|
||||
}
|
||||
|
@ -79,7 +79,7 @@ public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter
|
|||
ProxyServerInfo proxyServerInfo = getProxyServerInfo(host);
|
||||
if (proxyServerInfo == null)
|
||||
{
|
||||
logger.debug("No matching ProxyServerInfo found for: " + host);
|
||||
LOG.debug("No matching ProxyServerInfo found for: " + host);
|
||||
rst(clientStream);
|
||||
return null;
|
||||
}
|
||||
|
@ -88,11 +88,11 @@ public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter
|
|||
ProxyEngine proxyEngine = proxyEngines.get(protocol);
|
||||
if (proxyEngine == null)
|
||||
{
|
||||
logger.debug("No matching ProxyEngine found for: " + protocol);
|
||||
LOG.debug("No matching ProxyEngine found for: " + protocol);
|
||||
rst(clientStream);
|
||||
return null;
|
||||
}
|
||||
|
||||
LOG.debug("Forwarding request: {} -> {}", clientSynInfo, proxyServerInfo);
|
||||
return proxyEngine.proxy(clientStream, clientSynInfo, proxyServerInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -255,7 +255,9 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
|
|||
}
|
||||
|
||||
// TODO: handle better the HEAD last parameter
|
||||
HttpGenerator.ResponseInfo info = new HttpGenerator.ResponseInfo(httpVersion, fields, -1, code, reason, false);
|
||||
long contentLength = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString());
|
||||
HttpGenerator.ResponseInfo info = new HttpGenerator.ResponseInfo(httpVersion, fields, contentLength, code,
|
||||
reason, false);
|
||||
send(info, null, replyInfo.isClose());
|
||||
|
||||
if (replyInfo.isClose())
|
||||
|
|
|
@ -46,6 +46,8 @@ import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
|
|||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Fields;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by
|
||||
|
@ -53,6 +55,8 @@ import org.eclipse.jetty.util.Promise;
|
|||
*/
|
||||
public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
|
||||
|
||||
private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamHandler";
|
||||
private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
|
||||
|
||||
|
@ -130,7 +134,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
// Servers do not receive replies
|
||||
throw new IllegalStateException("Servers do not receive replies");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -143,7 +147,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
@Override
|
||||
public void onData(Stream clientStream, final DataInfo clientDataInfo)
|
||||
{
|
||||
logger.debug("C -> P {} on {}", clientDataInfo, clientStream);
|
||||
LOG.debug("C -> P {} on {}", clientDataInfo, clientStream);
|
||||
|
||||
ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
|
||||
{
|
||||
|
@ -168,7 +172,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
{
|
||||
SPDYClient client = factory.newSPDYClient(version);
|
||||
session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
|
||||
logger.debug("Proxy session connected to {}", address);
|
||||
LOG.debug("Proxy session connected to {}", address);
|
||||
Session existing = serverSessions.putIfAbsent(host, session);
|
||||
if (existing != null)
|
||||
{
|
||||
|
@ -180,7 +184,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
logger.debug(x);
|
||||
LOG.debug(x);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -221,7 +225,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
@Override
|
||||
public void onReply(final Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
logger.debug("S -> P {} on {}", replyInfo, stream);
|
||||
LOG.debug("S -> P {} on {}", replyInfo, stream);
|
||||
|
||||
short serverVersion = stream.getSession().getVersion();
|
||||
Fields headers = new Fields(replyInfo.getHeaders(), false);
|
||||
|
@ -246,7 +250,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
@Override
|
||||
public void onData(final Stream stream, final DataInfo dataInfo)
|
||||
{
|
||||
logger.debug("S -> P {} on {}", dataInfo, stream);
|
||||
LOG.debug("S -> P {} on {}", dataInfo, stream);
|
||||
|
||||
if (replyInfo != null)
|
||||
{
|
||||
|
@ -266,13 +270,13 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
logger.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
|
||||
LOG.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
logger.debug(x);
|
||||
LOG.debug(x);
|
||||
rst(clientStream);
|
||||
}
|
||||
});
|
||||
|
@ -286,13 +290,13 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
public void succeeded()
|
||||
{
|
||||
dataInfo.consume(dataInfo.length());
|
||||
logger.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
|
||||
LOG.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
logger.debug(x);
|
||||
LOG.debug(x);
|
||||
rst(clientStream);
|
||||
}
|
||||
});
|
||||
|
@ -323,7 +327,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
@Override
|
||||
public void succeeded(Stream serverStream)
|
||||
{
|
||||
logger.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream);
|
||||
LOG.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream);
|
||||
|
||||
serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
|
||||
|
||||
|
@ -336,18 +340,18 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
{
|
||||
if (dataInfoHandler.flushing)
|
||||
{
|
||||
logger.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
|
||||
LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
|
||||
dataInfoHandler = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
dataInfoHandler.flushing = true;
|
||||
logger.debug("SYN completed, queue size {}", queue.size());
|
||||
LOG.debug("SYN completed, queue size {}", queue.size());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.debug("SYN completed, queue empty");
|
||||
LOG.debug("SYN completed, queue empty");
|
||||
}
|
||||
}
|
||||
if (dataInfoHandler != null)
|
||||
|
@ -357,7 +361,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
logger.debug(x);
|
||||
LOG.debug(x);
|
||||
rst(clientStream);
|
||||
}
|
||||
|
||||
|
@ -375,18 +379,18 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
dataInfoHandler = queue.peek();
|
||||
if (dataInfoHandler.flushing)
|
||||
{
|
||||
logger.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
|
||||
LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
|
||||
serverStream = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
dataInfoHandler.flushing = true;
|
||||
logger.debug("Queued {}, queue size {}", dataInfo, queue.size());
|
||||
LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
|
||||
LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
|
||||
}
|
||||
}
|
||||
if (serverStream != null)
|
||||
|
@ -395,7 +399,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
|
||||
private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
|
||||
{
|
||||
logger.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
|
||||
LOG.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
|
||||
serverStream.data(dataInfoHandler.dataInfo, dataInfoHandler); //TODO: timeout???
|
||||
}
|
||||
|
||||
|
@ -425,11 +429,11 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
{
|
||||
assert !dataInfoHandler.flushing;
|
||||
dataInfoHandler.flushing = true;
|
||||
logger.debug("Completed {}, queue size {}", dataInfo, queue.size());
|
||||
LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.debug("Completed {}, queue empty", dataInfo);
|
||||
LOG.debug("Completed {}, queue empty", dataInfo);
|
||||
}
|
||||
}
|
||||
if (dataInfoHandler != null)
|
||||
|
@ -439,7 +443,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
logger.debug(x);
|
||||
LOG.debug(x);
|
||||
rst(clientStream);
|
||||
}
|
||||
}
|
||||
|
@ -450,7 +454,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
@Override
|
||||
public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo)
|
||||
{
|
||||
logger.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
|
||||
LOG.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
|
||||
|
||||
Fields headers = new Fields(serverSynInfo.getHeaders(), false);
|
||||
|
||||
|
@ -492,18 +496,19 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
|||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
// Push streams never send a reply
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHeaders(Stream stream, HeadersInfo headersInfo)
|
||||
{
|
||||
throw new UnsupportedOperationException(); //TODO
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onData(Stream serverStream, final DataInfo serverDataInfo)
|
||||
{
|
||||
logger.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
|
||||
LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
|
||||
|
||||
ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
|
||||
{
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
|
||||
package org.eclipse.jetty.spdy.server.http;
|
||||
|
||||
import org.eclipse.jetty.util.Fields;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
|
||||
public class SPDYTestUtils
|
||||
{
|
||||
public static Fields createHeaders(int port, short version, String httpMethod, String path)
|
||||
{
|
||||
Fields headers = new Fields();
|
||||
headers.put(HTTPSPDYHeader.METHOD.name(version), httpMethod);
|
||||
headers.put(HTTPSPDYHeader.URI.name(version), path);
|
||||
headers.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
|
||||
headers.put(HTTPSPDYHeader.SCHEME.name(version), "http");
|
||||
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + port);
|
||||
return headers;
|
||||
}
|
||||
|
||||
public static SslContextFactory newSslContextFactory()
|
||||
{
|
||||
SslContextFactory sslContextFactory = new SslContextFactory();
|
||||
sslContextFactory.setEndpointIdentificationAlgorithm("");
|
||||
sslContextFactory.setKeyStorePath("src/test/resources/keystore.jks");
|
||||
sslContextFactory.setKeyStorePassword("storepwd");
|
||||
sslContextFactory.setTrustStorePath("src/test/resources/truststore.jks");
|
||||
sslContextFactory.setTrustStorePassword("storepwd");
|
||||
sslContextFactory.setProtocol("TLSv1");
|
||||
sslContextFactory.setIncludeProtocols("TLSv1");
|
||||
return sslContextFactory;
|
||||
}
|
||||
}
|
|
@ -80,7 +80,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", path);
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", path);
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
|
@ -119,7 +119,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", uri);
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", uri);
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
|
@ -155,7 +155,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("HEAD", path);
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "HEAD", path);
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
|
@ -200,7 +200,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("POST", path);
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "POST", path);
|
||||
headers.put("content-type", "application/x-www-form-urlencoded");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, false, (byte)0),
|
||||
|
@ -242,7 +242,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("POST", path);
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "POST", path);
|
||||
headers.put("content-type", "application/x-www-form-urlencoded");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, false, (byte)0),
|
||||
|
@ -287,7 +287,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("POST", path);
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "POST", path);
|
||||
headers.put("content-type", "application/x-www-form-urlencoded");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, false, (byte)0), new StreamFrameListener.Adapter()
|
||||
|
@ -329,7 +329,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
|
@ -375,7 +375,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
|
@ -426,7 +426,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(2);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
|
@ -481,7 +481,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
|
@ -534,7 +534,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
|
@ -587,7 +587,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
|
@ -645,7 +645,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
|
@ -698,7 +698,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
|
@ -735,7 +735,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
|
@ -780,7 +780,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
|
@ -834,7 +834,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(2);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
|
@ -916,7 +916,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
|
@ -972,7 +972,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("GET", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "GET", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
final AtomicInteger contentLength = new AtomicInteger();
|
||||
|
@ -1036,7 +1036,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("POST", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "POST", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, false, (byte)0), new StreamFrameListener.Adapter()
|
||||
{
|
||||
|
@ -1079,7 +1079,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("POST", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "POST", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, true, (byte)0), new StreamFrameListener.Adapter()
|
||||
{
|
||||
|
@ -1124,7 +1124,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("POST", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "POST", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, false, (byte)0), new StreamFrameListener.Adapter()
|
||||
{
|
||||
|
@ -1192,7 +1192,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("POST", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "POST", "/foo");
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, false, (byte)0), new StreamFrameListener.Adapter()
|
||||
{
|
||||
|
@ -1260,7 +1260,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("POST", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "POST", "/foo");
|
||||
final CountDownLatch responseLatch = new CountDownLatch(2);
|
||||
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, false, (byte)0), new StreamFrameListener.Adapter()
|
||||
{
|
||||
|
@ -1301,7 +1301,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}), null);
|
||||
|
||||
Fields headers = createHeaders("POST", "/foo");
|
||||
Fields headers = SPDYTestUtils.createHeaders(connector.getPort(), version, "POST", "/foo");
|
||||
final CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, false, (byte)0), new StreamFrameListener.Adapter()
|
||||
{
|
||||
|
@ -1320,15 +1320,4 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
|
|||
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private Fields createHeaders(String httpMethod, String path)
|
||||
{
|
||||
Fields headers = new Fields();
|
||||
headers.put(HTTPSPDYHeader.METHOD.name(version), httpMethod);
|
||||
headers.put(HTTPSPDYHeader.URI.name(version), path);
|
||||
headers.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
|
||||
headers.put(HTTPSPDYHeader.SCHEME.name(version), "http");
|
||||
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + connector.getLocalPort());
|
||||
return headers;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.eclipse.jetty.spdy.server.proxy;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
|
@ -34,16 +33,12 @@ import org.eclipse.jetty.server.Server;
|
|||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.spdy.api.BytesDataInfo;
|
||||
import org.eclipse.jetty.spdy.api.DataInfo;
|
||||
import org.eclipse.jetty.spdy.api.GoAwayInfo;
|
||||
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
|
||||
import org.eclipse.jetty.spdy.api.PingInfo;
|
||||
import org.eclipse.jetty.spdy.api.PingResultInfo;
|
||||
import org.eclipse.jetty.spdy.api.PushInfo;
|
||||
import org.eclipse.jetty.spdy.api.ReplyInfo;
|
||||
import org.eclipse.jetty.spdy.api.RstInfo;
|
||||
import org.eclipse.jetty.spdy.api.SPDY;
|
||||
import org.eclipse.jetty.spdy.api.Session;
|
||||
import org.eclipse.jetty.spdy.api.SessionFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.Stream;
|
||||
import org.eclipse.jetty.spdy.api.StreamFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.StreamStatus;
|
||||
|
@ -56,10 +51,11 @@ import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
|
|||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Fields;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestWatcher;
|
||||
|
@ -67,10 +63,10 @@ import org.junit.runner.Description;
|
|||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@Ignore // TODO: make these tests pass
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class ProxyHTTPSPDYTest
|
||||
@RunWith(Parameterized.class)
|
||||
public class ProxyHTTPToSPDYTest
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(ProxyHTTPToSPDYTest.class);
|
||||
@Rule
|
||||
public final TestWatcher testName = new TestWatcher()
|
||||
{
|
||||
|
@ -84,6 +80,7 @@ public class ProxyHTTPSPDYTest
|
|||
description.getMethodName());
|
||||
}
|
||||
};
|
||||
|
||||
private final short version;
|
||||
|
||||
@Parameterized.Parameters
|
||||
|
@ -97,7 +94,7 @@ public class ProxyHTTPSPDYTest
|
|||
private Server proxy;
|
||||
private ServerConnector proxyConnector;
|
||||
|
||||
public ProxyHTTPSPDYTest(short version)
|
||||
public ProxyHTTPToSPDYTest(short version)
|
||||
{
|
||||
this.version = version;
|
||||
}
|
||||
|
@ -120,7 +117,7 @@ public class ProxyHTTPSPDYTest
|
|||
SPDYProxyEngine spdyProxyEngine = new SPDYProxyEngine(factory);
|
||||
proxyEngineSelector.putProxyEngine("spdy/" + version, spdyProxyEngine);
|
||||
proxyEngineSelector.putProxyServerInfo("localhost", new ProxyEngineSelector.ProxyServerInfo("spdy/" + version, address.getHostName(), address.getPort()));
|
||||
proxyConnector = new HTTPSPDYProxyServerConnector(server, proxyEngineSelector);
|
||||
proxyConnector = new HTTPSPDYProxyServerConnector(proxy, proxyEngineSelector);
|
||||
proxyConnector.setPort(0);
|
||||
proxy.addConnector(proxyConnector);
|
||||
proxy.start();
|
||||
|
@ -258,6 +255,11 @@ public class ProxyHTTPSPDYTest
|
|||
client2.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHEADRequest() throws Exception{
|
||||
// fail("Not yet implemented"); //TODO:
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGETThenSmallResponseContent() throws Exception
|
||||
{
|
||||
|
@ -301,7 +303,6 @@ public class ProxyHTTPSPDYTest
|
|||
line = reader.readLine();
|
||||
for (byte datum : data)
|
||||
Assert.assertEquals(datum, reader.read());
|
||||
Assert.assertFalse(reader.ready());
|
||||
|
||||
// Perform another request so that we are sure we reset the states of parsers and generators
|
||||
output.write(request.getBytes("UTF-8"));
|
||||
|
@ -450,109 +451,6 @@ public class ProxyHTTPSPDYTest
|
|||
client.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSYNThenREPLY() throws Exception
|
||||
{
|
||||
final String header = "foo";
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
Fields requestHeaders = synInfo.getHeaders();
|
||||
Assert.assertNotNull(requestHeaders.get("via"));
|
||||
Assert.assertNotNull(requestHeaders.get(header));
|
||||
|
||||
Fields responseHeaders = new Fields();
|
||||
responseHeaders.put(header, "baz");
|
||||
stream.reply(new ReplyInfo(responseHeaders, true), new Callback.Adapter());
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
|
||||
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
|
||||
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
Fields headers = new Fields();
|
||||
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
|
||||
headers.put(header, "bar");
|
||||
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
Fields headers = replyInfo.getHeaders();
|
||||
Assert.assertNotNull(headers.get(header));
|
||||
replyLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSYNThenREPLYAndDATA() throws Exception
|
||||
{
|
||||
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
|
||||
final String header = "foo";
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
Fields requestHeaders = synInfo.getHeaders();
|
||||
Assert.assertNotNull(requestHeaders.get("via"));
|
||||
Assert.assertNotNull(requestHeaders.get(header));
|
||||
|
||||
Fields responseHeaders = new Fields();
|
||||
responseHeaders.put(header, "baz");
|
||||
stream.reply(new ReplyInfo(responseHeaders, false), new Callback.Adapter());
|
||||
stream.data(new BytesDataInfo(data, true), new Callback.Adapter());
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
|
||||
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
|
||||
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
Fields headers = new Fields();
|
||||
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
|
||||
headers.put(header, "bar");
|
||||
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
private final ByteArrayOutputStream result = new ByteArrayOutputStream();
|
||||
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
Fields headers = replyInfo.getHeaders();
|
||||
Assert.assertNotNull(headers.get(header));
|
||||
replyLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataInfo dataInfo)
|
||||
{
|
||||
result.write(dataInfo.asBytes(true), 0, dataInfo.length());
|
||||
if (dataInfo.isClose())
|
||||
{
|
||||
Assert.assertArrayEquals(data, result.toByteArray());
|
||||
dataLatch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGETThenSPDYPushIsIgnored() throws Exception
|
||||
{
|
||||
|
@ -605,114 +503,6 @@ public class ProxyHTTPSPDYTest
|
|||
client.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSYNThenSPDYPushIsReceived() throws Exception
|
||||
{
|
||||
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
Fields responseHeaders = new Fields();
|
||||
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
|
||||
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK");
|
||||
stream.reply(new ReplyInfo(responseHeaders, false), new Callback.Adapter());
|
||||
|
||||
Fields pushHeaders = new Fields();
|
||||
pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/push");
|
||||
stream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Promise.Adapter<Stream>()
|
||||
{
|
||||
@Override
|
||||
public void succeeded(Stream pushStream)
|
||||
{
|
||||
pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter());
|
||||
}
|
||||
});
|
||||
|
||||
stream.data(new BytesDataInfo(data, true), new Callback.Adapter());
|
||||
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
|
||||
|
||||
final CountDownLatch pushSynLatch = new CountDownLatch(1);
|
||||
final CountDownLatch pushDataLatch = new CountDownLatch(1);
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
pushSynLatch.countDown();
|
||||
return new StreamFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onData(Stream stream, DataInfo dataInfo)
|
||||
{
|
||||
dataInfo.consume(dataInfo.length());
|
||||
if (dataInfo.isClose())
|
||||
pushDataLatch.countDown();
|
||||
}
|
||||
};
|
||||
}
|
||||
}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
Fields headers = new Fields();
|
||||
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
replyLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataInfo dataInfo)
|
||||
{
|
||||
dataInfo.consume(dataInfo.length());
|
||||
if (dataInfo.isClose())
|
||||
dataLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(pushSynLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(pushDataLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPING() throws Exception
|
||||
{
|
||||
// PING is per hop, and it does not carry the information to which server to ping to
|
||||
// We just verify that it works
|
||||
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()));
|
||||
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
|
||||
|
||||
final CountDownLatch pingLatch = new CountDownLatch(1);
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onPing(Session session, PingResultInfo pingInfo)
|
||||
{
|
||||
pingLatch.countDown();
|
||||
}
|
||||
}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
client.ping(new PingInfo(5, TimeUnit.SECONDS));
|
||||
|
||||
Assert.assertTrue(pingLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGETThenReset() throws Exception
|
||||
{
|
||||
|
@ -748,42 +538,4 @@ public class ProxyHTTPSPDYTest
|
|||
|
||||
client.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSYNThenReset() throws Exception
|
||||
{
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
Assert.assertTrue(synInfo.isClose());
|
||||
Fields requestHeaders = synInfo.getHeaders();
|
||||
Assert.assertNotNull(requestHeaders.get("via"));
|
||||
|
||||
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter());
|
||||
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
|
||||
|
||||
final CountDownLatch resetLatch = new CountDownLatch(1);
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onRst(Session session, RstInfo rstInfo)
|
||||
{
|
||||
resetLatch.countDown();
|
||||
}
|
||||
}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
Fields headers = new Fields();
|
||||
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
|
||||
client.syn(new SynInfo(headers, true), null);
|
||||
|
||||
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,467 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.spdy.server.proxy;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||
import org.eclipse.jetty.spdy.api.DataInfo;
|
||||
import org.eclipse.jetty.spdy.api.GoAwayInfo;
|
||||
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
|
||||
import org.eclipse.jetty.spdy.api.PingInfo;
|
||||
import org.eclipse.jetty.spdy.api.PingResultInfo;
|
||||
import org.eclipse.jetty.spdy.api.ReplyInfo;
|
||||
import org.eclipse.jetty.spdy.api.SPDY;
|
||||
import org.eclipse.jetty.spdy.api.Session;
|
||||
import org.eclipse.jetty.spdy.api.SessionFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.Stream;
|
||||
import org.eclipse.jetty.spdy.api.StreamFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.StringDataInfo;
|
||||
import org.eclipse.jetty.spdy.api.SynInfo;
|
||||
import org.eclipse.jetty.spdy.client.SPDYClient;
|
||||
import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
|
||||
import org.eclipse.jetty.spdy.server.http.SPDYTestUtils;
|
||||
import org.eclipse.jetty.util.Fields;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestWatcher;
|
||||
import org.junit.runner.Description;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
//@RunWith(value = Parameterized.class)
|
||||
@RunWith(JUnit4.class)
|
||||
public class ProxySPDYToHTTPTest
|
||||
{
|
||||
@Rule
|
||||
public final TestWatcher testName = new TestWatcher()
|
||||
{
|
||||
|
||||
@Override
|
||||
public void starting(Description description)
|
||||
{
|
||||
super.starting(description);
|
||||
System.err.printf("Running %s.%s()%n",
|
||||
description.getClassName(),
|
||||
description.getMethodName());
|
||||
}
|
||||
};
|
||||
private final short version = SPDY.V3;
|
||||
|
||||
// @Parameterized.Parameters
|
||||
// public static Collection<Short[]> parameters()
|
||||
// {
|
||||
// return Arrays.asList(new Short[]{SPDY.V2}, new Short[]{SPDY.V3});
|
||||
// }
|
||||
|
||||
private SPDYClient.Factory factory;
|
||||
private Server server;
|
||||
private Server proxy;
|
||||
private ServerConnector proxyConnector;
|
||||
private SslContextFactory sslContextFactory = SPDYTestUtils.newSslContextFactory();
|
||||
|
||||
// public ProxySPDYToHTTPTest(short version)
|
||||
// {
|
||||
// this.version = version;
|
||||
// }
|
||||
|
||||
protected InetSocketAddress startServer(Handler handler) throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
server.setHandler(handler);
|
||||
server.addConnector(connector);
|
||||
server.start();
|
||||
return new InetSocketAddress("localhost", connector.getLocalPort());
|
||||
}
|
||||
|
||||
protected InetSocketAddress startProxy(InetSocketAddress address, long proxyConnectorTimeout, long proxyEngineTimeout) throws Exception
|
||||
{
|
||||
proxy = new Server();
|
||||
ProxyEngineSelector proxyEngineSelector = new ProxyEngineSelector();
|
||||
HTTPProxyEngine httpProxyEngine = new HTTPProxyEngine();
|
||||
httpProxyEngine.setIdleTimeout(proxyEngineTimeout);
|
||||
proxyEngineSelector.putProxyEngine("http/1.1", httpProxyEngine);
|
||||
proxyEngineSelector.putProxyServerInfo("localhost", new ProxyEngineSelector.ProxyServerInfo("http/1.1", address.getHostName(), address.getPort()));
|
||||
proxyConnector = new HTTPSPDYProxyServerConnector(proxy, sslContextFactory, proxyEngineSelector);
|
||||
proxyConnector.setPort(0);
|
||||
proxyConnector.setIdleTimeout(proxyConnectorTimeout);
|
||||
proxy.addConnector(proxyConnector);
|
||||
proxy.start();
|
||||
return new InetSocketAddress("localhost", proxyConnector.getLocalPort());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() throws Exception
|
||||
{
|
||||
factory = new SPDYClient.Factory(sslContextFactory);
|
||||
factory.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void destroy() throws Exception
|
||||
{
|
||||
if (server != null)
|
||||
{
|
||||
server.stop();
|
||||
server.join();
|
||||
}
|
||||
if (proxy != null)
|
||||
{
|
||||
proxy.stop();
|
||||
proxy.join();
|
||||
}
|
||||
factory.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSYNThenREPLY() throws Exception
|
||||
{
|
||||
final String header = "foo";
|
||||
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new TestServerHandler(header, null)), 30000, 30000);
|
||||
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
|
||||
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
|
||||
Fields headers = SPDYTestUtils.createHeaders(proxyAddress.getPort(), version, "GET", "/");
|
||||
headers.put(header, "bar");
|
||||
headers.put("connection", "close");
|
||||
|
||||
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
Fields headers = replyInfo.getHeaders();
|
||||
assertThat("Custom set header foo is set on response", headers.get(header), is(notNullValue()));
|
||||
assertThat("HOP headers like connection are removed before forwarding",
|
||||
headers.get("connection"), is(nullValue()));
|
||||
replyLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertThat("Reply is send to SPDY client", replyLatch.await(5, TimeUnit.SECONDS), is(true));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSYNThenREPLYAndDATA() throws Exception
|
||||
{
|
||||
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
|
||||
final String header = "foo";
|
||||
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new TestServerHandler(header, data)), 30000, 30000);
|
||||
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
|
||||
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
|
||||
Fields headers = SPDYTestUtils.createHeaders(proxyAddress.getPort(), version, "GET", "/");
|
||||
headers.put(header, "bar");
|
||||
headers.put("connection", "close");
|
||||
|
||||
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
private final ByteArrayOutputStream result = new ByteArrayOutputStream();
|
||||
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
Fields headers = replyInfo.getHeaders();
|
||||
assertThat("custom header exists in response", headers.get(header), is(notNullValue()));
|
||||
replyLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataInfo dataInfo)
|
||||
{
|
||||
result.write(dataInfo.asBytes(true), 0, dataInfo.length());
|
||||
if (dataInfo.isClose())
|
||||
{
|
||||
assertThat("received data matches send data", result.toByteArray(), is(data));
|
||||
dataLatch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
assertThat("reply has been received", replyLatch.await(5, TimeUnit.SECONDS), is(true));
|
||||
assertThat("data has been received", dataLatch.await(5, TimeUnit.SECONDS), is(true));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSYNWithRequestContentThenREPLYAndDATA() throws Exception
|
||||
{
|
||||
final String data = "0123456789ABCDEF";
|
||||
final String header = "foo";
|
||||
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new TestServerHandler(header, null)), 30000, 30000);
|
||||
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
|
||||
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
|
||||
Fields headers = SPDYTestUtils.createHeaders(proxyAddress.getPort(), version, "POST", "/");
|
||||
headers.put(header, "bar");
|
||||
headers.put("connection", "close");
|
||||
|
||||
Stream stream = client.syn(new SynInfo(headers, false), new StreamFrameListener.Adapter()
|
||||
{
|
||||
private final ByteArrayOutputStream result = new ByteArrayOutputStream();
|
||||
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
Fields headers = replyInfo.getHeaders();
|
||||
assertThat("custom header exists in response", headers.get(header), is(notNullValue()));
|
||||
replyLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataInfo dataInfo)
|
||||
{
|
||||
result.write(dataInfo.asBytes(true), 0, dataInfo.length());
|
||||
if (dataInfo.isClose())
|
||||
{
|
||||
assertThat("received data matches send data", data, is(result.toString()));
|
||||
dataLatch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
stream.data(new StringDataInfo(data, true));
|
||||
|
||||
assertThat("reply has been received", replyLatch.await(5, TimeUnit.SECONDS), is(true));
|
||||
assertThat("data has been received", dataLatch.await(5, TimeUnit.SECONDS), is(true));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSYNWithSplitRequestContentThenREPLYAndDATA() throws Exception
|
||||
{
|
||||
final String data = "0123456789ABCDEF";
|
||||
final String data2 = "ABCDEF0123456789";
|
||||
final String header = "foo";
|
||||
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new TestServerHandler(header, null)), 30000, 30000);
|
||||
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
|
||||
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
|
||||
Fields headers = SPDYTestUtils.createHeaders(proxyAddress.getPort(), version, "POST", "/");
|
||||
headers.put(header, "bar");
|
||||
headers.put("connection", "close");
|
||||
|
||||
Stream stream = client.syn(new SynInfo(headers, false), new StreamFrameListener.Adapter()
|
||||
{
|
||||
private final ByteArrayOutputStream result = new ByteArrayOutputStream();
|
||||
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
Fields headers = replyInfo.getHeaders();
|
||||
assertThat("custom header exists in response", headers.get(header), is(notNullValue()));
|
||||
replyLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataInfo dataInfo)
|
||||
{
|
||||
result.write(dataInfo.asBytes(true), 0, dataInfo.length());
|
||||
if (dataInfo.isClose())
|
||||
{
|
||||
assertThat("received data matches send data", result.toString(), is(data + data2));
|
||||
dataLatch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
stream.data(new StringDataInfo(data, false));
|
||||
stream.data(new StringDataInfo(data2, true));
|
||||
|
||||
assertThat("reply has been received", replyLatch.await(5, TimeUnit.SECONDS), is(true));
|
||||
assertThat("data has been received", dataLatch.await(5, TimeUnit.SECONDS), is(true));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientTimeout() throws Exception
|
||||
{
|
||||
long timeout = 1000;
|
||||
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new TestServerHandler(null, null)), timeout, 30000);
|
||||
|
||||
final CountDownLatch goAwayLatch = new CountDownLatch(1);
|
||||
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayReceivedInfo goAwayReceivedInfo)
|
||||
{
|
||||
goAwayLatch.countDown();
|
||||
}
|
||||
}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
Fields headers = SPDYTestUtils.createHeaders(proxyAddress.getPort(), version, "POST", "/");
|
||||
client.syn(new SynInfo(headers, false), null);
|
||||
assertThat("goAway has been received by proxy", goAwayLatch.await(2 * timeout, TimeUnit.MILLISECONDS),
|
||||
is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerTimeout() throws Exception
|
||||
{
|
||||
final int timeout = 1000;
|
||||
final String header = "foo";
|
||||
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new DefaultHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(2 * timeout);
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}), 30000, timeout);
|
||||
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
|
||||
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
|
||||
Fields headers = SPDYTestUtils.createHeaders(proxyAddress.getPort(), version, "POST", "/");
|
||||
headers.put(header, "bar");
|
||||
headers.put("connection", "close");
|
||||
|
||||
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
Fields headers = replyInfo.getHeaders();
|
||||
assertThat("status is 504", headers.get(HTTPSPDYHeader.STATUS.name(version)).value(), is("504"));
|
||||
replyLatch.countDown();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
assertThat("reply has been received", replyLatch.await(5, TimeUnit.SECONDS), is(true));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPING() throws Exception
|
||||
{
|
||||
// PING is per hop, and it does not carry the information to which server to ping to
|
||||
// We just verify that it works
|
||||
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(null), 30000, 30000);
|
||||
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
|
||||
|
||||
final CountDownLatch pingLatch = new CountDownLatch(1);
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onPing(Session session, PingResultInfo pingInfo)
|
||||
{
|
||||
pingLatch.countDown();
|
||||
}
|
||||
}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
client.ping(new PingInfo(5, TimeUnit.SECONDS));
|
||||
|
||||
Assert.assertTrue(pingLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private class TestServerHandler extends DefaultHandler
|
||||
{
|
||||
private final String responseHeader;
|
||||
private final byte[] responseData;
|
||||
|
||||
private TestServerHandler(String responseHeader, byte[] responseData)
|
||||
{
|
||||
this.responseHeader = responseHeader;
|
||||
this.responseData = responseData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request,
|
||||
HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
assertThat("Via Header is set", baseRequest.getHeader("X-Forwarded-For"), is(notNullValue()));
|
||||
assertThat("X-Forwarded-For Header is set", baseRequest.getHeader("X-Forwarded-For"),
|
||||
is(notNullValue()));
|
||||
assertThat("X-Forwarded-Host Header is set", baseRequest.getHeader("X-Forwarded-Host"),
|
||||
is(notNullValue()));
|
||||
assertThat("X-Forwarded-Proto Header is set", baseRequest.getHeader("X-Forwarded-Proto"),
|
||||
is(notNullValue()));
|
||||
assertThat("X-Forwarded-Server Header is set", baseRequest.getHeader("X-Forwarded-Server"),
|
||||
is(notNullValue()));
|
||||
baseRequest.setHandled(true);
|
||||
BufferedReader bufferedReader = request.getReader();
|
||||
int read;
|
||||
while ((read = bufferedReader.read()) != -1)
|
||||
response.getOutputStream().write(read);
|
||||
|
||||
if (responseHeader != null)
|
||||
response.addHeader(responseHeader, "bar");
|
||||
if (responseData != null)
|
||||
response.getOutputStream().write(responseData);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,395 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.spdy.server.proxy;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.spdy.api.BytesDataInfo;
|
||||
import org.eclipse.jetty.spdy.api.DataInfo;
|
||||
import org.eclipse.jetty.spdy.api.GoAwayInfo;
|
||||
import org.eclipse.jetty.spdy.api.PingInfo;
|
||||
import org.eclipse.jetty.spdy.api.PingResultInfo;
|
||||
import org.eclipse.jetty.spdy.api.PushInfo;
|
||||
import org.eclipse.jetty.spdy.api.ReplyInfo;
|
||||
import org.eclipse.jetty.spdy.api.RstInfo;
|
||||
import org.eclipse.jetty.spdy.api.SPDY;
|
||||
import org.eclipse.jetty.spdy.api.Session;
|
||||
import org.eclipse.jetty.spdy.api.SessionFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.Stream;
|
||||
import org.eclipse.jetty.spdy.api.StreamFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.StreamStatus;
|
||||
import org.eclipse.jetty.spdy.api.SynInfo;
|
||||
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
|
||||
import org.eclipse.jetty.spdy.client.SPDYClient;
|
||||
import org.eclipse.jetty.spdy.server.SPDYServerConnectionFactory;
|
||||
import org.eclipse.jetty.spdy.server.SPDYServerConnector;
|
||||
import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
|
||||
import org.eclipse.jetty.spdy.server.http.SPDYTestUtils;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Fields;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestWatcher;
|
||||
import org.junit.runner.Description;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class ProxySPDYToSPDYTest
|
||||
{
|
||||
@Rule
|
||||
public final TestWatcher testName = new TestWatcher()
|
||||
{
|
||||
|
||||
@Override
|
||||
public void starting(Description description)
|
||||
{
|
||||
super.starting(description);
|
||||
System.err.printf("Running %s.%s()%n",
|
||||
description.getClassName(),
|
||||
description.getMethodName());
|
||||
}
|
||||
};
|
||||
private final short version;
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<Short[]> parameters()
|
||||
{
|
||||
return Arrays.asList(new Short[]{SPDY.V2}, new Short[]{SPDY.V3});
|
||||
}
|
||||
|
||||
private SPDYClient.Factory factory;
|
||||
private Server server;
|
||||
private Server proxy;
|
||||
private ServerConnector proxyConnector;
|
||||
private SslContextFactory sslContextFactory = SPDYTestUtils.newSslContextFactory();
|
||||
|
||||
public ProxySPDYToSPDYTest(short version)
|
||||
{
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
protected InetSocketAddress startServer(ServerSessionFrameListener listener) throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
SPDYServerConnector serverConnector = new SPDYServerConnector(server, sslContextFactory, listener);
|
||||
serverConnector.addConnectionFactory(new SPDYServerConnectionFactory(version, listener));
|
||||
serverConnector.setPort(0);
|
||||
server.addConnector(serverConnector);
|
||||
server.start();
|
||||
return new InetSocketAddress("localhost", serverConnector.getLocalPort());
|
||||
}
|
||||
|
||||
protected InetSocketAddress startProxy(InetSocketAddress address) throws Exception
|
||||
{
|
||||
proxy = new Server();
|
||||
ProxyEngineSelector proxyEngineSelector = new ProxyEngineSelector();
|
||||
SPDYProxyEngine spdyProxyEngine = new SPDYProxyEngine(factory);
|
||||
proxyEngineSelector.putProxyEngine("spdy/" + version, spdyProxyEngine);
|
||||
proxyEngineSelector.putProxyServerInfo("localhost", new ProxyEngineSelector.ProxyServerInfo("spdy/" + version, address.getHostName(), address.getPort()));
|
||||
proxyConnector = new HTTPSPDYProxyServerConnector(proxy, sslContextFactory, proxyEngineSelector);
|
||||
proxyConnector.setPort(0);
|
||||
proxy.addConnector(proxyConnector);
|
||||
proxy.start();
|
||||
return new InetSocketAddress("localhost", proxyConnector.getLocalPort());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() throws Exception
|
||||
{
|
||||
factory = new SPDYClient.Factory(sslContextFactory);
|
||||
factory.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void destroy() throws Exception
|
||||
{
|
||||
if (server != null)
|
||||
{
|
||||
server.stop();
|
||||
server.join();
|
||||
}
|
||||
if (proxy != null)
|
||||
{
|
||||
proxy.stop();
|
||||
proxy.join();
|
||||
}
|
||||
factory.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSYNThenREPLY() throws Exception
|
||||
{
|
||||
final String header = "foo";
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
Fields requestHeaders = synInfo.getHeaders();
|
||||
Assert.assertNotNull(requestHeaders.get("via"));
|
||||
Assert.assertNotNull(requestHeaders.get(header));
|
||||
|
||||
Fields responseHeaders = new Fields();
|
||||
responseHeaders.put(header, "baz");
|
||||
stream.reply(new ReplyInfo(responseHeaders, true), new Callback.Adapter());
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
|
||||
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
|
||||
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
Fields headers = SPDYTestUtils.createHeaders(proxyAddress.getPort(), version, "GET", "/");
|
||||
headers.put(header, "bar");
|
||||
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
Fields headers = replyInfo.getHeaders();
|
||||
Assert.assertNotNull(headers.get(header));
|
||||
replyLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSYNThenREPLYAndDATA() throws Exception
|
||||
{
|
||||
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
|
||||
final String header = "foo";
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
Fields requestHeaders = synInfo.getHeaders();
|
||||
Assert.assertNotNull(requestHeaders.get("via"));
|
||||
Assert.assertNotNull(requestHeaders.get(header));
|
||||
|
||||
Fields responseHeaders = new Fields();
|
||||
responseHeaders.put(header, "baz");
|
||||
stream.reply(new ReplyInfo(responseHeaders, false), new Callback.Adapter());
|
||||
stream.data(new BytesDataInfo(data, true), new Callback.Adapter());
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
|
||||
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
|
||||
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
Fields headers = new Fields();
|
||||
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
|
||||
headers.put(header, "bar");
|
||||
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
private final ByteArrayOutputStream result = new ByteArrayOutputStream();
|
||||
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
Fields headers = replyInfo.getHeaders();
|
||||
Assert.assertNotNull(headers.get(header));
|
||||
replyLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataInfo dataInfo)
|
||||
{
|
||||
result.write(dataInfo.asBytes(true), 0, dataInfo.length());
|
||||
if (dataInfo.isClose())
|
||||
{
|
||||
Assert.assertArrayEquals(data, result.toByteArray());
|
||||
dataLatch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSYNThenSPDYPushIsReceived() throws Exception
|
||||
{
|
||||
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
Fields responseHeaders = new Fields();
|
||||
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
|
||||
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK");
|
||||
stream.reply(new ReplyInfo(responseHeaders, false), new Callback.Adapter());
|
||||
|
||||
Fields pushHeaders = new Fields();
|
||||
pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/push");
|
||||
stream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Promise.Adapter<Stream>()
|
||||
{
|
||||
@Override
|
||||
public void succeeded(Stream pushStream)
|
||||
{
|
||||
pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter());
|
||||
}
|
||||
});
|
||||
|
||||
stream.data(new BytesDataInfo(data, true), new Callback.Adapter());
|
||||
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
|
||||
|
||||
final CountDownLatch pushSynLatch = new CountDownLatch(1);
|
||||
final CountDownLatch pushDataLatch = new CountDownLatch(1);
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
pushSynLatch.countDown();
|
||||
return new StreamFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onData(Stream stream, DataInfo dataInfo)
|
||||
{
|
||||
dataInfo.consume(dataInfo.length());
|
||||
if (dataInfo.isClose())
|
||||
pushDataLatch.countDown();
|
||||
}
|
||||
};
|
||||
}
|
||||
}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
Fields headers = new Fields();
|
||||
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
replyLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataInfo dataInfo)
|
||||
{
|
||||
dataInfo.consume(dataInfo.length());
|
||||
if (dataInfo.isClose())
|
||||
dataLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(pushSynLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(pushDataLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPING() throws Exception
|
||||
{
|
||||
// PING is per hop, and it does not carry the information to which server to ping to
|
||||
// We just verify that it works
|
||||
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()));
|
||||
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
|
||||
|
||||
final CountDownLatch pingLatch = new CountDownLatch(1);
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onPing(Session session, PingResultInfo pingInfo)
|
||||
{
|
||||
pingLatch.countDown();
|
||||
}
|
||||
}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
client.ping(new PingInfo(5, TimeUnit.SECONDS));
|
||||
|
||||
Assert.assertTrue(pingLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSYNThenReset() throws Exception
|
||||
{
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
Assert.assertTrue(synInfo.isClose());
|
||||
Fields requestHeaders = synInfo.getHeaders();
|
||||
Assert.assertNotNull(requestHeaders.get("via"));
|
||||
|
||||
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter());
|
||||
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
|
||||
|
||||
final CountDownLatch resetLatch = new CountDownLatch(1);
|
||||
Session client = factory.newSPDYClient(version).connect(proxyAddress, new SessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onRst(Session session, RstInfo rstInfo)
|
||||
{
|
||||
resetLatch.countDown();
|
||||
}
|
||||
}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
Fields headers = new Fields();
|
||||
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
|
||||
client.syn(new SynInfo(headers, true), null);
|
||||
|
||||
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
|
@ -2,4 +2,6 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
|||
#org.eclipse.jetty.spdy.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.server.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.io.ssl.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.spdy.server.LEVEL=DEBUG
|
||||
org.eclipse.jetty.spdy.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.client.LEVEL=DEBUG
|
||||
#org.mortbay.LEVEL=DEBUG
|
||||
|
|
Loading…
Reference in New Issue