Introduced a generic Sweeper.
The Sweeper class periodically sweeps resources added to it. Currently used to check whether the HttpClient code does not leak connections.
This commit is contained in:
parent
6fa30da0eb
commit
79e74c64e1
|
@ -36,8 +36,9 @@ import org.eclipse.jetty.util.component.Dumpable;
|
|||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.SpinLock;
|
||||
import org.eclipse.jetty.util.thread.Sweeper;
|
||||
|
||||
public class ConnectionPool implements Closeable, Dumpable
|
||||
public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
||||
{
|
||||
protected static final Logger LOG = Log.getLogger(ConnectionPool.class);
|
||||
|
||||
|
@ -293,6 +294,36 @@ public class ConnectionPool implements Closeable, Dumpable
|
|||
ContainerLifeCycle.dump(out, indent, actives, idles);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean sweep()
|
||||
{
|
||||
List<Sweeper.Sweepable> toSweep = new ArrayList<>();
|
||||
try (SpinLock.Lock lock = this.lock.lock())
|
||||
{
|
||||
for (Connection connection : getActiveConnections())
|
||||
{
|
||||
if (connection instanceof Sweeper.Sweepable)
|
||||
toSweep.add(((Sweeper.Sweepable)connection));
|
||||
}
|
||||
}
|
||||
|
||||
for (Sweeper.Sweepable candidate : toSweep)
|
||||
{
|
||||
if (candidate.sweep())
|
||||
{
|
||||
boolean removed = getActiveConnections().remove(candidate);
|
||||
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
|
||||
candidate,
|
||||
System.lineSeparator(),
|
||||
removed ? "Removed" : "Not removed",
|
||||
System.lineSeparator(),
|
||||
dump());
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.eclipse.jetty.client.api.Connection;
|
|||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.thread.Sweeper;
|
||||
|
||||
public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
|
||||
{
|
||||
|
@ -34,6 +35,9 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
|
|||
{
|
||||
super(client, origin);
|
||||
this.connectionPool = newConnectionPool(client);
|
||||
Sweeper sweeper = client.getBean(Sweeper.class);
|
||||
if (sweeper != null)
|
||||
sweeper.offer(connectionPool);
|
||||
}
|
||||
|
||||
protected ConnectionPool newConnectionPool(HttpClient client)
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.eclipse.jetty.client.http;
|
|||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.client.HttpConnection;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
|
@ -32,12 +33,14 @@ import org.eclipse.jetty.io.AbstractConnection;
|
|||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.Sweeper;
|
||||
|
||||
public class HttpConnectionOverHTTP extends AbstractConnection implements Connection
|
||||
public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, Sweeper.Sweepable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
|
||||
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
private final AtomicInteger sweeps = new AtomicInteger();
|
||||
private final Delegate delegate;
|
||||
private final HttpChannelOverHTTP channel;
|
||||
private long idleTimeout;
|
||||
|
@ -154,14 +157,27 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
|
|||
return exchange != null && exchange.getRequest().abort(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean sweep()
|
||||
{
|
||||
if (!closed.get())
|
||||
return false;
|
||||
|
||||
if (sweeps.incrementAndGet() < 4)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%h(l:%s <-> r:%s)[%s]",
|
||||
return String.format("%s@%h(l:%s <-> r:%s,closed=%b)[%s]",
|
||||
getClass().getSimpleName(),
|
||||
this,
|
||||
getEndPoint().getLocalAddress(),
|
||||
getEndPoint().getRemoteAddress(),
|
||||
closed.get(),
|
||||
channel);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,194 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util.thread;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* <p>A utility class to perform periodic sweeping of resources.</p>
|
||||
* <p>{@link Sweepable} resources may be added to or removed from a
|
||||
* {@link Sweeper} and the resource implementation decides whether
|
||||
* it should be swept or not.</p>
|
||||
* <p>If a {@link Sweepable} resources is itself a container of
|
||||
* other sweepable resources, it will forward the sweep operation
|
||||
* to children resources, and so on recursively.</p>
|
||||
* <p>Typical usage is to add {@link Sweeper} as a bean to an existing
|
||||
* container:</p>
|
||||
* <pre>
|
||||
* Server server = new Server();
|
||||
* server.addBean(new Sweeper(), true);
|
||||
* server.start();
|
||||
* </pre>
|
||||
* Code that knows it has sweepable resources can then lookup the
|
||||
* {@link Sweeper} and offer the sweepable resources to it:
|
||||
* <pre>
|
||||
* class MyComponent implements Sweeper.Sweepable
|
||||
* {
|
||||
* private final long creation;
|
||||
* private volatile destroyed;
|
||||
*
|
||||
* MyComponent(Server server)
|
||||
* {
|
||||
* this.creation = System.nanoTime();
|
||||
* Sweeper sweeper = server.getBean(Sweeper.class);
|
||||
* sweeper.offer(this);
|
||||
* }
|
||||
*
|
||||
* void destroy()
|
||||
* {
|
||||
* destroyed = true;
|
||||
* }
|
||||
*
|
||||
* @Override
|
||||
* public boolean sweep()
|
||||
* {
|
||||
* return destroyed;
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
public class Sweeper extends AbstractLifeCycle implements Runnable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(Sweeper.class);
|
||||
|
||||
private final AtomicReference<List<Sweepable>> items = new AtomicReference<>();
|
||||
private final AtomicReference<Scheduler.Task> task = new AtomicReference<>();
|
||||
private final Scheduler scheduler;
|
||||
private final long period;
|
||||
|
||||
public Sweeper(Scheduler scheduler, long period)
|
||||
{
|
||||
this.scheduler = scheduler;
|
||||
this.period = period;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
super.doStart();
|
||||
items.set(new CopyOnWriteArrayList<Sweepable>());
|
||||
activate();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
deactivate();
|
||||
items.set(null);
|
||||
super.doStop();
|
||||
}
|
||||
|
||||
public int getSize()
|
||||
{
|
||||
List<Sweepable> refs = items.get();
|
||||
return refs == null ? 0 : refs.size();
|
||||
}
|
||||
|
||||
public boolean offer(Sweepable sweepable)
|
||||
{
|
||||
List<Sweepable> refs = items.get();
|
||||
if (refs == null)
|
||||
return false;
|
||||
refs.add(sweepable);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Resource offered {}", sweepable);
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean remove(Sweepable sweepable)
|
||||
{
|
||||
List<Sweepable> refs = items.get();
|
||||
return refs != null && refs.remove(sweepable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
List<Sweepable> refs = items.get();
|
||||
if (refs == null)
|
||||
return;
|
||||
for (Sweepable sweepable : refs)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (sweepable.sweep())
|
||||
{
|
||||
refs.remove(sweepable);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Resource swept {}", sweepable);
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("Exception while sweeping " + sweepable, x);
|
||||
}
|
||||
}
|
||||
activate();
|
||||
}
|
||||
|
||||
private void activate()
|
||||
{
|
||||
if (isRunning())
|
||||
{
|
||||
Scheduler.Task t = scheduler.schedule(this, period, TimeUnit.MILLISECONDS);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Scheduled in {} ms sweep task {}", period, t);
|
||||
task.set(t);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Skipping sweep task scheduling");
|
||||
}
|
||||
}
|
||||
|
||||
private void deactivate()
|
||||
{
|
||||
Scheduler.Task t = task.getAndSet(null);
|
||||
if (t != null)
|
||||
{
|
||||
boolean cancelled = t.cancel();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Cancelled ({}) sweep task {}", cancelled, t);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>A {@link Sweepable} resource implements this interface to
|
||||
* signal to a {@link Sweeper} or to a parent container if it
|
||||
* needs to be swept or not.</p>
|
||||
* <p>Typical implementations will check their own internal state
|
||||
* and return true or false from {@link #sweep()} to indicate
|
||||
* whether they should be swept.</p>
|
||||
*/
|
||||
public interface Sweepable
|
||||
{
|
||||
/**
|
||||
* @return whether this resource should be swept
|
||||
*/
|
||||
public boolean sweep();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,124 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util.thread;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class SweeperTest
|
||||
{
|
||||
private Scheduler scheduler;
|
||||
|
||||
@Before
|
||||
public void prepare() throws Exception
|
||||
{
|
||||
scheduler = new ScheduledExecutorScheduler();
|
||||
scheduler.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void dispose() throws Exception
|
||||
{
|
||||
scheduler.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourceNotSweptIsNotRemoved() throws Exception
|
||||
{
|
||||
testResourceSweepRemove(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourceSweptIsRemoved() throws Exception
|
||||
{
|
||||
testResourceSweepRemove(true);
|
||||
}
|
||||
|
||||
private void testResourceSweepRemove(final boolean sweep) throws Exception
|
||||
{
|
||||
long period = 1000;
|
||||
final CountDownLatch taskLatch = new CountDownLatch(1);
|
||||
Sweeper sweeper = new Sweeper(scheduler, period)
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
super.run();
|
||||
taskLatch.countDown();
|
||||
}
|
||||
};
|
||||
sweeper.start();
|
||||
|
||||
final CountDownLatch sweepLatch = new CountDownLatch(1);
|
||||
sweeper.offer(new Sweeper.Sweepable()
|
||||
{
|
||||
@Override
|
||||
public boolean sweep()
|
||||
{
|
||||
sweepLatch.countDown();
|
||||
return sweep;
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(sweepLatch.await(2 * period, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(taskLatch.await(2 * period, TimeUnit.MILLISECONDS));
|
||||
Assert.assertEquals(sweep ? 0 : 1, sweeper.getSize());
|
||||
|
||||
sweeper.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSweepThrows() throws Exception
|
||||
{
|
||||
long period = 500;
|
||||
final CountDownLatch taskLatch = new CountDownLatch(2);
|
||||
Sweeper sweeper = new Sweeper(scheduler, period)
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
super.run();
|
||||
taskLatch.countDown();
|
||||
}
|
||||
};
|
||||
sweeper.start();
|
||||
|
||||
final CountDownLatch sweepLatch = new CountDownLatch(2);
|
||||
sweeper.offer(new Sweeper.Sweepable()
|
||||
{
|
||||
@Override
|
||||
public boolean sweep()
|
||||
{
|
||||
sweepLatch.countDown();
|
||||
throw new NullPointerException();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(sweepLatch.await(4 * period, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(taskLatch.await(4 * period, TimeUnit.MILLISECONDS));
|
||||
Assert.assertEquals(1, sweeper.getSize());
|
||||
|
||||
sweeper.stop();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue