From 850e70f27ce9337235614f5a4289f97aab39bfa6 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 16 Oct 2017 18:21:36 +0200 Subject: [PATCH] Fixes #1897 - Introduce a round-robin connection pool for HttpClient. --- .../client/RoundRobinConnectionPool.java | 208 +++++++++++++++++ .../jetty/client/ConnectionPoolTest.java | 216 ++++++++++++++++++ 2 files changed, 424 insertions(+) create mode 100644 jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java create mode 100644 jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java new file mode 100644 index 00000000000..52f83cd0645 --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -0,0 +1,208 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.client.api.Destination; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.component.ContainerLifeCycle; + +public class RoundRobinConnectionPool extends AbstractConnectionPool +{ + private final List entries; + private int index; + + public RoundRobinConnectionPool(Destination destination, int maxConnections, Callback requester) + { + super(destination, maxConnections, requester); + entries = new ArrayList<>(maxConnections); + for (int i = 0; i < maxConnections; ++i) + entries.add(new Entry()); + } + + @Override + protected void onCreated(Connection connection) + { + synchronized (this) + { + for (Entry entry : entries) + { + if (entry.connection == null) + { + entry.connection = connection; + break; + } + } + } + idle(connection, false); + } + + @Override + protected Connection activate() + { + Connection connection = null; + synchronized (this) + { + int offset = 0; + int capacity = getMaxConnectionCount(); + while (offset < capacity) + { + int idx = index + offset; + if (idx >= capacity) + idx -= capacity; + + Entry entry = entries.get(idx); + + if (entry.connection == null) + break; + + if (!entry.active) + { + entry.active = true; + entry.used++; + connection = entry.connection; + index += offset + 1; + if (index >= capacity) + index -= capacity; + break; + } + + ++offset; + } + } + return connection == null ? null : active(connection); + } + + @Override + public boolean isActive(Connection connection) + { + synchronized (this) + { + for (Entry entry : entries) + { + if (entry.connection == connection) + return entry.active; + } + return false; + } + } + + @Override + public boolean release(Connection connection) + { + boolean released = false; + synchronized (this) + { + for (Entry entry : entries) + { + if (entry.connection == connection) + { + entry.active = false; + released = true; + break; + } + } + } + if (released) + released(connection); + return idle(connection, isClosed()); + } + + @Override + public boolean remove(Connection connection) + { + boolean removed = false; + boolean active = false; + synchronized (this) + { + for (Entry entry : entries) + { + if (entry.connection == connection) + { + active = entry.active; + entry.connection = null; + entry.active = false; + entry.used = 0; + removed = true; + break; + } + } + } + if (active) + released(connection); + if (removed) + removed(connection); + return removed; + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + List connections = new ArrayList<>(); + synchronized (this) + { + connections.addAll(entries); + } + ContainerLifeCycle.dumpObject(out, this); + ContainerLifeCycle.dump(out, indent, connections); + } + + @Override + public String toString() + { + int present = 0; + int active = 0; + synchronized (this) + { + for (Entry entry : entries) + { + if (entry.connection != null) + { + ++present; + if (entry.active) + ++active; + } + } + } + return String.format("%s@%x[c=%d/%d,a=%d]", + getClass().getSimpleName(), + hashCode(), + present, + getMaxConnectionCount(), + active + ); + } + + private static class Entry + { + private Connection connection; + private boolean active; + private long used; + + @Override + public String toString() + { + return String.format("{u=%d,c=%s}", used, connection); + } + } +} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java new file mode 100644 index 00000000000..2cf61f70a28 --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java @@ -0,0 +1,216 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; +import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.client.util.FutureResponseListener; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.util.IO; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ConnectionPoolTest +{ + private Server server; + private ServerConnector connector; + private HttpClient client; + + @Parameterized.Parameters + public static ConnectionPool.Factory[] parameters() + { + return new ConnectionPool.Factory[] + { + destination -> new DuplexConnectionPool(destination, 8, destination), + destination -> new RoundRobinConnectionPool(destination, 8, destination) + }; + } + + private final ConnectionPool.Factory factory; + + public ConnectionPoolTest(ConnectionPool.Factory factory) + { + this.factory = factory; + } + + private void start(Handler handler) throws Exception + { + server = new Server(); + connector = new ServerConnector(server); + server.addConnector(connector); + server.setHandler(handler); + + HttpClientTransport transport = new HttpClientTransportOverHTTP(1); + transport.setConnectionPoolFactory(factory); + client = new HttpClient(transport, null); + server.addBean(client); + + server.start(); + } + + @After + public void dispose() throws Exception + { + if (server != null) + server.stop(); + } + + @Test + public void test() throws Exception + { + start(new EmptyServerHandler() + { + @Override + protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + switch (HttpMethod.fromString(request.getMethod())) + { + case GET: + { + int contentLength = request.getIntHeader("X-Download"); + if (contentLength > 0) + { + response.setContentLength(contentLength); + response.getOutputStream().write(new byte[contentLength]); + } + break; + } + case POST: + { + int contentLength = request.getContentLength(); + if (contentLength > 0) + response.setContentLength(contentLength); + IO.copy(request.getInputStream(), response.getOutputStream()); + break; + } + default: + { + throw new IllegalStateException(); + } + } + + if (Boolean.parseBoolean(request.getHeader("X-Close"))) + response.setHeader("Connection", "close"); + } + }); + + int parallelism = 16; + int runs = 2; + int iterations = 1024; + CountDownLatch latch = new CountDownLatch(parallelism * runs); + List failures = new CopyOnWriteArrayList<>(); + IntStream.range(0, parallelism).parallel().forEach(i -> + IntStream.range(0, runs).forEach(j -> + run(latch, iterations, failures))); + Assert.assertTrue(latch.await(iterations, TimeUnit.SECONDS)); + Assert.assertTrue(failures.toString(), failures.isEmpty()); + } + + private void run(CountDownLatch latch, int iterations, List failures) + { + long begin = System.nanoTime(); + for (int i = 0; i < iterations; ++i) + test(failures); + long end = System.nanoTime(); + long elapsed = TimeUnit.NANOSECONDS.toMillis(end - begin); + System.err.printf("%d requests in %d ms, %.3f req/s%n", iterations, elapsed, elapsed > 0 ? iterations * 1000D / elapsed : -1D); + latch.countDown(); + } + + private void test(List failures) + { + ThreadLocalRandom random = ThreadLocalRandom.current(); + // Choose a random method. + HttpMethod method = random.nextBoolean() ? HttpMethod.GET : HttpMethod.POST; + + // Choose randomly whether to close the connection on the client or on the server. + boolean clientClose = false; + if (random.nextInt(100) < 1) + clientClose = true; + boolean serverClose = false; + if (random.nextInt(100) < 1) + serverClose = true; + + int maxContentLength = 64 * 1024; + int contentLength = random.nextInt(maxContentLength) + 1; + + test(method, clientClose, serverClose, contentLength, failures); + } + + private void test(HttpMethod method, boolean clientClose, boolean serverClose, int contentLength, List failures) + { + Request request = client.newRequest("localhost", connector.getLocalPort()) + .path("/") + .method(method); + + if (clientClose) + request.header(HttpHeader.CONNECTION, "close"); + else if (serverClose) + request.header("X-Close", "true"); + + switch (method) + { + case GET: + request.header("X-Download", String.valueOf(contentLength)); + break; + case POST: + request.header(HttpHeader.CONTENT_LENGTH, String.valueOf(contentLength)); + request.content(new BytesContentProvider(new byte[contentLength])); + break; + default: + throw new IllegalStateException(); + } + + FutureResponseListener listener = new FutureResponseListener(request, contentLength); + request.send(listener); + + try + { + ContentResponse response = listener.get(5, TimeUnit.SECONDS); + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + } + catch (Throwable x) + { + failures.add(x); + } + } +}