Fixes #2088 - Recycle HTTP/2 channels on the client.

Recycled channels also for FCGI.
Small improvements to HTTP/2 too.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2018-01-05 17:19:06 +01:00
parent a1a3cb7c80
commit c35b832251
6 changed files with 62 additions and 28 deletions

View File

@ -38,18 +38,17 @@ public class HttpChannelOverFCGI extends HttpChannel
{ {
private final HttpConnectionOverFCGI connection; private final HttpConnectionOverFCGI connection;
private final Flusher flusher; private final Flusher flusher;
private final int request;
private final HttpSenderOverFCGI sender; private final HttpSenderOverFCGI sender;
private final HttpReceiverOverFCGI receiver; private final HttpReceiverOverFCGI receiver;
private final FCGIIdleTimeout idle; private final FCGIIdleTimeout idle;
private int request;
private HttpVersion version; private HttpVersion version;
public HttpChannelOverFCGI(final HttpConnectionOverFCGI connection, Flusher flusher, int request, long idleTimeout) public HttpChannelOverFCGI(final HttpConnectionOverFCGI connection, Flusher flusher, long idleTimeout)
{ {
super(connection.getHttpDestination()); super(connection.getHttpDestination());
this.connection = connection; this.connection = connection;
this.flusher = flusher; this.flusher = flusher;
this.request = request;
this.sender = new HttpSenderOverFCGI(this); this.sender = new HttpSenderOverFCGI(this);
this.receiver = new HttpReceiverOverFCGI(this); this.receiver = new HttpReceiverOverFCGI(this);
this.idle = new FCGIIdleTimeout(connection, idleTimeout); this.idle = new FCGIIdleTimeout(connection, idleTimeout);
@ -60,6 +59,11 @@ public class HttpChannelOverFCGI extends HttpChannel
return request; return request;
} }
void setRequest(int request)
{
this.request = request;
}
@Override @Override
protected HttpSender getHttpSender() protected HttpSender getHttpSender()
{ {
@ -72,6 +76,11 @@ public class HttpChannelOverFCGI extends HttpChannel
return receiver; return receiver;
} }
public boolean isFailed()
{
return sender.isFailed() || receiver.isFailed();
}
@Override @Override
public void send() public void send()
{ {

View File

@ -23,7 +23,9 @@ import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousCloseException;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -56,7 +58,8 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class); private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class);
private final LinkedList<Integer> requests = new LinkedList<>(); private final LinkedList<Integer> requests = new LinkedList<>();
private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>(); private final Map<Integer, HttpChannelOverFCGI> activeChannels = new ConcurrentHashMap<>();
private final Queue<HttpChannelOverFCGI> idleChannels = new ConcurrentLinkedQueue<>();
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
private final HttpDestination destination; private final HttpDestination destination;
private final Promise<Connection> promise; private final Promise<Connection> promise;
@ -184,7 +187,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
{ {
// Close explicitly only if we are idle, since the request may still // Close explicitly only if we are idle, since the request may still
// be in progress, otherwise close only if we can fail the responses. // be in progress, otherwise close only if we can fail the responses.
if (channels.isEmpty()) if (activeChannels.isEmpty())
close(); close();
else else
failAndClose(new EOFException(String.valueOf(getEndPoint()))); failAndClose(new EOFException(String.valueOf(getEndPoint())));
@ -204,8 +207,14 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
protected void release(HttpChannelOverFCGI channel) protected void release(HttpChannelOverFCGI channel)
{ {
channels.remove(channel.getRequest()); if (activeChannels.remove(channel.getRequest()) != null)
destination.release(this); {
channel.setRequest(0);
// Recycle only non-failed channels.
if (!channel.isFailed())
idleChannels.offer(channel);
destination.release(this);
}
} }
@Override @Override
@ -249,19 +258,20 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
protected void abort(Throwable failure) protected void abort(Throwable failure)
{ {
for (HttpChannelOverFCGI channel : channels.values()) for (HttpChannelOverFCGI channel : activeChannels.values())
{ {
HttpExchange exchange = channel.getHttpExchange(); HttpExchange exchange = channel.getHttpExchange();
if (exchange != null) if (exchange != null)
exchange.getRequest().abort(failure); exchange.getRequest().abort(failure);
} }
channels.clear(); activeChannels.clear();
idleChannels.clear();
} }
private void failAndClose(Throwable failure) private void failAndClose(Throwable failure)
{ {
boolean result = false; boolean result = false;
for (HttpChannelOverFCGI channel : channels.values()) for (HttpChannelOverFCGI channel : activeChannels.values())
result |= channel.responseFailure(failure); result |= channel.responseFailure(failure);
if (result) if (result)
close(failure); close(failure);
@ -286,9 +296,18 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
} }
} }
protected HttpChannelOverFCGI newHttpChannel(int id, Request request) protected HttpChannelOverFCGI provideHttpChannel(int id, Request request)
{ {
return new HttpChannelOverFCGI(this, getFlusher(), id, request.getIdleTimeout()); HttpChannelOverFCGI channel = idleChannels.poll();
if (channel == null)
channel = newHttpChannel(request);
channel.setRequest(id);
return channel;
}
protected HttpChannelOverFCGI newHttpChannel(Request request)
{
return new HttpChannelOverFCGI(this, getFlusher(), request.getIdleTimeout());
} }
@Override @Override
@ -314,10 +333,10 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
Request request = exchange.getRequest(); Request request = exchange.getRequest();
normalizeRequest(request); normalizeRequest(request);
// FCGI may be multiplexed, so create one channel for each request. // FCGI may be multiplexed, so one channel for each exchange.
int id = acquireRequest(); int id = acquireRequest();
HttpChannelOverFCGI channel = newHttpChannel(id, request); HttpChannelOverFCGI channel = provideHttpChannel(id, request);
channels.put(id, channel); activeChannels.put(id, channel);
return send(channel, exchange); return send(channel, exchange);
} }
@ -351,7 +370,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
@Override @Override
public void onBegin(int request, int code, String reason) public void onBegin(int request, int code, String reason)
{ {
HttpChannelOverFCGI channel = channels.get(request); HttpChannelOverFCGI channel = activeChannels.get(request);
if (channel != null) if (channel != null)
channel.responseBegin(code, reason); channel.responseBegin(code, reason);
else else
@ -361,7 +380,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
@Override @Override
public void onHeader(int request, HttpField field) public void onHeader(int request, HttpField field)
{ {
HttpChannelOverFCGI channel = channels.get(request); HttpChannelOverFCGI channel = activeChannels.get(request);
if (channel != null) if (channel != null)
channel.responseHeader(field); channel.responseHeader(field);
else else
@ -371,7 +390,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
@Override @Override
public void onHeaders(int request) public void onHeaders(int request)
{ {
HttpChannelOverFCGI channel = channels.get(request); HttpChannelOverFCGI channel = activeChannels.get(request);
if (channel != null) if (channel != null)
channel.responseHeaders(); channel.responseHeaders();
else else
@ -385,7 +404,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
{ {
case STD_OUT: case STD_OUT:
{ {
HttpChannelOverFCGI channel = channels.get(request); HttpChannelOverFCGI channel = activeChannels.get(request);
if (channel != null) if (channel != null)
{ {
CompletableCallback callback = new CompletableCallback() CompletableCallback callback = new CompletableCallback()
@ -431,7 +450,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
@Override @Override
public void onEnd(int request) public void onEnd(int request)
{ {
HttpChannelOverFCGI channel = channels.get(request); HttpChannelOverFCGI channel = activeChannels.get(request);
if (channel != null) if (channel != null)
{ {
if (channel.responseSuccess()) if (channel.responseSuccess())
@ -446,7 +465,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
@Override @Override
public void onFailure(int request, Throwable failure) public void onFailure(int request, Throwable failure)
{ {
HttpChannelOverFCGI channel = channels.get(request); HttpChannelOverFCGI channel = activeChannels.get(request);
if (channel != null) if (channel != null)
{ {
if (channel.responseFailure(failure)) if (channel.responseFailure(failure))

View File

@ -100,7 +100,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel
@Override @Override
public void release() public void release()
{ {
setStream(null);
connection.release(this); connection.release(this);
} }

View File

@ -65,7 +65,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
request.version(HttpVersion.HTTP_2); request.version(HttpVersion.HTTP_2);
normalizeRequest(request); normalizeRequest(request);
// One connection maps to N channels, so for each exchange we create a new channel. // One connection maps to N channels, so one channel for each exchange.
HttpChannelOverHTTP2 channel = provideHttpChannel(); HttpChannelOverHTTP2 channel = provideHttpChannel();
activeChannels.add(channel); activeChannels.add(channel);
@ -76,15 +76,21 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
{ {
HttpChannelOverHTTP2 channel = idleChannels.poll(); HttpChannelOverHTTP2 channel = idleChannels.poll();
if (channel == null) if (channel == null)
channel = new HttpChannelOverHTTP2(getHttpDestination(), this, getSession()); channel = newHttpChannel();
return channel; return channel;
} }
protected HttpChannelOverHTTP2 newHttpChannel()
{
return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession());
}
protected void release(HttpChannelOverHTTP2 channel) protected void release(HttpChannelOverHTTP2 channel)
{ {
// Only non-push channels are released. // Only non-push channels are released.
if (activeChannels.remove(channel)) if (activeChannels.remove(channel))
{ {
channel.setStream(null);
// Recycle only non-failed channels. // Recycle only non-failed channels.
if (!channel.isFailed()) if (!channel.isFailed())
idleChannels.offer(channel); idleChannels.offer(channel);

View File

@ -247,7 +247,7 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
return new HttpConnectionOverHTTP2(destination, session) return new HttpConnectionOverHTTP2(destination, session)
{ {
@Override @Override
protected HttpChannelOverHTTP2 provideHttpChannel() protected HttpChannelOverHTTP2 newHttpChannel()
{ {
return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession()) return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession())
{ {

View File

@ -27,6 +27,7 @@ import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpChannelOverHTTP; import org.eclipse.jetty.client.http.HttpChannelOverHTTP;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
@ -146,7 +147,7 @@ public class HttpChannelAssociationTest extends AbstractTest
return new HttpConnectionOverHTTP2(destination, session) return new HttpConnectionOverHTTP2(destination, session)
{ {
@Override @Override
protected HttpChannelOverHTTP2 provideHttpChannel() protected HttpChannelOverHTTP2 newHttpChannel()
{ {
return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession()) return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession())
{ {
@ -171,9 +172,9 @@ public class HttpChannelAssociationTest extends AbstractTest
return new HttpConnectionOverFCGI(endPoint, destination, promise, isMultiplexed()) return new HttpConnectionOverFCGI(endPoint, destination, promise, isMultiplexed())
{ {
@Override @Override
protected HttpChannelOverFCGI newHttpChannel(int id, org.eclipse.jetty.client.api.Request request) protected HttpChannelOverFCGI newHttpChannel(Request request)
{ {
return new HttpChannelOverFCGI(this, getFlusher(), id, request.getIdleTimeout()) return new HttpChannelOverFCGI(this, getFlusher(), request.getIdleTimeout())
{ {
@Override @Override
public boolean associate(HttpExchange exchange) public boolean associate(HttpExchange exchange)