431642 - Implement ProxyServlet using Servlet 3.1 async I/O.
Added more tests for failure cases.
This commit is contained in:
parent
976d86b375
commit
a405454276
|
@ -21,7 +21,6 @@ package org.eclipse.jetty.proxy;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.WritePendingException;
|
||||
import javax.servlet.AsyncContext;
|
||||
import javax.servlet.ReadListener;
|
||||
import javax.servlet.ServletInputStream;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
|
@ -44,7 +43,7 @@ public class AsyncProxyServlet extends ProxyServlet
|
|||
{
|
||||
ServletInputStream input = request.getInputStream();
|
||||
DeferredContentProvider provider = new DeferredContentProvider();
|
||||
input.setReadListener(new StreamReader(input, getRequestId(request), provider));
|
||||
input.setReadListener(new StreamReader(request, provider));
|
||||
return provider;
|
||||
}
|
||||
|
||||
|
@ -53,12 +52,11 @@ public class AsyncProxyServlet extends ProxyServlet
|
|||
{
|
||||
try
|
||||
{
|
||||
int requestId = getRequestId(request);
|
||||
_log.debug("{} proxying content to downstream: {} bytes", requestId, length);
|
||||
_log.debug("{} proxying content to downstream: {} bytes", getRequestId(request), length);
|
||||
StreamWriter writeListener = (StreamWriter)request.getAttribute(WRITE_LISTENER_ATTRIBUTE);
|
||||
if (writeListener == null)
|
||||
{
|
||||
writeListener = new StreamWriter(request.getAsyncContext(), requestId);
|
||||
writeListener = new StreamWriter(request, proxyResponse);
|
||||
request.setAttribute(WRITE_LISTENER_ATTRIBUTE, writeListener);
|
||||
|
||||
// Set the data to write before calling setWriteListener(), because
|
||||
|
@ -77,29 +75,28 @@ public class AsyncProxyServlet extends ProxyServlet
|
|||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
// TODO: who calls asyncContext.complete() in this case ?
|
||||
callback.failed(x);
|
||||
onResponseFailure(request, response, proxyResponse, x);
|
||||
}
|
||||
}
|
||||
|
||||
private class StreamReader implements ReadListener, Callback
|
||||
{
|
||||
private final byte[] buffer = new byte[512];
|
||||
// private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
|
||||
private final ServletInputStream input;
|
||||
private final int requestId;
|
||||
private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
|
||||
private final HttpServletRequest request;
|
||||
private final DeferredContentProvider provider;
|
||||
|
||||
public StreamReader(ServletInputStream input, int requestId, DeferredContentProvider provider)
|
||||
public StreamReader(HttpServletRequest request, DeferredContentProvider provider)
|
||||
{
|
||||
this.input = input;
|
||||
this.requestId = requestId;
|
||||
this.request = request;
|
||||
this.provider = provider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDataAvailable() throws IOException
|
||||
{
|
||||
int requestId = getRequestId(request);
|
||||
ServletInputStream input = request.getInputStream();
|
||||
_log.debug("{} asynchronous read start on {}", requestId, input);
|
||||
|
||||
// First check for isReady() because it has
|
||||
|
@ -123,7 +120,7 @@ public class AsyncProxyServlet extends ProxyServlet
|
|||
@Override
|
||||
public void onAllDataRead() throws IOException
|
||||
{
|
||||
_log.debug("{} proxying content to upstream completed", requestId);
|
||||
_log.debug("{} proxying content to upstream completed", getRequestId(request));
|
||||
provider.close();
|
||||
}
|
||||
|
||||
|
@ -138,7 +135,7 @@ public class AsyncProxyServlet extends ProxyServlet
|
|||
{
|
||||
try
|
||||
{
|
||||
if (input.isReady())
|
||||
if (request.getInputStream().isReady())
|
||||
onDataAvailable();
|
||||
}
|
||||
catch (Throwable x)
|
||||
|
@ -150,25 +147,24 @@ public class AsyncProxyServlet extends ProxyServlet
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
// TODO: send a response error ?
|
||||
// complete the async context since we cannot throw an exception from here.
|
||||
onClientRequestFailure(request, x);
|
||||
}
|
||||
}
|
||||
|
||||
private class StreamWriter implements WriteListener
|
||||
{
|
||||
private final AsyncContext asyncContext;
|
||||
private final int requestId;
|
||||
private final HttpServletRequest request;
|
||||
private final Response proxyResponse;
|
||||
private WriteState state;
|
||||
private byte[] buffer;
|
||||
private int offset;
|
||||
private int length;
|
||||
private Callback callback;
|
||||
|
||||
private StreamWriter(AsyncContext asyncContext, int requestId)
|
||||
private StreamWriter(HttpServletRequest request, Response proxyResponse)
|
||||
{
|
||||
this.asyncContext = asyncContext;
|
||||
this.requestId = requestId;
|
||||
this.request = request;
|
||||
this.proxyResponse = proxyResponse;
|
||||
this.state = WriteState.IDLE;
|
||||
}
|
||||
|
||||
|
@ -186,7 +182,8 @@ public class AsyncProxyServlet extends ProxyServlet
|
|||
@Override
|
||||
public void onWritePossible() throws IOException
|
||||
{
|
||||
ServletOutputStream output = asyncContext.getResponse().getOutputStream();
|
||||
int requestId = getRequestId(request);
|
||||
ServletOutputStream output = request.getAsyncContext().getResponse().getOutputStream();
|
||||
if (state == WriteState.READY)
|
||||
{
|
||||
// There is data to write.
|
||||
|
@ -224,15 +221,16 @@ public class AsyncProxyServlet extends ProxyServlet
|
|||
callback = null;
|
||||
state = WriteState.IDLE;
|
||||
// Call the callback only after the whole state has been reset,
|
||||
// because the callback may trigger a reentrant call and the
|
||||
// state would be the old one
|
||||
// because the callback may trigger a reentrant call and
|
||||
// the state must already be the new one that we reset here.
|
||||
c.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable failure)
|
||||
{
|
||||
// TODO:
|
||||
HttpServletResponse response = (HttpServletResponse)request.getAsyncContext().getResponse();
|
||||
onResponseFailure(request, response, proxyResponse, failure);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.eclipse.jetty.client.api.Result;
|
|||
import org.eclipse.jetty.client.util.InputStreamContentProvider;
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.HttpCookieStore;
|
||||
|
@ -495,9 +496,24 @@ public class ProxyServlet extends HttpServlet
|
|||
_log.debug("{} proxying content to upstream: {} bytes", getRequestId(request), length);
|
||||
return super.onRead(buffer, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onReadFailure(Throwable failure)
|
||||
{
|
||||
onClientRequestFailure(request, failure);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected void onClientRequestFailure(HttpServletRequest request, Throwable failure)
|
||||
{
|
||||
AsyncContext asyncContext = request.getAsyncContext();
|
||||
HttpServletResponse response = (HttpServletResponse)asyncContext.getResponse();
|
||||
response.setStatus(500);
|
||||
response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString());
|
||||
asyncContext.complete();
|
||||
}
|
||||
|
||||
protected void onRewriteFailed(HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
response.sendError(HttpServletResponse.SC_FORBIDDEN);
|
||||
|
@ -529,7 +545,7 @@ public class ProxyServlet extends HttpServlet
|
|||
if (newHeaderValue == null || newHeaderValue.trim().length() == 0)
|
||||
continue;
|
||||
|
||||
response.addHeader(headerName, newHeaderValue);
|
||||
response.setHeader(headerName, newHeaderValue);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -563,6 +579,7 @@ public class ProxyServlet extends HttpServlet
|
|||
response.setStatus(HttpServletResponse.SC_GATEWAY_TIMEOUT);
|
||||
else
|
||||
response.setStatus(HttpServletResponse.SC_BAD_GATEWAY);
|
||||
response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString());
|
||||
}
|
||||
AsyncContext asyncContext = request.getAsyncContext();
|
||||
asyncContext.complete();
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.proxy;
|
||||
|
||||
import java.io.IOException;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.util.IO;
|
||||
|
||||
public class EchoHttpServlet extends HttpServlet
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
IO.copy(request.getInputStream(), response.getOutputStream());
|
||||
}
|
||||
}
|
|
@ -27,12 +27,7 @@ import javax.servlet.http.HttpServletResponse;
|
|||
public class EmptyHttpServlet extends HttpServlet
|
||||
{
|
||||
@Override
|
||||
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,13 @@
|
|||
|
||||
package org.eclipse.jetty.proxy;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.Socket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -31,14 +37,21 @@ import javax.servlet.http.HttpServletResponse;
|
|||
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpProxy;
|
||||
import org.eclipse.jetty.client.api.ContentProvider;
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.util.BytesContentProvider;
|
||||
import org.eclipse.jetty.client.util.DeferredContentProvider;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.toolchain.test.TestTracker;
|
||||
import org.eclipse.jetty.toolchain.test.http.SimpleHttpParser;
|
||||
import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.StdErrLog;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
|
@ -86,7 +99,7 @@ public class ProxyServletFailureTest
|
|||
{
|
||||
QueuedThreadPool executor = new QueuedThreadPool();
|
||||
executor.setName("proxy");
|
||||
proxy = new Server();
|
||||
proxy = new Server(executor);
|
||||
proxyConnector = new ServerConnector(proxy);
|
||||
proxy.addConnector(proxyConnector);
|
||||
|
||||
|
@ -139,8 +152,126 @@ public class ProxyServletFailureTest
|
|||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientRequestStallsHeadersProxyIdlesTimeout() throws Exception
|
||||
{
|
||||
prepareProxy();
|
||||
int idleTimeout = 2000;
|
||||
proxyConnector.setIdleTimeout(idleTimeout);
|
||||
|
||||
prepareServer(new EchoHttpServlet());
|
||||
|
||||
try (Socket socket = new Socket("localhost", proxyConnector.getLocalPort()))
|
||||
{
|
||||
String serverHostPort = "localhost:" + serverConnector.getLocalPort();
|
||||
String request = "" +
|
||||
"GET http://" + serverHostPort + " HTTP/1.1\r\n" +
|
||||
"Host: " + serverHostPort + "\r\n";
|
||||
// Don't sent the \r\n that would signal the end of the headers.
|
||||
OutputStream output = socket.getOutputStream();
|
||||
output.write(request.getBytes("UTF-8"));
|
||||
output.flush();
|
||||
|
||||
// Wait for idle timeout to fire.
|
||||
|
||||
socket.setSoTimeout(2 * idleTimeout);
|
||||
InputStream input = socket.getInputStream();
|
||||
Assert.assertEquals(-1, input.read());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientRequestStallsContentProxyIdlesTimeout() throws Exception
|
||||
{
|
||||
prepareProxy();
|
||||
int idleTimeout = 2000;
|
||||
proxyConnector.setIdleTimeout(idleTimeout);
|
||||
|
||||
prepareServer(new EchoHttpServlet());
|
||||
|
||||
try (Socket socket = new Socket("localhost", proxyConnector.getLocalPort()))
|
||||
{
|
||||
String serverHostPort = "localhost:" + serverConnector.getLocalPort();
|
||||
String request = "" +
|
||||
"GET http://" + serverHostPort + " HTTP/1.1\r\n" +
|
||||
"Host: " + serverHostPort + "\r\n" +
|
||||
"Content-Length: 1\r\n" +
|
||||
"\r\n";
|
||||
OutputStream output = socket.getOutputStream();
|
||||
output.write(request.getBytes("UTF-8"));
|
||||
output.flush();
|
||||
|
||||
// Do not send the promised content, wait to idle timeout.
|
||||
|
||||
socket.setSoTimeout(2 * idleTimeout);
|
||||
SimpleHttpParser parser = new SimpleHttpParser();
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
|
||||
SimpleHttpResponse response = parser.readResponse(reader);
|
||||
Assert.assertTrue(Integer.parseInt(response.getCode()) >= 500);
|
||||
String connectionHeader = response.getHeaders().get("connection");
|
||||
Assert.assertNotNull(connectionHeader);
|
||||
Assert.assertTrue(connectionHeader.contains("close"));
|
||||
Assert.assertEquals(-1, reader.read());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProxyRequestStallsContentServerIdlesTimeout() throws Exception
|
||||
{
|
||||
final byte[] content = new byte[]{'C', '0', 'F', 'F', 'E', 'E'};
|
||||
if (proxyServlet instanceof AsyncProxyServlet)
|
||||
{
|
||||
proxyServlet = new AsyncProxyServlet()
|
||||
{
|
||||
@Override
|
||||
protected ContentProvider proxyRequestContent(Request proxyRequest, HttpServletRequest request) throws IOException
|
||||
{
|
||||
return new DeferredContentProvider()
|
||||
{
|
||||
@Override
|
||||
public boolean offer(ByteBuffer buffer, Callback callback)
|
||||
{
|
||||
// Ignore all content to trigger the test condition.
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
proxyServlet = new ProxyServlet()
|
||||
{
|
||||
@Override
|
||||
protected ContentProvider proxyRequestContent(Request proxyRequest, HttpServletRequest request) throws IOException
|
||||
{
|
||||
return new BytesContentProvider(content)
|
||||
{
|
||||
@Override
|
||||
public long getLength()
|
||||
{
|
||||
// Increase the content length to trigger the test condition.
|
||||
return content.length + 1;
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
prepareProxy();
|
||||
prepareServer(new EchoHttpServlet());
|
||||
long idleTimeout = 1000;
|
||||
serverConnector.setIdleTimeout(idleTimeout);
|
||||
|
||||
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
|
||||
.content(new BytesContentProvider(content))
|
||||
.send();
|
||||
|
||||
Assert.assertEquals(500, response.getStatus());
|
||||
}
|
||||
|
||||
@Test(expected = TimeoutException.class)
|
||||
public void testClientRequestExpired() throws Exception
|
||||
public void testClientRequestExpires() throws Exception
|
||||
{
|
||||
prepareProxy();
|
||||
final long timeout = 1000;
|
||||
|
@ -242,4 +373,9 @@ public class ProxyServletFailureTest
|
|||
((StdErrLog)Log.getLogger(ServletHandler.class)).setHideStacks(false);
|
||||
}
|
||||
}
|
||||
|
||||
private interface Function<T, R>
|
||||
{
|
||||
public R apply(T arg);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
||||
#org.eclipse.jetty.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.client.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.proxy.LEVEL=DEBUG
|
||||
|
|
Loading…
Reference in New Issue