Fixes #10218 - NPE in HttpChannelOverFCGI.receive()

Simplified the code, removing all leftover cruft present since FCGI was multiplexed.
Now the FCGI implementation is very similar to HTTP1.

Made HttpConnectionOverFCGI.channel final so that it cannot NPE anymore.

The client now properly handling server-side connection closes.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-08-08 16:16:40 +02:00
parent 17c3649771
commit 24792db09c
8 changed files with 145 additions and 235 deletions

View File

@ -13,45 +13,33 @@
package org.eclipse.jetty.fcgi.client.transport.internal;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.client.transport.HttpChannel;
import org.eclipse.jetty.client.transport.HttpExchange;
import org.eclipse.jetty.client.transport.HttpReceiver;
import org.eclipse.jetty.client.transport.HttpSender;
import org.eclipse.jetty.fcgi.generator.Flusher;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpChannelOverFCGI extends HttpChannel
{
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverFCGI.class);
private final HttpConnectionOverFCGI connection;
private final Flusher flusher;
private final HttpSenderOverFCGI sender;
private final HttpReceiverOverFCGI receiver;
private final FCGIIdleTimeout idle;
private int request;
private HttpVersion version;
public HttpChannelOverFCGI(HttpConnectionOverFCGI connection, Flusher flusher, long idleTimeout)
public HttpChannelOverFCGI(HttpConnectionOverFCGI connection)
{
super(connection.getHttpDestination());
this.connection = connection;
this.flusher = flusher;
this.sender = new HttpSenderOverFCGI(this);
this.receiver = new HttpReceiverOverFCGI(this);
this.idle = new FCGIIdleTimeout(connection, idleTimeout);
}
public HttpConnectionOverFCGI getHttpConnection()
@ -81,28 +69,21 @@ public class HttpChannelOverFCGI extends HttpChannel
return receiver;
}
public boolean isFailed()
{
return sender.isFailed() || receiver.isFailed();
}
@Override
public void send(HttpExchange exchange)
{
version = exchange.getRequest().getVersion();
idle.onOpen();
sender.send(exchange);
}
@Override
public void release()
{
connection.release(this);
connection.release();
}
protected void responseBegin(int code, String reason)
{
idle.notIdle();
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
@ -119,7 +100,6 @@ public class HttpChannelOverFCGI extends HttpChannel
protected void responseHeaders()
{
idle.notIdle();
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseHeaders(exchange);
@ -127,7 +107,6 @@ public class HttpChannelOverFCGI extends HttpChannel
protected void content(Content.Chunk chunk)
{
idle.notIdle();
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.content(chunk);
@ -135,7 +114,6 @@ public class HttpChannelOverFCGI extends HttpChannel
protected void end()
{
idle.notIdle();
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.end(exchange);
@ -150,67 +128,33 @@ public class HttpChannelOverFCGI extends HttpChannel
promise.succeeded(false);
}
void eof()
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
connection.close();
}
@Override
public void exchangeTerminated(HttpExchange exchange, Result result)
{
super.exchangeTerminated(exchange, result);
idle.onClose();
HttpFields responseHeaders = result.getResponse().getHeaders();
if (result.isFailed())
connection.close(result.getFailure());
else if (!connection.closeByHTTP(responseHeaders))
else if (connection.isShutdown() || connection.isCloseByHTTP(responseHeaders))
connection.close();
else
release();
}
protected void flush(ByteBufferPool.Accumulator accumulator, Callback callback)
{
flusher.flush(accumulator, callback);
connection.getFlusher().flush(accumulator, callback);
}
void receive()
{
receiver.receive();
}
private class FCGIIdleTimeout extends IdleTimeout
{
private final HttpConnectionOverFCGI connection;
private boolean open;
public FCGIIdleTimeout(HttpConnectionOverFCGI connection, long idleTimeout)
{
super(connection.getHttpDestination().getHttpClient().getScheduler());
this.connection = connection;
setIdleTimeout(idleTimeout >= 0 ? idleTimeout : connection.getEndPoint().getIdleTimeout());
}
@Override
public void onOpen()
{
open = true;
notIdle();
super.onOpen();
}
@Override
public void onClose()
{
super.onClose();
open = false;
}
@Override
protected void onIdleExpired(TimeoutException timeout)
{
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout for request {}", request);
connection.abort(timeout);
}
@Override
public boolean isOpen()
{
return open;
}
}
}

View File

@ -13,14 +13,13 @@
package org.eclipse.jetty.fcgi.client.transport.internal;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.Destination;
@ -49,7 +48,6 @@ import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,18 +56,19 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverFCGI.class);
private final ByteBufferPool networkByteBufferPool;
private final AutoLock lock = new AutoLock();
private final LinkedList<Integer> requests = new LinkedList<>();
private final AtomicInteger requests = new AtomicInteger();
private final AtomicBoolean closed = new AtomicBoolean();
private final HttpDestination destination;
private final Promise<Connection> promise;
private final Flusher flusher;
private final Delegate delegate;
private final ClientParser parser;
private HttpChannelOverFCGI channel;
private final HttpChannelOverFCGI channel;
private RetainableByteBuffer networkBuffer;
private Object attachment;
private Runnable action;
private long idleTimeout;
private boolean shutdown;
public HttpConnectionOverFCGI(EndPoint endPoint, Destination destination, Promise<Connection> promise)
{
@ -79,7 +78,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
this.flusher = new Flusher(endPoint);
this.delegate = new Delegate(destination);
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);
this.channel = newHttpChannel();
HttpClient client = destination.getHttpClient();
this.networkByteBufferPool = client.getByteBufferPool();
}
@ -207,13 +206,18 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
private void shutdown()
{
// Close explicitly only if we are idle, since the request may still
// be in progress, otherwise close only if we can fail the responses.
HttpChannelOverFCGI channel = this.channel;
if (channel == null || channel.getRequest() == 0)
close();
else
failAndClose(new EOFException(String.valueOf(getEndPoint())));
// Mark this receiver as shutdown, so that we can
// close the connection when the exchange terminates.
// We cannot close the connection from here because
// the request may still be in process.
shutdown = true;
if (!parser.eof())
channel.eof();
}
boolean isShutdown()
{
return shutdown;
}
@Override
@ -226,27 +230,11 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
return false;
}
protected void release(HttpChannelOverFCGI channel)
protected void release()
{
HttpChannelOverFCGI existing = this.channel;
if (existing == channel)
{
channel.setRequest(0);
// Recycle only non-failed channels.
if (channel.isFailed())
{
channel.destroy();
this.channel = null;
}
destination.release(this);
}
else
{
if (existing == null)
channel.destroy();
else
throw new UnsupportedOperationException("FastCGI Multiplex");
}
// Restore idle timeout
getEndPoint().setIdleTimeout(idleTimeout);
destination.release(this);
}
@Override
@ -260,9 +248,8 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
if (closed.compareAndSet(false, true))
{
getHttpDestination().remove(this);
abort(failure);
channel.destroy();
getEndPoint().shutdownOutput();
if (LOG.isDebugEnabled())
LOG.debug("Shutdown {}", this);
@ -290,62 +277,25 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
return attachment;
}
protected boolean closeByHTTP(HttpFields fields)
protected boolean isCloseByHTTP(HttpFields fields)
{
if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))
return false;
close();
return true;
return fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
}
protected void abort(Throwable failure)
{
HttpChannelOverFCGI channel = this.channel;
if (channel != null)
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
exchange.getRequest().abort(failure);
channel.destroy();
this.channel = null;
}
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
exchange.getRequest().abort(failure);
}
private void failAndClose(Throwable failure)
{
HttpChannelOverFCGI channel = this.channel;
if (channel != null)
channel.responseFailure(failure, Promise.from(failed ->
{
channel.responseFailure(failure, Promise.from(failed ->
{
channel.destroy();
if (failed)
close(failure);
}, x ->
{
channel.destroy();
if (failed)
close(failure);
}));
}
}
private int acquireRequest()
{
try (AutoLock ignored = lock.lock())
{
int last = requests.getLast();
int request = last + 1;
requests.addLast(request);
return request;
}
}
private void releaseRequest(int request)
{
try (AutoLock ignored = lock.lock())
{
requests.removeFirstOccurrence(request);
}
}, x -> close(failure)));
}
private Runnable getAndSetAction(Runnable action)
@ -355,17 +305,9 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
return r;
}
protected HttpChannelOverFCGI acquireHttpChannel(int id, Request request)
protected HttpChannelOverFCGI newHttpChannel()
{
if (channel == null)
channel = newHttpChannel(request);
channel.setRequest(id);
return channel;
}
protected HttpChannelOverFCGI newHttpChannel(Request request)
{
return new HttpChannelOverFCGI(this, getFlusher(), request.getIdleTimeout());
return new HttpChannelOverFCGI(this);
}
@Override
@ -388,8 +330,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
@Override
protected Iterator<HttpChannel> getHttpChannels()
{
HttpChannel channel = HttpConnectionOverFCGI.this.channel;
return channel == null ? Collections.emptyIterator() : Collections.singleton(channel).iterator();
return Collections.<HttpChannel>singleton(channel).iterator();
}
@Override
@ -398,9 +339,14 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
HttpRequest request = exchange.getRequest();
normalizeRequest(request);
int id = acquireRequest();
HttpChannelOverFCGI channel = acquireHttpChannel(id, request);
// Save the old idle timeout to restore it.
EndPoint endPoint = getEndPoint();
idleTimeout = endPoint.getIdleTimeout();
long requestIdleTimeout = request.getIdleTimeout();
if (requestIdleTimeout >= 0)
endPoint.setIdleTimeout(requestIdleTimeout);
channel.setRequest(requests.incrementAndGet());
return send(channel, exchange);
}
@ -431,11 +377,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
if (LOG.isDebugEnabled())
LOG.debug("onBegin r={},c={},reason={}", request, code, reason);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
channel.responseBegin(code, reason);
else
noChannel(request);
channel.responseBegin(code, reason);
}
@Override
@ -443,11 +385,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
if (LOG.isDebugEnabled())
LOG.debug("onHeader r={},f={}", request, field);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
channel.responseHeader(field);
else
noChannel(request);
channel.responseHeader(field);
}
@Override
@ -455,15 +393,9 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
if (LOG.isDebugEnabled())
LOG.debug("onHeaders r={} {}", request, networkBuffer);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
{
if (getAndSetAction(channel::responseHeaders) != null)
throw new IllegalStateException();
return true;
}
noChannel(request);
return false;
if (getAndSetAction(channel::responseHeaders) != null)
throw new IllegalStateException();
return true;
}
@Override
@ -475,21 +407,13 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
case STD_OUT ->
{
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
{
// No need to call networkBuffer.retain() here, since we know
// that the action will be run before releasing the networkBuffer.
// The receiver of the chunk decides whether to consume/retain it.
Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, networkBuffer);
if (getAndSetAction(() -> channel.content(chunk)) == null)
return true;
throw new IllegalStateException();
}
else
{
noChannel(request);
}
// No need to call networkBuffer.retain() here, since we know
// that the action will be run before releasing the networkBuffer.
// The receiver of the chunk decides whether to consume/retain it.
Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, networkBuffer);
if (getAndSetAction(() -> channel.content(chunk)) == null)
return true;
throw new IllegalStateException();
}
case STD_ERR -> LOG.info(BufferUtil.toUTF8String(buffer));
default -> throw new IllegalArgumentException();
@ -502,16 +426,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
if (LOG.isDebugEnabled())
LOG.debug("onEnd r={}", request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
{
releaseRequest(request);
channel.end();
}
else
{
noChannel(request);
}
channel.end();
}
@Override
@ -519,25 +434,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
if (LOG.isDebugEnabled())
LOG.debug("onFailure request={}", request, failure);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
{
channel.responseFailure(failure, Promise.from(failed ->
{
if (failed)
releaseRequest(request);
}, x -> releaseRequest(request)));
}
else
{
noChannel(request);
}
}
private void noChannel(int request)
{
if (LOG.isDebugEnabled())
LOG.debug("Channel not found for request {}", request);
failAndClose(failure);
}
}
}

View File

@ -61,6 +61,17 @@ public class HttpReceiverOverFCGI extends HttpReceiver
}
}
@Override
protected void dispose()
{
super.dispose();
if (chunk != null)
{
chunk.release();
chunk = null;
}
}
@Override
public Content.Chunk read(boolean fillInterestIfNeeded)
{

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.fcgi.parser;
import java.io.EOFException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.fcgi.FCGI;
@ -60,7 +61,7 @@ public abstract class Parser
protected final HeaderParser headerParser = new HeaderParser();
private final Listener listener;
private State state = State.HEADER;
private State state = State.INITIAL;
private int padding;
protected Parser(Listener listener)
@ -80,6 +81,12 @@ public abstract class Parser
{
switch (state)
{
case INITIAL ->
{
if (!buffer.hasRemaining())
return false;
state = State.HEADER;
}
case HEADER ->
{
if (!headerParser.parse(buffer))
@ -145,10 +152,19 @@ public abstract class Parser
protected abstract ContentParser findContentParser(FCGI.FrameType frameType);
public boolean eof()
{
if (state == State.INITIAL)
return false;
Throwable failure = new EOFException();
listener.onFailure(headerParser.getRequest(), failure);
return true;
}
private void reset()
{
headerParser.reset();
state = State.HEADER;
state = State.INITIAL;
padding = 0;
}
@ -190,6 +206,6 @@ public abstract class Parser
private enum State
{
HEADER, CONTENT, PADDING
INITIAL, HEADER, CONTENT, PADDING
}
}

View File

@ -270,6 +270,8 @@ public class ServerFCGIConnection extends AbstractConnection implements Connecti
private void releaseInputBuffer()
{
if (networkBuffer == null)
return;
boolean released = networkBuffer.release();
if (LOG.isDebugEnabled())
LOG.debug("releaseInputBuffer {} {}", released, this);
@ -327,6 +329,9 @@ public class ServerFCGIConnection extends AbstractConnection implements Connecti
@Override
public boolean onIdleExpired(TimeoutException timeoutException)
{
HttpStreamOverFCGI stream = this.stream;
if (stream == null)
return true;
Runnable task = stream.getHttpChannel().onIdleTimeout(timeoutException);
if (task != null)
getExecutor().execute(task);

View File

@ -32,7 +32,9 @@ import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.client.AsyncRequestContent;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.Request;
@ -515,6 +517,35 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void testConnectionIdleTimeout() throws Exception
{
long idleTimeout = 1000;
start(new Handler.Abstract()
{
@Override
public boolean handle(org.eclipse.jetty.server.Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
callback.succeeded();
return true;
}
});
connector.setIdleTimeout(idleTimeout);
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(2 * idleTimeout, TimeUnit.MILLISECONDS)
.send();
assertNotNull(response);
assertEquals(200, response.getStatus());
Thread.sleep(2 * idleTimeout);
assertTrue(client.getDestinations().stream()
.map(Destination::getConnectionPool)
.allMatch(ConnectionPool::isEmpty));
}
@Test
public void testConnectionIdleTimeoutIgnored() throws Exception
{
long idleTimeout = 1000;
start(new Handler.Abstract()
@ -522,9 +553,11 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Override
public boolean handle(org.eclipse.jetty.server.Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
// Handler says it will handle the idletimeout
// Handler says it will handle the idle timeout by ignoring it.
request.addIdleTimeoutListener(t -> false);
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
// Sleep an non-integral number of idle timeouts to avoid
// racing with the idle timeout ticking every idle period.
TimeUnit.MILLISECONDS.sleep(idleTimeout * 3 / 2);
callback.succeeded();
return true;
}

View File

@ -59,11 +59,16 @@ import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.RegisterExtension;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AbstractTest
{
@RegisterExtension
public final BeforeTestExecutionCallback printMethodName = context ->
System.err.printf("Running %s.%s() %s%n", context.getRequiredTestClass().getSimpleName(), context.getRequiredTestMethod().getName(), context.getDisplayName());
protected final HttpConfiguration httpConfig = new HttpConfiguration();
protected SslContextFactory.Server sslContextFactoryServer;
protected Server server;

View File

@ -22,7 +22,6 @@ import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.transport.HttpExchange;
import org.eclipse.jetty.client.transport.internal.HttpChannelOverHTTP;
@ -209,9 +208,9 @@ public class HttpChannelAssociationTest extends AbstractTest
return new HttpConnectionOverFCGI(endPoint, destination, promise)
{
@Override
protected HttpChannelOverFCGI newHttpChannel(Request request)
protected HttpChannelOverFCGI newHttpChannel()
{
return new HttpChannelOverFCGI(this, getFlusher(), request.getIdleTimeout())
return new HttpChannelOverFCGI(this)
{
@Override
public boolean associate(HttpExchange exchange)