432270 - Slow requests with response content delimited by EOF fail.

Fixed also in the FastCGI module.
This commit is contained in:
Simone Bordet 2014-04-11 15:57:26 +02:00
parent eb45d45dbf
commit 5eeda38f0a
13 changed files with 249 additions and 71 deletions

View File

@ -215,6 +215,10 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
LOG.debug("Closed {}", this);
}
public void release(Connection connection)
{
}
public void close(Connection connection)
{
}

View File

@ -140,8 +140,11 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
protected abstract void send(C connection, HttpExchange exchange);
public void release(C connection)
@Override
public void release(Connection c)
{
@SuppressWarnings("unchecked")
C connection = (C)c;
LOG.debug("{} released", connection);
HttpClient client = getHttpClient();
if (client.isRunning())

View File

@ -28,8 +28,6 @@ import org.eclipse.jetty.fcgi.generator.Flusher;
import org.eclipse.jetty.fcgi.generator.Generator;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.IdleTimeout;
@ -83,42 +81,43 @@ public class HttpChannelOverFCGI extends HttpChannel
return receiver.abort(cause);
}
protected void responseBegin(int code, String reason)
protected boolean responseBegin(int code, String reason)
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
{
exchange.getResponse().version(version).status(code).reason(reason);
receiver.responseBegin(exchange);
}
if (exchange == null)
return false;
exchange.getResponse().version(version).status(code).reason(reason);
return receiver.responseBegin(exchange);
}
protected void responseHeader(HttpField field)
protected boolean responseHeader(HttpField field)
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseHeader(exchange, field);
return exchange != null && receiver.responseHeader(exchange, field);
}
protected void responseHeaders()
protected boolean responseHeaders()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseHeaders(exchange);
return exchange != null && receiver.responseHeaders(exchange);
}
protected void content(ByteBuffer buffer)
protected boolean content(ByteBuffer buffer)
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseContent(exchange, buffer);
return exchange != null && receiver.responseContent(exchange, buffer);
}
protected void responseSuccess()
protected boolean responseSuccess()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseSuccess(exchange);
return exchange != null && receiver.responseSuccess(exchange);
}
protected boolean responseFailure(Throwable failure)
{
HttpExchange exchange = getHttpExchange();
return exchange != null && receiver.responseFailure(failure);
}
@Override
@ -126,12 +125,10 @@ public class HttpChannelOverFCGI extends HttpChannel
{
super.exchangeTerminated(result);
idle.onClose();
boolean close = result.isFailed();
HttpFields responseHeaders = result.getResponse().getHeaders();
close |= responseHeaders.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
if (close)
connection.close();
else
if (result.isFailed())
connection.close(result.getFailure());
else if (!connection.closeByHTTP(responseHeaders))
connection.release(this);
}
@ -154,7 +151,8 @@ public class HttpChannelOverFCGI extends HttpChannel
@Override
protected void onIdleExpired(TimeoutException timeout)
{
LOG.debug("Idle timeout for request {}", request);
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout for request {}", request);
connection.abort(timeout);
}

View File

@ -69,7 +69,7 @@ public class HttpClientTransportOverFCGI extends AbstractHttpClientTransport
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpConnectionOverFCGI connection = new HttpConnectionOverFCGI(endPoint, destination);
HttpConnectionOverFCGI connection = new HttpConnectionOverFCGI(endPoint, destination, isMultiplexed());
LOG.debug("Created {}", connection);
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);

View File

@ -31,7 +31,6 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.PoolingHttpDestination;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@ -39,6 +38,9 @@ import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.fcgi.generator.Flusher;
import org.eclipse.jetty.fcgi.parser.ClientParser;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
@ -55,14 +57,16 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
private final AtomicBoolean closed = new AtomicBoolean();
private final Flusher flusher;
private final HttpDestination destination;
private final boolean multiplexed;
private final Delegate delegate;
private final ClientParser parser;
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination)
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed)
{
super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO());
this.flusher = new Flusher(endPoint);
this.destination = destination;
this.multiplexed = multiplexed;
this.flusher = new Flusher(endPoint);
this.delegate = new Delegate(destination);
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);
@ -103,7 +107,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
while (true)
{
int read = endPoint.fill(buffer);
if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'
if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'.
LOG.debug("Read {} bytes from {}", read, endPoint);
if (read > 0)
{
@ -124,7 +128,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
catch (Exception x)
{
LOG.debug(x);
// TODO: fail and close ?
close(x);
}
finally
{
@ -140,7 +144,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
private void shutdown()
{
close(new EOFException());
// Close explicitly only if we are idle, since the request may still
// be in progress, otherwise close only if we can fail the responses.
if (channels.isEmpty())
close();
else
failAndClose(new EOFException());
}
@Override
@ -153,13 +162,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
protected void release(HttpChannelOverFCGI channel)
{
channels.remove(channel.getRequest());
if (destination instanceof PoolingHttpDestination)
{
@SuppressWarnings("unchecked")
PoolingHttpDestination<HttpConnectionOverFCGI> fcgiDestination =
(PoolingHttpDestination<HttpConnectionOverFCGI>)destination;
fcgiDestination.release(this);
}
destination.release(this);
}
@Override
@ -168,7 +171,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
close(new AsynchronousCloseException());
}
private void close(Throwable failure)
protected void close(Throwable failure)
{
if (closed.compareAndSet(false, true))
{
@ -184,6 +187,16 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
}
}
protected boolean closeByHTTP(HttpFields fields)
{
if (multiplexed)
return false;
if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))
return false;
close();
return true;
}
protected void abort(Throwable failure)
{
for (HttpChannelOverFCGI channel : channels.values())
@ -195,6 +208,15 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
channels.clear();
}
private void failAndClose(Throwable failure)
{
boolean result = false;
for (HttpChannelOverFCGI channel : channels.values())
result |= channel.responseFailure(failure);
if (result)
close(failure);
}
private int acquireRequest()
{
synchronized (requests)
@ -322,8 +344,23 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
HttpChannelOverFCGI channel = channels.get(request);
if (channel != null)
{
channel.responseSuccess();
releaseRequest(request);
if (channel.responseSuccess())
releaseRequest(request);
}
else
{
noChannel(request);
}
}
@Override
public void onFailure(int request, Throwable failure)
{
HttpChannelOverFCGI channel = channels.get(request);
if (channel != null)
{
if (channel.responseFailure(failure))
releaseRequest(request);
}
else
{

View File

@ -88,21 +88,36 @@ public class ServerGenerator extends Generator
return generateContent(request, buffer, true, false, callback, FCGI.FrameType.STDOUT);
}
public Result generateResponseContent(int request, ByteBuffer content, boolean lastContent, Callback callback)
public Result generateResponseContent(int request, ByteBuffer content, boolean lastContent, boolean aborted, Callback callback)
{
Result result = generateContent(request, content, false, lastContent, callback, FCGI.FrameType.STDOUT);
if (lastContent)
if (aborted)
{
// Generate the FCGI_END_REQUEST
request &= 0xFF_FF;
ByteBuffer endRequestBuffer = byteBufferPool.acquire(8, false);
BufferUtil.clearToFill(endRequestBuffer);
endRequestBuffer.putInt(0x01_03_00_00 + request);
endRequestBuffer.putInt(0x00_08_00_00);
endRequestBuffer.putLong(0x00L);
endRequestBuffer.flip();
result = result.append(endRequestBuffer, true);
Result result = new Result(byteBufferPool, callback);
if (lastContent)
result.append(generateEndRequest(request, true), true);
else
result.append(BufferUtil.EMPTY_BUFFER, false);
return result;
}
return result;
else
{
Result result = generateContent(request, content, false, lastContent, callback, FCGI.FrameType.STDOUT);
if (lastContent)
result.append(generateEndRequest(request, false), true);
return result;
}
}
private ByteBuffer generateEndRequest(int request, boolean aborted)
{
request &= 0xFF_FF;
ByteBuffer endRequestBuffer = byteBufferPool.acquire(8, false);
BufferUtil.clearToFill(endRequestBuffer);
endRequestBuffer.putInt(0x01_03_00_00 + request);
endRequestBuffer.putInt(0x00_08_00_00);
endRequestBuffer.putInt(aborted ? 1 : 0);
endRequestBuffer.putInt(0);
endRequestBuffer.flip();
return endRequestBuffer;
}
}

View File

@ -98,5 +98,13 @@ public class ClientParser extends Parser
for (StreamContentParser streamParser : streamParsers)
streamParser.end(request);
}
@Override
public void onFailure(int request, Throwable failure)
{
listener.onFailure(request, failure);
for (StreamContentParser streamParser : streamParsers)
streamParser.end(request);
}
}
}

View File

@ -107,8 +107,12 @@ public class EndRequestContentParser extends ContentParser
private void onEnd()
{
// TODO: if protocol != 0, invoke an error callback
listener.onEnd(getRequest());
if (application != 0)
listener.onFailure(getRequest(), new Exception("FastCGI application returned code " + application));
else if (protocol != 0)
listener.onFailure(getRequest(), new Exception("FastCGI server returned code " + protocol));
else
listener.onEnd(getRequest());
}
private void reset()

View File

@ -100,6 +100,8 @@ public abstract class Parser
public void onEnd(int request);
public void onFailure(int request, Throwable failure);
public static class Adapter implements Listener
{
@Override
@ -121,6 +123,12 @@ public abstract class Parser
public void onEnd(int request)
{
}
@Override
public void onFailure(int request, Throwable failure)
{
}
}
}

View File

@ -111,7 +111,7 @@ public class ClientParserTest
ByteBufferPool byteBufferPool = new MappedByteBufferPool();
ServerGenerator generator = new ServerGenerator(byteBufferPool);
Generator.Result result1 = generator.generateResponseHeaders(id, 200, "OK", fields, null);
Generator.Result result2 = generator.generateResponseContent(id, null, true, null);
Generator.Result result2 = generator.generateResponseContent(id, null, true, false, null);
final AtomicInteger verifier = new AtomicInteger();
ClientParser parser = new ClientParser(new ClientParser.Listener.Adapter()
@ -162,7 +162,7 @@ public class ClientParserTest
ByteBufferPool byteBufferPool = new MappedByteBufferPool();
ServerGenerator generator = new ServerGenerator(byteBufferPool);
Generator.Result result1 = generator.generateResponseHeaders(id, code, "OK", fields, null);
Generator.Result result2 = generator.generateResponseContent(id, content, true, null);
Generator.Result result2 = generator.generateResponseContent(id, content, true, false, null);
final AtomicInteger verifier = new AtomicInteger();
ClientParser parser = new ClientParser(new ClientParser.Listener.Adapter()
@ -214,7 +214,7 @@ public class ClientParserTest
ByteBufferPool byteBufferPool = new MappedByteBufferPool();
ServerGenerator generator = new ServerGenerator(byteBufferPool);
Generator.Result result1 = generator.generateResponseHeaders(id, code, "OK", fields, null);
Generator.Result result2 = generator.generateResponseContent(id, content, true, null);
Generator.Result result2 = generator.generateResponseContent(id, content, true, false, null);
final AtomicInteger totalLength = new AtomicInteger();
final AtomicBoolean verifier = new AtomicBoolean();

View File

@ -24,6 +24,8 @@ import org.eclipse.jetty.fcgi.generator.Flusher;
import org.eclipse.jetty.fcgi.generator.Generator;
import org.eclipse.jetty.fcgi.generator.ServerGenerator;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.BufferUtil;
@ -35,6 +37,8 @@ public class HttpTransportOverFCGI implements HttpTransport
private final Flusher flusher;
private final int request;
private volatile boolean head;
private volatile boolean shutdown;
private volatile boolean aborted;
public HttpTransportOverFCGI(ByteBufferPool byteBufferPool, Flusher flusher, int request)
{
@ -47,13 +51,15 @@ public class HttpTransportOverFCGI implements HttpTransport
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
{
boolean head = this.head = info.isHead();
boolean shutdown = this.shutdown = info.getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
if (head)
{
if (lastContent)
{
Generator.Result headersResult = generator.generateResponseHeaders(request, info.getStatus(), info.getReason(),
info.getHttpFields(), new Callback.Adapter());
Generator.Result contentResult = generator.generateResponseContent(request, BufferUtil.EMPTY_BUFFER, lastContent, callback);
Generator.Result contentResult = generator.generateResponseContent(request, BufferUtil.EMPTY_BUFFER, lastContent, aborted, callback);
flusher.flush(headersResult, contentResult);
}
else
@ -67,9 +73,12 @@ public class HttpTransportOverFCGI implements HttpTransport
{
Generator.Result headersResult = generator.generateResponseHeaders(request, info.getStatus(), info.getReason(),
info.getHttpFields(), new Callback.Adapter());
Generator.Result contentResult = generator.generateResponseContent(request, content, lastContent, callback);
Generator.Result contentResult = generator.generateResponseContent(request, content, lastContent, aborted, callback);
flusher.flush(headersResult, contentResult);
}
if (lastContent && shutdown)
flusher.shutdown();
}
@Override
@ -79,7 +88,7 @@ public class HttpTransportOverFCGI implements HttpTransport
{
if (lastContent)
{
Generator.Result result = generator.generateResponseContent(request, BufferUtil.EMPTY_BUFFER, lastContent, callback);
Generator.Result result = generator.generateResponseContent(request, BufferUtil.EMPTY_BUFFER, lastContent, aborted, callback);
flusher.flush(result);
}
else
@ -90,18 +99,22 @@ public class HttpTransportOverFCGI implements HttpTransport
}
else
{
Generator.Result result = generator.generateResponseContent(request, content, lastContent, callback);
Generator.Result result = generator.generateResponseContent(request, content, lastContent, aborted, callback);
flusher.flush(result);
}
if (lastContent && shutdown)
flusher.shutdown();
}
@Override
public void abort()
{
aborted = true;
}
@Override
public void completed()
{
}
@Override
public void abort()
{
}
}

View File

@ -175,5 +175,17 @@ public class ServerFCGIConnection extends AbstractConnection
channel.dispatch();
}
}
@Override
public void onFailure(int request, Throwable failure)
{
HttpChannelOverFCGI channel = channels.remove(request);
if (LOG.isDebugEnabled())
LOG.debug("Request {} failure on {}: {}", request, channel, failure);
if (channel != null)
{
channel.badMessage(400, failure.toString());
}
}
}
}

View File

@ -24,6 +24,7 @@ import java.net.URI;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -40,6 +41,8 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.IO;
@ -551,4 +554,77 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testEarlyEOF() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
// Promise some content, then flush the headers, then fail to send the content.
response.setContentLength(16);
response.flushBuffer();
throw new NullPointerException();
}
});
try
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.fail();
}
catch (ExecutionException x)
{
// Expected.
}
}
@Test
public void testSmallContentDelimitedByEOFWithSlowRequest() throws Exception
{
testContentDelimitedByEOFWithSlowRequest(1024);
}
@Test
public void testBigContentDelimitedByEOFWithSlowRequest() throws Exception
{
testContentDelimitedByEOFWithSlowRequest(128 * 1024);
}
private void testContentDelimitedByEOFWithSlowRequest(int length) throws Exception
{
final byte[] data = new byte[length];
new Random().nextBytes(data);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setHeader("Connection", "close");
response.getOutputStream().write(data);
}
});
DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(new byte[]{0}));
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.content(content);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
// Wait some time to simulate a slow request.
Thread.sleep(1000);
content.close();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(data, response.getContent());
}
}