434056 - Support content consumed asynchronously.

Implemented content consumed asynchronously for FastCGI (on the client).
This commit is contained in:
Simone Bordet 2014-05-04 23:15:54 +02:00
parent c4b5e3d3d7
commit 871b330ba8
19 changed files with 259 additions and 84 deletions

View File

@ -103,11 +103,13 @@ public class HttpChannelOverFCGI extends HttpChannel
return exchange != null && receiver.responseHeaders(exchange);
}
protected boolean content(ByteBuffer buffer)
protected boolean content(ByteBuffer buffer, Callback callback)
{
HttpExchange exchange = getHttpExchange();
// TODO: handle callback properly
return exchange != null && receiver.responseContent(exchange, buffer, new Callback.Adapter());
if (exchange != null)
return receiver.responseContent(exchange, buffer, callback);
callback.succeeded();
return false;
}
protected boolean responseSuccess()

View File

@ -45,6 +45,7 @@ import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.CompletableCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -60,6 +61,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
private final boolean multiplexed;
private final Delegate delegate;
private final ClientParser parser;
private ByteBuffer buffer;
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed)
{
@ -98,48 +100,66 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
@Override
public void onFillable()
{
EndPoint endPoint = getEndPoint();
HttpClient client = destination.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
try
buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
process();
}
private void process()
{
if (readAndParse())
{
while (true)
HttpClient client = destination.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
bufferPool.release(buffer);
// Don't linger the buffer around if we are idle.
buffer = null;
}
}
private boolean readAndParse()
{
EndPoint endPoint = getEndPoint();
ByteBuffer buffer = this.buffer;
while (true)
{
try
{
if (!parse(buffer))
return false;
int read = endPoint.fill(buffer);
if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'.
LOG.debug("Read {} bytes from {}", read, endPoint);
if (read > 0)
{
parse(buffer);
if (!parse(buffer))
return false;
}
else if (read == 0)
{
fillInterested();
break;
return true;
}
else
{
shutdown();
break;
return true;
}
}
}
catch (Exception x)
{
LOG.debug(x);
close(x);
}
finally
{
bufferPool.release(buffer);
catch (Exception x)
{
LOG.debug(x);
close(x);
return false;
}
}
}
private void parse(ByteBuffer buffer)
private boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
parser.parse(buffer);
return !parser.parse(buffer);
}
private void shutdown()
@ -313,7 +333,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
}
@Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
switch (stream)
{
@ -321,7 +341,25 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
{
HttpChannelOverFCGI channel = channels.get(request);
if (channel != null)
channel.content(buffer);
{
CompletableCallback callback = new CompletableCallback()
{
@Override
public void resume()
{
LOG.debug("Content consumed asynchronously, resuming processing");
process();
}
@Override
public void abort(Throwable x)
{
close(x);
}
};
channel.content(buffer, callback);
return callback.tryComplete();
}
else
noChannel(request);
break;
@ -336,6 +374,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
throw new IllegalArgumentException();
}
}
return false;
}
@Override

View File

@ -37,7 +37,7 @@ public class BeginRequestContentParser extends ContentParser
}
@Override
public boolean parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -78,7 +78,7 @@ public class BeginRequestContentParser extends ContentParser
buffer.position(buffer.position() + 5);
onStart();
reset();
return true;
return Result.COMPLETE;
}
else
{
@ -94,7 +94,7 @@ public class BeginRequestContentParser extends ContentParser
{
onStart();
reset();
return true;
return Result.COMPLETE;
}
break;
}
@ -104,7 +104,7 @@ public class BeginRequestContentParser extends ContentParser
}
}
}
return false;
return Result.PENDING;
}
private void onStart()

View File

@ -86,9 +86,9 @@ public class ClientParser extends Parser
}
@Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
listener.onContent(request, stream, buffer);
return listener.onContent(request, stream, buffer);
}
@Override

View File

@ -29,7 +29,7 @@ public abstract class ContentParser
this.headerParser = headerParser;
}
public abstract boolean parse(ByteBuffer buffer);
public abstract Result parse(ByteBuffer buffer);
public void noContent()
{
@ -45,4 +45,9 @@ public abstract class ContentParser
{
return headerParser.getContentLength();
}
public enum Result
{
PENDING, ASYNC, COMPLETE
}
}

View File

@ -35,7 +35,7 @@ public class EndRequestContentParser extends ContentParser
}
@Override
public boolean parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -76,7 +76,7 @@ public class EndRequestContentParser extends ContentParser
buffer.position(buffer.position() + 3);
onEnd();
reset();
return true;
return Result.COMPLETE;
}
else
{
@ -92,7 +92,7 @@ public class EndRequestContentParser extends ContentParser
{
onEnd();
reset();
return true;
return Result.COMPLETE;
}
break;
}
@ -102,7 +102,7 @@ public class EndRequestContentParser extends ContentParser
}
}
}
return false;
return Result.PENDING;
}
private void onEnd()

View File

@ -32,6 +32,12 @@ public class HeaderParser
private int length;
private int padding;
/**
* Parses the bytes in the given {@code buffer} as FastCGI header bytes
*
* @param buffer the bytes to parse
* @return whether there were enough bytes for a FastCGI header
*/
public boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())

View File

@ -45,7 +45,7 @@ public class ParamsContentParser extends ContentParser
}
@Override
public boolean parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
while (buffer.hasRemaining() || state == State.PARAM)
{
@ -185,7 +185,7 @@ public class ParamsContentParser extends ContentParser
if (length == 0)
{
reset();
return true;
return Result.COMPLETE;
}
break;
}
@ -195,7 +195,7 @@ public class ParamsContentParser extends ContentParser
}
}
}
return false;
return Result.PENDING;
}
@Override

View File

@ -29,7 +29,7 @@ public abstract class Parser
private State state = State.HEADER;
private int padding;
public void parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
while (true)
{
@ -38,7 +38,7 @@ public abstract class Parser
case HEADER:
{
if (!headerParser.parse(buffer))
return;
return false;
state = State.CONTENT;
break;
}
@ -51,8 +51,11 @@ public abstract class Parser
}
else
{
if (!contentParser.parse(buffer))
return;
ContentParser.Result result = contentParser.parse(buffer);
if (result == ContentParser.Result.PENDING)
return false;
else if (result == ContentParser.Result.ASYNC)
return true;
}
padding = headerParser.getPaddingLength();
state = State.PADDING;
@ -70,7 +73,7 @@ public abstract class Parser
{
padding -= buffer.remaining();
buffer.position(buffer.limit());
return;
return false;
}
}
default:
@ -96,7 +99,7 @@ public abstract class Parser
public void onHeaders(int request);
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer);
public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer);
public void onEnd(int request);
@ -115,8 +118,9 @@ public abstract class Parser
}
@Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
return false;
}
@Override

View File

@ -52,7 +52,7 @@ public class ResponseContentParser extends StreamContentParser
}
@Override
protected void onContent(ByteBuffer buffer)
protected boolean onContent(ByteBuffer buffer)
{
int request = getRequest();
ResponseParser parser = parsers.get(request);
@ -61,7 +61,7 @@ public class ResponseContentParser extends StreamContentParser
parser = new ResponseParser(listener, request);
parsers.put(request, parser);
}
parser.parse(buffer);
return parser.parse(buffer);
}
@Override
@ -87,7 +87,7 @@ public class ResponseContentParser extends StreamContentParser
this.httpParser = new FCGIHttpParser(this);
}
public void parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
LOG.debug("Response {} {} content {} {}", request, FCGI.StreamType.STD_OUT, state, buffer);
@ -117,7 +117,8 @@ public class ResponseContentParser extends StreamContentParser
}
case RAW_CONTENT:
{
notifyContent(buffer);
if (notifyContent(buffer))
return true;
remaining = 0;
break;
}
@ -133,6 +134,7 @@ public class ResponseContentParser extends StreamContentParser
}
}
}
return false;
}
@Override
@ -253,15 +255,16 @@ public class ResponseContentParser extends StreamContentParser
return false;
}
private void notifyContent(ByteBuffer buffer)
private boolean notifyContent(ByteBuffer buffer)
{
try
{
listener.onContent(request, FCGI.StreamType.STD_OUT, buffer);
return listener.onContent(request, FCGI.StreamType.STD_OUT, buffer);
}
catch (Throwable x)
{
logger.debug("Exception while invoking listener " + listener, x);
return false;
}
}

View File

@ -41,7 +41,7 @@ public class StreamContentParser extends ContentParser
}
@Override
public boolean parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -59,14 +59,15 @@ public class StreamContentParser extends ContentParser
int limit = buffer.limit();
buffer.limit(buffer.position() + length);
ByteBuffer slice = buffer.slice();
onContent(slice);
buffer.position(buffer.limit());
buffer.limit(limit);
contentLength -= length;
if (onContent(slice))
return Result.ASYNC;
if (contentLength > 0)
break;
state = State.LENGTH;
return true;
return Result.COMPLETE;
}
default:
{
@ -74,7 +75,7 @@ public class StreamContentParser extends ContentParser
}
}
}
return false;
return Result.PENDING;
}
@Override
@ -90,15 +91,16 @@ public class StreamContentParser extends ContentParser
}
}
protected void onContent(ByteBuffer buffer)
protected boolean onContent(ByteBuffer buffer)
{
try
{
listener.onContent(getRequest(), streamType, buffer);
return listener.onContent(getRequest(), streamType, buffer);
}
catch (Throwable x)
{
logger.debug("Exception while invoking listener " + listener, x);
return false;
}
}

View File

@ -158,10 +158,11 @@ public class ClientGeneratorTest
ServerParser parser = new ServerParser(new ServerParser.Listener.Adapter()
{
@Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
Assert.assertEquals(id, request);
totalLength.addAndGet(buffer.remaining());
return false;
}
@Override

View File

@ -117,10 +117,11 @@ public class ClientParserTest
ClientParser parser = new ClientParser(new ClientParser.Listener.Adapter()
{
@Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
Assert.assertEquals(id, request);
verifier.addAndGet(2);
return false;
}
@Override
@ -168,11 +169,12 @@ public class ClientParserTest
ClientParser parser = new ClientParser(new ClientParser.Listener.Adapter()
{
@Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
Assert.assertEquals(id, request);
Assert.assertEquals(contentLength, buffer.remaining());
verifier.addAndGet(2);
return false;
}
@Override
@ -221,10 +223,11 @@ public class ClientParserTest
ClientParser parser = new ClientParser(new ClientParser.Listener.Adapter()
{
@Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
Assert.assertEquals(id, request);
totalLength.addAndGet(buffer.remaining());
return false;
}
@Override

View File

@ -151,7 +151,7 @@ public class ServerFCGIConnection extends AbstractConnection
}
@Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
HttpChannelOverFCGI channel = channels.get(request);
if (LOG.isDebugEnabled())
@ -161,6 +161,7 @@ public class ServerFCGIConnection extends AbstractConnection
if (channel.content(buffer))
channel.dispatch();
}
return false;
}
@Override

View File

@ -32,6 +32,7 @@ import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.proxy.AsyncProxyServlet;
import org.eclipse.jetty.proxy.ProxyServlet;
/**
@ -57,7 +58,7 @@ import org.eclipse.jetty.proxy.ProxyServlet;
*
* @see TryFilesFilter
*/
public class FastCGIProxyServlet extends ProxyServlet.Transparent
public class FastCGIProxyServlet extends AsyncProxyServlet.Transparent
{
public static final String SCRIPT_ROOT_INIT_PARAM = "scriptRoot";
public static final String SCRIPT_PATTERN_INIT_PARAM = "scriptPattern";

View File

@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.GZIPOutputStream;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
@ -47,6 +48,7 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.IO;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback;
import org.junit.Assert;
import org.junit.Test;
@ -567,7 +569,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
// Promise some content, then flush the headers, then fail to send the content.
response.setContentLength(16);
response.flushBuffer();
throw new NullPointerException();
throw new NullPointerException("Explicitly thrown by test");
}
});
@ -627,4 +629,75 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(data, response.getContent());
}
@Test
public void testSmallAsyncContent() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
ServletOutputStream output = response.getOutputStream();
output.write(65);
output.flush();
output.write(66);
}
});
final AtomicInteger contentCount = new AtomicInteger();
final AtomicReference<Callback> callbackRef = new AtomicReference<>();
final AtomicReference<CountDownLatch> contentLatch = new AtomicReference<>(new CountDownLatch(1));
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onResponseContentAsync(new Response.AsyncContentListener()
{
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
contentCount.incrementAndGet();
callbackRef.set(callback);
contentLatch.get().countDown();
}
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
completeLatch.countDown();
}
});
Assert.assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS));
Callback callback = callbackRef.get();
// Wait a while to be sure that the parsing does not proceed.
TimeUnit.MILLISECONDS.sleep(1000);
Assert.assertEquals(1, contentCount.get());
// Succeed the content callback to proceed with parsing.
callbackRef.set(null);
contentLatch.set(new CountDownLatch(1));
callback.succeeded();
Assert.assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS));
callback = callbackRef.get();
// Wait a while to be sure that the parsing does not proceed.
TimeUnit.MILLISECONDS.sleep(1000);
Assert.assertEquals(2, contentCount.get());
Assert.assertEquals(1, completeLatch.getCount());
// Succeed the content callback to proceed with parsing.
callbackRef.set(null);
contentLatch.set(new CountDownLatch(1));
callback.succeeded();
Assert.assertTrue(completeLatch.await(555, TimeUnit.SECONDS));
Assert.assertEquals(2, contentCount.get());
}
}

View File

@ -19,9 +19,12 @@
package org.eclipse.jetty.proxy;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import javax.servlet.ReadListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
@ -79,6 +82,24 @@ public class AsyncProxyServlet extends ProxyServlet
onResponseFailure(request, response, proxyResponse, x);
}
}
public static class Transparent extends AsyncProxyServlet
{
private final TransparentDelegate delegate = new TransparentDelegate(this);
@Override
public void init(ServletConfig config) throws ServletException
{
super.init(config);
delegate.init(config);
}
@Override
protected URI rewriteURI(HttpServletRequest request)
{
return delegate.rewriteURI(request);
}
}
private class StreamReader implements ReadListener, Callback
{

View File

@ -646,26 +646,35 @@ public class ProxyServlet extends HttpServlet
*/
public static class Transparent extends ProxyServlet
{
private String _proxyTo;
private String _prefix;
private final TransparentDelegate delegate = new TransparentDelegate(this);
public Transparent()
@Override
public void init(ServletConfig config) throws ServletException
{
}
public Transparent(String proxyTo, String prefix)
{
_proxyTo = URI.create(proxyTo).normalize().toString();
_prefix = URI.create(prefix).normalize().toString();
super.init(config);
delegate.init(config);
}
@Override
public void init() throws ServletException
protected URI rewriteURI(HttpServletRequest request)
{
super.init();
return delegate.rewriteURI(request);
}
}
ServletConfig config = getServletConfig();
protected static class TransparentDelegate
{
private final ProxyServlet proxyServlet;
private String _proxyTo;
private String _prefix;
protected TransparentDelegate(ProxyServlet proxyServlet)
{
this.proxyServlet = proxyServlet;
}
protected void init(ServletConfig config) throws ServletException
{
String proxyTo = config.getInitParameter("proxyTo");
_proxyTo = proxyTo == null ? _proxyTo : proxyTo;
@ -681,13 +690,12 @@ public class ProxyServlet extends HttpServlet
}
// Adjust prefix value to account for context path
String contextPath = getServletContext().getContextPath();
String contextPath = config.getServletContext().getContextPath();
_prefix = _prefix == null ? contextPath : (contextPath + _prefix);
_log.debug(config.getServletName() + " @ " + _prefix + " to " + _proxyTo);
proxyServlet._log.debug(config.getServletName() + " @ " + _prefix + " to " + _proxyTo);
}
@Override
protected URI rewriteURI(HttpServletRequest request)
{
String path = request.getRequestURI();
@ -706,7 +714,7 @@ public class ProxyServlet extends HttpServlet
uri.append("?").append(query);
URI rewrittenURI = URI.create(uri.toString()).normalize();
if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort()))
if (!proxyServlet.validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort()))
return null;
return rewrittenURI;

View File

@ -593,8 +593,11 @@ public class ProxyServletTest
});
String proxyTo = "http://localhost:" + serverConnector.getLocalPort();
proxyServlet = new ProxyServlet.Transparent(proxyTo, prefix);
prepareProxy();
proxyServlet = new ProxyServlet.Transparent();
Map<String, String> params = new HashMap<>();
params.put("proxyTo", proxyTo);
params.put("prefix", prefix);
prepareProxy(params);
// Make the request to the proxy, it should transparently forward to the server
ContentResponse response = client.newRequest("localhost", proxyConnector.getLocalPort())
@ -632,8 +635,11 @@ public class ProxyServletTest
String proxyTo = "http://localhost:" + serverConnector.getLocalPort();
String prefix = "/proxy";
proxyServlet = new ProxyServlet.Transparent(proxyTo, prefix);
prepareProxy();
proxyServlet = new ProxyServlet.Transparent();
Map<String, String> params = new HashMap<>();
params.put("proxyTo", proxyTo);
params.put("prefix", prefix);
prepareProxy(params);
// Make the request to the proxy, it should transparently forward to the server
ContentResponse response = client.newRequest("localhost", proxyConnector.getLocalPort())