480452 - Large downloads via FastCGI proxy keep HttpClient connections active.

Fixed by returning false from ResponseContentParser.ResponseParser
.messageComplete(), so that the FastCGI parsing can continue and
parse the END_REQUEST frame.
This commit is contained in:
Simone Bordet 2015-10-22 20:48:57 +02:00
parent e5e3b05817
commit 5cf6629385
3 changed files with 81 additions and 30 deletions

View File

@ -18,11 +18,13 @@
package org.eclipse.jetty.fcgi.parser;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
@ -32,6 +34,14 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>The parser for STDOUT type frames.</p>
* <p>STDOUT frames contain both the HTTP headers (but not the response line)
* and the HTTP content (either Content-Length delimited or chunked).</p>
* <p>For this reason, a special HTTP parser is used to parse the frames body.
* This special HTTP parser is configured to skip the response line, and to
* parse HTTP headers and HTTP content.</p>
*/
public class ResponseContentParser extends StreamContentParser
{
private static final Logger LOG = Log.getLogger(ResponseContentParser.class);
@ -245,12 +255,12 @@ public class ResponseContentParser extends StreamContentParser
{
if (!seenResponseCode)
{
// No Status header but we have other headers, assume 200 OK
// No Status header but we have other headers, assume 200 OK.
notifyBegin(200, "OK");
notifyHeaders(fields);
}
notifyHeaders();
// Return from parsing so that we can parse the content
// Return from HTTP parsing so that we can parse the content.
return true;
}
@ -277,21 +287,34 @@ public class ResponseContentParser extends StreamContentParser
@Override
public boolean messageComplete()
{
// Return from parsing so that we can parse the next headers or the raw content.
// No need to notify the listener because it will be done by FCGI_END_REQUEST.
return true;
// No need to notify the end of the response to the
// listener because it will be done by FCGI_END_REQUEST.
return false;
}
@Override
public void earlyEOF()
{
// TODO
fail(new EOFException());
}
@Override
public void badMessage(int status, String reason)
{
// TODO
fail(new BadMessageException(status, reason));
}
protected void fail(Throwable failure)
{
try
{
listener.onFailure(request, failure);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Exception while invoking listener " + listener, x);
}
}
}

View File

@ -95,6 +95,35 @@ public class HttpClientTest extends AbstractHttpClientServerTest
}
}
@Test
public void testGETResponseWithBigContent() throws Exception
{
final byte[] data = new byte[16 * 1024 * 1024];
new Random().nextBytes(data);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
// Setting the Content-Length triggers the HTTP
// content mode for response content parsing,
// otherwise the RAW content mode is used.
response.setContentLength(data.length);
response.getOutputStream().write(data);
baseRequest.setHandled(true);
}
});
Request request = client.newRequest(scheme + "://localhost:" + connector.getLocalPort());
FutureResponseListener listener = new FutureResponseListener(request, data.length);
request.send(listener);
ContentResponse response = listener.get(15, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
byte[] content = response.getContent();
Assert.assertArrayEquals(data, content);
}
@Test
public void testGETWithParametersResponseWithContent() throws Exception
{
@ -420,7 +449,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testConnectionIdleTimeout() throws Exception
{

View File

@ -19,11 +19,9 @@
package org.eclipse.jetty.fcgi.server.proxy;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@ -32,7 +30,6 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.fcgi.server.ServerFCGIConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
@ -40,7 +37,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@ -51,9 +48,9 @@ import org.junit.runners.Parameterized;
public class FastCGIProxyServletTest
{
@Parameterized.Parameters
public static Collection<Object[]> parameters()
public static Object[] parameters()
{
return Arrays.asList(new Object[]{true}, new Object[]{false});
return new Object[]{true, false};
}
private final boolean sendStatus200;
@ -69,7 +66,9 @@ public class FastCGIProxyServletTest
public void prepare(HttpServlet servlet) throws Exception
{
server = new Server();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
httpConnector = new ServerConnector(server);
server.addConnector(httpConnector);
@ -89,14 +88,18 @@ public class FastCGIProxyServletTest
}
};
ServletHolder fcgiServletHolder = new ServletHolder(fcgiServlet);
context.addServlet(fcgiServletHolder, "*.php");
fcgiServletHolder.setName("fcgi");
fcgiServletHolder.setInitParameter(FastCGIProxyServlet.SCRIPT_ROOT_INIT_PARAM, "/scriptRoot");
fcgiServletHolder.setInitParameter("proxyTo", "http://localhost");
fcgiServletHolder.setInitParameter(FastCGIProxyServlet.SCRIPT_PATTERN_INIT_PARAM, "(.+?\\.php)");
context.addServlet(fcgiServletHolder, "*.php");
context.addServlet(new ServletHolder(servlet), servletPath + "/*");
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client = new HttpClient();
client.setExecutor(clientThreads);
server.addBean(client);
server.start();
@ -144,21 +147,17 @@ public class FastCGIProxyServletTest
});
Request request = client.newRequest("localhost", httpConnector.getLocalPort())
.onResponseContentAsync(new Response.AsyncContentListener()
.onResponseContentAsync((response, content, callback) ->
{
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
try
{
try
{
if (delay > 0)
TimeUnit.MILLISECONDS.sleep(delay);
callback.succeeded();
}
catch (InterruptedException x)
{
callback.failed(x);
}
if (delay > 0)
TimeUnit.MILLISECONDS.sleep(delay);
callback.succeeded();
}
catch (InterruptedException x)
{
callback.failed(x);
}
})
.path(path);