444416 Interative Callback pattern in AsyncProxyServlet

This commit is contained in:
Greg Wilkins 2014-09-18 16:20:08 +10:00
parent 3b066ca2ae
commit ca07a9947e
1 changed files with 69 additions and 11 deletions

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException; import java.nio.channels.WritePendingException;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ReadListener; import javax.servlet.ReadListener;
import javax.servlet.ServletConfig; import javax.servlet.ServletConfig;
import javax.servlet.ServletException; import javax.servlet.ServletException;
@ -112,12 +114,25 @@ public class AsyncProxyServlet extends ProxyServlet
} }
} }
/**
* State machine for reader
* null PENDING SUCCESS FAILURE
* ---------------------------------------
* onRequestContent call null null null null
* onRequestContent called PENDING - (iterate) -
* succeeded SUCCESS SUCCESS->ODA - -
* failed FAILED FAILED - -
*
*/
private enum ReadState { OFFER, PENDING, SUCCESS, FAILURE };
protected class StreamReader implements ReadListener, Callback protected class StreamReader implements ReadListener, Callback
{ {
private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()]; private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
private final Request proxyRequest; private final Request proxyRequest;
private final HttpServletRequest request; private final HttpServletRequest request;
private final DeferredContentProvider provider; private final DeferredContentProvider provider;
private final AtomicReference<ReadState> state = new AtomicReference<>(null);
protected StreamReader(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider) protected StreamReader(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider)
{ {
@ -129,10 +144,13 @@ public class AsyncProxyServlet extends ProxyServlet
@Override @Override
public void onDataAvailable() throws IOException public void onDataAvailable() throws IOException
{ {
int requestId = getRequestId(request); int requestId=0;
ServletInputStream input = request.getInputStream(); ServletInputStream input = request.getInputStream();
if (_log.isDebugEnabled()) if (_log.isDebugEnabled())
{
requestId= getRequestId(request);
_log.debug("{} asynchronous read start on {}", requestId, input); _log.debug("{} asynchronous read start on {}", requestId, input);
}
// First check for isReady() because it has // First check for isReady() because it has
// side effects, and then for isFinished(). // side effects, and then for isFinished().
@ -145,9 +163,10 @@ public class AsyncProxyServlet extends ProxyServlet
{ {
if (_log.isDebugEnabled()) if (_log.isDebugEnabled())
_log.debug("{} proxying content to upstream: {} bytes", requestId, read); _log.debug("{} proxying content to upstream: {} bytes", requestId, read);
state.set(ReadState.OFFER);
onRequestContent(proxyRequest, request, provider, buffer, 0, read, this); onRequestContent(proxyRequest, request, provider, buffer, 0, read, this);
// Do not call isReady() so that we can apply backpressure. if (state.compareAndSet(ReadState.OFFER,ReadState.PENDING) || state.get()!=ReadState.SUCCESS)
break; break;
} }
} }
if (!input.isFinished()) if (!input.isFinished())
@ -179,21 +198,60 @@ public class AsyncProxyServlet extends ProxyServlet
@Override @Override
public void succeeded() public void succeeded()
{ {
try loop: while (true)
{ {
if (request.getInputStream().isReady()) switch(state.get())
onDataAvailable(); {
} case OFFER:
catch (Throwable x) if (!state.compareAndSet(ReadState.OFFER,ReadState.SUCCESS))
{ continue;
failed(x); // Nothing to do as onDataAvailable() will iterate
break loop;
case PENDING:
if (!state.compareAndSet(ReadState.PENDING,ReadState.SUCCESS))
continue;
try
{
onDataAvailable();
}
catch (Throwable x)
{
failed(x);
}
break loop;
default:
throw new IllegalStateException("state="+state.get());
}
} }
} }
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
onClientRequestFailure(proxyRequest, request, x); while (true)
{
switch(state.get())
{
case OFFER:
if (!state.compareAndSet(ReadState.OFFER,ReadState.SUCCESS))
continue;
onClientRequestFailure(proxyRequest, request, x);
break;
case PENDING:
if (!state.compareAndSet(ReadState.PENDING,ReadState.SUCCESS))
continue;
onClientRequestFailure(proxyRequest, request, x);
break;
default:
throw new IllegalStateException(x);
}
}
} }
} }