diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index b2e58c80220..e92e0ec776a 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -429,9 +429,14 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest { // Trigger the next request after releasing the connection. if (connectionPool.release(connection)) + { send(false); + } else + { connection.close(); + send(true); + } } else { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java index 825598b484e..73a31d69cf2 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -18,12 +18,11 @@ package org.eclipse.jetty.client; -import java.util.concurrent.atomic.AtomicInteger; - import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +31,9 @@ public class RoundRobinConnectionPool extends MultiplexConnectionPool { private static final Logger LOG = LoggerFactory.getLogger(RoundRobinConnectionPool.class); - private final AtomicInteger offset = new AtomicInteger(); + private final AutoLock lock = new AutoLock(); private final Pool pool; + private int offset; public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester) { @@ -47,26 +47,32 @@ public class RoundRobinConnectionPool extends MultiplexConnectionPool } @Override - protected Connection activate() + public Connection acquire(boolean create) { - int offset = this.offset.get(); - Connection connection = activate(offset); - if (connection != null) - this.offset.getAndIncrement(); - return connection; + // If there are queued requests and connections get + // closed due to idle timeout or overuse, we want to + // aggressively try to open new connections to replace + // those that were closed to process queued requests. + return super.acquire(true); } - private Connection activate(int offset) + @Override + protected Connection activate() { - Pool.Entry entry = pool.acquireAt(Math.abs(offset % pool.getMaxEntries())); - if (LOG.isDebugEnabled()) - LOG.debug("activated '{}'", entry); - if (entry != null) + Pool.Entry entry; + try (AutoLock l = lock.lock()) { - Connection connection = entry.getPooled(); - acquired(connection); - return connection; + int index = Math.abs(offset % pool.getMaxEntries()); + entry = pool.acquireAt(index); + if (LOG.isDebugEnabled()) + LOG.debug("activated at index={} entry={}", index, entry); + if (entry != null) + ++offset; } - return null; + if (entry == null) + return null; + Connection connection = entry.getPooled(); + acquired(connection); + return connection; } } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java index c70d75da2c3..1f297def171 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java @@ -110,12 +110,9 @@ public class HttpChannelOverHTTP2 extends HttpChannel @Override public void release() { + setStream(null); connection.release(this); - } - - void onStreamClosed(IStream stream) - { - connection.onStreamClosed(stream, this); + getHttpDestination().release(getHttpConnection()); } @Override @@ -215,12 +212,5 @@ public class HttpChannelOverHTTP2 extends HttpChannel HTTP2Channel.Client channel = (HTTP2Channel.Client)((IStream)stream).getAttachment(); channel.onFailure(failure, callback); } - - @Override - public void onClosed(Stream stream) - { - // TODO: needs to call HTTP2Channel? - receiver.onClosed(stream); - } } } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index 5c73784bc72..814105919e4 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -167,16 +167,6 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S } } - void onStreamClosed(IStream stream, HttpChannelOverHTTP2 channel) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} closed for {}", stream, channel); - channel.setStream(null); - // Only non-push channels are released. - if (stream.isLocal()) - getHttpDestination().release(this); - } - @Override public boolean onIdleTimeout(long idleTimeout) { @@ -239,9 +229,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S { if (!isClosed()) return false; - if (sweeps.incrementAndGet() < 4) - return false; - return true; + return sweeps.incrementAndGet() >= 4; } @Override diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index b55924a5dfb..cd550b8e511 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -235,11 +235,6 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. callback.succeeded(); } - void onClosed(Stream stream) - { - getHttpChannel().onStreamClosed((IStream)stream); - } - private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback) { contentNotifier.offer(exchange, frame, callback); diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java index 9580a98b297..85d52a1b7a2 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java @@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -180,4 +182,56 @@ public class RoundRobinConnectionPoolTest extends AbstractTest remotePorts = new CopyOnWriteArrayList<>(); + scenario.start(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) + { + remotePorts.add(request.getRemotePort()); + } + }); + scenario.client.getTransport().setConnectionPoolFactory(destination -> + { + RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex); + pool.setMaxUsageCount(maxUsage); + return pool; + }); + + CountDownLatch clientLatch = new CountDownLatch(count); + for (int i = 0; i < count; ++i) + { + scenario.client.newRequest(scenario.newURI()) + .path("/" + i) + .timeout(5, TimeUnit.SECONDS) + .send(result -> + { + if (result.getResponse().getStatus() == HttpStatus.OK_200) + clientLatch.countDown(); + }); + } + assertTrue(clientLatch.await(count, TimeUnit.SECONDS)); + assertEquals(count, remotePorts.size()); + + Map results = remotePorts.stream() + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + assertEquals(count / maxUsage, results.size(), remotePorts.toString()); + assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString()); + } }