470184 - Send the proxy-to-server request more lazily.

This commit is contained in:
Simone Bordet 2015-06-15 16:31:38 +02:00
parent 2c26e82fea
commit 3958625993
3 changed files with 182 additions and 7 deletions

View File

@ -46,6 +46,7 @@ import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -544,7 +545,10 @@ public abstract class AbstractProxyServlet extends HttpServlet
boolean aborted = proxyRequest.abort(failure);
if (!aborted)
{
proxyResponse.setStatus(500);
int status = failure instanceof TimeoutException ?
HttpStatus.REQUEST_TIMEOUT_408 :
HttpStatus.INTERNAL_SERVER_ERROR_500;
proxyResponse.setStatus(status);
clientRequest.getAsyncContext().complete();
}
}

View File

@ -66,6 +66,7 @@ import org.eclipse.jetty.util.component.Destroyable;
*/
public class AsyncMiddleManServlet extends AbstractProxyServlet
{
private static final String PROXY_REQUEST_COMMITTED = AsyncMiddleManServlet.class.getName() + ".proxyRequestCommitted";
private static final String CLIENT_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".clientTransformer";
private static final String SERVER_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".serverTransformer";
@ -316,20 +317,23 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
private void process(ByteBuffer content, Callback callback, boolean finished) throws IOException
{
ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
boolean committed = transformer != null;
if (transformer == null)
{
transformer = newClientRequestContentTransformer(clientRequest, proxyRequest);
clientRequest.setAttribute(CLIENT_TRANSFORMER, transformer);
}
if (!content.hasRemaining() && !finished)
boolean committed = clientRequest.getAttribute(PROXY_REQUEST_COMMITTED) != null;
int contentBytes = content.remaining();
// Skip transformation for empty non-last buffers.
if (contentBytes == 0 && !finished)
{
callback.succeeded();
return;
}
int contentBytes = content.remaining();
transform(transformer, content, finished, buffers);
int newContentBytes = 0;
@ -352,14 +356,15 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
if (_log.isDebugEnabled())
_log.debug("{} upstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes);
if (!committed)
if (!committed && (size > 0 || finished))
{
proxyRequest.header(HttpHeader.CONTENT_LENGTH, null);
clientRequest.setAttribute(PROXY_REQUEST_COMMITTED, true);
sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
}
if (size == 0)
succeeded();
callback.succeeded();
}
@Override

View File

@ -1110,7 +1110,7 @@ public class AsyncMiddleManServletTest
.send();
Assert.assertTrue(destroyLatch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertEquals(HttpStatus.GATEWAY_TIMEOUT_504, response.getStatus());
Assert.assertEquals(HttpStatus.REQUEST_TIMEOUT_408, response.getStatus());
}
@Test
@ -1281,6 +1281,172 @@ public class AsyncMiddleManServletTest
Assert.assertFalse(transformed.get());
}
@Test
public void testProxyRequestHeadersSentWhenDiscardingContent() throws Exception
{
startServer(new EchoHttpServlet());
final CountDownLatch proxyRequestLatch = new CountDownLatch(1);
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
{
return new DiscardContentTransformer();
}
@Override
protected void sendProxyRequest(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest)
{
proxyRequestLatch.countDown();
super.sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
}
});
startClient();
DeferredContentProvider content = new DeferredContentProvider();
Request request = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.content(content);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
// Send one chunk of content, the proxy request must not be sent.
ByteBuffer chunk1 = ByteBuffer.allocate(1024);
content.offer(chunk1);
Assert.assertFalse(proxyRequestLatch.await(1, TimeUnit.SECONDS));
// Send another chunk of content, the proxy request must not be sent.
ByteBuffer chunk2 = ByteBuffer.allocate(512);
content.offer(chunk2);
Assert.assertFalse(proxyRequestLatch.await(1, TimeUnit.SECONDS));
// Finish the content, request must be sent.
content.close();
Assert.assertTrue(proxyRequestLatch.await(1, TimeUnit.SECONDS));
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertEquals(0, response.getContent().length);
}
@Test
public void testProxyRequestHeadersNotSentUntilContent() throws Exception
{
startServer(new EchoHttpServlet());
final CountDownLatch proxyRequestLatch = new CountDownLatch(1);
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
{
return new BufferingContentTransformer();
}
@Override
protected void sendProxyRequest(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest)
{
proxyRequestLatch.countDown();
super.sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
}
});
startClient();
DeferredContentProvider content = new DeferredContentProvider();
Request request = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.content(content);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
// Send one chunk of content, the proxy request must not be sent.
ByteBuffer chunk1 = ByteBuffer.allocate(1024);
content.offer(chunk1);
Assert.assertFalse(proxyRequestLatch.await(1, TimeUnit.SECONDS));
// Send another chunk of content, the proxy request must not be sent.
ByteBuffer chunk2 = ByteBuffer.allocate(512);
content.offer(chunk2);
Assert.assertFalse(proxyRequestLatch.await(1, TimeUnit.SECONDS));
// Finish the content, request must be sent.
content.close();
Assert.assertTrue(proxyRequestLatch.await(1, TimeUnit.SECONDS));
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertEquals(chunk1.capacity() + chunk2.capacity(), response.getContent().length);
}
@Test
public void testProxyRequestHeadersNotSentUntilFirstContent() throws Exception
{
startServer(new EchoHttpServlet());
final CountDownLatch proxyRequestLatch = new CountDownLatch(1);
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
{
return new ContentTransformer()
{
private ByteBuffer buffer;
@Override
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
{
// Buffer only the first chunk.
if (buffer == null)
{
buffer = ByteBuffer.allocate(input.remaining());
buffer.put(input).flip();
}
else if (buffer.hasRemaining())
{
output.add(buffer);
output.add(input);
}
else
{
output.add(input);
}
}
};
}
@Override
protected void sendProxyRequest(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest)
{
proxyRequestLatch.countDown();
super.sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
}
});
startClient();
DeferredContentProvider content = new DeferredContentProvider();
Request request = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.content(content);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
// Send one chunk of content, the proxy request must not be sent.
ByteBuffer chunk1 = ByteBuffer.allocate(1024);
content.offer(chunk1);
Assert.assertFalse(proxyRequestLatch.await(1, TimeUnit.SECONDS));
// Send another chunk of content, the proxy request must be sent.
ByteBuffer chunk2 = ByteBuffer.allocate(512);
content.offer(chunk2);
Assert.assertTrue(proxyRequestLatch.await(5, TimeUnit.SECONDS));
// Finish the content.
content.close();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertEquals(chunk1.capacity() + chunk2.capacity(), response.getContent().length);
}
private Path prepareTargetTestsDir() throws IOException
{
final Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();