From 8720fb213cf89a1652f3a0593549d3885c8a7f6b Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 7 Jan 2014 19:43:58 +0100 Subject: [PATCH] 425043 - Track whether pools are used correctly. Introduced LeakDetector and utility classes LeakTrackingConnectionPool and LeakTrackingByteBufferPool to track resource pool leakages. Fixed ConnectionPool to be more precise in closing connections when release() cannot recycle the connection. Fixed a leak in server's HttpConnection in case a request arrives with the Connection: close header: a ByteBuffer was allocated but never released. --- .../eclipse/jetty/client/ConnectionPool.java | 44 ++-- .../client/LeakTrackingConnectionPool.java | 92 ++++++++ .../jetty/client/HttpClientLoadTest.java | 66 +++++- .../jetty/io/LeakTrackingByteBufferPool.java | 70 ++++++ .../eclipse/jetty/server/HttpConnection.java | 9 +- .../org/eclipse/jetty/util/LeakDetector.java | 201 ++++++++++++++++++ .../eclipse/jetty/util/LeakDetectorTest.java | 92 ++++++++ .../test/resources/jetty-logging.properties | 2 +- 8 files changed, 558 insertions(+), 18 deletions(-) create mode 100644 jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/LeakTrackingByteBufferPool.java create mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/LeakDetector.java create mode 100644 jetty-util/src/test/java/org/eclipse/jetty/util/LeakDetectorTest.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java index 053583712a3..42eae391298 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.client; +import java.io.Closeable; import java.io.IOException; import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; @@ -33,9 +34,9 @@ import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class ConnectionPool implements Dumpable +public class ConnectionPool implements Closeable, Dumpable { - private static final Logger LOG = Log.getLogger(ConnectionPool.class); + protected static final Logger LOG = Log.getLogger(ConnectionPool.class); private final AtomicInteger connectionCount = new AtomicInteger(); private final Destination destination; @@ -65,10 +66,14 @@ public class ConnectionPool implements Dumpable public Connection acquire() { - Connection result = acquireIdleConnection(); - if (result != null) - return result; + Connection connection = acquireIdleConnection(); + if (connection == null) + connection = tryCreate(); + return connection; + } + private Connection tryCreate() + { while (true) { int current = connectionCount.get(); @@ -91,8 +96,8 @@ public class ConnectionPool implements Dumpable public void succeeded(Connection connection) { LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection); - activate(connection); - connectionPromise.succeeded(connection); + if (activate(connection)) + connectionPromise.succeeded(connection); } @Override @@ -113,9 +118,9 @@ public class ConnectionPool implements Dumpable private Connection acquireIdleConnection() { Connection connection = idleConnections.pollFirst(); - if (connection != null) - activate(connection); - return connection; + if (connection == null) + return null; + return activate(connection) ? connection : null; } private boolean activate(Connection connection) @@ -123,17 +128,24 @@ public class ConnectionPool implements Dumpable if (activeConnections.offer(connection)) { LOG.debug("Connection active {}", connection); + acquired(connection); return true; } else { LOG.debug("Connection active overflow {}", connection); + connection.close(); return false; } } + protected void acquired(Connection connection) + { + } + public boolean release(Connection connection) { + released(connection); if (activeConnections.remove(connection)) { // Make sure we use "hot" connections first @@ -145,15 +157,23 @@ public class ConnectionPool implements Dumpable else { LOG.debug("Connection idle overflow {}", connection); + connection.close(); } } return false; } + protected void released(Connection connection) + { + } + public boolean remove(Connection connection) { - boolean removed = activeConnections.remove(connection); - removed |= idleConnections.remove(connection); + boolean activeRemoved = activeConnections.remove(connection); + boolean idleRemoved = idleConnections.remove(connection); + if (!idleRemoved) + released(connection); + boolean removed = activeRemoved || idleRemoved; if (removed) { int pooled = connectionCount.decrementAndGet(); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java new file mode 100644 index 00000000000..d2d74ac3f0f --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java @@ -0,0 +1,92 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.client.api.Destination; +import org.eclipse.jetty.util.LeakDetector; +import org.eclipse.jetty.util.Promise; + +public class LeakTrackingConnectionPool extends ConnectionPool +{ + private final LeakDetector leakDetector = new LeakDetector() + { + @Override + protected void leaked(LeakInfo leakInfo) + { + LeakTrackingConnectionPool.this.leaked(leakInfo); + } + }; + + public LeakTrackingConnectionPool(Destination destination, int maxConnections, Promise connectionPromise) + { + super(destination, maxConnections, connectionPromise); + start(); + } + + private void start() + { + try + { + leakDetector.start(); + } + catch (Exception x) + { + throw new RuntimeException(x); + } + } + + @Override + public void close() + { + stop(); + super.close(); + } + + private void stop() + { + try + { + leakDetector.stop(); + } + catch (Exception x) + { + throw new RuntimeException(x); + } + } + + @Override + protected void acquired(Connection connection) + { + if (!leakDetector.acquired(connection)) + LOG.info("Connection {}@{} not tracked", connection, System.identityHashCode(connection)); + } + + @Override + protected void released(Connection connection) + { + if (!leakDetector.released(connection)) + LOG.info("Connection {}@{} released but not acquired", connection, System.identityHashCode(connection)); + } + + protected void leaked(LeakDetector.LeakInfo leakInfo) + { + LOG.info("Connection " + leakInfo.getResourceDescription() + " leaked at:", leakInfo.getStackFrames()); + } +} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientLoadTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientLoadTest.java index 1bbb223b8b1..a16f63fbb27 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientLoadTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientLoadTest.java @@ -28,7 +28,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - +import java.util.concurrent.atomic.AtomicLong; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -37,16 +37,24 @@ import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; import org.eclipse.jetty.client.http.HttpDestinationOverHTTP; import org.eclipse.jetty.client.util.BytesContentProvider; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpScheme; +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.io.LeakTrackingByteBufferPool; +import org.eclipse.jetty.io.MappedByteBufferPool; +import org.eclipse.jetty.server.AbstractConnectionFactory; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.toolchain.test.annotation.Stress; import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.LeakDetector; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -68,12 +76,64 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest @Test public void testIterative() throws Exception { - start(new LoadHandler()); + int cores = Runtime.getRuntime().availableProcessors(); + final AtomicLong leaks = new AtomicLong(); + + start(new LoadHandler()); + server.stop(); + server.removeConnector(connector); + connector = new ServerConnector(server, connector.getExecutor(), connector.getScheduler(), + new LeakTrackingByteBufferPool(new ArrayByteBufferPool()) + { + @Override + protected void leaked(LeakDetector.LeakInfo leakInfo) + { + leaks.incrementAndGet(); + } + }, 1, Math.min(1, cores / 2), AbstractConnectionFactory.getFactories(sslContextFactory, new HttpConnectionFactory())); + server.addConnector(connector); + server.start(); + + client.stop(); + HttpClient newClient = new HttpClient(new HttpClientTransportOverHTTP() + { + @Override + public HttpDestination newHttpDestination(Origin origin) + { + return new HttpDestinationOverHTTP(getHttpClient(), origin) + { + @Override + protected ConnectionPool newConnectionPool(HttpClient client) + { + return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this) + { + @Override + protected void leaked(LeakDetector.LeakInfo resource) + { + leaks.incrementAndGet(); + } + }; + } + }; + } + }, sslContextFactory); + newClient.setExecutor(client.getExecutor()); + client = newClient; + client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool()) + { + @Override + protected void leaked(LeakDetector.LeakInfo leakInfo) + { + super.leaked(leakInfo); + leaks.incrementAndGet(); + } + }); client.setMaxConnectionsPerDestination(32768); client.setMaxRequestsQueuedPerDestination(1024 * 1024); client.setDispatchIO(false); client.setStrictEventOrdering(false); + client.start(); Random random = new Random(); // At least 25k requests to warmup properly (use -XX:+PrintCompilation to verify JIT activity) @@ -90,6 +150,8 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest { run(random, iterations); } + + Assert.assertEquals(0, leaks.get()); } private void run(Random random, int iterations) throws InterruptedException diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/LeakTrackingByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/LeakTrackingByteBufferPool.java new file mode 100644 index 00000000000..7cc33fbe25a --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/LeakTrackingByteBufferPool.java @@ -0,0 +1,70 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.util.LeakDetector; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements ByteBufferPool +{ + private static final Logger LOG = Log.getLogger(LeakTrackingByteBufferPool.class); + + private final LeakDetector leakDetector = new LeakDetector() + { + @Override + protected void leaked(LeakInfo leakInfo) + { + LeakTrackingByteBufferPool.this.leaked(leakInfo); + } + }; + private final ByteBufferPool delegate; + + public LeakTrackingByteBufferPool(ByteBufferPool delegate) + { + this.delegate = delegate; + addBean(leakDetector); + addBean(delegate); + } + + @Override + public ByteBuffer acquire(int size, boolean direct) + { + ByteBuffer buffer = delegate.acquire(size, direct); + if (!leakDetector.acquired(buffer)) + LOG.info("ByteBuffer {}@{} not tracked", buffer, System.identityHashCode(buffer)); + return buffer; + } + + @Override + public void release(ByteBuffer buffer) + { + if (!leakDetector.released(buffer)) + LOG.info("ByteBuffer {}@{} released but not acquired", buffer, System.identityHashCode(buffer)); + delegate.release(buffer); + } + + protected void leaked(LeakDetector.LeakInfo leakInfo) + { + LOG.info("ByteBuffer " + leakInfo.getResourceDescription() + " leaked at:", leakInfo.getStackFrames()); + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 1bc3c19acef..c5710b9335e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -210,8 +210,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http else { // Get a buffer - if (_requestBuffer == null) - _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT); + _requestBuffer = getRequestBuffer(); // fill filled = getEndPoint().fill(_requestBuffer); @@ -232,7 +231,11 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http // if the request suspends, the request/response will be incomplete so the outer loop will exit. suspended = !_channel.handle(); } - + else + { + // We parsed what we could, recycle the request buffer + releaseRequestBuffer(); + } } } catch (EofException e) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/LeakDetector.java b/jetty-util/src/main/java/org/eclipse/jetty/util/LeakDetector.java new file mode 100644 index 00000000000..b0ac94e9b4d --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/LeakDetector.java @@ -0,0 +1,201 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.lang.ref.PhantomReference; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +/** + * A facility to detect improper usage of resource pools. + *

+ * Resource pools usually have a method to acquire a pooled resource + * and a method to released it back to the pool. + *

+ * To detect if client code acquires a resource but never releases it, + * the resource pool can be modified to use a {@link LeakDetector}. + * The modified resource pool should call {@link #acquired(Object)} every time + * the method to acquire a resource is called, and {@link #released(Object)} + * every time the method to release the resource is called. + * {@link LeakDetector} keeps track of these resources and invokes method + * {@link #leaked(org.eclipse.jetty.util.LeakDetector.LeakInfo)} when it detects that a resource + * has been leaked (that is, acquired but never released). + *

+ * To detect whether client code releases a resource without having + * acquired it, the resource pool can be modified to check the return value + * of {@link #released(Object)}: if false, it means that the resource was + * not acquired. + *

+ * IMPLEMENTATION NOTES + *

+ * This class relies on {@link System#identityHashCode(Object)} to create + * a unique id for each resource passed to {@link #acquired(Object)} and + * {@link #released(Object)}. {@link System#identityHashCode(Object)} does + * not guarantee that it will not generate the same number for different + * objects, but in practice the chance of collision is rare. + *

+ * {@link LeakDetector} uses {@link PhantomReference}s to detect leaks. + * {@link PhantomReference}s are enqueued in their {@link ReferenceQueue} + * after they have been garbage collected (differently from + * {@link WeakReference}s that are enqueued before). + * Since the resource is now garbage collected, {@link LeakDetector} checks + * whether it has been released and if not, it reports a leak. + * Using {@link PhantomReference}s is better than overriding {@link #finalize()} + * and works also in those cases where {@link #finalize()} is not + * overridable. + * + * @param the resource type. + */ +public class LeakDetector extends AbstractLifeCycle implements Runnable +{ + private static final Logger LOG = Log.getLogger(LeakDetector.class); + + private final ReferenceQueue queue = new ReferenceQueue<>(); + private final ConcurrentMap resources = new ConcurrentHashMap<>(); + private Thread thread; + + /** + * Tracks the resource as been acquired. + * + * @param resource the resource that has been acquired + * @return whether the resource has been tracked + * @see #released(Object) + */ + public boolean acquired(T resource) + { + String id = id(resource); + return resources.putIfAbsent(id, new LeakInfo(resource, id)) == null; + } + + /** + * Tracks the resource as been released. + * + * @param resource the resource that has been released + * @return whether the resource has been acquired + * @see #acquired(Object) + */ + public boolean released(T resource) + { + String id = id(resource); + return resources.remove(id) != null; + } + + /** + * Generates a unique ID for the given resource. + * + * @param resource the resource to generate the unique ID for + * @return the unique ID of the given resource + */ + protected String id(T resource) + { + return String.valueOf(System.identityHashCode(resource)); + } + + @Override + protected void doStart() throws Exception + { + super.doStart(); + thread = new Thread(this, getClass().getSimpleName()); + thread.setDaemon(true); + thread.start(); + } + + @Override + protected void doStop() throws Exception + { + thread.interrupt(); + super.doStop(); + } + + @Override + public void run() + { + try + { + while (isRunning()) + { + @SuppressWarnings("unchecked") + LeakInfo leakInfo = (LeakInfo)queue.remove(); + LOG.debug("Resource GC'ed: {}", leakInfo); + if (resources.remove(leakInfo.id) != null) + leaked(leakInfo); + } + } + catch (InterruptedException x) + { + // Exit + } + } + + /** + * Callback method invoked by {@link LeakDetector} when it detects that a resource has been leaked. + * + * @param leakInfo the information about the leak + */ + protected void leaked(LeakInfo leakInfo) + { + LOG.warn("Resource leaked: " + leakInfo.description, leakInfo.stackFrames); + } + + /** + * Information about the leak of a resource. + */ + public class LeakInfo extends PhantomReference + { + private final String id; + private final String description; + private final Throwable stackFrames; + + private LeakInfo(T referent, String id) + { + super(referent, queue); + this.id = id; + this.description = referent.toString(); + this.stackFrames = new Throwable(); + } + + /** + * @return the resource description as provided by the resource's {@link Object#toString()} method. + */ + public String getResourceDescription() + { + return description; + } + + /** + * @return a Throwable instance that contains the stack frames at the time of resource acquisition. + */ + public Throwable getStackFrames() + { + return stackFrames; + } + + @Override + public String toString() + { + return description; + } + } +} diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/LeakDetectorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/LeakDetectorTest.java new file mode 100644 index 00000000000..065243e6884 --- /dev/null +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/LeakDetectorTest.java @@ -0,0 +1,92 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; + +public class LeakDetectorTest +{ + private LeakDetector leakDetector; + + public void prepare(LeakDetector leakDetector) throws Exception + { + this.leakDetector = leakDetector; + leakDetector.start(); + } + + public void dispose() throws Exception + { + leakDetector.stop(); + } + + private void gc() + { + for (int i = 0; i < 3; ++i) + System.gc(); + } + + @Test + public void testResourceAcquiredAndReleased() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + prepare(new LeakDetector() + { + @Override + protected void leaked(LeakInfo leakInfo) + { + latch.countDown(); + } + }); + + // Block to make sure "resource" goes out of scope + { + Object resource = new Object(); + leakDetector.acquired(resource); + leakDetector.released(resource); + } + + gc(); + + Assert.assertFalse(latch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testResourceAcquiredAndNotReleased() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + prepare(new LeakDetector() + { + @Override + protected void leaked(LeakInfo leakInfo) + { + latch.countDown(); + } + }); + + leakDetector.acquired(new Object()); + + gc(); + + Assert.assertTrue(latch.await(1, TimeUnit.SECONDS)); + } +} diff --git a/jetty-util/src/test/resources/jetty-logging.properties b/jetty-util/src/test/resources/jetty-logging.properties index 239f2ba365d..3a0ce97e2e0 100644 --- a/jetty-util/src/test/resources/jetty-logging.properties +++ b/jetty-util/src/test/resources/jetty-logging.properties @@ -1,3 +1,3 @@ # Setup default logging implementation for during testing org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog -#org.eclipse.jetty.io.LEVEL=DEBUG +#org.eclipse.jetty.util.LEVEL=DEBUG