444416 - AsyncProxyServlet recursion.

Implemented reading of content using IteratingCallback to avoid
recursion.
This commit is contained in:
Simone Bordet 2014-09-22 18:21:35 +02:00
parent f1678124dc
commit ee0f90b6b9
2 changed files with 72 additions and 95 deletions

View File

@ -22,8 +22,6 @@ import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ReadListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
@ -38,6 +36,7 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
public class AsyncProxyServlet extends ProxyServlet
{
@ -114,25 +113,12 @@ public class AsyncProxyServlet extends ProxyServlet
}
}
/**
* State machine for reader
* null PENDING SUCCESS FAILURE
* ---------------------------------------
* onRequestContent call null null null null
* onRequestContent called PENDING - (iterate) -
* succeeded SUCCESS SUCCESS->ODA - -
* failed FAILED FAILED - -
*
*/
private enum ReadState { OFFER, PENDING, SUCCESS, FAILURE };
protected class StreamReader implements ReadListener, Callback
protected class StreamReader extends IteratingCallback implements ReadListener
{
private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
private final Request proxyRequest;
private final HttpServletRequest request;
private final DeferredContentProvider provider;
private final AtomicReference<ReadState> state = new AtomicReference<>(null);
protected StreamReader(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider)
{
@ -144,13 +130,28 @@ public class AsyncProxyServlet extends ProxyServlet
@Override
public void onDataAvailable() throws IOException
{
int requestId=0;
ServletInputStream input = request.getInputStream();
iterate();
}
@Override
public void onAllDataRead() throws IOException
{
if (_log.isDebugEnabled())
{
requestId= getRequestId(request);
_log.debug("{} asynchronous read start on {}", requestId, input);
}
_log.debug("{} proxying content to upstream completed", getRequestId(request));
provider.close();
}
@Override
public void onError(Throwable t)
{
onClientRequestFailure(proxyRequest, request, t);
}
@Override
protected Action process() throws Exception
{
int requestId = _log.isDebugEnabled() ? getRequestId(request) : 0;
ServletInputStream input = request.getInputStream();
// First check for isReady() because it has
// side effects, and then for isFinished().
@ -163,16 +164,22 @@ public class AsyncProxyServlet extends ProxyServlet
{
if (_log.isDebugEnabled())
_log.debug("{} proxying content to upstream: {} bytes", requestId, read);
state.set(ReadState.OFFER);
onRequestContent(proxyRequest, request, provider, buffer, 0, read, this);
if (state.compareAndSet(ReadState.OFFER,ReadState.PENDING) || state.get()!=ReadState.SUCCESS)
break;
return Action.SCHEDULED;
}
}
if (!input.isFinished())
if (input.isFinished())
{
if (_log.isDebugEnabled())
_log.debug("{} asynchronous read complete on {}", requestId, input);
return Action.SUCCEEDED;
}
else
{
if (_log.isDebugEnabled())
_log.debug("{} asynchronous read pending on {}", requestId, input);
return Action.IDLE;
}
}
@ -181,77 +188,11 @@ public class AsyncProxyServlet extends ProxyServlet
provider.offer(ByteBuffer.wrap(buffer, offset, length), callback);
}
@Override
public void onAllDataRead() throws IOException
{
if (_log.isDebugEnabled())
_log.debug("{} proxying content to upstream completed", getRequestId(request));
provider.close();
}
@Override
public void onError(Throwable x)
{
failed(x);
}
@Override
public void succeeded()
{
loop: while (true)
{
switch(state.get())
{
case OFFER:
if (!state.compareAndSet(ReadState.OFFER,ReadState.SUCCESS))
continue;
// Nothing to do as onDataAvailable() will iterate
break loop;
case PENDING:
if (!state.compareAndSet(ReadState.PENDING,ReadState.SUCCESS))
continue;
try
{
onDataAvailable();
}
catch (Throwable x)
{
failed(x);
}
break loop;
default:
throw new IllegalStateException("state="+state.get());
}
}
}
@Override
public void failed(Throwable x)
{
while (true)
{
switch(state.get())
{
case OFFER:
if (!state.compareAndSet(ReadState.OFFER,ReadState.SUCCESS))
continue;
onClientRequestFailure(proxyRequest, request, x);
break;
case PENDING:
if (!state.compareAndSet(ReadState.PENDING,ReadState.SUCCESS))
continue;
onClientRequestFailure(proxyRequest, request, x);
break;
default:
throw new IllegalStateException(x);
}
}
super.failed(x);
onError(x);
}
}

View File

@ -181,7 +181,7 @@ public class ProxyServletFailureTest
}
@Test
public void testClientRequestStallsContentProxyIdlesTimeout() throws Exception
public void testClientRequestDoesNotSendContentProxyIdlesTimeout() throws Exception
{
prepareProxy();
int idleTimeout = 2000;
@ -215,6 +215,42 @@ public class ProxyServletFailureTest
}
}
@Test
public void testClientRequestStallsContentProxyIdlesTimeout() throws Exception
{
prepareProxy();
int idleTimeout = 2000;
proxyConnector.setIdleTimeout(idleTimeout);
prepareServer(new EchoHttpServlet());
try (Socket socket = new Socket("localhost", proxyConnector.getLocalPort()))
{
String serverHostPort = "localhost:" + serverConnector.getLocalPort();
String request = "" +
"GET http://" + serverHostPort + " HTTP/1.1\r\n" +
"Host: " + serverHostPort + "\r\n" +
"Content-Length: 2\r\n" +
"\r\n" +
"Z";
OutputStream output = socket.getOutputStream();
output.write(request.getBytes("UTF-8"));
output.flush();
// Do not send all the promised content, wait to idle timeout.
socket.setSoTimeout(2 * idleTimeout);
SimpleHttpParser parser = new SimpleHttpParser();
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
SimpleHttpResponse response = parser.readResponse(reader);
Assert.assertTrue(Integer.parseInt(response.getCode()) >= 500);
String connectionHeader = response.getHeaders().get("connection");
Assert.assertNotNull(connectionHeader);
Assert.assertTrue(connectionHeader.contains("close"));
Assert.assertEquals(-1, reader.read());
}
}
@Test
public void testProxyRequestStallsContentServerIdlesTimeout() throws Exception
{