Merge branch 'master' into jetty-8
This commit is contained in:
commit
5222459aa7
|
@ -0,0 +1,191 @@
|
|||
// ========================================================================
|
||||
// Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ExpirationWithLimitedConnectionsTest
|
||||
{
|
||||
@Test
|
||||
public void testExpirationWithMaxConnectionPerAddressReached() throws Exception
|
||||
{
|
||||
final Logger logger = Log.getLogger("org.eclipse.jetty.client");
|
||||
logger.setDebugEnabled(true);
|
||||
|
||||
HttpClient client = new HttpClient();
|
||||
int maxConnectionsPerAddress = 10;
|
||||
client.setMaxConnectionsPerAddress(maxConnectionsPerAddress);
|
||||
long timeout = 1000;
|
||||
client.setTimeout(timeout);
|
||||
client.start();
|
||||
|
||||
final List<Socket> sockets = new CopyOnWriteArrayList<Socket>();
|
||||
final List<Exception> failures = new CopyOnWriteArrayList<Exception>();
|
||||
final AtomicLong processingDelay = new AtomicLong(200);
|
||||
|
||||
final ExecutorService threadPool = Executors.newCachedThreadPool();
|
||||
final ServerSocket server = new ServerSocket(0);
|
||||
threadPool.submit(new Runnable()
|
||||
{
|
||||
public void run()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
final Socket socket = server.accept();
|
||||
sockets.add(socket);
|
||||
logger.debug("CONNECTION {}", socket.getRemoteSocketAddress());
|
||||
threadPool.submit(new Runnable()
|
||||
{
|
||||
public void run()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
|
||||
String firstLine = reader.readLine();
|
||||
String line = firstLine;
|
||||
while (line != null)
|
||||
{
|
||||
if (line.length() == 0)
|
||||
break;
|
||||
line = reader.readLine();
|
||||
}
|
||||
|
||||
if (line == null)
|
||||
break;
|
||||
|
||||
long sleep = processingDelay.get();
|
||||
logger.debug("{} {} {} ms", firstLine, socket.getRemoteSocketAddress(), sleep);
|
||||
TimeUnit.MILLISECONDS.sleep(sleep);
|
||||
|
||||
String response = "" +
|
||||
"HTTP/1.1 200 OK\r\n" +
|
||||
"Content-Length: 0\r\n" +
|
||||
"\r\n";
|
||||
OutputStream output = socket.getOutputStream();
|
||||
output.write(response.getBytes("UTF-8"));
|
||||
output.flush();
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
failures.add(x);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
failures.add(x);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
List<ContentExchange> exchanges = new ArrayList<ContentExchange>();
|
||||
|
||||
final AtomicBoolean firstExpired = new AtomicBoolean();
|
||||
int count = 0;
|
||||
int maxAdditionalRequest = 100;
|
||||
int additionalRequests = 0;
|
||||
while (true)
|
||||
{
|
||||
TimeUnit.MILLISECONDS.sleep(1); // Just avoid being too fast
|
||||
ContentExchange exchange = new ContentExchange(true)
|
||||
{
|
||||
@Override
|
||||
protected void onResponseComplete() throws IOException
|
||||
{
|
||||
logger.debug("{} {} OK", getMethod(), getRequestURI());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onExpire()
|
||||
{
|
||||
logger.debug("{} {} EXPIRED {}", getMethod(), getRequestURI(), this);
|
||||
firstExpired.compareAndSet(false, true);
|
||||
}
|
||||
};
|
||||
exchanges.add(exchange);
|
||||
Address address = new Address("localhost", server.getLocalPort());
|
||||
exchange.setAddress(address);
|
||||
exchange.setMethod("GET");
|
||||
exchange.setRequestURI("/" + count);
|
||||
exchange.setVersion("HTTP/1.1");
|
||||
exchange.setRequestHeader("Host", address.toString());
|
||||
logger.debug("{} {} SENT", exchange.getMethod(), exchange.getRequestURI());
|
||||
client.send(exchange);
|
||||
++count;
|
||||
|
||||
if (processingDelay.get() > 0)
|
||||
{
|
||||
if (client.getDestination(address, false).getConnections() == maxConnectionsPerAddress)
|
||||
{
|
||||
if (firstExpired.get())
|
||||
{
|
||||
++additionalRequests;
|
||||
if (additionalRequests == maxAdditionalRequest)
|
||||
processingDelay.set(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
++additionalRequests;
|
||||
if (additionalRequests == 2 * maxAdditionalRequest)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (ContentExchange exchange : exchanges)
|
||||
{
|
||||
int status = exchange.waitForDone();
|
||||
Assert.assertTrue(status == HttpExchange.STATUS_COMPLETED || status == HttpExchange.STATUS_EXPIRED);
|
||||
}
|
||||
|
||||
client.stop();
|
||||
|
||||
Assert.assertTrue(failures.isEmpty());
|
||||
|
||||
for (Socket socket : sockets)
|
||||
socket.close();
|
||||
server.close();
|
||||
|
||||
threadPool.shutdown();
|
||||
threadPool.awaitTermination(5, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
|
@ -8,14 +8,14 @@
|
|||
<New class="org.eclipse.jetty.monitor.ThreadMonitor">
|
||||
<Set name="scanInterval">2000</Set>
|
||||
<Set name="busyThreshold">90</Set>
|
||||
<Set name="stackDepth">3</Set>
|
||||
<Set name="stackDepth">5</Set>
|
||||
<Set name="trailLength">2</Set>
|
||||
<!-- To enable logging CPU utilization for threads above specified threshold, -->
|
||||
<!-- uncomment the following lines, changing log interval (in milliseconds) -->
|
||||
<!-- and log threshold (in percent) as desired. -->
|
||||
<!--
|
||||
<Set name="logInterval">10000</Arg>
|
||||
<Set name="logThreshold">1</Arg>
|
||||
<Set name="logInterval">10000</Set>
|
||||
<Set name="logThreshold">65</Set>
|
||||
-->
|
||||
|
||||
<!-- To enable detail dump of the server whenever a thread is detected as spinning, -->
|
||||
|
|
|
@ -12,15 +12,21 @@ package org.eclipse.jetty.server.handler;
|
|||
//You may elect to redistribute this code under either of these licenses.
|
||||
//========================================================================
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assume.*;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -31,13 +37,10 @@ import org.eclipse.jetty.io.EndPoint;
|
|||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.nio.SelectChannelConnector;
|
||||
import org.eclipse.jetty.toolchain.test.OS;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
/**
|
||||
* @version $Revision$ $Date$
|
||||
*/
|
||||
|
@ -120,7 +123,25 @@ public class ConnectHandlerTest extends AbstractConnectHandlerTest
|
|||
@Test
|
||||
public void testCONNECTBadHostPort() throws Exception
|
||||
{
|
||||
String hostPort = "badlocalhost:" + serverConnector.getLocalPort();
|
||||
String invalidHostname = "AMAZEBALLS_BADHOST.webtide.com";
|
||||
|
||||
try
|
||||
{
|
||||
InetAddress addr = InetAddress.getByName(invalidHostname);
|
||||
StringBuilder err = new StringBuilder();
|
||||
err.append("DNS Hijacking detected: ");
|
||||
err.append(invalidHostname).append(" should have not returned a valid IP address [");
|
||||
err.append(addr.getHostAddress()).append("]. ");
|
||||
err.append("Fix your DNS provider to have this test pass.");
|
||||
err.append("\nFor more info see https://en.wikipedia.org/wiki/DNS_hijacking");
|
||||
Assert.assertNull(err.toString(), addr);
|
||||
}
|
||||
catch (UnknownHostException e)
|
||||
{
|
||||
// expected path
|
||||
}
|
||||
|
||||
String hostPort = String.format("%s:%d",invalidHostname,serverConnector.getLocalPort());
|
||||
String request = "" +
|
||||
"CONNECT " + hostPort + " HTTP/1.1\r\n" +
|
||||
"Host: " + hostPort + "\r\n" +
|
||||
|
@ -137,7 +158,7 @@ public class ConnectHandlerTest extends AbstractConnectHandlerTest
|
|||
|
||||
// Expect 500 OK from the CONNECT request
|
||||
Response response = readResponse(input);
|
||||
assertEquals("500", response.getCode());
|
||||
assertEquals("Response Code", "500", response.getCode());
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
|
@ -105,6 +105,7 @@ public class CrossOriginFilter implements Filter
|
|||
public static final String PREFLIGHT_MAX_AGE_PARAM = "preflightMaxAge";
|
||||
public static final String ALLOW_CREDENTIALS_PARAM = "allowCredentials";
|
||||
public static final String EXPOSED_HEADERS_PARAM = "exposedHeaders";
|
||||
public static final String FORWARD_PREFLIGHT_PARAM = "forwardPreflight";
|
||||
private static final String ANY_ORIGIN = "*";
|
||||
private static final List<String> SIMPLE_HTTP_METHODS = Arrays.asList("GET", "POST", "HEAD");
|
||||
|
||||
|
@ -113,8 +114,9 @@ public class CrossOriginFilter implements Filter
|
|||
private List<String> allowedMethods = new ArrayList<String>();
|
||||
private List<String> allowedHeaders = new ArrayList<String>();
|
||||
private List<String> exposedHeaders = new ArrayList<String>();
|
||||
private int preflightMaxAge = 0;
|
||||
private int preflightMaxAge;
|
||||
private boolean allowCredentials;
|
||||
private boolean forwardPreflight;
|
||||
|
||||
public void init(FilterConfig config) throws ServletException
|
||||
{
|
||||
|
@ -172,6 +174,11 @@ public class CrossOriginFilter implements Filter
|
|||
exposedHeadersConfig = "";
|
||||
exposedHeaders.addAll(Arrays.asList(exposedHeadersConfig.split(",")));
|
||||
|
||||
String forwardPreflightConfig = config.getInitParameter(FORWARD_PREFLIGHT_PARAM);
|
||||
if (forwardPreflightConfig == null)
|
||||
forwardPreflightConfig = "true";
|
||||
forwardPreflight = Boolean.parseBoolean(forwardPreflightConfig);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Cross-origin filter configuration: " +
|
||||
|
@ -180,7 +187,8 @@ public class CrossOriginFilter implements Filter
|
|||
ALLOWED_HEADERS_PARAM + " = " + allowedHeadersConfig + ", " +
|
||||
PREFLIGHT_MAX_AGE_PARAM + " = " + preflightMaxAgeConfig + ", " +
|
||||
ALLOW_CREDENTIALS_PARAM + " = " + allowedCredentialsConfig + "," +
|
||||
EXPOSED_HEADERS_PARAM + " = " + exposedHeadersConfig
|
||||
EXPOSED_HEADERS_PARAM + " = " + exposedHeadersConfig + "," +
|
||||
FORWARD_PREFLIGHT_PARAM + " = " + forwardPreflightConfig
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -207,6 +215,10 @@ public class CrossOriginFilter implements Filter
|
|||
{
|
||||
LOG.debug("Cross-origin request to {} is a preflight cross-origin request", request.getRequestURI());
|
||||
handlePreflightResponse(request, response, origin);
|
||||
if (forwardPreflight)
|
||||
LOG.debug("Preflight cross-origin request to {} forwarded to application", request.getRequestURI());
|
||||
else
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -1,16 +1,17 @@
|
|||
// ========================================================================
|
||||
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
|
||||
package org.eclipse.jetty.servlets;
|
||||
//========================================================================
|
||||
//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
|
||||
//------------------------------------------------------------------------
|
||||
//All rights reserved. This program and the accompanying materials
|
||||
//are made available under the terms of the Eclipse Public License v1.0
|
||||
//and Apache License v2.0 which accompanies this distribution.
|
||||
//The Eclipse Public License is available at
|
||||
//http://www.eclipse.org/legal/epl-v10.html
|
||||
//The Apache License v2.0 is available at
|
||||
//http://www.opensource.org/licenses/apache2.0.php
|
||||
//You may elect to redistribute this code under either of these licenses.
|
||||
//========================================================================
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
|
@ -407,6 +408,30 @@ public class CrossOriginFilterTest
|
|||
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForwardPreflightRequest() throws Exception
|
||||
{
|
||||
FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter());
|
||||
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, "PUT");
|
||||
filterHolder.setInitParameter(CrossOriginFilter.FORWARD_PREFLIGHT_PARAM, "false");
|
||||
tester.getContext().addFilter(filterHolder, "/*", FilterMapping.DEFAULT);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
tester.getContext().addServlet(new ServletHolder(new ResourceServlet(latch)), "/*");
|
||||
|
||||
// Preflight request
|
||||
String request = "" +
|
||||
"OPTIONS / HTTP/1.1\r\n" +
|
||||
"Host: localhost\r\n" +
|
||||
CrossOriginFilter.ACCESS_CONTROL_REQUEST_METHOD_HEADER + ": PUT\r\n" +
|
||||
"Origin: http://localhost\r\n" +
|
||||
"\r\n";
|
||||
String response = tester.getResponses(request);
|
||||
Assert.assertTrue(response.contains("HTTP/1.1 200"));
|
||||
Assert.assertTrue(response.contains(CrossOriginFilter.ACCESS_CONTROL_ALLOW_METHODS_HEADER));
|
||||
Assert.assertFalse(latch.await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
public static class ResourceServlet extends HttpServlet
|
||||
{
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
|
|
@ -138,6 +138,11 @@ public class StandardStream implements IStream
|
|||
this.listener = listener;
|
||||
}
|
||||
|
||||
public StreamFrameListener getStreamFrameListener()
|
||||
{
|
||||
return listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateCloseState(boolean close, boolean local)
|
||||
{
|
||||
|
|
|
@ -216,13 +216,15 @@ public class Headers implements Iterable<Headers.Header>
|
|||
if (obj == null || getClass() != obj.getClass())
|
||||
return false;
|
||||
Header that = (Header)obj;
|
||||
return name.equals(that.name) && Arrays.equals(values, that.values);
|
||||
// Header names must be lowercase, thus we lowercase them before transmission, but keep them as is
|
||||
// internally. That's why we've to compare them case insensitive.
|
||||
return name.equalsIgnoreCase(that.name) && Arrays.equals(values, that.values);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = name.hashCode();
|
||||
int result = name.toLowerCase().hashCode();
|
||||
result = 31 * result + Arrays.hashCode(values);
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ public class HeadersBlockGenerator
|
|||
writeCount(version, buffer, headers.size());
|
||||
for (Headers.Header header : headers)
|
||||
{
|
||||
String name = header.name();
|
||||
String name = header.name().toLowerCase();
|
||||
byte[] nameBytes = name.getBytes(iso1);
|
||||
writeNameLength(version, buffer, nameBytes.length);
|
||||
buffer.write(nameBytes, 0, nameBytes.length);
|
||||
|
|
|
@ -22,65 +22,77 @@ import org.eclipse.jetty.spdy.api.HeadersInfo;
|
|||
import org.eclipse.jetty.spdy.api.SPDY;
|
||||
import org.eclipse.jetty.spdy.generator.Generator;
|
||||
import org.eclipse.jetty.spdy.parser.Parser;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class HeadersGenerateParseTest
|
||||
{
|
||||
@Test
|
||||
public void testGenerateParse() throws Exception
|
||||
|
||||
private Headers headers = new Headers();
|
||||
private int streamId = 13;
|
||||
private byte flags = HeadersInfo.FLAG_RESET_COMPRESSION;
|
||||
private final TestSPDYParserListener listener = new TestSPDYParserListener();
|
||||
private final Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
|
||||
private ByteBuffer buffer;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
byte flags = HeadersInfo.FLAG_RESET_COMPRESSION;
|
||||
int streamId = 13;
|
||||
Headers headers = new Headers();
|
||||
parser.addListener(listener);
|
||||
headers.put("a", "b");
|
||||
buffer = createHeadersFrameBuffer(headers);
|
||||
}
|
||||
|
||||
private ByteBuffer createHeadersFrameBuffer(Headers headers)
|
||||
{
|
||||
HeadersFrame frame1 = new HeadersFrame(SPDY.V2, flags, streamId, headers);
|
||||
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
|
||||
ByteBuffer buffer = generator.control(frame1);
|
||||
assertThat("Buffer is not null", buffer, notNullValue());
|
||||
return buffer;
|
||||
}
|
||||
|
||||
Assert.assertNotNull(buffer);
|
||||
|
||||
TestSPDYParserListener listener = new TestSPDYParserListener();
|
||||
Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
|
||||
parser.addListener(listener);
|
||||
@Test
|
||||
public void testGenerateParse() throws Exception
|
||||
{
|
||||
parser.parse(buffer);
|
||||
ControlFrame frame2 = listener.getControlFrame();
|
||||
|
||||
Assert.assertNotNull(frame2);
|
||||
Assert.assertEquals(ControlFrameType.HEADERS, frame2.getType());
|
||||
HeadersFrame headersFrame = (HeadersFrame)frame2;
|
||||
Assert.assertEquals(SPDY.V2, headersFrame.getVersion());
|
||||
Assert.assertEquals(streamId, headersFrame.getStreamId());
|
||||
Assert.assertEquals(flags, headersFrame.getFlags());
|
||||
Assert.assertEquals(headers, headersFrame.getHeaders());
|
||||
assertExpectationsAreMet(headers);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGenerateParseOneByteAtATime() throws Exception
|
||||
{
|
||||
byte flags = HeadersInfo.FLAG_RESET_COMPRESSION;
|
||||
int streamId = 13;
|
||||
Headers headers = new Headers();
|
||||
headers.put("a", "b");
|
||||
HeadersFrame frame1 = new HeadersFrame(SPDY.V2, flags, streamId, headers);
|
||||
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
|
||||
ByteBuffer buffer = generator.control(frame1);
|
||||
|
||||
Assert.assertNotNull(buffer);
|
||||
|
||||
TestSPDYParserListener listener = new TestSPDYParserListener();
|
||||
Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
|
||||
parser.addListener(listener);
|
||||
while (buffer.hasRemaining())
|
||||
parser.parse(ByteBuffer.wrap(new byte[]{buffer.get()}));
|
||||
ControlFrame frame2 = listener.getControlFrame();
|
||||
|
||||
Assert.assertNotNull(frame2);
|
||||
Assert.assertEquals(ControlFrameType.HEADERS, frame2.getType());
|
||||
HeadersFrame headersFrame = (HeadersFrame)frame2;
|
||||
Assert.assertEquals(SPDY.V2, headersFrame.getVersion());
|
||||
Assert.assertEquals(streamId, headersFrame.getStreamId());
|
||||
Assert.assertEquals(flags, headersFrame.getFlags());
|
||||
Assert.assertEquals(headers, headersFrame.getHeaders());
|
||||
assertExpectationsAreMet(headers);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHeadersAreTranslatedToLowerCase()
|
||||
{
|
||||
Headers headers = new Headers();
|
||||
headers.put("Via","localhost");
|
||||
parser.parse(createHeadersFrameBuffer(headers));
|
||||
HeadersFrame parsedHeadersFrame = assertExpectationsAreMet(headers);
|
||||
Headers.Header viaHeader = parsedHeadersFrame.getHeaders().get("via");
|
||||
assertThat("Via Header name is lowercase", viaHeader.name(), is("via"));
|
||||
}
|
||||
|
||||
private HeadersFrame assertExpectationsAreMet(Headers headers)
|
||||
{
|
||||
ControlFrame parsedControlFrame = listener.getControlFrame();
|
||||
assertThat("listener received controlFrame", parsedControlFrame, notNullValue());
|
||||
assertThat("ControlFrame type is HEADERS", ControlFrameType.HEADERS, is(parsedControlFrame.getType()));
|
||||
HeadersFrame headersFrame = (HeadersFrame)parsedControlFrame;
|
||||
assertThat("Version matches", SPDY.V2, is(headersFrame.getVersion()));
|
||||
assertThat("StreamId matches", streamId, is(headersFrame.getStreamId()));
|
||||
assertThat("flags match", flags, is(headersFrame.getFlags()));
|
||||
assertThat("headers match", headers, is(headersFrame.getHeaders()));
|
||||
return headersFrame;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,24 +32,34 @@
|
|||
</Call>
|
||||
|
||||
<!--
|
||||
The ProxyEngine receives SPDY/x(HTTP) requests from proxy connectors below
|
||||
and is configured to process requests for host "localhost".
|
||||
Such requests are converted from SPDY/x(HTTP) to SPDY/2(HTTP) and forwarded
|
||||
to 127.0.0.1:9090, where they are served by the upstream server above.
|
||||
This ProxyEngine translates the incoming SPDY/x(HTTP) request to SPDY/2(HTTP)
|
||||
-->
|
||||
<New id="proxyEngine" class="org.eclipse.jetty.spdy.proxy.SPDYProxyEngine">
|
||||
<New id="spdyProxyEngine" class="org.eclipse.jetty.spdy.proxy.SPDYProxyEngine">
|
||||
<Arg>
|
||||
<New class="org.eclipse.jetty.spdy.SPDYClient$Factory">
|
||||
<Call name="start" />
|
||||
<Call name="start"/>
|
||||
</New>
|
||||
</Arg>
|
||||
<Set name="proxyInfos">
|
||||
</New>
|
||||
|
||||
<!--
|
||||
The ProxyEngineSelector receives SPDY/x(HTTP) requests from proxy connectors below
|
||||
and is configured to process requests for host "localhost".
|
||||
Such requests are converted from SPDY/x(HTTP) to SPDY/2(HTTP) by the configured ProxyEngine
|
||||
and forwarded to 127.0.0.1:9090, where they are served by the upstream server above.
|
||||
-->
|
||||
<New id="proxyEngineSelector" class="org.eclipse.jetty.spdy.proxy.ProxyEngineSelector">
|
||||
<Call name="putProxyEngine">
|
||||
<Arg>spdy/2</Arg>
|
||||
<Arg><Ref id="spdyProxyEngine" /></Arg>
|
||||
</Call>
|
||||
<Set name="proxyServerInfos">
|
||||
<Map>
|
||||
<Entry>
|
||||
<Item>localhost</Item>
|
||||
<Item>
|
||||
<New class="org.eclipse.jetty.spdy.proxy.ProxyEngine$ProxyInfo">
|
||||
<Arg type="short">2</Arg>
|
||||
<New class="org.eclipse.jetty.spdy.proxy.ProxyEngineSelector$ProxyServerInfo">
|
||||
<Arg type="String">spdy/2</Arg>
|
||||
<Arg>127.0.0.1</Arg>
|
||||
<Arg type="int">9090</Arg>
|
||||
</New>
|
||||
|
@ -69,7 +79,7 @@
|
|||
<Call name="addConnector">
|
||||
<Arg>
|
||||
<New class="org.eclipse.jetty.spdy.proxy.HTTPSPDYProxyConnector">
|
||||
<Arg><Ref id="proxyEngine" /></Arg>
|
||||
<Arg><Ref id="proxyEngineSelector" /></Arg>
|
||||
<Set name="Port">8080</Set>
|
||||
</New>
|
||||
</Arg>
|
||||
|
@ -77,7 +87,7 @@
|
|||
<Call name="addConnector">
|
||||
<Arg>
|
||||
<New class="org.eclipse.jetty.spdy.proxy.HTTPSPDYProxyConnector">
|
||||
<Arg><Ref id="proxyEngine" /></Arg>
|
||||
<Arg><Ref id="proxyEngineSelector" /></Arg>
|
||||
<Arg><Ref id="sslContextFactory" /></Arg>
|
||||
<Set name="Port">8443</Set>
|
||||
</New>
|
||||
|
|
|
@ -722,6 +722,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
|
|||
logger.debug("HTTP < {} bytes of content", dataInfo.length());
|
||||
stream.data(dataInfo).get(maxIdleTime, TimeUnit.MILLISECONDS);
|
||||
content.clear();
|
||||
_bypass = false;
|
||||
content = getContentBuffer();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,19 +21,19 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
|
|||
|
||||
public class HTTPSPDYProxyConnector extends AbstractHTTPSPDYServerConnector
|
||||
{
|
||||
public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine)
|
||||
public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector)
|
||||
{
|
||||
this(proxyEngine, null);
|
||||
this(proxyEngineSelector, null);
|
||||
}
|
||||
|
||||
public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine, SslContextFactory sslContextFactory)
|
||||
public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector, SslContextFactory sslContextFactory)
|
||||
{
|
||||
super(proxyEngine, sslContextFactory);
|
||||
super(proxyEngineSelector, sslContextFactory);
|
||||
clearAsyncConnectionFactories();
|
||||
|
||||
putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine));
|
||||
putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine));
|
||||
putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V3, proxyEngine));
|
||||
putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector));
|
||||
putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector));
|
||||
putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V2, proxyEngineSelector));
|
||||
setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("http/1.1"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,32 +15,25 @@
|
|||
package org.eclipse.jetty.spdy.proxy;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.eclipse.jetty.spdy.api.Headers;
|
||||
import org.eclipse.jetty.spdy.api.Stream;
|
||||
import org.eclipse.jetty.spdy.api.StreamFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.SynInfo;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* <p>{@link ProxyEngine} is the base class for SPDY proxy functionalities, that is a proxy that
|
||||
* accepts SPDY from its client side and converts to any protocol to its server side.</p>
|
||||
* <p>{@link ProxyEngine} is the class for SPDY proxy functionalities that receives a SPDY request and converts it to
|
||||
* any protocol to its server side.</p>
|
||||
* <p>This class listens for SPDY events sent by clients; subclasses are responsible for translating
|
||||
* these SPDY client events into appropriate events to forward to the server, in the appropriate
|
||||
* protocol that is understood by the server.</p>
|
||||
* <p>This class also provides configuration for the proxy rules.</p>
|
||||
*/
|
||||
public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter implements StreamFrameListener
|
||||
public abstract class ProxyEngine
|
||||
{
|
||||
protected final Logger logger = Log.getLogger(getClass());
|
||||
private final ConcurrentMap<String, ProxyInfo> proxyInfos = new ConcurrentHashMap<>();
|
||||
private final String name;
|
||||
|
||||
protected ProxyEngine()
|
||||
|
@ -60,6 +53,8 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp
|
|||
}
|
||||
}
|
||||
|
||||
public abstract StreamFrameListener proxy(Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo);
|
||||
|
||||
protected ProxyEngine(String name)
|
||||
{
|
||||
this.name = name;
|
||||
|
@ -73,6 +68,9 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp
|
|||
protected void addRequestProxyHeaders(Stream stream, Headers headers)
|
||||
{
|
||||
addViaHeader(headers);
|
||||
String address = (String)stream.getSession().getAttribute("org.eclipse.jetty.spdy.remoteAddress");
|
||||
if (address != null)
|
||||
headers.add("X-Forwarded-For", address);
|
||||
}
|
||||
|
||||
protected void addResponseProxyHeaders(Stream stream, Headers headers)
|
||||
|
@ -93,46 +91,4 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp
|
|||
{
|
||||
}
|
||||
|
||||
public Map<String, ProxyInfo> getProxyInfos()
|
||||
{
|
||||
return new HashMap<>(proxyInfos);
|
||||
}
|
||||
|
||||
public void setProxyInfos(Map<String, ProxyInfo> proxyInfos)
|
||||
{
|
||||
this.proxyInfos.clear();
|
||||
this.proxyInfos.putAll(proxyInfos);
|
||||
}
|
||||
|
||||
public void putProxyInfo(String host, ProxyInfo proxyInfo)
|
||||
{
|
||||
proxyInfos.put(host, proxyInfo);
|
||||
}
|
||||
|
||||
protected ProxyInfo getProxyInfo(String host)
|
||||
{
|
||||
return proxyInfos.get(host);
|
||||
}
|
||||
|
||||
public static class ProxyInfo
|
||||
{
|
||||
private final short version;
|
||||
private final InetSocketAddress address;
|
||||
|
||||
public ProxyInfo(short version, String host, int port)
|
||||
{
|
||||
this.version = version;
|
||||
this.address = new InetSocketAddress(host, port);
|
||||
}
|
||||
|
||||
public short getVersion()
|
||||
{
|
||||
return version;
|
||||
}
|
||||
|
||||
public InetSocketAddress getAddress()
|
||||
{
|
||||
return address;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
package org.eclipse.jetty.spdy.proxy;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.eclipse.jetty.spdy.api.GoAwayInfo;
|
||||
import org.eclipse.jetty.spdy.api.Headers;
|
||||
import org.eclipse.jetty.spdy.api.PingInfo;
|
||||
import org.eclipse.jetty.spdy.api.RstInfo;
|
||||
import org.eclipse.jetty.spdy.api.Session;
|
||||
import org.eclipse.jetty.spdy.api.Stream;
|
||||
import org.eclipse.jetty.spdy.api.StreamFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.StreamStatus;
|
||||
import org.eclipse.jetty.spdy.api.SynInfo;
|
||||
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
|
||||
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* <p>{@link ProxyEngineSelector} is the main entry point for syn stream events of a jetty SPDY proxy. It receives the
|
||||
* syn stream frames from the clients, checks if there's an appropriate {@link ProxyServerInfo} for the given target
|
||||
* host and forwards the syn to a {@link ProxyEngine} for the protocol defined in {@link ProxyServerInfo}.</p>
|
||||
*
|
||||
* <p>If no {@link ProxyServerInfo} can be found for the given target host or no {@link ProxyEngine} can be found for
|
||||
* the given protocol, it resets the client stream.</p>
|
||||
*
|
||||
* <p>This class also provides configuration for the proxy rules.</p>
|
||||
*/
|
||||
public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter
|
||||
{
|
||||
protected final Logger logger = Log.getLogger(getClass());
|
||||
private final Map<String, ProxyServerInfo> proxyInfos = new ConcurrentHashMap<>();
|
||||
private final Map<String, ProxyEngine> proxyEngines = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public final StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo)
|
||||
{
|
||||
logger.debug("C -> P {} on {}", clientSynInfo, clientStream);
|
||||
|
||||
final Session clientSession = clientStream.getSession();
|
||||
short clientVersion = clientSession.getVersion();
|
||||
Headers headers = new Headers(clientSynInfo.getHeaders(), false);
|
||||
|
||||
Headers.Header hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion));
|
||||
if (hostHeader == null)
|
||||
{
|
||||
rst(clientStream);
|
||||
return null;
|
||||
}
|
||||
|
||||
String host = hostHeader.value();
|
||||
int colon = host.indexOf(':');
|
||||
if (colon >= 0)
|
||||
host = host.substring(0, colon);
|
||||
|
||||
ProxyServerInfo proxyServerInfo = getProxyServerInfo(host);
|
||||
if (proxyServerInfo == null)
|
||||
{
|
||||
rst(clientStream);
|
||||
return null;
|
||||
}
|
||||
|
||||
String protocol = proxyServerInfo.getProtocol();
|
||||
ProxyEngine proxyEngine = proxyEngines.get(protocol);
|
||||
if (proxyEngine == null)
|
||||
{
|
||||
rst(clientStream);
|
||||
return null;
|
||||
}
|
||||
|
||||
return proxyEngine.proxy(clientStream, clientSynInfo, proxyServerInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPing(Session clientSession, PingInfo pingInfo)
|
||||
{
|
||||
// We do not know to which upstream server
|
||||
// to send the PING so we just ignore it
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
|
||||
{
|
||||
// TODO:
|
||||
}
|
||||
|
||||
public Map<String, ProxyEngine> getProxyEngines()
|
||||
{
|
||||
return new HashMap<>(proxyEngines);
|
||||
}
|
||||
|
||||
public void setProxyEngines(Map<String, ProxyEngine> proxyEngines)
|
||||
{
|
||||
this.proxyEngines.clear();
|
||||
this.proxyEngines.putAll(proxyEngines);
|
||||
}
|
||||
|
||||
public ProxyEngine getProxyEngine(String protocol)
|
||||
{
|
||||
return proxyEngines.get(protocol);
|
||||
}
|
||||
|
||||
public void putProxyEngine(String protocol, ProxyEngine proxyEngine)
|
||||
{
|
||||
proxyEngines.put(protocol, proxyEngine);
|
||||
}
|
||||
|
||||
public Map<String, ProxyServerInfo> getProxyServerInfos()
|
||||
{
|
||||
return new HashMap<>(proxyInfos);
|
||||
}
|
||||
|
||||
protected ProxyServerInfo getProxyServerInfo(String host)
|
||||
{
|
||||
return proxyInfos.get(host);
|
||||
}
|
||||
|
||||
public void setProxyServerInfos(Map<String, ProxyServerInfo> proxyServerInfos)
|
||||
{
|
||||
this.proxyInfos.clear();
|
||||
this.proxyInfos.putAll(proxyServerInfos);
|
||||
}
|
||||
|
||||
public void putProxyServerInfo(String host, ProxyServerInfo proxyServerInfo)
|
||||
{
|
||||
proxyInfos.put(host, proxyServerInfo);
|
||||
}
|
||||
|
||||
private void rst(Stream stream)
|
||||
{
|
||||
RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
|
||||
stream.getSession().rst(rstInfo);
|
||||
}
|
||||
|
||||
public static class ProxyServerInfo
|
||||
{
|
||||
private final String protocol;
|
||||
private final String host;
|
||||
private final InetSocketAddress address;
|
||||
|
||||
public ProxyServerInfo(String protocol, String host, int port)
|
||||
{
|
||||
this.protocol = protocol;
|
||||
this.host = host;
|
||||
this.address = new InetSocketAddress(host, port);
|
||||
}
|
||||
|
||||
public String getProtocol()
|
||||
{
|
||||
return protocol;
|
||||
}
|
||||
|
||||
public String getHost()
|
||||
{
|
||||
return host;
|
||||
}
|
||||
|
||||
public InetSocketAddress getAddress()
|
||||
{
|
||||
return address;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,18 +24,18 @@ import org.eclipse.jetty.spdy.http.ServerHTTPAsyncConnectionFactory;
|
|||
public class ProxyHTTPAsyncConnectionFactory extends ServerHTTPAsyncConnectionFactory
|
||||
{
|
||||
private final short version;
|
||||
private final ProxyEngine proxyEngine;
|
||||
private final ProxyEngineSelector proxyEngineSelector;
|
||||
|
||||
public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngine proxyEngine)
|
||||
public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngineSelector proxyEngineSelector)
|
||||
{
|
||||
super(connector);
|
||||
this.version = version;
|
||||
this.proxyEngine = proxyEngine;
|
||||
this.proxyEngineSelector = proxyEngineSelector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
|
||||
{
|
||||
return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngine);
|
||||
return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngineSelector);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.eclipse.jetty.spdy.api.ReplyInfo;
|
|||
import org.eclipse.jetty.spdy.api.RstInfo;
|
||||
import org.eclipse.jetty.spdy.api.SessionStatus;
|
||||
import org.eclipse.jetty.spdy.api.Stream;
|
||||
import org.eclipse.jetty.spdy.api.StreamFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.SynInfo;
|
||||
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
|
||||
|
||||
|
@ -53,19 +54,20 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
|
|||
{
|
||||
private final Headers headers = new Headers();
|
||||
private final short version;
|
||||
private final ProxyEngine proxyEngine;
|
||||
private final ProxyEngineSelector proxyEngineSelector;
|
||||
private final HttpGenerator generator;
|
||||
private final ISession session;
|
||||
private Stream stream;
|
||||
private HTTPStream stream;
|
||||
private Buffer content;
|
||||
|
||||
public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endpoint, short version, ProxyEngine proxyEngine)
|
||||
public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endPoint, short version, ProxyEngineSelector proxyEngineSelector)
|
||||
{
|
||||
super(connector, endpoint, connector.getServer());
|
||||
super(connector, endPoint, connector.getServer());
|
||||
this.version = version;
|
||||
this.proxyEngine = proxyEngine;
|
||||
this.proxyEngineSelector = proxyEngineSelector;
|
||||
this.generator = (HttpGenerator)_generator;
|
||||
this.session = new HTTPSession(version, connector);
|
||||
this.session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddr());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -116,7 +118,7 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
|
|||
}
|
||||
else
|
||||
{
|
||||
proxyEngine.onData(stream, toDataInfo(buffer, false));
|
||||
stream.getStreamFrameListener().onData(stream, toDataInfo(buffer, false));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,23 +129,24 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
|
|||
{
|
||||
assert content == null;
|
||||
if (headers.isEmpty())
|
||||
proxyEngine.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
|
||||
proxyEngineSelector.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
|
||||
else
|
||||
syn(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
proxyEngine.onData(stream, toDataInfo(content, true));
|
||||
stream.getStreamFrameListener().onData(stream, toDataInfo(content, true));
|
||||
}
|
||||
headers.clear();
|
||||
stream = null;
|
||||
content = null;
|
||||
}
|
||||
|
||||
private Stream syn(boolean close)
|
||||
private HTTPStream syn(boolean close)
|
||||
{
|
||||
Stream stream = new HTTPStream(1, (byte)0, session, null);
|
||||
proxyEngine.onSyn(stream, new SynInfo(headers, close));
|
||||
HTTPStream stream = new HTTPStream(1, (byte)0, session, null);
|
||||
StreamFrameListener streamFrameListener = proxyEngineSelector.onSyn(stream, new SynInfo(headers, close));
|
||||
stream.setStreamFrameListener(streamFrameListener);
|
||||
return stream;
|
||||
}
|
||||
|
||||
|
@ -167,7 +170,7 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
|
|||
{
|
||||
private HTTPSession(short version, SPDYServerConnector connector)
|
||||
{
|
||||
super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngine, null, null);
|
||||
super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngineSelector, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -15,10 +15,8 @@
|
|||
package org.eclipse.jetty.spdy.proxy;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -30,9 +28,9 @@ import org.eclipse.jetty.spdy.api.GoAwayInfo;
|
|||
import org.eclipse.jetty.spdy.api.Handler;
|
||||
import org.eclipse.jetty.spdy.api.Headers;
|
||||
import org.eclipse.jetty.spdy.api.HeadersInfo;
|
||||
import org.eclipse.jetty.spdy.api.PingInfo;
|
||||
import org.eclipse.jetty.spdy.api.ReplyInfo;
|
||||
import org.eclipse.jetty.spdy.api.RstInfo;
|
||||
import org.eclipse.jetty.spdy.api.SPDY;
|
||||
import org.eclipse.jetty.spdy.api.Session;
|
||||
import org.eclipse.jetty.spdy.api.SessionFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.Stream;
|
||||
|
@ -45,11 +43,10 @@ import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
|
|||
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by
|
||||
* clients into SPDY events for the servers.</p>
|
||||
*/
|
||||
public class SPDYProxyEngine extends ProxyEngine
|
||||
public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
|
||||
{
|
||||
private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
|
||||
private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream";
|
||||
private static final String CLIENT_SESSIONS_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientSessions";
|
||||
|
||||
private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
|
||||
private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
|
||||
|
@ -82,68 +79,24 @@ public class SPDYProxyEngine extends ProxyEngine
|
|||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPing(Session clientSession, PingInfo pingInfo)
|
||||
public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
|
||||
{
|
||||
// We do not know to which upstream server
|
||||
// to send the PING so we just ignore it
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session clientSession, GoAwayInfo goAwayInfo)
|
||||
{
|
||||
for (Session serverSession : serverSessions.values())
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE);
|
||||
if (sessions.remove(clientSession))
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo)
|
||||
{
|
||||
logger.debug("C -> P {} on {}", clientSynInfo, clientStream);
|
||||
|
||||
final Session clientSession = clientStream.getSession();
|
||||
short clientVersion = clientSession.getVersion();
|
||||
Headers headers = new Headers(clientSynInfo.getHeaders(), false);
|
||||
|
||||
Headers.Header hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion));
|
||||
if (hostHeader == null)
|
||||
{
|
||||
rst(clientStream);
|
||||
return null;
|
||||
}
|
||||
|
||||
String host = hostHeader.value();
|
||||
int colon = host.indexOf(':');
|
||||
if (colon >= 0)
|
||||
host = host.substring(0, colon);
|
||||
ProxyInfo proxyInfo = getProxyInfo(host);
|
||||
if (proxyInfo == null)
|
||||
{
|
||||
rst(clientStream);
|
||||
return null;
|
||||
}
|
||||
|
||||
short serverVersion = proxyInfo.getVersion();
|
||||
InetSocketAddress address = proxyInfo.getAddress();
|
||||
Session serverSession = produceSession(host, serverVersion, address);
|
||||
short serverVersion = getVersion(proxyServerInfo.getProtocol());
|
||||
InetSocketAddress address = proxyServerInfo.getAddress();
|
||||
Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
|
||||
if (serverSession == null)
|
||||
{
|
||||
rst(clientStream);
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE);
|
||||
sessions.add(clientSession);
|
||||
final Session clientSession = clientStream.getSession();
|
||||
|
||||
addRequestProxyHeaders(clientStream, headers);
|
||||
customizeRequestHeaders(clientStream, headers);
|
||||
convert(clientVersion, serverVersion, headers);
|
||||
convert(clientSession.getVersion(), serverVersion, headers);
|
||||
|
||||
SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
|
||||
StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
|
||||
|
@ -153,6 +106,19 @@ public class SPDYProxyEngine extends ProxyEngine
|
|||
return this;
|
||||
}
|
||||
|
||||
private static short getVersion(String protocol)
|
||||
{
|
||||
switch (protocol)
|
||||
{
|
||||
case "spdy/2":
|
||||
return SPDY.V2;
|
||||
case "spdy/3":
|
||||
return SPDY.V3;
|
||||
default:
|
||||
throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
|
@ -194,7 +160,6 @@ public class SPDYProxyEngine extends ProxyEngine
|
|||
{
|
||||
SPDYClient client = factory.newSPDYClient(version);
|
||||
session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
|
||||
session.setAttribute(CLIENT_SESSIONS_ATTRIBUTE, Collections.newSetFromMap(new ConcurrentHashMap<Session, Boolean>()));
|
||||
logger.debug("Proxy session connected to {}", address);
|
||||
Session existing = serverSessions.putIfAbsent(host, session);
|
||||
if (existing != null)
|
||||
|
@ -513,10 +478,6 @@ public class SPDYProxyEngine extends ProxyEngine
|
|||
public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
|
||||
{
|
||||
serverSessions.values().remove(serverSession);
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<Session> sessions = (Set<Session>)serverSession.removeAttribute(CLIENT_SESSIONS_ATTRIBUTE);
|
||||
for (Session session : sessions)
|
||||
session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -528,7 +489,7 @@ public class SPDYProxyEngine extends ProxyEngine
|
|||
@Override
|
||||
public void onHeaders(Stream stream, HeadersInfo headersInfo)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
throw new UnsupportedOperationException(); //TODO
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,6 +22,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
@ -36,6 +38,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
|
|||
import org.eclipse.jetty.spdy.AsyncConnectionFactory;
|
||||
import org.eclipse.jetty.spdy.api.DataInfo;
|
||||
import org.eclipse.jetty.spdy.api.Headers;
|
||||
import org.eclipse.jetty.spdy.api.SPDY;
|
||||
import org.eclipse.jetty.spdy.api.Session;
|
||||
import org.eclipse.jetty.spdy.api.SessionFrameListener;
|
||||
import org.eclipse.jetty.spdy.api.Stream;
|
||||
|
@ -331,13 +334,36 @@ public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest
|
|||
}
|
||||
}
|
||||
|
||||
private void addPushedResource(String pushedURI)
|
||||
{
|
||||
switch (version())
|
||||
{
|
||||
case SPDY.V2:
|
||||
{
|
||||
Matcher matcher = Pattern.compile("https?://[^:]+:\\d+(/.*)").matcher(pushedURI);
|
||||
Assert.assertTrue(matcher.matches());
|
||||
pushedResources.add(matcher.group(1));
|
||||
break;
|
||||
}
|
||||
case SPDY.V3:
|
||||
{
|
||||
pushedResources.add(pushedURI);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class ClientSessionFrameListener extends SessionFrameListener.Adapter
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
String path = synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version())).value();
|
||||
pushedResources.add(path);
|
||||
addPushedResource(path);
|
||||
return new DataListener();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1009,6 +1009,61 @@ public class ServerHTTPSPDYv2Test extends AbstractHTTPSPDYTest
|
|||
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGETWithMultipleMediumContentByPassed() throws Exception
|
||||
{
|
||||
final byte[] data = new byte[2048];
|
||||
Session session = startClient(version(), startHTTPServer(version(), new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
|
||||
throws IOException, ServletException
|
||||
{
|
||||
// The sequence of write/flush/write/write below triggers a condition where
|
||||
// HttpGenerator._bypass is set to true on the second write(), and the
|
||||
// third write causes an infinite spin loop on the third write().
|
||||
request.setHandled(true);
|
||||
OutputStream output = httpResponse.getOutputStream();
|
||||
output.write(data);
|
||||
output.flush();
|
||||
output.write(data);
|
||||
output.write(data);
|
||||
}
|
||||
}), null);
|
||||
|
||||
Headers headers = new Headers();
|
||||
headers.put(HTTPSPDYHeader.METHOD.name(version()), "GET");
|
||||
headers.put(HTTPSPDYHeader.URI.name(version()), "/foo");
|
||||
headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
|
||||
headers.put(HTTPSPDYHeader.SCHEME.name(version()), "http");
|
||||
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + connector.getLocalPort());
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
final AtomicInteger contentLength = new AtomicInteger();
|
||||
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
Assert.assertFalse(replyInfo.isClose());
|
||||
Headers replyHeaders = replyInfo.getHeaders();
|
||||
Assert.assertTrue(replyHeaders.get(HTTPSPDYHeader.STATUS.name(version())).value().contains("200"));
|
||||
replyLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataInfo dataInfo)
|
||||
{
|
||||
dataInfo.consume(dataInfo.available());
|
||||
contentLength.addAndGet(dataInfo.length());
|
||||
if (dataInfo.isClose())
|
||||
dataLatch.countDown();
|
||||
}
|
||||
});
|
||||
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertEquals(3 * data.length, contentLength.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPOSTThenSuspendRequestThenReadOneChunkThenComplete() throws Exception
|
||||
{
|
||||
|
|
|
@ -91,9 +91,11 @@ public class ProxyHTTPSPDYv2Test
|
|||
protected InetSocketAddress startProxy(InetSocketAddress address) throws Exception
|
||||
{
|
||||
proxy = new Server();
|
||||
SPDYProxyEngine proxyEngine = new SPDYProxyEngine(factory);
|
||||
proxyEngine.putProxyInfo("localhost", new ProxyEngine.ProxyInfo(version(), address.getHostName(), address.getPort()));
|
||||
proxyConnector = new HTTPSPDYProxyConnector(proxyEngine);
|
||||
ProxyEngineSelector proxyEngineSelector = new ProxyEngineSelector();
|
||||
SPDYProxyEngine spdyProxyEngine = new SPDYProxyEngine(factory);
|
||||
proxyEngineSelector.putProxyEngine("spdy/" + version(), spdyProxyEngine);
|
||||
proxyEngineSelector.putProxyServerInfo("localhost", new ProxyEngineSelector.ProxyServerInfo("spdy/" + version(), address.getHostName(), address.getPort()));
|
||||
proxyConnector = new HTTPSPDYProxyConnector(proxyEngineSelector);
|
||||
proxyConnector.setPort(0);
|
||||
proxy.addConnector(proxyConnector);
|
||||
proxy.start();
|
||||
|
@ -171,96 +173,6 @@ public class ProxyHTTPSPDYv2Test
|
|||
Assert.assertFalse(closeLatch.await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClosingServerClosesHTTPClient() throws Exception
|
||||
{
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
Headers responseHeaders = new Headers();
|
||||
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
|
||||
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
|
||||
stream.reply(new ReplyInfo(responseHeaders, true));
|
||||
stream.getSession().goAway();
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
|
||||
Socket client = new Socket();
|
||||
client.connect(proxyAddress);
|
||||
OutputStream output = client.getOutputStream();
|
||||
|
||||
String request = "" +
|
||||
"GET / HTTP/1.1\r\n" +
|
||||
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
|
||||
"\r\n";
|
||||
output.write(request.getBytes("UTF-8"));
|
||||
output.flush();
|
||||
|
||||
client.setSoTimeout(1000);
|
||||
InputStream input = client.getInputStream();
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
|
||||
String line = reader.readLine();
|
||||
Assert.assertTrue(line.contains(" 200"));
|
||||
while (line.length() > 0)
|
||||
line = reader.readLine();
|
||||
Assert.assertFalse(reader.ready());
|
||||
|
||||
Assert.assertNull(reader.readLine());
|
||||
|
||||
client.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClosingServerClosesSPDYClient() throws Exception
|
||||
{
|
||||
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
Headers responseHeaders = new Headers();
|
||||
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
|
||||
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
|
||||
stream.reply(new ReplyInfo(responseHeaders, true));
|
||||
stream.getSession().goAway();
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
|
||||
|
||||
final CountDownLatch goAwayLatch = new CountDownLatch(1);
|
||||
Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
|
||||
{
|
||||
goAwayLatch.countDown();
|
||||
}
|
||||
}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
final CountDownLatch replyLatch = new CountDownLatch(1);
|
||||
Headers headers = new Headers();
|
||||
headers.put(HTTPSPDYHeader.SCHEME.name(version()), "http");
|
||||
headers.put(HTTPSPDYHeader.METHOD.name(version()), "GET");
|
||||
headers.put(HTTPSPDYHeader.URI.name(version()), "/");
|
||||
headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
|
||||
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
|
||||
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
replyLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGETThenNoContentFromTwoClients() throws Exception
|
||||
{
|
||||
|
|
|
@ -69,6 +69,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
|
|||
FlowControlStrategy flowControlStrategy = connector.newFlowControlStrategy(version);
|
||||
|
||||
StandardSession session = new StandardSession(version, bufferPool, threadPool, scheduler, connection, connection, 2, listener, generator, flowControlStrategy);
|
||||
session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddr());
|
||||
session.setWindowSize(connector.getInitialWindowSize());
|
||||
parser.addListener(session);
|
||||
connection.setSession(session);
|
||||
|
|
Loading…
Reference in New Issue