Merged branch 'jetty-9.2.x' into 'master'.

This commit is contained in:
Simone Bordet 2015-02-12 14:32:24 +01:00
commit 465f6f7da8
4 changed files with 381 additions and 256 deletions

View File

@ -27,7 +27,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.GZIPOutputStream;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
@ -315,17 +314,18 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
}
}
protected class ProxyResponseListener extends Response.Listener.Adapter
protected class ProxyResponseListener extends Response.Listener.Adapter implements Callback
{
private final String WRITE_LISTENER_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".writeListener";
private final Callback complete = new CountingCallback(this, 2);
private final List<ByteBuffer> buffers = new ArrayList<>();
private final AtomicBoolean complete = new AtomicBoolean();
private final HttpServletRequest clientRequest;
private final HttpServletResponse proxyResponse;
private boolean hasContent;
private long contentLength;
private long length;
private Response response;
protected ProxyResponseListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse)
{
@ -381,7 +381,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
int size = buffers.size();
if (size > 0)
{
CountingCallback counter = new CountingCallback(callback, size);
Callback counter = size == 1 ? callback : new CountingCallback(callback, size);
for (int i = 0; i < size; ++i)
{
ByteBuffer buffer = buffers.get(i);
@ -390,15 +390,19 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
}
buffers.clear();
}
else
{
proxyWriter.offer(BufferUtil.EMPTY_BUFFER, callback);
}
if (finished)
proxyWriter.offer(BufferUtil.EMPTY_BUFFER, complete);
if (_log.isDebugEnabled())
_log.debug("{} downstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes);
if (committed)
{
if (size == 0)
callback.succeeded();
else
proxyWriter.onWritePossible();
proxyWriter.onWritePossible();
}
else
{
@ -411,9 +415,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
// we run into a race where the different thread calls
// onWritePossible() and succeeding the callback causes
// this method to be called again, which also may call
// onWritePossible(). We use a poison pill for this case.
if (size == 0)
proxyWriter.offer(BufferUtil.EMPTY_BUFFER, callback);
// onWritePossible().
proxyResponse.getOutputStream().setWriteListener(proxyWriter);
}
}
@ -428,61 +430,72 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
{
try
{
// If we had unknown length content, we need to call the
// transformer to signal that the content is finished.
if (contentLength < 0 && hasContent)
if (hasContent)
{
ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
transformer.transform(BufferUtil.EMPTY_BUFFER, true, buffers);
long newContentBytes = 0;
int size = buffers.size();
if (size > 0)
// If we had unknown length content, we need to call the
// transformer to signal that the content is finished.
if (contentLength < 0)
{
Callback callback = new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
if (complete.compareAndSet(false, true))
onProxyResponseFailure(clientRequest, proxyResponse, serverResponse, x);
}
};
for (int i = 0; i < size; ++i)
{
ByteBuffer buffer = buffers.get(i);
newContentBytes += buffer.remaining();
proxyWriter.offer(buffer, callback);
}
buffers.clear();
}
if (_log.isDebugEnabled())
_log.debug("{} downstream content transformation to {} bytes", getRequestId(clientRequest), newContentBytes);
ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
proxyWriter.onWritePossible();
transformer.transform(BufferUtil.EMPTY_BUFFER, true, buffers);
long newContentBytes = 0;
int size = buffers.size();
if (size > 0)
{
Callback callback = size == 1 ? complete : new CountingCallback(complete, size);
for (int i = 0; i < size; ++i)
{
ByteBuffer buffer = buffers.get(i);
newContentBytes += buffer.remaining();
proxyWriter.offer(buffer, callback);
}
buffers.clear();
}
else
{
proxyWriter.offer(BufferUtil.EMPTY_BUFFER, complete);
}
if (_log.isDebugEnabled())
_log.debug("{} downstream content transformation to {} bytes", getRequestId(clientRequest), newContentBytes);
proxyWriter.onWritePossible();
}
}
else
{
complete.succeeded();
}
}
catch (Throwable x)
{
if (complete.compareAndSet(false, true))
onProxyResponseFailure(clientRequest, proxyResponse, serverResponse, x);
complete.failed(x);
}
}
@Override
public void onComplete(Result result)
{
if (complete.compareAndSet(false, true))
{
if (result.isSucceeded())
onProxyResponseSuccess(clientRequest, proxyResponse, result.getResponse());
else
onProxyResponseFailure(clientRequest, proxyResponse, result.getResponse(), result.getFailure());
}
if (_log.isDebugEnabled())
_log.debug("{} proxying complete", getRequestId(clientRequest));
response = result.getResponse();
if (result.isSucceeded())
complete.succeeded();
else
complete.failed(result.getFailure());
}
@Override
public void succeeded()
{
onProxyResponseSuccess(clientRequest, proxyResponse, response);
}
@Override
public void failed(Throwable failure)
{
onProxyResponseFailure(clientRequest, proxyResponse, response, failure);
}
}
@ -503,7 +516,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
public boolean offer(ByteBuffer content, Callback callback)
{
if (_log.isDebugEnabled())
_log.debug("{} proxying content to downstream: {} bytes", getRequestId(clientRequest), content.remaining());
_log.debug("{} proxying content to downstream: {} bytes {}", getRequestId(clientRequest), content.remaining(), callback);
return chunks.offer(new DeferredContentProvider.Chunk(content, callback));
}

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.proxy;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
@ -51,6 +50,7 @@ import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
@ -75,49 +75,10 @@ public class AsyncMiddleManServletTest
private HttpClient client;
private Server proxy;
private ServerConnector proxyConnector;
private ServletContextHandler proxyContext;
private Server server;
private ServerConnector serverConnector;
private void prepareProxy(HttpServlet proxyServlet) throws Exception
{
prepareProxy(proxyServlet, new HashMap<String, String>());
}
private void prepareProxy(HttpServlet proxyServlet, Map<String, String> initParams) throws Exception
{
QueuedThreadPool proxyPool = new QueuedThreadPool();
proxyPool.setName("proxy");
proxy = new Server(proxyPool);
HttpConfiguration configuration = new HttpConfiguration();
configuration.setSendDateHeader(false);
configuration.setSendServerVersion(false);
String value = initParams.get("outputBufferSize");
if (value != null)
configuration.setOutputBufferSize(Integer.valueOf(value));
proxyConnector = new ServerConnector(proxy, new HttpConnectionFactory(configuration));
proxy.addConnector(proxyConnector);
proxyContext = new ServletContextHandler(proxy, "/", true, false);
ServletHolder proxyServletHolder = new ServletHolder(proxyServlet);
proxyServletHolder.setInitParameters(initParams);
proxyContext.addServlet(proxyServletHolder, "/*");
proxy.start();
}
private void prepareClient() throws Exception
{
QueuedThreadPool clientPool = new QueuedThreadPool();
clientPool.setName("client");
client = new HttpClient();
client.setExecutor(clientPool);
client.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyConnector.getLocalPort()));
client.start();
}
private void prepareServer(HttpServlet servlet) throws Exception
private void startServer(HttpServlet servlet) throws Exception
{
QueuedThreadPool serverPool = new QueuedThreadPool();
serverPool.setName("server");
@ -132,19 +93,66 @@ public class AsyncMiddleManServletTest
server.start();
}
@After
public void disposeProxy() throws Exception
private void startProxy(HttpServlet proxyServlet) throws Exception
{
client.stop();
proxy.stop();
startProxy(proxyServlet, new HashMap<String, String>());
}
private void startProxy(HttpServlet proxyServlet, Map<String, String> initParams) throws Exception
{
QueuedThreadPool proxyPool = new QueuedThreadPool();
proxyPool.setName("proxy");
proxy = new Server(proxyPool);
HttpConfiguration configuration = new HttpConfiguration();
configuration.setSendDateHeader(false);
configuration.setSendServerVersion(false);
String value = initParams.get("outputBufferSize");
if (value != null)
configuration.setOutputBufferSize(Integer.valueOf(value));
proxyConnector = new ServerConnector(proxy, new HttpConnectionFactory(configuration));
proxy.addConnector(proxyConnector);
ServletContextHandler proxyContext = new ServletContextHandler(proxy, "/", true, false);
ServletHolder proxyServletHolder = new ServletHolder(proxyServlet);
proxyServletHolder.setInitParameters(initParams);
proxyContext.addServlet(proxyServletHolder, "/*");
proxy.start();
}
private void startClient() throws Exception
{
QueuedThreadPool clientPool = new QueuedThreadPool();
clientPool.setName("client");
client = new HttpClient();
client.setExecutor(clientPool);
client.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyConnector.getLocalPort()));
client.start();
}
@After
public void disposeServer() throws Exception
public void dispose() throws Exception
{
client.stop();
proxy.stop();
server.stop();
}
@Test
public void testZeroContentLength() throws Exception
{
startServer(new EchoHttpServlet());
startProxy(new AsyncMiddleManServlet());
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testClientRequestSmallContentKnownLengthGzipped() throws Exception
{
@ -164,7 +172,7 @@ public class AsyncMiddleManServletTest
byte[] bytes = new byte[length];
new Random().nextBytes(bytes);
prepareServer(new EchoHttpServlet()
startServer(new EchoHttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -178,7 +186,7 @@ public class AsyncMiddleManServletTest
super.service(request, response);
}
});
prepareProxy(new AsyncMiddleManServlet()
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
@ -186,7 +194,7 @@ public class AsyncMiddleManServletTest
return new GZIPContentTransformer(ContentTransformer.IDENTITY);
}
});
prepareClient();
startClient();
byte[] gzipBytes = gzip(bytes);
ContentProvider gzipContent = new BytesContentProvider(gzipBytes);
@ -208,7 +216,7 @@ public class AsyncMiddleManServletTest
new Random().nextBytes(bytes);
final byte[] gzipBytes = gzip(bytes);
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -217,7 +225,7 @@ public class AsyncMiddleManServletTest
response.getOutputStream().write(gzipBytes);
}
});
prepareProxy(new AsyncMiddleManServlet()
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
@ -225,7 +233,7 @@ public class AsyncMiddleManServletTest
return new GZIPContentTransformer(ContentTransformer.IDENTITY);
}
});
prepareClient();
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
@ -241,7 +249,7 @@ public class AsyncMiddleManServletTest
String data = "<a href=\"http://google.com\">Google</a>";
byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
prepareServer(new EchoHttpServlet()
startServer(new EchoHttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -250,7 +258,7 @@ public class AsyncMiddleManServletTest
super.service(request, response);
}
});
prepareProxy(new AsyncMiddleManServlet()
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
@ -264,7 +272,7 @@ public class AsyncMiddleManServletTest
return new GZIPContentTransformer(new HrefTransformer.Server());
}
});
prepareClient();
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.header(HttpHeader.CONTENT_ENCODING, "gzip")
@ -286,7 +294,7 @@ public class AsyncMiddleManServletTest
@Test
public void testUpstreamTransformationBufferedGzipped() throws Exception
{
prepareServer(new EchoHttpServlet()
startServer(new EchoHttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -295,7 +303,7 @@ public class AsyncMiddleManServletTest
super.service(request, response);
}
});
prepareProxy(new AsyncMiddleManServlet()
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
@ -303,7 +311,7 @@ public class AsyncMiddleManServletTest
return new GZIPContentTransformer(new BufferingContentTransformer());
}
});
prepareClient();
startClient();
DeferredContentProvider content = new DeferredContentProvider();
Request request = client.newRequest("localhost", serverConnector.getLocalPort());
@ -324,7 +332,7 @@ public class AsyncMiddleManServletTest
@Test
public void testDownstreamTransformationBufferedGzipped() throws Exception
{
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -341,7 +349,7 @@ public class AsyncMiddleManServletTest
}
}
});
prepareProxy(new AsyncMiddleManServlet()
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
@ -349,7 +357,7 @@ public class AsyncMiddleManServletTest
return new GZIPContentTransformer(new BufferingContentTransformer());
}
});
prepareClient();
startClient();
byte[] bytes = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8);
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
@ -366,7 +374,7 @@ public class AsyncMiddleManServletTest
public void testDiscardUpstreamAndDownstreamKnownContentLengthGzipped() throws Exception
{
final byte[] bytes = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8);
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -376,7 +384,7 @@ public class AsyncMiddleManServletTest
response.getOutputStream().write(gzip(bytes));
}
});
prepareProxy(new AsyncMiddleManServlet()
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
@ -390,7 +398,7 @@ public class AsyncMiddleManServletTest
return new GZIPContentTransformer(new DiscardContentTransformer());
}
});
prepareClient();
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.header(HttpHeader.CONTENT_ENCODING, "gzip")
@ -405,8 +413,8 @@ public class AsyncMiddleManServletTest
@Test
public void testUpstreamTransformationThrowsBeforeCommittingProxyRequest() throws Exception
{
prepareServer(new EchoHttpServlet());
prepareProxy(new AsyncMiddleManServlet()
startServer(new EchoHttpServlet());
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
@ -421,7 +429,7 @@ public class AsyncMiddleManServletTest
};
}
});
prepareClient();
startClient();
byte[] bytes = new byte[1024];
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
@ -435,8 +443,8 @@ public class AsyncMiddleManServletTest
@Test
public void testUpstreamTransformationThrowsAfterCommittingProxyRequest() throws Exception
{
prepareServer(new EchoHttpServlet());
prepareProxy(new AsyncMiddleManServlet()
startServer(new EchoHttpServlet());
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
@ -456,7 +464,7 @@ public class AsyncMiddleManServletTest
};
}
});
prepareClient();
startClient();
final CountDownLatch latch = new CountDownLatch(1);
DeferredContentProvider content = new DeferredContentProvider();
@ -518,8 +526,8 @@ public class AsyncMiddleManServletTest
private void testDownstreamTransformationThrows(HttpServlet serverServlet) throws Exception
{
prepareServer(serverServlet);
prepareProxy(new AsyncMiddleManServlet()
startServer(serverServlet);
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
@ -539,7 +547,7 @@ public class AsyncMiddleManServletTest
};
}
});
prepareClient();
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
@ -548,11 +556,110 @@ public class AsyncMiddleManServletTest
Assert.assertEquals(502, response.getStatus());
}
@Test
public void testLargeChunkedBufferedDownstreamTransformation() throws Exception
{
// Tests the race between a incomplete write performed from ProxyResponseListener.onSuccess()
// and ProxyResponseListener.onComplete() being called before the write has completed.
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
ServletOutputStream output = response.getOutputStream();
byte[] chunk = new byte[1024 * 1024];
for (int i = 0; i < 16; ++i)
{
output.write(chunk);
output.flush();
}
}
});
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
{
return new BufferingContentTransformer();
}
});
startClient();
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", serverConnector.getLocalPort())
.onResponseContent(new Response.ContentListener()
{
@Override
public void onContent(Response response, ByteBuffer content)
{
// Slow down the reader so that the
// write from the proxy gets congested.
sleep(1);
}
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isSucceeded());
Assert.assertEquals(200, result.getResponse().getStatus());
latch.countDown();
}
});
Assert.assertTrue(latch.await(15, TimeUnit.SECONDS));
}
@Test
public void testDownstreamTransformationKnownContentLengthDroppingLastChunk() throws Exception
{
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
byte[] chunk = new byte[1024];
int contentLength = 2 * chunk.length;
response.setContentLength(contentLength);
ServletOutputStream output = response.getOutputStream();
output.write(chunk);
output.flush();
sleep(1000);
output.write(chunk);
}
});
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
{
return new ContentTransformer()
{
@Override
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
{
if (!finished)
output.add(input);
}
};
}
});
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testClientRequestReadFailsOnFirstRead() throws Exception
{
prepareServer(new EchoHttpServlet());
prepareProxy(new AsyncMiddleManServlet()
startServer(new EchoHttpServlet());
startProxy(new AsyncMiddleManServlet()
{
@Override
protected int readClientRequestContent(ServletInputStream input, byte[] buffer) throws IOException
@ -560,7 +667,7 @@ public class AsyncMiddleManServletTest
throw new IOException("explicitly_thrown_by_test");
}
});
prepareClient();
startClient();
final CountDownLatch latch = new CountDownLatch(1);
DeferredContentProvider content = new DeferredContentProvider();
@ -587,8 +694,8 @@ public class AsyncMiddleManServletTest
@Test
public void testClientRequestReadFailsOnSecondRead() throws Exception
{
prepareServer(new EchoHttpServlet());
prepareProxy(new AsyncMiddleManServlet()
startServer(new EchoHttpServlet());
startProxy(new AsyncMiddleManServlet()
{
private int count;
@ -601,7 +708,7 @@ public class AsyncMiddleManServletTest
throw new IOException("explicitly_thrown_by_test");
}
});
prepareClient();
startClient();
final CountDownLatch latch = new CountDownLatch(1);
DeferredContentProvider content = new DeferredContentProvider();
@ -638,7 +745,7 @@ public class AsyncMiddleManServletTest
private void testProxyResponseWriteFails(final int writeCount) throws Exception
{
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -649,7 +756,7 @@ public class AsyncMiddleManServletTest
output.write(new byte[512]);
}
});
prepareProxy(new AsyncMiddleManServlet()
startProxy(new AsyncMiddleManServlet()
{
private int count;
@ -662,7 +769,7 @@ public class AsyncMiddleManServletTest
throw new IOException("explicitly_thrown_by_test");
}
});
prepareClient();
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
@ -671,7 +778,7 @@ public class AsyncMiddleManServletTest
Assert.assertEquals(502, response.getStatus());
}
private void sleep(long delay) throws IOException
private void sleep(long delay)
{
try
{
@ -679,7 +786,7 @@ public class AsyncMiddleManServletTest
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
throw new RuntimeIOException(x);
}
}

View File

@ -82,7 +82,6 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
@ -124,12 +123,27 @@ public class ProxyServletTest
this.proxyServlet = (AbstractProxyServlet)proxyServletClass.newInstance();
}
private void prepareProxy() throws Exception
private void startServer(HttpServlet servlet) throws Exception
{
prepareProxy(new HashMap<String, String>());
QueuedThreadPool serverPool = new QueuedThreadPool();
serverPool.setName("server");
server = new Server(serverPool);
serverConnector = new ServerConnector(server);
server.addConnector(serverConnector);
ServletContextHandler appCtx = new ServletContextHandler(server, "/", true, false);
ServletHolder appServletHolder = new ServletHolder(servlet);
appCtx.addServlet(appServletHolder, "/*");
server.start();
}
private void prepareProxy(Map<String, String> initParams) throws Exception
private void startProxy() throws Exception
{
startProxy(new HashMap<String, String>());
}
private void startProxy(Map<String, String> initParams) throws Exception
{
QueuedThreadPool proxyPool = new QueuedThreadPool();
proxyPool.setName("proxy");
@ -150,55 +164,38 @@ public class ProxyServletTest
proxyContext.addServlet(proxyServletHolder, "/*");
proxy.start();
}
private void startClient() throws Exception
{
client = prepareClient();
}
private HttpClient prepareClient() throws Exception
{
HttpClient result = new HttpClient();
QueuedThreadPool clientPool = new QueuedThreadPool();
clientPool.setName("client");
HttpClient result = new HttpClient();
result.setExecutor(clientPool);
result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyConnector.getLocalPort()));
result.start();
return result;
}
private void prepareServer(HttpServlet servlet) throws Exception
{
QueuedThreadPool serverPool = new QueuedThreadPool();
serverPool.setName("server");
server = new Server(serverPool);
serverConnector = new ServerConnector(server);
server.addConnector(serverConnector);
ServletContextHandler appCtx = new ServletContextHandler(server, "/", true, false);
ServletHolder appServletHolder = new ServletHolder(servlet);
appCtx.addServlet(appServletHolder, "/*");
server.start();
}
@After
public void disposeProxy() throws Exception
public void dispose() throws Exception
{
client.stop();
proxy.stop();
}
@After
public void disposeServer() throws Exception
{
server.stop();
}
@Test
public void testProxyDown() throws Exception
{
prepareProxy();
prepareServer(new EmptyHttpServlet());
startServer(new EmptyHttpServlet());
startProxy();
startClient();
// Shutdown the proxy
proxy.stop();
@ -218,8 +215,7 @@ public class ProxyServletTest
@Test
public void testProxyWithoutContent() throws Exception
{
prepareProxy();
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -228,6 +224,8 @@ public class ProxyServletTest
resp.addHeader(PROXIED_HEADER, "true");
}
});
startProxy();
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
@ -241,21 +239,9 @@ public class ProxyServletTest
@Test
public void testProxyWithResponseContent() throws Exception
{
prepareProxy();
HttpClient result = new HttpClient();
result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyConnector.getLocalPort()));
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName("foo");
threadPool.setMaxThreads(20);
result.setExecutor(threadPool);
result.start();
ContentResponse[] responses = new ContentResponse[10];
final byte[] content = new byte[1024];
new Random().nextBytes(content);
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -265,16 +251,18 @@ public class ProxyServletTest
resp.getOutputStream().write(content);
}
});
startProxy();
startClient();
ContentResponse[] responses = new ContentResponse[10];
for (int i = 0; i < 10; ++i)
{
// Request is for the target server
responses[i] = result.newRequest("localhost", serverConnector.getLocalPort())
responses[i] = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
}
for (int i = 0; i < 10; ++i)
{
Assert.assertEquals(200, responses[i].getStatus());
@ -286,8 +274,7 @@ public class ProxyServletTest
@Test
public void testProxyWithRequestContentAndResponseContent() throws Exception
{
prepareProxy();
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -297,6 +284,8 @@ public class ProxyServletTest
IO.copy(req.getInputStream(), resp.getOutputStream());
}
});
startProxy();
startClient();
byte[] content = new byte[1024];
new Random().nextBytes(content);
@ -314,8 +303,7 @@ public class ProxyServletTest
@Test
public void testProxyWithBigRequestContentIgnored() throws Exception
{
prepareProxy();
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -335,6 +323,8 @@ public class ProxyServletTest
}
}
});
startProxy();
startClient();
byte[] content = new byte[128 * 1024];
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
@ -352,9 +342,7 @@ public class ProxyServletTest
{
final byte[] content = new byte[128 * 1024];
new Random().nextBytes(content);
prepareProxy();
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -373,6 +361,8 @@ public class ProxyServletTest
}
}
});
startProxy();
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.method(HttpMethod.POST)
@ -384,12 +374,9 @@ public class ProxyServletTest
Assert.assertTrue(response.getHeaders().containsKey(PROXIED_HEADER));
}
@Slow
@Test
public void testProxyWithBigResponseContentWithSlowReader() throws Exception
{
prepareProxy();
// Create a 6 MiB file
final int length = 6 * 1024;
Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();
@ -402,8 +389,7 @@ public class ProxyServletTest
for (int i = 0; i < length; ++i)
output.write(kb);
}
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -414,6 +400,8 @@ public class ProxyServletTest
}
}
});
startProxy();
startClient();
Request request = client.newRequest("localhost", serverConnector.getLocalPort()).path("/proxy/test");
final CountDownLatch latch = new CountDownLatch(1);
@ -449,9 +437,7 @@ public class ProxyServletTest
@Test
public void testProxyWithQueryString() throws Exception
{
prepareProxy();
String query = "a=1&b=%E2%82%AC";
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -459,7 +445,10 @@ public class ProxyServletTest
resp.getOutputStream().print(req.getQueryString());
}
});
startProxy();
startClient();
String query = "a=1&b=%E2%82%AC";
ContentResponse response = client.newRequest("http://localhost:" + serverConnector.getLocalPort() + "/?" + query)
.timeout(5, TimeUnit.SECONDS)
.send();
@ -467,13 +456,11 @@ public class ProxyServletTest
Assert.assertEquals(query, response.getContentAsString());
}
@Slow
@Test
public void testProxyLongPoll() throws Exception
{
prepareProxy();
final long timeout = 1000;
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
@ -510,6 +497,8 @@ public class ProxyServletTest
}
}
});
startProxy();
startClient();
Response response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(2 * timeout, TimeUnit.MILLISECONDS)
@ -521,8 +510,7 @@ public class ProxyServletTest
@Test
public void testProxyXForwardedHostHeaderIsPresent() throws Exception
{
prepareProxy();
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -532,6 +520,8 @@ public class ProxyServletTest
writer.flush();
}
});
startProxy();
startClient();
ContentResponse response = client.GET("http://localhost:" + serverConnector.getLocalPort());
Assert.assertThat("Response expected to contain content of X-Forwarded-Host Header from the request",
@ -542,8 +532,9 @@ public class ProxyServletTest
@Test
public void testProxyWhiteList() throws Exception
{
prepareProxy();
prepareServer(new EmptyHttpServlet());
startServer(new EmptyHttpServlet());
startProxy();
startClient();
int port = serverConnector.getLocalPort();
proxyServlet.getWhiteListHosts().add("127.0.0.1:" + port);
@ -563,8 +554,9 @@ public class ProxyServletTest
@Test
public void testProxyBlackList() throws Exception
{
prepareProxy();
prepareServer(new EmptyHttpServlet());
startServer(new EmptyHttpServlet());
startProxy();
startClient();
int port = serverConnector.getLocalPort();
proxyServlet.getBlackListHosts().add("localhost:" + port);
@ -584,8 +576,7 @@ public class ProxyServletTest
@Test
public void testClientExcludedHosts() throws Exception
{
prepareProxy();
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -594,6 +585,8 @@ public class ProxyServletTest
resp.addHeader(PROXIED_HEADER, "true");
}
});
startProxy();
startClient();
int port = serverConnector.getLocalPort();
client.getProxyConfiguration().getProxies().get(0).getExcludedAddresses().add("127.0.0.1:" + port);
@ -627,7 +620,7 @@ public class ProxyServletTest
private void testTransparentProxyWithPrefix(String prefix) throws Exception
{
final String target = "/test";
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -637,13 +630,13 @@ public class ProxyServletTest
resp.setStatus(target.equals(req.getRequestURI()) ? 200 : 404);
}
});
String proxyTo = "http://localhost:" + serverConnector.getLocalPort();
proxyServlet = new ProxyServlet.Transparent();
Map<String, String> params = new HashMap<>();
params.put("proxyTo", proxyTo);
params.put("prefix", prefix);
prepareProxy(params);
startProxy(params);
startClient();
// Make the request to the proxy, it should transparently forward to the server
ContentResponse response = client.newRequest("localhost", proxyConnector.getLocalPort())
@ -659,7 +652,7 @@ public class ProxyServletTest
{
final String target = "/test";
final String query = "a=1&b=2";
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -678,14 +671,14 @@ public class ProxyServletTest
resp.setStatus(404);
}
});
String proxyTo = "http://localhost:" + serverConnector.getLocalPort();
String prefix = "/proxy";
proxyServlet = new ProxyServlet.Transparent();
Map<String, String> params = new HashMap<>();
params.put("proxyTo", proxyTo);
params.put("prefix", prefix);
prepareProxy(params);
startProxy(params);
startClient();
// Make the request to the proxy, it should transparently forward to the server
ContentResponse response = client.newRequest("localhost", proxyConnector.getLocalPort())
@ -720,14 +713,14 @@ public class ProxyServletTest
resp.setStatus(404);
}
});
String proxyTo = "http://localhost:" + serverConnector.getLocalPort();
String prefix = "/proxy";
proxyServlet = new ProxyServlet.Transparent();
Map<String, String> params = new HashMap<>();
params.put("proxyTo", proxyTo);
params.put("prefix", prefix);
prepareProxy(params);
startProxy(params);
startClient();
// Make the request to the proxy, it should transparently forward to the server
ContentResponse response = client.newRequest("localhost", proxyConnector.getLocalPort())
@ -742,7 +735,7 @@ public class ProxyServletTest
public void testTransparentProxyWithoutPrefix() throws Exception
{
final String target = "/test";
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -752,12 +745,12 @@ public class ProxyServletTest
resp.setStatus(target.equals(req.getRequestURI()) ? 200 : 404);
}
});
final String proxyTo = "http://localhost:" + serverConnector.getLocalPort();
proxyServlet = new ProxyServlet.Transparent();
Map<String, String> initParams = new HashMap<>();
initParams.put("proxyTo", proxyTo);
prepareProxy(initParams);
startProxy(initParams);
startClient();
// Make the request to the proxy, it should transparently forward to the server
ContentResponse response = client.newRequest("localhost", proxyConnector.getLocalPort())
@ -772,7 +765,7 @@ public class ProxyServletTest
public void testCachingProxy() throws Exception
{
final byte[] content = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xA, 0xB, 0xC, 0xD, 0xE, 0xF};
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -782,7 +775,6 @@ public class ProxyServletTest
resp.getOutputStream().write(content);
}
});
// Don't do this at home: this example is not concurrent, not complete,
// it is only used for this test and to verify that ProxyServlet can be
// subclassed enough to write your own caching servlet
@ -832,7 +824,8 @@ public class ProxyServletTest
super.onProxyResponseSuccess(request, response, proxyResponse);
}
};
prepareProxy();
startProxy();
startClient();
// First request
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
@ -854,8 +847,7 @@ public class ProxyServletTest
@Test
public void testRedirectsAreProxied() throws Exception
{
prepareProxy();
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -865,6 +857,8 @@ public class ProxyServletTest
resp.sendRedirect("/");
}
});
startProxy();
startClient();
client.setFollowRedirects(false);
@ -879,8 +873,7 @@ public class ProxyServletTest
public void testGZIPContentIsProxied() throws Exception
{
final byte[] content = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
prepareProxy();
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -894,6 +887,8 @@ public class ProxyServletTest
gzipOutputStream.close();
}
});
startProxy();
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
@ -906,8 +901,7 @@ public class ProxyServletTest
@Test(expected = TimeoutException.class)
public void testWrongContentLength() throws Exception
{
prepareProxy();
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -918,6 +912,8 @@ public class ProxyServletTest
resp.getOutputStream().write(message);
}
});
startProxy();
startClient();
client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(1, TimeUnit.SECONDS)
@ -930,8 +926,7 @@ public class ProxyServletTest
public void testCookiesFromDifferentClientsAreNotMixed() throws Exception
{
final String name = "biscuit";
prepareProxy();
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -953,6 +948,8 @@ public class ProxyServletTest
}
}
});
startProxy();
startClient();
String value1 = "1";
ContentResponse response1 = client.newRequest("localhost", serverConnector.getLocalPort())
@ -997,15 +994,10 @@ public class ProxyServletTest
@Test
public void testProxyRequestFailureInTheMiddleOfProxyingSmallContent() throws Exception
{
final long proxyTimeout = 1000;
Map<String, String> proxyParams = new HashMap<>();
proxyParams.put("timeout", String.valueOf(proxyTimeout));
prepareProxy(proxyParams);
final CountDownLatch chunk1Latch = new CountDownLatch(1);
final int chunk1 = 'q';
final int chunk2 = 'w';
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -1033,6 +1025,11 @@ public class ProxyServletTest
}
}
});
final long proxyTimeout = 1000;
Map<String, String> proxyParams = new HashMap<>();
proxyParams.put("timeout", String.valueOf(proxyTimeout));
startProxy(proxyParams);
startClient();
InputStreamResponseListener listener = new InputStreamResponseListener();
int port = serverConnector.getLocalPort();
@ -1065,18 +1062,12 @@ public class ProxyServletTest
@Test
public void testProxyRequestFailureInTheMiddleOfProxyingBigContent() throws Exception
{
final long proxyTimeout = 1000;
int outputBufferSize = 1024;
Map<String, String> proxyParams = new HashMap<>();
proxyParams.put("timeout", String.valueOf(proxyTimeout));
proxyParams.put("outputBufferSize", String.valueOf(outputBufferSize));
prepareProxy(proxyParams);
final CountDownLatch chunk1Latch = new CountDownLatch(1);
final byte[] chunk1 = new byte[outputBufferSize];
new Random().nextBytes(chunk1);
final int chunk2 = 'w';
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -1104,6 +1095,12 @@ public class ProxyServletTest
}
}
});
final long proxyTimeout = 1000;
Map<String, String> proxyParams = new HashMap<>();
proxyParams.put("timeout", String.valueOf(proxyTimeout));
proxyParams.put("outputBufferSize", String.valueOf(outputBufferSize));
startProxy(proxyParams);
startClient();
InputStreamResponseListener listener = new InputStreamResponseListener();
int port = serverConnector.getLocalPort();
@ -1138,7 +1135,8 @@ public class ProxyServletTest
@Test
public void testResponseHeadersAreNotRemoved() throws Exception
{
prepareProxy();
startServer(new EmptyHttpServlet());
startProxy();
proxyContext.stop();
final String headerName = "X-Test";
final String headerValue = "test-value";
@ -1162,9 +1160,11 @@ public class ProxyServletTest
}
}), "/*", EnumSet.of(DispatcherType.REQUEST));
proxyContext.start();
prepareServer(new EmptyHttpServlet());
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort()).send();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(headerValue, response.getHeaders().get(headerName));
@ -1173,16 +1173,13 @@ public class ProxyServletTest
@Test
public void testHeadersListedByConnectionHeaderAreRemoved() throws Exception
{
prepareProxy();
final Map<String, String> hopHeaders = new LinkedHashMap<>();
hopHeaders.put(HttpHeader.TE.asString(), "gzip");
hopHeaders.put(HttpHeader.CONNECTION.asString(), "Keep-Alive, Foo, Bar");
hopHeaders.put("Foo", "abc");
hopHeaders.put("Foo", "def");
hopHeaders.put(HttpHeader.KEEP_ALIVE.asString(), "timeout=30");
prepareServer(new HttpServlet()
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -1195,11 +1192,15 @@ public class ProxyServletTest
}
}
});
startProxy();
startClient();
Request request = client.newRequest("localhost", serverConnector.getLocalPort());
for (Map.Entry<String, String> entry : hopHeaders.entrySet())
request.header(entry.getKey(), entry.getValue());
ContentResponse response = request.send();
ContentResponse response = request
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(200, response.getStatus());
}

View File

@ -64,10 +64,8 @@ public class CountingCallback implements Callback
if (count.compareAndSet(current, current - 1))
{
if (current == 1)
{
callback.succeeded();
return;
}
return;
}
}
}
@ -91,4 +89,10 @@ public class CountingCallback implements Callback
}
}
}
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
}