Fixes #6693 - FastCGI review (#6694)

* Fixes #6693 - FastCGI review

- Removed code supporting multiplex in the client.
- Removed code supporting multiplex in the server.
- Reworked the server-side processing of a request, now more similar to HTTP/1.1.
- Improved javadocs.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-07 09:57:31 +02:00 committed by GitHub
parent 2231e6496e
commit 0bd15e0831
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 432 additions and 255 deletions

View File

@ -16,12 +16,9 @@ package org.eclipse.jetty.fcgi.client.http;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -59,19 +56,18 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverFCGI.class);
private final RetainableByteBufferPool networkByteBufferPool;
private final AutoLock lock = new AutoLock();
private final LinkedList<Integer> requests = new LinkedList<>();
private final Map<Integer, HttpChannelOverFCGI> activeChannels = new ConcurrentHashMap<>();
private final Queue<HttpChannelOverFCGI> idleChannels = new ConcurrentLinkedQueue<>();
private final AtomicBoolean closed = new AtomicBoolean();
private final HttpDestination destination;
private final Promise<Connection> promise;
private final Flusher flusher;
private final Delegate delegate;
private final ClientParser parser;
private HttpChannelOverFCGI channel;
private RetainableByteBuffer networkBuffer;
private Object attachment;
private final RetainableByteBufferPool retainableByteBufferPool;
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
@ -82,9 +78,8 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
this.delegate = new Delegate(destination);
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);
HttpClient client = destination.getHttpClient();
this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool());
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool());
}
public HttpDestination getHttpDestination()
@ -139,7 +134,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
private RetainableByteBuffer newNetworkBuffer()
{
HttpClient client = destination.getHttpClient();
return retainableByteBufferPool.acquire(client.getResponseBufferSize(), client.isUseInputDirectByteBuffers());
return networkByteBufferPool.acquire(client.getResponseBufferSize(), client.isUseInputDirectByteBuffers());
}
private void releaseNetworkBuffer()
@ -205,7 +200,8 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
// Close explicitly only if we are idle, since the request may still
// be in progress, otherwise close only if we can fail the responses.
if (activeChannels.isEmpty())
HttpChannelOverFCGI channel = this.channel;
if (channel == null || channel.getRequest() == 0)
close();
else
failAndClose(new EOFException(String.valueOf(getEndPoint())));
@ -223,19 +219,24 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
protected void release(HttpChannelOverFCGI channel)
{
if (activeChannels.remove(channel.getRequest()) == null)
{
channel.destroy();
}
else
HttpChannelOverFCGI existing = this.channel;
if (existing == channel)
{
channel.setRequest(0);
// Recycle only non-failed channels.
if (channel.isFailed())
{
channel.destroy();
this.channel = null;
}
destination.release(this);
}
else
{
if (existing == null)
channel.destroy();
else
idleChannels.offer(channel);
destination.release(this);
throw new UnsupportedOperationException("FastCGI Multiplex");
}
}
@ -290,34 +291,27 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
protected void abort(Throwable failure)
{
for (HttpChannelOverFCGI channel : activeChannels.values())
HttpChannelOverFCGI channel = this.channel;
if (channel != null)
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
exchange.getRequest().abort(failure);
channel.destroy();
}
activeChannels.clear();
HttpChannel channel = idleChannels.poll();
while (channel != null)
{
channel.destroy();
channel = idleChannels.poll();
this.channel = null;
}
}
private void failAndClose(Throwable failure)
{
boolean result = false;
for (HttpChannelOverFCGI channel : activeChannels.values())
HttpChannelOverFCGI channel = this.channel;
if (channel != null)
{
result |= channel.responseFailure(failure);
boolean result = channel.responseFailure(failure);
channel.destroy();
if (result)
close(failure);
}
if (result)
close(failure);
}
private int acquireRequest()
@ -341,7 +335,6 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
protected HttpChannelOverFCGI acquireHttpChannel(int id, Request request)
{
HttpChannelOverFCGI channel = idleChannels.poll();
if (channel == null)
channel = newHttpChannel(request);
channel.setRequest(id);
@ -373,7 +366,8 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
@Override
protected Iterator<HttpChannel> getHttpChannels()
{
return new IteratorWrapper<>(activeChannels.values().iterator());
HttpChannel channel = HttpConnectionOverFCGI.this.channel;
return channel == null ? Collections.emptyIterator() : Collections.singleton(channel).iterator();
}
@Override
@ -382,10 +376,8 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
HttpRequest request = exchange.getRequest();
normalizeRequest(request);
// FCGI may be multiplexed, so one channel for each exchange.
int id = acquireRequest();
HttpChannelOverFCGI channel = acquireHttpChannel(id, request);
activeChannels.put(id, channel);
return send(channel, exchange);
}
@ -420,7 +412,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
@Override
public void onBegin(int request, int code, String reason)
{
HttpChannelOverFCGI channel = activeChannels.get(request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
channel.responseBegin(code, reason);
else
@ -430,7 +422,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
@Override
public void onHeader(int request, HttpField field)
{
HttpChannelOverFCGI channel = activeChannels.get(request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
channel.responseHeader(field);
else
@ -440,7 +432,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
@Override
public boolean onHeaders(int request)
{
HttpChannelOverFCGI channel = activeChannels.get(request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
return !channel.responseHeaders();
noChannel(request);
@ -454,7 +446,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
case STD_OUT:
{
HttpChannelOverFCGI channel = activeChannels.get(request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
{
networkBuffer.retain();
@ -482,7 +474,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
@Override
public void onEnd(int request)
{
HttpChannelOverFCGI channel = activeChannels.get(request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
{
if (channel.responseSuccess())
@ -497,7 +489,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
@Override
public void onFailure(int request, Throwable failure)
{
HttpChannelOverFCGI channel = activeChannels.get(request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
{
if (channel.responseFailure(failure))
@ -515,32 +507,4 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
LOG.debug("Channel not found for request {}", request);
}
}
private static final class IteratorWrapper<T> implements Iterator<T>
{
private final Iterator<? extends T> iterator;
private IteratorWrapper(Iterator<? extends T> iterator)
{
this.iterator = iterator;
}
@Override
public boolean hasNext()
{
return iterator.hasNext();
}
@Override
public T next()
{
return iterator.next();
}
@Override
public void remove()
{
iterator.remove();
}
}
}

View File

@ -18,7 +18,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.fcgi.FCGI;
/**
* <p>Parser for the BEGIN_REQUEST frame body.</p>
* <p>Parser for the BEGIN_REQUEST frame content.</p>
* <pre>
* struct begin_request_body {
* ushort role;

View File

@ -15,6 +15,23 @@ package org.eclipse.jetty.fcgi.parser;
import java.nio.ByteBuffer;
/**
* <p>Parser for FastCGI frame content.</p>
* <p>Depending on the frame type specified in the FastCGI frame header,
* the FastCGI frame content has different formats and it is parsed by
* different implementation of this abstract class.</p>
* <p>There are these frame content types:</p>
* <ul>
* <li>{@code BEGIN_REQUEST}, to signal the begin of the request</li>
* <li>{@code PARAMS}, key/value pairs</li>
* <li>{@code STDIN}, the request body, handled as a stream</li>
* <li>{@code STDOUT}, the response body, handled as a stream</li>
* <li>{@code STDERR}, the response error, handled as a stream</li>
* <li>{@code END_REQUEST}, to signal the end of the response</li>
* </ul>
*
* @see Parser
*/
public abstract class ContentParser
{
private final HeaderParser headerParser;
@ -24,9 +41,20 @@ public abstract class ContentParser
this.headerParser = headerParser;
}
/**
* <p>Parses the bytes in the given {@code buffer} as FastCGI frame content bytes.</p>
*
* @param buffer the bytes to parse
* @return the result of the parsing
*/
public abstract Result parse(ByteBuffer buffer);
public void noContent()
/**
* <p>Invoked by the {@link Parser} when the frame content length is zero.</p>
*
* @return whether the parsing should stop
*/
public boolean noContent()
{
throw new IllegalStateException();
}
@ -41,8 +69,25 @@ public abstract class ContentParser
return headerParser.getContentLength();
}
/**
* <p>The result of the frame content parsing.</p>
*/
public enum Result
{
PENDING, ASYNC, COMPLETE
/**
* <p>Not enough bytes have been provided to the parser
* with a call to {@link ContentParser#parse(ByteBuffer)}.</p>
*/
PENDING,
/**
* <p>The frame content has been parsed, but the application
* signalled that it wants to process the content asynchronously.</p>
*/
ASYNC,
/**
* <p>The frame content parsing is complete,
* and the parser can now parse the padding bytes.</p>
*/
COMPLETE
}
}

View File

@ -16,7 +16,7 @@ package org.eclipse.jetty.fcgi.parser;
import java.nio.ByteBuffer;
/**
* <p>Parser for the END_REQUEST frame body.</p>
* <p>Parser for the END_REQUEST frame content.</p>
* <pre>
* struct end_request_body {
* uint applicationStatus;

View File

@ -20,7 +20,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>Parser for FastCGI frame headers.</p>
* <p>Parser for the FastCGI frame header.</p>
* <pre>
* struct frame_header {
* ubyte version;
@ -47,7 +47,7 @@ public class HeaderParser
private int padding;
/**
* Parses the bytes in the given {@code buffer} as FastCGI header bytes
* Parses the bytes in the given {@code buffer} as FastCGI frame header bytes
*
* @param buffer the bytes to parse
* @return whether there were enough bytes for a FastCGI header

View File

@ -22,7 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>Parser for the PARAMS frame body.</p>
* <p>Parser for the PARAMS frame content.</p>
* <pre>
* struct small_name_small_value_params_body {
* ubyte nameLength;
@ -227,9 +227,9 @@ public class ParamsContentParser extends ContentParser
}
@Override
public void noContent()
public boolean noContent()
{
onParams();
return onParams();
}
protected void onParam(String name, String value)
@ -245,16 +245,17 @@ public class ParamsContentParser extends ContentParser
}
}
protected void onParams()
protected boolean onParams()
{
try
{
listener.onHeaders(getRequest());
return listener.onHeaders(getRequest());
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Exception while invoking listener {}", listener, x);
return false;
}
}

View File

@ -36,6 +36,20 @@ import org.slf4j.LoggerFactory;
* </pre>
* <p>Depending on the {@code type}, the content may have a different format,
* so there are specialized content parsers.</p>
* <p>A typical exchange is:</p>
* <pre>
* BEGIN_REQUEST
* PARAMS (length &gt; 0)
* PARAMS (length == 0 to signal end of PARAMS frames)
* [STDIN (length &gt; 0 in case of request content)]
* STDIN (length == 0 to signal end of STDIN frames and end of request)
* ...
* STDOUT (length &gt; 0 with HTTP headers and HTTP content)
* STDOUT (length == 0 to signal end of STDOUT frames)
* [STDERR (length &gt; 0)]
* [STDERR (length == 0 to signal end of STDERR frames)]
* END_REQUEST
* </pre>
*
* @see HeaderParser
* @see ContentParser
@ -70,7 +84,10 @@ public abstract class Parser
ContentParser contentParser = findContentParser(headerParser.getFrameType());
if (headerParser.getContentLength() == 0)
{
contentParser.noContent();
padding = headerParser.getPaddingLength();
state = State.PADDING;
if (contentParser.noContent())
return true;
}
else
{
@ -89,9 +106,9 @@ public abstract class Parser
// parsing; the async operation will eventually resume parsing.
return true;
}
padding = headerParser.getPaddingLength();
state = State.PADDING;
}
padding = headerParser.getPaddingLength();
state = State.PADDING;
break;
}
case PADDING:
@ -176,7 +193,6 @@ public abstract class Parser
@Override
public void onFailure(int request, Throwable failure)
{
}
}
}

View File

@ -31,8 +31,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>The parser for STDOUT type frame bodies.</p>
* <p>STDOUT frame bodies contain both the HTTP headers (but not the response line)
* <p>The parser for STDOUT frame content.</p>
* <p>STDOUT frame content contain both the HTTP headers (but not the response line)
* and the HTTP content (either Content-Length delimited or chunked).</p>
* <p>For this reason, a special HTTP parser is used to parse the frames body.
* This special HTTP parser is configured to skip the response line, and to
@ -52,9 +52,11 @@ public class ResponseContentParser extends StreamContentParser
}
@Override
public void noContent()
public boolean noContent()
{
// Does nothing, since for responses the end of content is signaled via a FCGI_END_REQUEST frame
// Does nothing, since for responses the end of
// content is signaled via a FCGI_END_REQUEST frame.
return false;
}
@Override

View File

@ -57,14 +57,18 @@ public class StreamContentParser extends ContentParser
int length = Math.min(contentLength, buffer.remaining());
int limit = buffer.limit();
buffer.limit(buffer.position() + length);
final ByteBuffer slice = buffer.slice();
ByteBuffer slice = buffer.slice();
buffer.position(buffer.limit());
buffer.limit(limit);
contentLength -= length;
if (contentLength <= 0)
state = State.EOF;
if (onContent(slice))
return Result.ASYNC;
if (contentLength > 0)
break;
break;
}
case EOF:
{
state = State.LENGTH;
return Result.COMPLETE;
}
@ -78,7 +82,7 @@ public class StreamContentParser extends ContentParser
}
@Override
public void noContent()
public boolean noContent()
{
try
{
@ -89,6 +93,7 @@ public class StreamContentParser extends ContentParser
if (LOG.isDebugEnabled())
LOG.debug("Exception while invoking listener {}", listener, x);
}
return false;
}
protected boolean onContent(ByteBuffer buffer)
@ -111,6 +116,6 @@ public class StreamContentParser extends ContentParser
private enum State
{
LENGTH, CONTENT
LENGTH, CONTENT, EOF
}
}

View File

@ -13,11 +13,7 @@
package org.eclipse.jetty.fcgi.server;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
@ -35,114 +31,149 @@ import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpChannelOverFCGI extends HttpChannel
{
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverFCGI.class);
private static final HttpInput.Content EOF_CONTENT = new HttpInput.EofContent();
private final Queue<HttpInput.Content> _contentQueue = new LinkedList<>();
private final AutoLock _lock = new AutoLock();
private HttpInput.Content _specialContent;
private final Callback asyncFillCallback = new AsyncFillCallback();
private final ServerFCGIConnection connection;
private final HttpFields.Mutable fields = HttpFields.build();
private final Dispatcher dispatcher;
private HttpInput.Content normalContent;
private HttpInput.Content specialContent;
private String method;
private String path;
private String query;
private String version;
private HostPortHttpField hostPort;
public HttpChannelOverFCGI(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport)
public HttpChannelOverFCGI(ServerFCGIConnection connection, Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport)
{
super(connector, configuration, endPoint, transport);
this.connection = connection;
this.dispatcher = new Dispatcher(connector.getServer().getThreadPool(), this);
}
@Override
public boolean onContent(HttpInput.Content content)
{
boolean b = super.onContent(content);
boolean result = super.onContent(content);
Throwable failure;
try (AutoLock l = _lock.lock())
HttpInput.Content special = this.specialContent;
Throwable failure = special == null ? null : special.getError();
if (failure == null)
{
failure = _specialContent == null ? null : _specialContent.getError();
if (failure == null)
_contentQueue.offer(content);
if (normalContent != null)
throw new IllegalStateException("onContent has unconsumed content");
normalContent = content;
}
if (failure != null)
else
{
content.failed(failure);
}
return b;
return result;
}
@Override
public boolean needContent()
{
try (AutoLock l = _lock.lock())
if (hasContent())
{
boolean hasContent = _specialContent != null || !_contentQueue.isEmpty();
if (LOG.isDebugEnabled())
LOG.debug("needContent has content? {}", hasContent);
return hasContent;
LOG.debug("needContent has immediate content {}", this);
return true;
}
parseAndFill();
if (hasContent())
{
if (LOG.isDebugEnabled())
LOG.debug("needContent has parsed content {}", this);
return true;
}
connection.getEndPoint().tryFillInterested(asyncFillCallback);
return false;
}
private boolean hasContent()
{
return specialContent != null || normalContent != null;
}
@Override
public HttpInput.Content produceContent()
{
HttpInput.Content content;
try (AutoLock l = _lock.lock())
if (!hasContent())
parseAndFill();
if (!hasContent())
return null;
HttpInput.Content content = normalContent;
if (content != null)
{
content = _contentQueue.poll();
if (content == null)
content = _specialContent;
if (LOG.isDebugEnabled())
LOG.debug("produceContent produced {} {}", content, this);
normalContent = null;
return content;
}
content = specialContent;
if (LOG.isDebugEnabled())
LOG.debug("produceContent has produced {}", content);
LOG.debug("produceContent produced special {} {}", content, this);
return content;
}
private void parseAndFill()
{
if (LOG.isDebugEnabled())
LOG.debug("parseAndFill {}", this);
connection.parseAndFill();
}
@Override
public boolean failAllContent(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failing all content with {} {}", failure, this);
List<HttpInput.Content> copy;
try (AutoLock l = _lock.lock())
LOG.debug("failing all content {}", this);
HttpInput.Content normal = normalContent;
if (normal != null)
normal.failed(failure);
HttpInput.Content special = specialContent;
if (special != null)
return special.isEof();
while (true)
{
copy = new ArrayList<>(_contentQueue);
_contentQueue.clear();
HttpInput.Content content = produceContent();
if (content == null)
return false;
special = specialContent;
if (special != null)
return special.isEof();
content.failed(failure);
}
copy.forEach(c -> c.failed(failure));
boolean atEof;
try (AutoLock l = _lock.lock())
{
atEof = _specialContent != null && _specialContent.isEof();
}
if (LOG.isDebugEnabled())
LOG.debug("failed all content, EOF = {}", atEof);
return atEof;
}
@Override
public boolean failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed " + x);
LOG.debug("failed {}", this, x);
try (AutoLock l = _lock.lock())
{
Throwable error = _specialContent == null ? null : _specialContent.getError();
if (error != null && error != x)
error.addSuppressed(x);
else
_specialContent = new HttpInput.ErrorContent(x);
}
HttpInput.Content special = specialContent;
Throwable error = special == null ? null : special.getError();
if (error != null && error != x)
error.addSuppressed(x);
else
specialContent = new HttpInput.ErrorContent(x);
return getRequest().getHttpInput().onContentProducible();
}
@ -152,10 +183,7 @@ public class HttpChannelOverFCGI extends HttpChannel
{
if (LOG.isDebugEnabled())
LOG.debug("received EOF");
try (AutoLock l = _lock.lock())
{
_specialContent = new HttpInput.EofContent();
}
specialContent = EOF_CONTENT;
return getRequest().getHttpInput().onContentProducible();
}
@ -238,20 +266,12 @@ public class HttpChannelOverFCGI extends HttpChannel
private boolean doOnIdleTimeout(Throwable x)
{
boolean neverDispatched = getState().isIdle();
boolean waitingForContent;
HttpInput.Content specialContent;
try (AutoLock l = _lock.lock())
{
waitingForContent = _contentQueue.isEmpty() || _contentQueue.peek().remaining() == 0;
specialContent = _specialContent;
}
HttpInput.Content normal = this.normalContent;
boolean waitingForContent = normal == null || normal.remaining() == 0;
if ((waitingForContent || neverDispatched) && specialContent == null)
{
x.addSuppressed(new Throwable("HttpInput idle timeout"));
try (AutoLock l = _lock.lock())
{
_specialContent = new HttpInput.ErrorContent(x);
}
specialContent = new HttpInput.ErrorContent(x);
return getRequest().getHttpInput().onContentProducible();
}
return false;
@ -260,13 +280,46 @@ public class HttpChannelOverFCGI extends HttpChannel
@Override
public void recycle()
{
try (AutoLock l = _lock.lock())
{
if (!_contentQueue.isEmpty())
throw new AssertionError("unconsumed content: " + _contentQueue);
_specialContent = null;
}
super.recycle();
HttpInput.Content normal = normalContent;
if (normal != null)
throw new AssertionError("unconsumed content: " + normal);
specialContent = null;
}
@Override
public void onCompleted()
{
super.onCompleted();
HttpInput input = getRequest().getHttpInput();
boolean consumed = input.consumeAll();
// Assume we don't arrive here from the connection's onFillable() (which already
// calls fillInterested()), because we dispatch() when all the headers are received.
// When the request/response is completed, we must arrange to call fillInterested().
connection.onCompleted(consumed);
}
private class AsyncFillCallback implements Callback
{
@Override
public void succeeded()
{
if (getRequest().getHttpInput().onContentProducible())
handle();
}
@Override
public void failed(Throwable x)
{
if (HttpChannelOverFCGI.this.failed(x))
handle();
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
private static class Dispatcher implements Runnable

View File

@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
public class HttpTransportOverFCGI implements HttpTransport
{
private static final Logger LOG = LoggerFactory.getLogger(HttpTransportOverFCGI.class);
private final ServerGenerator generator;
private final Flusher flusher;
private final int request;
@ -48,6 +49,8 @@ public class HttpTransportOverFCGI implements HttpTransport
@Override
public void send(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("send {} {} l={}", this, request, lastContent);
boolean head = HttpMethod.HEAD.is(request.getMethod());
if (response != null)
{

View File

@ -14,8 +14,6 @@
package org.eclipse.jetty.fcgi.server;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.fcgi.generator.Flusher;
@ -24,8 +22,9 @@ import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
@ -36,19 +35,22 @@ public class ServerFCGIConnection extends AbstractConnection
{
private static final Logger LOG = LoggerFactory.getLogger(ServerFCGIConnection.class);
private final ConcurrentMap<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
private final Connector connector;
private final RetainableByteBufferPool networkByteBufferPool;
private final boolean sendStatus200;
private final Flusher flusher;
private final HttpConfiguration configuration;
private final ServerParser parser;
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;
private RetainableByteBuffer networkBuffer;
private HttpChannelOverFCGI channel;
public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfiguration configuration, boolean sendStatus200)
{
super(endPoint, connector.getExecutor());
this.connector = connector;
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());
this.flusher = new Flusher(endPoint);
this.configuration = configuration;
this.sendStatus200 = sendStatus200;
@ -85,29 +87,28 @@ public class ServerFCGIConnection extends AbstractConnection
@Override
public void onFillable()
{
EndPoint endPoint = getEndPoint();
ByteBufferPool bufferPool = connector.getByteBufferPool();
ByteBuffer buffer = bufferPool.acquire(configuration.getResponseHeaderSize(), isUseInputDirectByteBuffers());
acquireInputBuffer();
try
{
while (true)
{
int read = endPoint.fill(buffer);
if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'
LOG.debug("Read {} bytes from {}", read, endPoint);
int read = fillInputBuffer();
if (LOG.isDebugEnabled())
LOG.debug("Read {} bytes from {} {}", read, getEndPoint(), this);
if (read > 0)
{
parse(buffer);
if (parse(networkBuffer.getBuffer()))
return;
}
else if (read == 0)
{
bufferPool.release(buffer);
releaseInputBuffer();
fillInterested();
break;
}
else
{
bufferPool.release(buffer);
releaseInputBuffer();
shutdown();
break;
}
@ -117,25 +118,81 @@ public class ServerFCGIConnection extends AbstractConnection
{
if (LOG.isDebugEnabled())
LOG.debug("Unable to fill endpoint", x);
bufferPool.release(buffer);
networkBuffer.clear();
releaseInputBuffer();
// TODO: fail and close ?
}
}
/**
* This is just a "consume" method, so it must not call
* fillInterested(), but just consume what's in the network
* for the current request.
*/
void parseAndFill()
{
if (LOG.isDebugEnabled())
LOG.debug("parseAndFill {}", this);
// This loop must run only until the request is completed.
// See also HttpConnection.parseAndFillForContent().
while (channel != null)
{
if (parse(networkBuffer.getBuffer()))
return;
// Check if the request was completed by the parsing.
if (channel == null)
return;
if (fillInputBuffer() <= 0)
break;
}
}
private void acquireInputBuffer()
{
if (networkBuffer == null)
networkBuffer = networkByteBufferPool.acquire(configuration.getResponseHeaderSize(), isUseInputDirectByteBuffers());
}
private void releaseInputBuffer()
{
boolean released = networkBuffer.release();
if (LOG.isDebugEnabled())
LOG.debug("releaseInputBuffer {} {}", released, this);
if (released)
networkBuffer = null;
}
private int fillInputBuffer()
{
try
{
return getEndPoint().fill(networkBuffer.getBuffer());
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not fill from {}", this, x);
return -1;
}
}
@Override
protected boolean onReadTimeout(Throwable timeout)
{
return channels.values().stream()
.mapToInt(channel -> channel.onIdleTimeout(timeout) ? 0 : 1)
.sum() == 0;
if (channel != null)
return channel.onIdleTimeout(timeout);
return true;
}
private void parse(ByteBuffer buffer)
private boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
parser.parse(buffer);
boolean result = parser.parse(buffer);
if (result)
return true;
}
return false;
}
private void shutdown()
@ -143,17 +200,23 @@ public class ServerFCGIConnection extends AbstractConnection
flusher.shutdown();
}
void onCompleted(boolean fillMore)
{
releaseInputBuffer();
if (getEndPoint().isOpen() && fillMore)
fillInterested();
}
private class ServerListener implements ServerParser.Listener
{
@Override
public void onStart(int request, FCGI.Role role, int flags)
{
// TODO: handle flags
HttpChannelOverFCGI channel = new HttpChannelOverFCGI(connector, configuration, getEndPoint(),
if (channel != null)
throw new UnsupportedOperationException("FastCGI Multiplexing");
channel = new HttpChannelOverFCGI(ServerFCGIConnection.this, connector, configuration, getEndPoint(),
new HttpTransportOverFCGI(connector.getByteBufferPool(), isUseOutputDirectByteBuffers(), sendStatus200, flusher, request));
HttpChannelOverFCGI existing = channels.putIfAbsent(request, channel);
if (existing != null)
throw new IllegalStateException();
if (LOG.isDebugEnabled())
LOG.debug("Request {} start on {}", request, channel);
}
@ -161,7 +224,6 @@ public class ServerFCGIConnection extends AbstractConnection
@Override
public void onHeader(int request, HttpField field)
{
HttpChannelOverFCGI channel = channels.get(request);
if (LOG.isDebugEnabled())
LOG.debug("Request {} header {} on {}", request, field, channel);
if (channel != null)
@ -171,13 +233,14 @@ public class ServerFCGIConnection extends AbstractConnection
@Override
public boolean onHeaders(int request)
{
HttpChannelOverFCGI channel = channels.get(request);
if (LOG.isDebugEnabled())
LOG.debug("Request {} headers on {}", request, channel);
if (channel != null)
{
channel.onRequest();
channel.dispatch();
// We have dispatched to the application, so we must stop the fill & parse loop.
return true;
}
return false;
}
@ -185,14 +248,13 @@ public class ServerFCGIConnection extends AbstractConnection
@Override
public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
HttpChannelOverFCGI channel = channels.get(request);
if (LOG.isDebugEnabled())
LOG.debug("Request {} {} content {} on {}", request, stream, buffer, channel);
if (channel != null)
{
ByteBuffer copy = ByteBuffer.allocate(buffer.remaining());
copy.put(buffer).flip();
channel.onContent(new HttpInput.Content(copy));
channel.onContent(new FastCGIContent(buffer));
// Signal that the content is processed asynchronously, to ensure backpressure.
return true;
}
return false;
}
@ -200,24 +262,52 @@ public class ServerFCGIConnection extends AbstractConnection
@Override
public void onEnd(int request)
{
HttpChannelOverFCGI channel = channels.remove(request);
if (LOG.isDebugEnabled())
LOG.debug("Request {} end on {}", request, channel);
if (channel != null)
{
channel.onContentComplete();
channel.onRequestComplete();
// Nulling out the channel signals that the
// request is complete, see also parseAndFill().
channel = null;
}
}
@Override
public void onFailure(int request, Throwable failure)
{
HttpChannelOverFCGI channel = channels.remove(request);
if (LOG.isDebugEnabled())
LOG.debug("Request {} failure on {}: {}", request, channel, failure);
if (channel != null)
channel.onBadMessage(new BadMessageException(HttpStatus.BAD_REQUEST_400, null, failure));
channel = null;
}
private class FastCGIContent extends HttpInput.Content
{
public FastCGIContent(ByteBuffer content)
{
super(content);
networkBuffer.retain();
}
@Override
public void succeeded()
{
release();
}
@Override
public void failed(Throwable x)
{
release();
}
private void release()
{
networkBuffer.release();
}
}
}
}

View File

@ -21,6 +21,7 @@ import org.eclipse.jetty.client.LeakTrackingConnectionPool;
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.Handler;
@ -47,22 +48,26 @@ public abstract class AbstractHttpClientServerTest
public void start(Handler handler) throws Exception
{
server = new Server();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
ServerFCGIConnectionFactory fcgiConnectionFactory = new ServerFCGIConnectionFactory(new HttpConfiguration());
serverBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged());
connector = new ServerConnector(server, null, null, serverBufferPool,
1, Math.max(1, ProcessorUtils.availableProcessors() / 2), fcgiConnectionFactory);
// connector.setPort(9000);
server.addConnector(connector);
server.setHandler(handler);
server.start();
QueuedThreadPool executor = new QueuedThreadPool();
executor.setName(executor.getName() + "-client");
HttpClientTransport transport = new HttpClientTransportOverFCGI(1, "");
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
clientConnector.setExecutor(clientThreads);
if (clientBufferPool == null)
clientBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged());
clientConnector.setByteBufferPool(clientBufferPool);
HttpClientTransport transport = new HttpClientTransportOverFCGI(clientConnector, "");
transport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
{
@Override
@ -72,10 +77,6 @@ public abstract class AbstractHttpClientServerTest
}
});
client = new HttpClient(transport);
client.setExecutor(executor);
if (clientBufferPool == null)
clientBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged());
client.setByteBufferPool(clientBufferPool);
client.start();
}

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.fcgi.server;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.net.URLEncoder;
@ -48,18 +47,15 @@ import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
// @checkstyle-disable-check : AvoidEscapedUnicodeCharactersCheck
public class HttpClientTest extends AbstractHttpClientServerTest
{
// @checkstyle-disable-check : AvoidEscapedUnicodeCharactersCheck
@Test
public void testGETResponseWithoutContent() throws Exception
{
@ -76,7 +72,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void testGETResponseWithContent() throws Exception
{
final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
start(new AbstractHandler()
{
@Override
@ -103,7 +99,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void testGETResponseWithBigContent() throws Exception
{
final byte[] data = new byte[16 * 1024 * 1024];
byte[] data = new byte[16 * 1024 * 1024];
new Random().nextBytes(data);
start(new AbstractHandler()
{
@ -132,8 +128,8 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void testGETWithParametersResponseWithContent() throws Exception
{
final String paramName1 = "a";
final String paramName2 = "b";
String paramName1 = "a";
String paramName2 = "b";
start(new AbstractHandler()
{
@Override
@ -164,8 +160,8 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void testGETWithParametersMultiValuedResponseWithContent() throws Exception
{
final String paramName1 = "a";
final String paramName2 = "b";
String paramName1 = "a";
String paramName2 = "b";
start(new AbstractHandler()
{
@Override
@ -202,8 +198,8 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void testPOSTWithParameters() throws Exception
{
final String paramName = "a";
final String paramValue = "\u20AC";
String paramName = "a";
String paramValue = "\u20AC";
start(new AbstractHandler()
{
@Override
@ -233,8 +229,8 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void testPOSTWithQueryString() throws Exception
{
final String paramName = "a";
final String paramValue = "\u20AC";
String paramName = "a";
String paramValue = "\u20AC";
start(new AbstractHandler()
{
@Override
@ -265,8 +261,8 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void testPUTWithParameters() throws Exception
{
final String paramName = "a";
final String paramValue = "\u20AC";
String paramName = "a";
String paramValue = "\u20AC";
start(new AbstractHandler()
{
@Override
@ -297,9 +293,9 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void testPOSTWithParametersWithContent() throws Exception
{
final byte[] content = {0, 1, 2, 3};
final String paramName = "a";
final String paramValue = "\u20AC";
byte[] content = {0, 1, 2, 3};
String paramName = "a";
String paramValue = "\u20AC";
start(new AbstractHandler()
{
@Override
@ -318,7 +314,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
for (int i = 0; i < 256; ++i)
{
ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort() + "/?b=1")
ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort() + "/?r=" + i)
.param(paramName, paramValue)
.body(new BytesRequestContent(content))
.timeout(5, TimeUnit.SECONDS)
@ -326,14 +322,14 @@ public class HttpClientTest extends AbstractHttpClientServerTest
assertNotNull(response);
assertEquals(200, response.getStatus());
assertArrayEquals(content, response.getContent());
assertArrayEquals(content, response.getContent(), "content mismatch for request " + i);
}
}
@Test
public void testPOSTWithContentNotifiesRequestContentListener() throws Exception
{
final byte[] content = {0, 1, 2, 3};
byte[] content = {0, 1, 2, 3};
start(new EmptyServerHandler());
ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort())
@ -357,7 +353,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
{
start(new EmptyServerHandler());
final AtomicInteger progress = new AtomicInteger();
AtomicInteger progress = new AtomicInteger();
ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort())
.onRequestContent((request, buffer) ->
{
@ -385,7 +381,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
// appear as "leaked", so we use a normal ByteBufferPool.
clientBufferPool = new MappedByteBufferPool.Tagged();
final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
start(new AbstractHandler()
{
@Override
@ -411,7 +407,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void testConnectionIdleTimeout() throws Exception
{
final long idleTimeout = 1000;
long idleTimeout = 1000;
start(new AbstractHandler()
{
@Override
@ -431,25 +427,26 @@ public class HttpClientTest extends AbstractHttpClientServerTest
connector.setIdleTimeout(idleTimeout);
ExecutionException x = assertThrows(ExecutionException.class, () ->
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.idleTimeout(4 * idleTimeout, TimeUnit.MILLISECONDS)
.timeout(3 * idleTimeout, TimeUnit.MILLISECONDS)
.send());
assertThat(x.getCause(), instanceOf(EOFException.class));
// Request does not fail because idle timeouts while dispatched are ignored.
ContentResponse response1 = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.idleTimeout(4 * idleTimeout, TimeUnit.MILLISECONDS)
.timeout(3 * idleTimeout, TimeUnit.MILLISECONDS)
.send();
assertNotNull(response1);
assertEquals(200, response1.getStatus());
connector.setIdleTimeout(5 * idleTimeout);
// Make another request to be sure the connection is recreated
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
// Make another request to be sure the connection works fine.
ContentResponse response2 = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.idleTimeout(4 * idleTimeout, TimeUnit.MILLISECONDS)
.timeout(3 * idleTimeout, TimeUnit.MILLISECONDS)
.send();
assertNotNull(response);
assertEquals(200, response.getStatus());
assertNotNull(response2);
assertEquals(200, response2.getStatus());
}
@Test
@ -470,7 +467,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void testHEADWithResponseContentLength() throws Exception
{
final int length = 1024;
int length = 1024;
start(new AbstractHandler()
{
@Override
@ -507,7 +504,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void testLongPollIsAbortedWhenClientIsStopped() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
@ -519,7 +516,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
}
});
final CountDownLatch completeLatch = new CountDownLatch(1);
CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(result ->
@ -576,7 +573,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
private void testContentDelimitedByEOFWithSlowRequest(int length) throws Exception
{
final byte[] data = new byte[length];
byte[] data = new byte[length];
new Random().nextBytes(data);
start(new AbstractHandler()
{
@ -620,10 +617,10 @@ public class HttpClientTest extends AbstractHttpClientServerTest
}
});
final AtomicInteger contentCount = new AtomicInteger();
final AtomicReference<Callback> callbackRef = new AtomicReference<>();
final AtomicReference<CountDownLatch> contentLatch = new AtomicReference<>(new CountDownLatch(1));
final CountDownLatch completeLatch = new CountDownLatch(1);
AtomicInteger contentCount = new AtomicInteger();
AtomicReference<Callback> callbackRef = new AtomicReference<>();
AtomicReference<CountDownLatch> contentLatch = new AtomicReference<>(new CountDownLatch(1));
CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onResponseContentAsync((response, content, callback) ->