Introduced LeakDetector.id() to be consistent in logging the resource ID.

This commit is contained in:
Simone Bordet 2015-03-06 12:49:52 +01:00
parent dd88d393e4
commit a13a55e242
7 changed files with 100 additions and 79 deletions

View File

@ -25,12 +25,18 @@ import org.eclipse.jetty.util.Promise;
public class LeakTrackingConnectionPool extends ConnectionPool
{
private final LeakDetector<Connection> leakDetector;
private final LeakDetector<Connection> leakDetector = new LeakDetector<Connection>()
{
@Override
protected void leaked(LeakInfo leakInfo)
{
LeakTrackingConnectionPool.this.leaked(leakInfo);
}
};
public LeakTrackingConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise, LeakDetector<Connection> leakDetector)
public LeakTrackingConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise)
{
super(destination, maxConnections, connectionPromise);
this.leakDetector = leakDetector;
start();
}
@ -69,13 +75,18 @@ public class LeakTrackingConnectionPool extends ConnectionPool
protected void acquired(Connection connection)
{
if (!leakDetector.acquired(connection))
LOG.info("Connection {}@{} not tracked", connection, System.identityHashCode(connection));
LOG.info("Connection {}@{} not tracked", connection, leakDetector.id(connection));
}
@Override
protected void released(Connection connection)
{
if (!leakDetector.released(connection))
LOG.info("Connection {}@{} released but not acquired", connection, System.identityHashCode(connection));
LOG.info("Connection {}@{} released but not acquired", connection, leakDetector.id(connection));
}
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
LOG.info("Connection " + leakInfo.getResourceDescription() + " leaked at:", leakInfo.getStackFrames());
}
}

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.client;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -31,7 +28,7 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -62,6 +59,9 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public class HttpClientLoadTest extends AbstractHttpClientServerTest
{
private final Logger logger = Log.getLogger(HttpClientLoadTest.class);
@ -76,6 +76,8 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
{
int cores = Runtime.getRuntime().availableProcessors();
final AtomicLong connectionLeaks = new AtomicLong();
start(new LoadHandler());
server.stop();
server.removeConnector(connector);
@ -88,8 +90,6 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
client.stop();
final LeakDetector<Connection> connectionLeakDetector = new LeakDetector<Connection>();
HttpClient newClient = new HttpClient(new HttpClientTransportOverHTTP()
{
@Override
@ -100,7 +100,14 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this, connectionLeakDetector);
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo resource)
{
connectionLeaks.incrementAndGet();
}
};
}
};
}
@ -131,15 +138,17 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
run(random, iterations);
}
System.gc();
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedUnreleased(), is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), is(0L));
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), is(0L));
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedUnreleased(), is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), is(0L));
assertThat("Connection Leaks", connectionLeakDetector.getUnreleasedCount(), is(0L));
assertThat("Connection Leaks", connectionLeaks.get(), is(0L));
}
private void run(Random random, int iterations) throws InterruptedException

View File

@ -18,15 +18,13 @@
package org.eclipse.jetty.fcgi.server;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.LeakTrackingConnectionPool;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
import org.eclipse.jetty.fcgi.client.http.HttpDestinationOverFCGI;
import org.eclipse.jetty.http.HttpScheme;
@ -42,13 +40,16 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Rule;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public abstract class AbstractHttpClientServerTest
{
@Rule
public final TestTracker tracker = new TestTracker();
private LeakTrackingByteBufferPool serverBufferPool;
private LeakTrackingByteBufferPool clientBufferPool;
private LeakDetector<Connection> connectionLeakDetector;
private final AtomicLong connectionLeaks = new AtomicLong();
protected Server server;
protected ServerConnector connector;
protected HttpClient client;
@ -71,8 +72,6 @@ public abstract class AbstractHttpClientServerTest
QueuedThreadPool executor = new QueuedThreadPool();
executor.setName(executor.getName() + "-client");
connectionLeakDetector = new LeakDetector<Connection>();
client = new HttpClient(new HttpClientTransportOverFCGI(1, false, "")
{
@Override
@ -83,7 +82,14 @@ public abstract class AbstractHttpClientServerTest
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this, connectionLeakDetector);
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
connectionLeaks.incrementAndGet();
}
};
}
};
}
@ -98,15 +104,16 @@ public abstract class AbstractHttpClientServerTest
public void dispose() throws Exception
{
System.gc();
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedUnreleased(), is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), is(0L));
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), is(0L));
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedUnreleased(), is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), is(0L));
assertThat("Connection Leaks", connectionLeakDetector.getUnreleasedCount(), is(0L));
assertThat("Connection Leaks", connectionLeaks.get(), is(0L));
if (client != null)
client.stop();

View File

@ -33,16 +33,24 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
private final LeakDetector<ByteBuffer> leakDetector = new LeakDetector<ByteBuffer>()
{
public String id(ByteBuffer resource)
public String id(ByteBuffer resource)
{
return BufferUtil.toIDString(resource);
}
@Override
protected void leaked(LeakInfo leakInfo)
{
leaked.incrementAndGet();
LeakTrackingByteBufferPool.this.leaked(leakInfo);
}
};
private final static boolean NOISY = Boolean.getBoolean(LeakTrackingByteBufferPool.class.getName() + ".NOISY");
private final ByteBufferPool delegate;
private final AtomicLong leakedReleases = new AtomicLong(0);
private final AtomicLong leakedAcquires = new AtomicLong(0);
private final AtomicLong leaked = new AtomicLong(0);
public LeakTrackingByteBufferPool(ByteBufferPool delegate)
{
@ -54,12 +62,12 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
@Override
public ByteBuffer acquire(int size, boolean direct)
{
ByteBuffer buffer = delegate.acquire(size,direct);
ByteBuffer buffer = delegate.acquire(size, direct);
boolean leaked = leakDetector.acquired(buffer);
if (NOISY || !leaked)
{
leakedAcquires.incrementAndGet();
LOG.info(String.format("ByteBuffer acquire %s leaked.acquired=%s",leakDetector.id(buffer),leaked ? "normal" : "LEAK"),
LOG.info(String.format("ByteBuffer acquire %s leaked.acquired=%s", leakDetector.id(buffer), leaked ? "normal" : "LEAK"),
new Throwable("LeakStack.Acquire"));
}
return buffer;
@ -71,46 +79,47 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
if (buffer == null)
return;
boolean leaked = leakDetector.released(buffer);
if (NOISY || !leaked) {
if (NOISY || !leaked)
{
leakedReleases.incrementAndGet();
LOG.info(String.format("ByteBuffer release %s leaked.released=%s",leakDetector.id(buffer),leaked ? "normal" : "LEAK"),new Throwable(
LOG.info(String.format("ByteBuffer release %s leaked.released=%s", leakDetector.id(buffer), leaked ? "normal" : "LEAK"), new Throwable(
"LeakStack.Release"));
}
delegate.release(buffer);
}
public void clearTracking()
{
leakDetector.clear();
leakedAcquires.set(0);
leakedReleases.set(0);
}
/**
* Get the count of BufferPool.acquire() calls that detected a leak
* @return count of BufferPool.acquire() calls that detected a leak
*/
public long getLeakedAcquires()
{
return leakedAcquires.get();
}
/**
* Get the count of BufferPool.release() calls that detected a leak
* @return count of BufferPool.release() calls that detected a leak
*/
public long getLeakedReleases()
{
return leakedReleases.get();
}
/**
* At the end of the run, when the LeakDetector runs, this reports the
* number of unreleased resources.
* @return count of resources that were acquired but not released (byt the end of the run)
* @return count of resources that were acquired but not released
*/
public long getLeakedUnreleased()
public long getLeakedResources()
{
return leakDetector.getUnreleasedCount();
return leaked.get();
}
protected void leaked(LeakDetector<ByteBuffer>.LeakInfo leakInfo)
{
LOG.warn("ByteBuffer " + leakInfo.getResourceDescription() + " leaked at:", leakInfo.getStackFrames());
}
}

View File

@ -19,9 +19,6 @@
package org.eclipse.jetty.spdy.server;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -60,6 +57,9 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public class SynDataReplyDataLoadTest extends AbstractTest
{
private static final int TIMEOUT = 60 * 1000;
@ -192,13 +192,15 @@ public class SynDataReplyDataLoadTest extends AbstractTest
threadPool.shutdown();
System.gc();
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedUnreleased(), is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), is(0L));
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), is(0L));
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedUnreleased(), is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), is(0L));
}
private void synCompletedData(Session session, Fields headers, int iterations) throws Exception

View File

@ -20,9 +20,9 @@ package org.eclipse.jetty.util;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
@ -55,8 +55,7 @@ import org.eclipse.jetty.util.log.Logger;
* has been released and if not, it reports a leak. Using {@link PhantomReference}s is better than overriding
* {@link #finalize()} and works also in those cases where {@link #finalize()} is not overridable.
*
* @param <T>
* the resource type.
* @param <T> the resource type.
*/
public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
{
@ -64,14 +63,12 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
private final ReferenceQueue<T> queue = new ReferenceQueue<>();
private final ConcurrentMap<String, LeakInfo> resources = new ConcurrentHashMap<>();
private final AtomicLong unreleasedCount = new AtomicLong(0);
private Thread thread;
/**
* Tracks the resource as been acquired.
*
* @param resource
* the resource that has been acquired
* @param resource the resource that has been acquired
* @return true whether the resource has been acquired normally, false if the resource has detected a leak (meaning
* that another acquire occurred before a release of the same resource)
* @see #released(Object)
@ -79,21 +76,20 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
public boolean acquired(T resource)
{
String id = id(resource);
LeakInfo info = resources.putIfAbsent(id,new LeakInfo(resource,id));
LeakInfo info = resources.putIfAbsent(id, new LeakInfo(resource,id));
if (info != null)
{
// leak detected, prior acquire exists (not released)
// Leak detected, prior acquire exists (not released) or id clash.
return false;
}
// normal behavior
// Normal behavior.
return true;
}
/**
* Tracks the resource as been released.
*
* @param resource
* the resource that has been released
* @param resource the resource that has been released
* @return true whether the resource has been released normally (based on a previous acquire). false if the resource
* has been released without a prior acquire (such as a double release scenario)
* @see #acquired(Object)
@ -104,19 +100,18 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
LeakInfo info = resources.remove(id);
if (info != null)
{
// normal behavior
// Normal behavior.
return true;
}
// leak detected (released without acquire)
// Leak detected (released without acquire).
return false;
}
/**
* Generates a unique ID for the given resource.
*
* @param resource
* the resource to generate the unique ID for
* @param resource the resource to generate the unique ID for
* @return the unique ID of the given resource
*/
public String id(T resource)
@ -164,23 +159,11 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
/**
* Callback method invoked by {@link LeakDetector} when it detects that a resource has been leaked.
*
* @param leakInfo
* the information about the leak
* @param leakInfo the information about the leak
*/
protected void leaked(LeakInfo leakInfo)
{
LOG.warn("Resource leaked: " + leakInfo.description,leakInfo.stackFrames);
unreleasedCount.incrementAndGet();
}
public void clear()
{
unreleasedCount.set(0);
}
public long getUnreleasedCount()
{
return unreleasedCount.get();
}
/**

View File

@ -18,15 +18,15 @@
package org.eclipse.jetty.websocket.common.test;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public class LeakTrackingBufferPoolRule extends LeakTrackingByteBufferPool implements TestRule
{
private final String id;
@ -41,7 +41,7 @@ public class LeakTrackingBufferPoolRule extends LeakTrackingByteBufferPool imple
{
assertThat("Leaked Acquires Count for [" + id + "]",getLeakedAcquires(),is(0L));
assertThat("Leaked Releases Count for [" + id + "]",getLeakedReleases(),is(0L));
assertThat("Leaked Unrelesed Count for [" + id + "]",getLeakedUnreleased(),is(0L));
assertThat("Leaked Resource Count for [" + id + "]", getLeakedResources(),is(0L));
}
@Override