diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java index 36d91e076ab..59775694ebb 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java @@ -103,11 +103,13 @@ public class HttpChannelOverFCGI extends HttpChannel return exchange != null && receiver.responseHeaders(exchange); } - protected boolean content(ByteBuffer buffer) + protected boolean content(ByteBuffer buffer, Callback callback) { HttpExchange exchange = getHttpExchange(); - // TODO: handle callback properly - return exchange != null && receiver.responseContent(exchange, buffer, new Callback.Adapter()); + if (exchange != null) + return receiver.responseContent(exchange, buffer, callback); + callback.succeeded(); + return false; } protected boolean responseSuccess() diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java index f14143bfb0c..caafebaff80 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java @@ -45,6 +45,7 @@ import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.CompletableCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -60,6 +61,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec private final boolean multiplexed; private final Delegate delegate; private final ClientParser parser; + private ByteBuffer buffer; public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed) { @@ -98,48 +100,66 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec @Override public void onFillable() { - EndPoint endPoint = getEndPoint(); HttpClient client = destination.getHttpClient(); ByteBufferPool bufferPool = client.getByteBufferPool(); - ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true); - try + buffer = bufferPool.acquire(client.getResponseBufferSize(), true); + process(); + } + + private void process() + { + if (readAndParse()) { - while (true) + HttpClient client = destination.getHttpClient(); + ByteBufferPool bufferPool = client.getByteBufferPool(); + bufferPool.release(buffer); + // Don't linger the buffer around if we are idle. + buffer = null; + } + } + + private boolean readAndParse() + { + EndPoint endPoint = getEndPoint(); + ByteBuffer buffer = this.buffer; + while (true) + { + try { + if (!parse(buffer)) + return false; + int read = endPoint.fill(buffer); if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'. LOG.debug("Read {} bytes from {}", read, endPoint); if (read > 0) { - parse(buffer); + if (!parse(buffer)) + return false; } else if (read == 0) { fillInterested(); - break; + return true; } else { shutdown(); - break; + return true; } } - } - catch (Exception x) - { - LOG.debug(x); - close(x); - } - finally - { - bufferPool.release(buffer); + catch (Exception x) + { + LOG.debug(x); + close(x); + return false; + } } } - private void parse(ByteBuffer buffer) + private boolean parse(ByteBuffer buffer) { - while (buffer.hasRemaining()) - parser.parse(buffer); + return !parser.parse(buffer); } private void shutdown() @@ -313,7 +333,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec } @Override - public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) + public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { switch (stream) { @@ -321,7 +341,25 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec { HttpChannelOverFCGI channel = channels.get(request); if (channel != null) - channel.content(buffer); + { + CompletableCallback callback = new CompletableCallback() + { + @Override + public void resume() + { + LOG.debug("Content consumed asynchronously, resuming processing"); + process(); + } + + @Override + public void abort(Throwable x) + { + close(x); + } + }; + channel.content(buffer, callback); + return callback.tryComplete(); + } else noChannel(request); break; @@ -336,6 +374,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec throw new IllegalArgumentException(); } } + return false; } @Override diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/BeginRequestContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/BeginRequestContentParser.java index 09c0755c10c..50911bc0d7e 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/BeginRequestContentParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/BeginRequestContentParser.java @@ -37,7 +37,7 @@ public class BeginRequestContentParser extends ContentParser } @Override - public boolean parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { while (buffer.hasRemaining()) { @@ -78,7 +78,7 @@ public class BeginRequestContentParser extends ContentParser buffer.position(buffer.position() + 5); onStart(); reset(); - return true; + return Result.COMPLETE; } else { @@ -94,7 +94,7 @@ public class BeginRequestContentParser extends ContentParser { onStart(); reset(); - return true; + return Result.COMPLETE; } break; } @@ -104,7 +104,7 @@ public class BeginRequestContentParser extends ContentParser } } } - return false; + return Result.PENDING; } private void onStart() diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java index d7a7dc6604d..88a5e7a2e72 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java @@ -86,9 +86,9 @@ public class ClientParser extends Parser } @Override - public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) + public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { - listener.onContent(request, stream, buffer); + return listener.onContent(request, stream, buffer); } @Override diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ContentParser.java index 500396ae9ef..55bf6d2c30d 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ContentParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ContentParser.java @@ -29,7 +29,7 @@ public abstract class ContentParser this.headerParser = headerParser; } - public abstract boolean parse(ByteBuffer buffer); + public abstract Result parse(ByteBuffer buffer); public void noContent() { @@ -45,4 +45,9 @@ public abstract class ContentParser { return headerParser.getContentLength(); } + + public enum Result + { + PENDING, ASYNC, COMPLETE + } } diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/EndRequestContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/EndRequestContentParser.java index 1f77eaf0ea6..b2198ad59d6 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/EndRequestContentParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/EndRequestContentParser.java @@ -35,7 +35,7 @@ public class EndRequestContentParser extends ContentParser } @Override - public boolean parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { while (buffer.hasRemaining()) { @@ -76,7 +76,7 @@ public class EndRequestContentParser extends ContentParser buffer.position(buffer.position() + 3); onEnd(); reset(); - return true; + return Result.COMPLETE; } else { @@ -92,7 +92,7 @@ public class EndRequestContentParser extends ContentParser { onEnd(); reset(); - return true; + return Result.COMPLETE; } break; } @@ -102,7 +102,7 @@ public class EndRequestContentParser extends ContentParser } } } - return false; + return Result.PENDING; } private void onEnd() diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/HeaderParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/HeaderParser.java index 496275a49fc..85c34d0c3eb 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/HeaderParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/HeaderParser.java @@ -32,6 +32,12 @@ public class HeaderParser private int length; private int padding; + /** + * Parses the bytes in the given {@code buffer} as FastCGI header bytes + * + * @param buffer the bytes to parse + * @return whether there were enough bytes for a FastCGI header + */ public boolean parse(ByteBuffer buffer) { while (buffer.hasRemaining()) diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ParamsContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ParamsContentParser.java index 7f2805a2b27..42cbee811da 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ParamsContentParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ParamsContentParser.java @@ -45,7 +45,7 @@ public class ParamsContentParser extends ContentParser } @Override - public boolean parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { while (buffer.hasRemaining() || state == State.PARAM) { @@ -185,7 +185,7 @@ public class ParamsContentParser extends ContentParser if (length == 0) { reset(); - return true; + return Result.COMPLETE; } break; } @@ -195,7 +195,7 @@ public class ParamsContentParser extends ContentParser } } } - return false; + return Result.PENDING; } @Override diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java index 5739ef32012..45a348f1396 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java @@ -29,7 +29,7 @@ public abstract class Parser private State state = State.HEADER; private int padding; - public void parse(ByteBuffer buffer) + public boolean parse(ByteBuffer buffer) { while (true) { @@ -38,7 +38,7 @@ public abstract class Parser case HEADER: { if (!headerParser.parse(buffer)) - return; + return false; state = State.CONTENT; break; } @@ -51,8 +51,11 @@ public abstract class Parser } else { - if (!contentParser.parse(buffer)) - return; + ContentParser.Result result = contentParser.parse(buffer); + if (result == ContentParser.Result.PENDING) + return false; + else if (result == ContentParser.Result.ASYNC) + return true; } padding = headerParser.getPaddingLength(); state = State.PADDING; @@ -70,7 +73,7 @@ public abstract class Parser { padding -= buffer.remaining(); buffer.position(buffer.limit()); - return; + return false; } } default: @@ -96,7 +99,7 @@ public abstract class Parser public void onHeaders(int request); - public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer); + public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer); public void onEnd(int request); @@ -115,8 +118,9 @@ public abstract class Parser } @Override - public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) + public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { + return false; } @Override diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java index b130c1374ee..26983fe0e7c 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java @@ -52,7 +52,7 @@ public class ResponseContentParser extends StreamContentParser } @Override - protected void onContent(ByteBuffer buffer) + protected boolean onContent(ByteBuffer buffer) { int request = getRequest(); ResponseParser parser = parsers.get(request); @@ -61,7 +61,7 @@ public class ResponseContentParser extends StreamContentParser parser = new ResponseParser(listener, request); parsers.put(request, parser); } - parser.parse(buffer); + return parser.parse(buffer); } @Override @@ -87,7 +87,7 @@ public class ResponseContentParser extends StreamContentParser this.httpParser = new FCGIHttpParser(this); } - public void parse(ByteBuffer buffer) + public boolean parse(ByteBuffer buffer) { LOG.debug("Response {} {} content {} {}", request, FCGI.StreamType.STD_OUT, state, buffer); @@ -117,7 +117,8 @@ public class ResponseContentParser extends StreamContentParser } case RAW_CONTENT: { - notifyContent(buffer); + if (notifyContent(buffer)) + return true; remaining = 0; break; } @@ -133,6 +134,7 @@ public class ResponseContentParser extends StreamContentParser } } } + return false; } @Override @@ -253,15 +255,16 @@ public class ResponseContentParser extends StreamContentParser return false; } - private void notifyContent(ByteBuffer buffer) + private boolean notifyContent(ByteBuffer buffer) { try { - listener.onContent(request, FCGI.StreamType.STD_OUT, buffer); + return listener.onContent(request, FCGI.StreamType.STD_OUT, buffer); } catch (Throwable x) { logger.debug("Exception while invoking listener " + listener, x); + return false; } } diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java index b59593dd25a..655a4d24bf6 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java @@ -41,7 +41,7 @@ public class StreamContentParser extends ContentParser } @Override - public boolean parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { while (buffer.hasRemaining()) { @@ -59,14 +59,15 @@ public class StreamContentParser extends ContentParser int limit = buffer.limit(); buffer.limit(buffer.position() + length); ByteBuffer slice = buffer.slice(); - onContent(slice); buffer.position(buffer.limit()); buffer.limit(limit); contentLength -= length; + if (onContent(slice)) + return Result.ASYNC; if (contentLength > 0) break; state = State.LENGTH; - return true; + return Result.COMPLETE; } default: { @@ -74,7 +75,7 @@ public class StreamContentParser extends ContentParser } } } - return false; + return Result.PENDING; } @Override @@ -90,15 +91,16 @@ public class StreamContentParser extends ContentParser } } - protected void onContent(ByteBuffer buffer) + protected boolean onContent(ByteBuffer buffer) { try { - listener.onContent(getRequest(), streamType, buffer); + return listener.onContent(getRequest(), streamType, buffer); } catch (Throwable x) { logger.debug("Exception while invoking listener " + listener, x); + return false; } } diff --git a/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/generator/ClientGeneratorTest.java b/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/generator/ClientGeneratorTest.java index 5756c01e512..c1307599f73 100644 --- a/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/generator/ClientGeneratorTest.java +++ b/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/generator/ClientGeneratorTest.java @@ -158,10 +158,11 @@ public class ClientGeneratorTest ServerParser parser = new ServerParser(new ServerParser.Listener.Adapter() { @Override - public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) + public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { Assert.assertEquals(id, request); totalLength.addAndGet(buffer.remaining()); + return false; } @Override diff --git a/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java b/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java index 826ed84b43e..fdd2acb2f8b 100644 --- a/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java +++ b/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java @@ -117,10 +117,11 @@ public class ClientParserTest ClientParser parser = new ClientParser(new ClientParser.Listener.Adapter() { @Override - public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) + public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { Assert.assertEquals(id, request); verifier.addAndGet(2); + return false; } @Override @@ -168,11 +169,12 @@ public class ClientParserTest ClientParser parser = new ClientParser(new ClientParser.Listener.Adapter() { @Override - public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) + public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { Assert.assertEquals(id, request); Assert.assertEquals(contentLength, buffer.remaining()); verifier.addAndGet(2); + return false; } @Override @@ -221,10 +223,11 @@ public class ClientParserTest ClientParser parser = new ClientParser(new ClientParser.Listener.Adapter() { @Override - public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) + public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { Assert.assertEquals(id, request); totalLength.addAndGet(buffer.remaining()); + return false; } @Override diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java index 2e8187966e0..1aa253a1fb1 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java @@ -151,7 +151,7 @@ public class ServerFCGIConnection extends AbstractConnection } @Override - public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) + public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { HttpChannelOverFCGI channel = channels.get(request); if (LOG.isDebugEnabled()) @@ -161,6 +161,7 @@ public class ServerFCGIConnection extends AbstractConnection if (channel.content(buffer)) channel.dispatch(); } + return false; } @Override diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/proxy/FastCGIProxyServlet.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/proxy/FastCGIProxyServlet.java index f8926e96bb9..2daa22132c6 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/proxy/FastCGIProxyServlet.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/proxy/FastCGIProxyServlet.java @@ -32,6 +32,7 @@ import org.eclipse.jetty.fcgi.FCGI; import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpScheme; +import org.eclipse.jetty.proxy.AsyncProxyServlet; import org.eclipse.jetty.proxy.ProxyServlet; /** @@ -57,7 +58,7 @@ import org.eclipse.jetty.proxy.ProxyServlet; * * @see TryFilesFilter */ -public class FastCGIProxyServlet extends ProxyServlet.Transparent +public class FastCGIProxyServlet extends AsyncProxyServlet.Transparent { public static final String SCRIPT_ROOT_INIT_PARAM = "scriptRoot"; public static final String SCRIPT_PATTERN_INIT_PARAM = "scriptPattern"; diff --git a/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java b/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java index 2146f6305be..e97750f8b50 100644 --- a/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java +++ b/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.zip.GZIPOutputStream; import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; @@ -47,6 +48,7 @@ import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.IO; import org.eclipse.jetty.toolchain.test.annotation.Slow; +import org.eclipse.jetty.util.Callback; import org.junit.Assert; import org.junit.Test; @@ -567,7 +569,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest // Promise some content, then flush the headers, then fail to send the content. response.setContentLength(16); response.flushBuffer(); - throw new NullPointerException(); + throw new NullPointerException("Explicitly thrown by test"); } }); @@ -627,4 +629,75 @@ public class HttpClientTest extends AbstractHttpClientServerTest Assert.assertEquals(200, response.getStatus()); Assert.assertArrayEquals(data, response.getContent()); } + + @Test + public void testSmallAsyncContent() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + ServletOutputStream output = response.getOutputStream(); + output.write(65); + output.flush(); + output.write(66); + } + }); + + final AtomicInteger contentCount = new AtomicInteger(); + final AtomicReference callbackRef = new AtomicReference<>(); + final AtomicReference contentLatch = new AtomicReference<>(new CountDownLatch(1)); + final CountDownLatch completeLatch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .onResponseContentAsync(new Response.AsyncContentListener() + { + @Override + public void onContent(Response response, ByteBuffer content, Callback callback) + { + contentCount.incrementAndGet(); + callbackRef.set(callback); + contentLatch.get().countDown(); + } + }) + .send(new Response.CompleteListener() + { + @Override + public void onComplete(Result result) + { + completeLatch.countDown(); + } + }); + + Assert.assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS)); + Callback callback = callbackRef.get(); + + // Wait a while to be sure that the parsing does not proceed. + TimeUnit.MILLISECONDS.sleep(1000); + + Assert.assertEquals(1, contentCount.get()); + + // Succeed the content callback to proceed with parsing. + callbackRef.set(null); + contentLatch.set(new CountDownLatch(1)); + callback.succeeded(); + + Assert.assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS)); + callback = callbackRef.get(); + + // Wait a while to be sure that the parsing does not proceed. + TimeUnit.MILLISECONDS.sleep(1000); + + Assert.assertEquals(2, contentCount.get()); + Assert.assertEquals(1, completeLatch.getCount()); + + // Succeed the content callback to proceed with parsing. + callbackRef.set(null); + contentLatch.set(new CountDownLatch(1)); + callback.succeeded(); + + Assert.assertTrue(completeLatch.await(555, TimeUnit.SECONDS)); + Assert.assertEquals(2, contentCount.get()); + } } diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java index a1a9eedc091..7e22db075e1 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java @@ -19,9 +19,12 @@ package org.eclipse.jetty.proxy; import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.WritePendingException; import javax.servlet.ReadListener; +import javax.servlet.ServletConfig; +import javax.servlet.ServletException; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; @@ -79,6 +82,24 @@ public class AsyncProxyServlet extends ProxyServlet onResponseFailure(request, response, proxyResponse, x); } } + + public static class Transparent extends AsyncProxyServlet + { + private final TransparentDelegate delegate = new TransparentDelegate(this); + + @Override + public void init(ServletConfig config) throws ServletException + { + super.init(config); + delegate.init(config); + } + + @Override + protected URI rewriteURI(HttpServletRequest request) + { + return delegate.rewriteURI(request); + } + } private class StreamReader implements ReadListener, Callback { diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java index e37bd297064..7f1fe069f96 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java @@ -646,26 +646,35 @@ public class ProxyServlet extends HttpServlet */ public static class Transparent extends ProxyServlet { - private String _proxyTo; - private String _prefix; + private final TransparentDelegate delegate = new TransparentDelegate(this); - public Transparent() + @Override + public void init(ServletConfig config) throws ServletException { - } - - public Transparent(String proxyTo, String prefix) - { - _proxyTo = URI.create(proxyTo).normalize().toString(); - _prefix = URI.create(prefix).normalize().toString(); + super.init(config); + delegate.init(config); } @Override - public void init() throws ServletException + protected URI rewriteURI(HttpServletRequest request) { - super.init(); + return delegate.rewriteURI(request); + } + } - ServletConfig config = getServletConfig(); + protected static class TransparentDelegate + { + private final ProxyServlet proxyServlet; + private String _proxyTo; + private String _prefix; + protected TransparentDelegate(ProxyServlet proxyServlet) + { + this.proxyServlet = proxyServlet; + } + + protected void init(ServletConfig config) throws ServletException + { String proxyTo = config.getInitParameter("proxyTo"); _proxyTo = proxyTo == null ? _proxyTo : proxyTo; @@ -681,13 +690,12 @@ public class ProxyServlet extends HttpServlet } // Adjust prefix value to account for context path - String contextPath = getServletContext().getContextPath(); + String contextPath = config.getServletContext().getContextPath(); _prefix = _prefix == null ? contextPath : (contextPath + _prefix); - _log.debug(config.getServletName() + " @ " + _prefix + " to " + _proxyTo); + proxyServlet._log.debug(config.getServletName() + " @ " + _prefix + " to " + _proxyTo); } - @Override protected URI rewriteURI(HttpServletRequest request) { String path = request.getRequestURI(); @@ -706,7 +714,7 @@ public class ProxyServlet extends HttpServlet uri.append("?").append(query); URI rewrittenURI = URI.create(uri.toString()).normalize(); - if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) + if (!proxyServlet.validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) return null; return rewrittenURI; diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java index 30c8b382eb1..c599fd61228 100644 --- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java +++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java @@ -593,8 +593,11 @@ public class ProxyServletTest }); String proxyTo = "http://localhost:" + serverConnector.getLocalPort(); - proxyServlet = new ProxyServlet.Transparent(proxyTo, prefix); - prepareProxy(); + proxyServlet = new ProxyServlet.Transparent(); + Map params = new HashMap<>(); + params.put("proxyTo", proxyTo); + params.put("prefix", prefix); + prepareProxy(params); // Make the request to the proxy, it should transparently forward to the server ContentResponse response = client.newRequest("localhost", proxyConnector.getLocalPort()) @@ -632,8 +635,11 @@ public class ProxyServletTest String proxyTo = "http://localhost:" + serverConnector.getLocalPort(); String prefix = "/proxy"; - proxyServlet = new ProxyServlet.Transparent(proxyTo, prefix); - prepareProxy(); + proxyServlet = new ProxyServlet.Transparent(); + Map params = new HashMap<>(); + params.put("proxyTo", proxyTo); + params.put("prefix", prefix); + prepareProxy(params); // Make the request to the proxy, it should transparently forward to the server ContentResponse response = client.newRequest("localhost", proxyConnector.getLocalPort())