425043 - Track whether pools are used correctly.

Introduced LeakDetector and utility classes LeakTrackingConnectionPool
and LeakTrackingByteBufferPool to track resource pool leakages.

Fixed ConnectionPool to be more precise in closing connections when
release() cannot recycle the connection.

Fixed a leak in server's HttpConnection in case a request arrives with
the Connection: close header: a ByteBuffer was allocated but never
released.
This commit is contained in:
Simone Bordet 2014-01-07 19:43:58 +01:00
parent 6cb93e026f
commit 8720fb213c
8 changed files with 558 additions and 18 deletions

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
@ -33,9 +34,9 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class ConnectionPool implements Dumpable
public class ConnectionPool implements Closeable, Dumpable
{
private static final Logger LOG = Log.getLogger(ConnectionPool.class);
protected static final Logger LOG = Log.getLogger(ConnectionPool.class);
private final AtomicInteger connectionCount = new AtomicInteger();
private final Destination destination;
@ -65,10 +66,14 @@ public class ConnectionPool implements Dumpable
public Connection acquire()
{
Connection result = acquireIdleConnection();
if (result != null)
return result;
Connection connection = acquireIdleConnection();
if (connection == null)
connection = tryCreate();
return connection;
}
private Connection tryCreate()
{
while (true)
{
int current = connectionCount.get();
@ -91,7 +96,7 @@ public class ConnectionPool implements Dumpable
public void succeeded(Connection connection)
{
LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
activate(connection);
if (activate(connection))
connectionPromise.succeeded(connection);
}
@ -113,9 +118,9 @@ public class ConnectionPool implements Dumpable
private Connection acquireIdleConnection()
{
Connection connection = idleConnections.pollFirst();
if (connection != null)
activate(connection);
return connection;
if (connection == null)
return null;
return activate(connection) ? connection : null;
}
private boolean activate(Connection connection)
@ -123,17 +128,24 @@ public class ConnectionPool implements Dumpable
if (activeConnections.offer(connection))
{
LOG.debug("Connection active {}", connection);
acquired(connection);
return true;
}
else
{
LOG.debug("Connection active overflow {}", connection);
connection.close();
return false;
}
}
protected void acquired(Connection connection)
{
}
public boolean release(Connection connection)
{
released(connection);
if (activeConnections.remove(connection))
{
// Make sure we use "hot" connections first
@ -145,15 +157,23 @@ public class ConnectionPool implements Dumpable
else
{
LOG.debug("Connection idle overflow {}", connection);
connection.close();
}
}
return false;
}
protected void released(Connection connection)
{
}
public boolean remove(Connection connection)
{
boolean removed = activeConnections.remove(connection);
removed |= idleConnections.remove(connection);
boolean activeRemoved = activeConnections.remove(connection);
boolean idleRemoved = idleConnections.remove(connection);
if (!idleRemoved)
released(connection);
boolean removed = activeRemoved || idleRemoved;
if (removed)
{
int pooled = connectionCount.decrementAndGet();

View File

@ -0,0 +1,92 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.Promise;
public class LeakTrackingConnectionPool extends ConnectionPool
{
private final LeakDetector<Connection> leakDetector = new LeakDetector<Connection>()
{
@Override
protected void leaked(LeakInfo leakInfo)
{
LeakTrackingConnectionPool.this.leaked(leakInfo);
}
};
public LeakTrackingConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise)
{
super(destination, maxConnections, connectionPromise);
start();
}
private void start()
{
try
{
leakDetector.start();
}
catch (Exception x)
{
throw new RuntimeException(x);
}
}
@Override
public void close()
{
stop();
super.close();
}
private void stop()
{
try
{
leakDetector.stop();
}
catch (Exception x)
{
throw new RuntimeException(x);
}
}
@Override
protected void acquired(Connection connection)
{
if (!leakDetector.acquired(connection))
LOG.info("Connection {}@{} not tracked", connection, System.identityHashCode(connection));
}
@Override
protected void released(Connection connection)
{
if (!leakDetector.released(connection))
LOG.info("Connection {}@{} released but not acquired", connection, System.identityHashCode(connection));
}
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
LOG.info("Connection " + leakInfo.getResourceDescription() + " leaked at:", leakInfo.getStackFrames());
}
}

View File

@ -28,7 +28,7 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -37,16 +37,24 @@ import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.toolchain.test.annotation.Stress;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -68,12 +76,64 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
@Test
public void testIterative() throws Exception
{
start(new LoadHandler());
int cores = Runtime.getRuntime().availableProcessors();
final AtomicLong leaks = new AtomicLong();
start(new LoadHandler());
server.stop();
server.removeConnector(connector);
connector = new ServerConnector(server, connector.getExecutor(), connector.getScheduler(),
new LeakTrackingByteBufferPool(new ArrayByteBufferPool())
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
leaks.incrementAndGet();
}
}, 1, Math.min(1, cores / 2), AbstractConnectionFactory.getFactories(sslContextFactory, new HttpConnectionFactory()));
server.addConnector(connector);
server.start();
client.stop();
HttpClient newClient = new HttpClient(new HttpClientTransportOverHTTP()
{
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new HttpDestinationOverHTTP(getHttpClient(), origin)
{
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo resource)
{
leaks.incrementAndGet();
}
};
}
};
}
}, sslContextFactory);
newClient.setExecutor(client.getExecutor());
client = newClient;
client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool())
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
super.leaked(leakInfo);
leaks.incrementAndGet();
}
});
client.setMaxConnectionsPerDestination(32768);
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
client.setDispatchIO(false);
client.setStrictEventOrdering(false);
client.start();
Random random = new Random();
// At least 25k requests to warmup properly (use -XX:+PrintCompilation to verify JIT activity)
@ -90,6 +150,8 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
{
run(random, iterations);
}
Assert.assertEquals(0, leaks.get());
}
private void run(Random random, int iterations) throws InterruptedException

View File

@ -0,0 +1,70 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements ByteBufferPool
{
private static final Logger LOG = Log.getLogger(LeakTrackingByteBufferPool.class);
private final LeakDetector<ByteBuffer> leakDetector = new LeakDetector<ByteBuffer>()
{
@Override
protected void leaked(LeakInfo leakInfo)
{
LeakTrackingByteBufferPool.this.leaked(leakInfo);
}
};
private final ByteBufferPool delegate;
public LeakTrackingByteBufferPool(ByteBufferPool delegate)
{
this.delegate = delegate;
addBean(leakDetector);
addBean(delegate);
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{
ByteBuffer buffer = delegate.acquire(size, direct);
if (!leakDetector.acquired(buffer))
LOG.info("ByteBuffer {}@{} not tracked", buffer, System.identityHashCode(buffer));
return buffer;
}
@Override
public void release(ByteBuffer buffer)
{
if (!leakDetector.released(buffer))
LOG.info("ByteBuffer {}@{} released but not acquired", buffer, System.identityHashCode(buffer));
delegate.release(buffer);
}
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
LOG.info("ByteBuffer " + leakInfo.getResourceDescription() + " leaked at:", leakInfo.getStackFrames());
}
}

View File

@ -210,8 +210,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
else
{
// Get a buffer
if (_requestBuffer == null)
_requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
_requestBuffer = getRequestBuffer();
// fill
filled = getEndPoint().fill(_requestBuffer);
@ -232,7 +231,11 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// if the request suspends, the request/response will be incomplete so the outer loop will exit.
suspended = !_channel.handle();
}
else
{
// We parsed what we could, recycle the request buffer
releaseRequestBuffer();
}
}
}
catch (EofException e)

View File

@ -0,0 +1,201 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* A facility to detect improper usage of resource pools.
* <p>
* Resource pools usually have a method to acquire a pooled resource
* and a method to released it back to the pool.
* <p>
* To detect if client code acquires a resource but never releases it,
* the resource pool can be modified to use a {@link LeakDetector}.
* The modified resource pool should call {@link #acquired(Object)} every time
* the method to acquire a resource is called, and {@link #released(Object)}
* every time the method to release the resource is called.
* {@link LeakDetector} keeps track of these resources and invokes method
* {@link #leaked(org.eclipse.jetty.util.LeakDetector.LeakInfo)} when it detects that a resource
* has been leaked (that is, acquired but never released).
* <p>
* To detect whether client code releases a resource without having
* acquired it, the resource pool can be modified to check the return value
* of {@link #released(Object)}: if false, it means that the resource was
* not acquired.
* <p>
* IMPLEMENTATION NOTES
* <p>
* This class relies on {@link System#identityHashCode(Object)} to create
* a unique id for each resource passed to {@link #acquired(Object)} and
* {@link #released(Object)}. {@link System#identityHashCode(Object)} does
* not guarantee that it will not generate the same number for different
* objects, but in practice the chance of collision is rare.
* <p>
* {@link LeakDetector} uses {@link PhantomReference}s to detect leaks.
* {@link PhantomReference}s are enqueued in their {@link ReferenceQueue}
* <em>after</em> they have been garbage collected (differently from
* {@link WeakReference}s that are enqueued <em>before</em>).
* Since the resource is now garbage collected, {@link LeakDetector} checks
* whether it has been released and if not, it reports a leak.
* Using {@link PhantomReference}s is better than overriding {@link #finalize()}
* and works also in those cases where {@link #finalize()} is not
* overridable.
*
* @param <T> the resource type.
*/
public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
{
private static final Logger LOG = Log.getLogger(LeakDetector.class);
private final ReferenceQueue<T> queue = new ReferenceQueue<>();
private final ConcurrentMap<String, LeakInfo> resources = new ConcurrentHashMap<>();
private Thread thread;
/**
* Tracks the resource as been acquired.
*
* @param resource the resource that has been acquired
* @return whether the resource has been tracked
* @see #released(Object)
*/
public boolean acquired(T resource)
{
String id = id(resource);
return resources.putIfAbsent(id, new LeakInfo(resource, id)) == null;
}
/**
* Tracks the resource as been released.
*
* @param resource the resource that has been released
* @return whether the resource has been acquired
* @see #acquired(Object)
*/
public boolean released(T resource)
{
String id = id(resource);
return resources.remove(id) != null;
}
/**
* Generates a unique ID for the given resource.
*
* @param resource the resource to generate the unique ID for
* @return the unique ID of the given resource
*/
protected String id(T resource)
{
return String.valueOf(System.identityHashCode(resource));
}
@Override
protected void doStart() throws Exception
{
super.doStart();
thread = new Thread(this, getClass().getSimpleName());
thread.setDaemon(true);
thread.start();
}
@Override
protected void doStop() throws Exception
{
thread.interrupt();
super.doStop();
}
@Override
public void run()
{
try
{
while (isRunning())
{
@SuppressWarnings("unchecked")
LeakInfo leakInfo = (LeakInfo)queue.remove();
LOG.debug("Resource GC'ed: {}", leakInfo);
if (resources.remove(leakInfo.id) != null)
leaked(leakInfo);
}
}
catch (InterruptedException x)
{
// Exit
}
}
/**
* Callback method invoked by {@link LeakDetector} when it detects that a resource has been leaked.
*
* @param leakInfo the information about the leak
*/
protected void leaked(LeakInfo leakInfo)
{
LOG.warn("Resource leaked: " + leakInfo.description, leakInfo.stackFrames);
}
/**
* Information about the leak of a resource.
*/
public class LeakInfo extends PhantomReference<T>
{
private final String id;
private final String description;
private final Throwable stackFrames;
private LeakInfo(T referent, String id)
{
super(referent, queue);
this.id = id;
this.description = referent.toString();
this.stackFrames = new Throwable();
}
/**
* @return the resource description as provided by the resource's {@link Object#toString()} method.
*/
public String getResourceDescription()
{
return description;
}
/**
* @return a Throwable instance that contains the stack frames at the time of resource acquisition.
*/
public Throwable getStackFrames()
{
return stackFrames;
}
@Override
public String toString()
{
return description;
}
}
}

View File

@ -0,0 +1,92 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
public class LeakDetectorTest
{
private LeakDetector<Object> leakDetector;
public void prepare(LeakDetector<Object> leakDetector) throws Exception
{
this.leakDetector = leakDetector;
leakDetector.start();
}
public void dispose() throws Exception
{
leakDetector.stop();
}
private void gc()
{
for (int i = 0; i < 3; ++i)
System.gc();
}
@Test
public void testResourceAcquiredAndReleased() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
prepare(new LeakDetector<Object>()
{
@Override
protected void leaked(LeakInfo leakInfo)
{
latch.countDown();
}
});
// Block to make sure "resource" goes out of scope
{
Object resource = new Object();
leakDetector.acquired(resource);
leakDetector.released(resource);
}
gc();
Assert.assertFalse(latch.await(1, TimeUnit.SECONDS));
}
@Test
public void testResourceAcquiredAndNotReleased() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
prepare(new LeakDetector<Object>()
{
@Override
protected void leaked(LeakInfo leakInfo)
{
latch.countDown();
}
});
leakDetector.acquired(new Object());
gc();
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
}
}

View File

@ -1,3 +1,3 @@
# Setup default logging implementation for during testing
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.io.LEVEL=DEBUG
#org.eclipse.jetty.util.LEVEL=DEBUG