From 48c77b86087a3527510dfecf8c4240aa99e5615c Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 4 Jan 2018 07:09:08 -0800 Subject: [PATCH] Fixes #2088 - Recycle HTTP/2 channels on the client. (#2089) Removed the distinction between pushed and non-pushed channels; only non-pushed channels are released and recycled if they're not failed. Properly resetting HttpReceiverOverHTTP2. Signed-off-by: Simone Bordet --- .../eclipse/jetty/client/HttpReceiver.java | 5 +++ .../org/eclipse/jetty/client/HttpSender.java | 5 +++ .../client/http/HttpChannelOverHTTP2.java | 15 ++++--- .../client/http/HttpConnectionOverHTTP2.java | 39 +++++++++++++------ .../client/http/HttpReceiverOverHTTP2.java | 16 ++++++-- .../HttpClientTransportOverHTTP2Test.java | 11 ++++-- .../client/HttpChannelAssociationTest.java | 4 +- 7 files changed, 67 insertions(+), 28 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index a2d978ef0d7..40ce6c768bd 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -95,6 +95,11 @@ public abstract class HttpReceiver return channel.getHttpDestination(); } + public boolean isFailed() + { + return responseState.get() == ResponseState.FAILURE; + } + /** * Method to be invoked when the response status code is available. *

diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index bdae1570819..22a50d88ee8 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -86,6 +86,11 @@ public abstract class HttpSender implements AsyncContentProvider.Listener return channel.getHttpExchange(); } + public boolean isFailed() + { + return requestState.get() == RequestState.FAILURE; + } + @Override public void onContent() { diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java index 18fa3c560e2..004ce38f1c3 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java @@ -34,17 +34,15 @@ public class HttpChannelOverHTTP2 extends HttpChannel { private final HttpConnectionOverHTTP2 connection; private final Session session; - private final boolean push; private final HttpSenderOverHTTP2 sender; private final HttpReceiverOverHTTP2 receiver; private Stream stream; - public HttpChannelOverHTTP2(HttpDestination destination, HttpConnectionOverHTTP2 connection, Session session, boolean push) + public HttpChannelOverHTTP2(HttpDestination destination, HttpConnectionOverHTTP2 connection, Session session) { super(destination); this.connection = connection; this.session = session; - this.push = push; this.sender = new HttpSenderOverHTTP2(this); this.receiver = new HttpReceiverOverHTTP2(this); } @@ -86,6 +84,11 @@ public class HttpChannelOverHTTP2 extends HttpChannel this.stream = stream; } + public boolean isFailed() + { + return sender.isFailed() || receiver.isFailed(); + } + @Override public void send() { @@ -97,16 +100,17 @@ public class HttpChannelOverHTTP2 extends HttpChannel @Override public void release() { + setStream(null); connection.release(this); } @Override public boolean abort(HttpExchange exchange, Throwable requestFailure, Throwable responseFailure) { + Stream stream = getStream(); boolean aborted = super.abort(exchange, requestFailure, responseFailure); if (aborted) { - Stream stream = getStream(); if (stream != null) stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); } @@ -117,7 +121,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel public void exchangeTerminated(HttpExchange exchange, Result result) { super.exchangeTerminated(exchange, result); - if (!push) - release(); + release(); } } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index 05f8456185f..3ee9ae3681a 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -19,8 +19,10 @@ package org.eclipse.jetty.http2.client.http; import java.nio.channels.AsynchronousCloseException; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -29,6 +31,7 @@ import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.client.HttpRequest; import org.eclipse.jetty.client.SendFailure; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http2.ErrorCode; @@ -38,7 +41,8 @@ import org.eclipse.jetty.util.thread.Sweeper; public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable { - private final Set channels = ConcurrentHashMap.newKeySet(); + private final Set activeChannels = ConcurrentHashMap.newKeySet(); + private final Queue idleChannels = new ConcurrentLinkedQueue<>(); private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicInteger sweeps = new AtomicInteger(); private final Session session; @@ -57,25 +61,35 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S @Override protected SendFailure send(HttpExchange exchange) { - exchange.getRequest().version(HttpVersion.HTTP_2); - normalizeRequest(exchange.getRequest()); + HttpRequest request = exchange.getRequest(); + request.version(HttpVersion.HTTP_2); + normalizeRequest(request); // One connection maps to N channels, so for each exchange we create a new channel. - HttpChannel channel = newHttpChannel(false); - channels.add(channel); + HttpChannelOverHTTP2 channel = provideHttpChannel(); + activeChannels.add(channel); return send(channel, exchange); } - protected HttpChannelOverHTTP2 newHttpChannel(boolean push) + protected HttpChannelOverHTTP2 provideHttpChannel() { - return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession(), push); + HttpChannelOverHTTP2 channel = idleChannels.poll(); + if (channel == null) + channel = new HttpChannelOverHTTP2(getHttpDestination(), this, getSession()); + return channel; } - protected void release(HttpChannel channel) + protected void release(HttpChannelOverHTTP2 channel) { - channels.remove(channel); - getHttpDestination().release(this); + // Only non-push channels are released. + if (activeChannels.remove(channel)) + { + // Recycle only non-failed channels. + if (!channel.isFailed()) + idleChannels.offer(channel); + getHttpDestination().release(this); + } } @Override @@ -113,13 +127,14 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S private void abort(Throwable failure) { - for (HttpChannel channel : channels) + for (HttpChannel channel : activeChannels) { HttpExchange exchange = channel.getHttpExchange(); if (exchange != null) exchange.getRequest().abort(failure); } - channels.clear(); + activeChannels.clear(); + idleChannels.clear(); } @Override diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index 70913728500..dbf7c5a34cd 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -64,6 +64,13 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen return (HttpChannelOverHTTP2)super.getHttpChannel(); } + @Override + protected void reset() + { + super.reset(); + contentNotifier.reset(); + } + @Override public void onHeaders(Stream stream, HeadersFrame frame) { @@ -114,6 +121,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen HttpRequest request = exchange.getRequest(); MetaData.Request metaData = (MetaData.Request)frame.getMetaData(); HttpRequest pushRequest = (HttpRequest)getHttpDestination().getHttpClient().newRequest(metaData.getURIString()); + // TODO: copy PUSH_PROMISE headers into pushRequest. BiFunction pushListener = request.getPushListener(); if (pushListener != null) @@ -121,7 +129,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen Response.CompleteListener listener = pushListener.apply(request, pushRequest); if (listener != null) { - HttpChannelOverHTTP2 pushChannel = getHttpChannel().getHttpConnection().newHttpChannel(true); + HttpChannelOverHTTP2 pushChannel = getHttpChannel().getHttpConnection().provideHttpChannel(); List listeners = Collections.singletonList(listener); HttpExchange pushExchange = new HttpExchange(getHttpDestination(), pushRequest, listeners); pushChannel.associate(pushExchange); @@ -187,16 +195,16 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen private final Queue queue = new ArrayDeque<>(); private DataInfo dataInfo; - private boolean offer(DataInfo dataInfo) + private void offer(DataInfo dataInfo) { synchronized (this) { - return queue.offer(dataInfo); + queue.offer(dataInfo); } } @Override - protected Action process() throws Exception + protected Action process() { DataInfo dataInfo; synchronized (this) diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java index bf9f2e7e438..08377e71cb7 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java @@ -247,16 +247,19 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest return new HttpConnectionOverHTTP2(destination, session) { @Override - protected HttpChannelOverHTTP2 newHttpChannel(boolean push) + protected HttpChannelOverHTTP2 provideHttpChannel() { - return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession(), push) + return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession()) { @Override public void setStream(Stream stream) { super.setStream(stream); - streamRef.set(stream); - streamLatch.countDown(); + if (stream != null) + { + streamRef.set(stream); + streamLatch.countDown(); + } } }; } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpChannelAssociationTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpChannelAssociationTest.java index c6add504a19..ac0e3d31db3 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpChannelAssociationTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpChannelAssociationTest.java @@ -146,9 +146,9 @@ public class HttpChannelAssociationTest extends AbstractTest return new HttpConnectionOverHTTP2(destination, session) { @Override - protected HttpChannelOverHTTP2 newHttpChannel(boolean push) + protected HttpChannelOverHTTP2 provideHttpChannel() { - return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession(), push) + return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession()) { @Override public boolean associate(HttpExchange exchange)