diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java index 2d6c8dcca2c..1cf0c57f16c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java @@ -37,8 +37,9 @@ import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.SpinLock; +import org.eclipse.jetty.util.thread.Sweeper; -public class ConnectionPool implements Closeable, Dumpable +public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable { protected static final Logger LOG = Log.getLogger(ConnectionPool.class); @@ -294,6 +295,36 @@ public class ConnectionPool implements Closeable, Dumpable ContainerLifeCycle.dump(out, indent, actives, idles); } + @Override + public boolean sweep() + { + List toSweep = new ArrayList<>(); + try (SpinLock.Lock lock = this.lock.lock()) + { + for (Connection connection : getActiveConnections()) + { + if (connection instanceof Sweeper.Sweepable) + toSweep.add(((Sweeper.Sweepable)connection)); + } + } + + for (Sweeper.Sweepable candidate : toSweep) + { + if (candidate.sweep()) + { + boolean removed = getActiveConnections().remove(candidate); + LOG.warn("Connection swept: {}{}{} from active connections{}{}", + candidate, + System.lineSeparator(), + removed ? "Removed" : "Not removed", + System.lineSeparator(), + dump()); + } + } + + return false; + } + @Override public String toString() { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index 9e41a65bfb0..704884c1dc5 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -36,11 +36,16 @@ public class HttpExchange private final HttpRequest request; private final List listeners; private final HttpResponse response; - - enum State { PENDING, COMPLETED, TERMINATED } ; + + enum State + { + PENDING, COMPLETED, TERMINATED + } + + ; private final SpinLock _lock = new SpinLock(); - private State requestState=State.PENDING; - private State responseState=State.PENDING; + private State requestState = State.PENDING; + private State responseState = State.PENDING; private Throwable requestFailure; private Throwable responseFailure; @@ -67,7 +72,7 @@ public class HttpExchange public Throwable getRequestFailure() { - try(SpinLock.Lock lock = _lock.lock()) + try (SpinLock.Lock lock = _lock.lock()) { return requestFailure; } @@ -85,7 +90,7 @@ public class HttpExchange public Throwable getResponseFailure() { - try(SpinLock.Lock lock = _lock.lock()) + try (SpinLock.Lock lock = _lock.lock()) { return responseFailure; } @@ -105,32 +110,32 @@ public class HttpExchange public boolean requestComplete() { - try(SpinLock.Lock lock = _lock.lock()) + try (SpinLock.Lock lock = _lock.lock()) { - if (requestState!=State.PENDING) + if (requestState != State.PENDING) return false; - requestState=State.COMPLETED; + requestState = State.COMPLETED; return true; } } public boolean responseComplete() { - try(SpinLock.Lock lock = _lock.lock()) + try (SpinLock.Lock lock = _lock.lock()) { - if (responseState!=State.PENDING) + if (responseState != State.PENDING) return false; - responseState=State.COMPLETED; + responseState = State.COMPLETED; return true; } } public Result terminateRequest(Throwable failure) { - try(SpinLock.Lock lock = _lock.lock()) + try (SpinLock.Lock lock = _lock.lock()) { - requestState=State.TERMINATED; - requestFailure=failure; + requestState = State.TERMINATED; + requestFailure = failure; if (State.TERMINATED.equals(responseState)) return new Result(getRequest(), requestFailure, getResponse(), responseFailure); } @@ -139,11 +144,11 @@ public class HttpExchange public Result terminateResponse(Throwable failure) { - try(SpinLock.Lock lock = _lock.lock()) + try (SpinLock.Lock lock = _lock.lock()) { - responseState=State.TERMINATED; - responseFailure=failure; - if (State.TERMINATED.equals(requestState)) + responseState = State.TERMINATED; + responseFailure = failure; + if (requestState == State.TERMINATED) return new Result(getRequest(), requestFailure, getResponse(), responseFailure); } return null; @@ -173,23 +178,23 @@ public class HttpExchange private boolean fail(Throwable cause) { - boolean notify=false; - try(SpinLock.Lock lock = _lock.lock()) + boolean notify = false; + try (SpinLock.Lock lock = _lock.lock()) { - if (!Boolean.TRUE.equals(requestState)) + if (requestState != State.TERMINATED) { - requestState=State.TERMINATED; - notify=true; - requestFailure=cause; + requestState = State.TERMINATED; + notify = true; + requestFailure = cause; } - if (!Boolean.TRUE.equals(responseState)) + if (responseState != State.TERMINATED) { - responseState=State.TERMINATED; - notify=true; - responseFailure=cause; + responseState = State.TERMINATED; + notify = true; + responseFailure = cause; } } - + if (notify) { if (LOG.isDebugEnabled()) @@ -209,10 +214,10 @@ public class HttpExchange public void resetResponse() { - try(SpinLock.Lock lock = _lock.lock()) + try (SpinLock.Lock lock = _lock.lock()) { - responseState=State.PENDING; - responseFailure=null; + responseState = State.PENDING; + responseFailure = null; } } @@ -226,13 +231,13 @@ public class HttpExchange @Override public String toString() { - try(SpinLock.Lock lock = _lock.lock()) + try (SpinLock.Lock lock = _lock.lock()) { return String.format("%s@%x req=%s/%s res=%s/%s", - HttpExchange.class.getSimpleName(), - hashCode(), - requestState,requestFailure, - responseState,responseFailure); + HttpExchange.class.getSimpleName(), + hashCode(), + requestState, requestFailure, + responseState, responseFailure); } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java index fd565772a68..ae994c6c7e4 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java @@ -25,6 +25,7 @@ import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.thread.Sweeper; public abstract class PoolingHttpDestination extends HttpDestination implements Callback { @@ -34,6 +35,9 @@ public abstract class PoolingHttpDestination extends HttpD { super(client, origin); this.connectionPool = newConnectionPool(client); + Sweeper sweeper = client.getBean(Sweeper.class); + if (sweeper != null) + sweeper.offer(connectionPool); } protected ConnectionPool newConnectionPool(HttpClient client) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index 3166ca456cd..c68bdeceea6 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.client.http; import java.nio.channels.AsynchronousCloseException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpDestination; @@ -33,13 +34,15 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Sweeper; -public class HttpConnectionOverHTTP extends AbstractConnection implements Connection +public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, Sweeper.Sweepable { private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class); private final AtomicBoolean closed = new AtomicBoolean(); private final Promise promise; + private final AtomicInteger sweeps = new AtomicInteger(); private final Delegate delegate; private final HttpChannelOverHTTP channel; private long idleTimeout; @@ -158,14 +161,27 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec return exchange != null && exchange.getRequest().abort(failure); } + @Override + public boolean sweep() + { + if (!closed.get()) + return false; + + if (sweeps.incrementAndGet() < 4) + return false; + + return true; + } + @Override public String toString() { - return String.format("%s@%h(l:%s <-> r:%s)[%s]", + return String.format("%s@%h(l:%s <-> r:%s,closed=%b)[%s]", getClass().getSimpleName(), this, getEndPoint().getLocalAddress(), getEndPoint().getRemoteAddress(), + closed.get(), channel); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index e97df958738..42f5ff35707 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -343,17 +343,23 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump LOG.debug("Connected {} {}", connected, channel); if (connected) { - connect.timeout.cancel(); - key.interestOps(0); - return new CreateEndPoint(channel, key) + if (connect.timeout.cancel()) { - @Override - protected void failed(Throwable failure) + key.interestOps(0); + return new CreateEndPoint(channel, key) { - super.failed(failure); - connect.failed(failure); - } - }; + @Override + protected void failed(Throwable failure) + { + super.failed(failure); + connect.failed(failure); + } + }; + } + else + { + throw new SocketTimeoutException("Concurrent Connect Timeout"); + } } else { @@ -648,7 +654,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump { if (LOG.isDebugEnabled()) LOG.debug("Channel {} timed out while connecting, closing it", channel); - connect.failed(new SocketTimeoutException()); + connect.failed(new SocketTimeoutException("Connect Timeout")); } } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Sweeper.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Sweeper.java new file mode 100644 index 00000000000..1767eb04a96 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Sweeper.java @@ -0,0 +1,194 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +/** + *

A utility class to perform periodic sweeping of resources.

+ *

{@link Sweepable} resources may be added to or removed from a + * {@link Sweeper} and the resource implementation decides whether + * it should be swept or not.

+ *

If a {@link Sweepable} resources is itself a container of + * other sweepable resources, it will forward the sweep operation + * to children resources, and so on recursively.

+ *

Typical usage is to add {@link Sweeper} as a bean to an existing + * container:

+ *
+ * Server server = new Server();
+ * server.addBean(new Sweeper(), true);
+ * server.start();
+ * 
+ * Code that knows it has sweepable resources can then lookup the + * {@link Sweeper} and offer the sweepable resources to it: + *
+ * class MyComponent implements Sweeper.Sweepable
+ * {
+ *     private final long creation;
+ *     private volatile destroyed;
+ *
+ *     MyComponent(Server server)
+ *     {
+ *         this.creation = System.nanoTime();
+ *         Sweeper sweeper = server.getBean(Sweeper.class);
+ *         sweeper.offer(this);
+ *     }
+ *
+ *     void destroy()
+ *     {
+ *         destroyed = true;
+ *     }
+ *
+ *     @Override
+ *     public boolean sweep()
+ *     {
+ *         return destroyed;
+ *     }
+ * }
+ * 
+ */ +public class Sweeper extends AbstractLifeCycle implements Runnable +{ + private static final Logger LOG = Log.getLogger(Sweeper.class); + + private final AtomicReference> items = new AtomicReference<>(); + private final AtomicReference task = new AtomicReference<>(); + private final Scheduler scheduler; + private final long period; + + public Sweeper(Scheduler scheduler, long period) + { + this.scheduler = scheduler; + this.period = period; + } + + @Override + protected void doStart() throws Exception + { + super.doStart(); + items.set(new CopyOnWriteArrayList()); + activate(); + } + + @Override + protected void doStop() throws Exception + { + deactivate(); + items.set(null); + super.doStop(); + } + + public int getSize() + { + List refs = items.get(); + return refs == null ? 0 : refs.size(); + } + + public boolean offer(Sweepable sweepable) + { + List refs = items.get(); + if (refs == null) + return false; + refs.add(sweepable); + if (LOG.isDebugEnabled()) + LOG.debug("Resource offered {}", sweepable); + return true; + } + + public boolean remove(Sweepable sweepable) + { + List refs = items.get(); + return refs != null && refs.remove(sweepable); + } + + @Override + public void run() + { + List refs = items.get(); + if (refs == null) + return; + for (Sweepable sweepable : refs) + { + try + { + if (sweepable.sweep()) + { + refs.remove(sweepable); + if (LOG.isDebugEnabled()) + LOG.debug("Resource swept {}", sweepable); + } + } + catch (Throwable x) + { + LOG.info("Exception while sweeping " + sweepable, x); + } + } + activate(); + } + + private void activate() + { + if (isRunning()) + { + Scheduler.Task t = scheduler.schedule(this, period, TimeUnit.MILLISECONDS); + if (LOG.isDebugEnabled()) + LOG.debug("Scheduled in {} ms sweep task {}", period, t); + task.set(t); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Skipping sweep task scheduling"); + } + } + + private void deactivate() + { + Scheduler.Task t = task.getAndSet(null); + if (t != null) + { + boolean cancelled = t.cancel(); + if (LOG.isDebugEnabled()) + LOG.debug("Cancelled ({}) sweep task {}", cancelled, t); + } + } + + /** + *

A {@link Sweepable} resource implements this interface to + * signal to a {@link Sweeper} or to a parent container if it + * needs to be swept or not.

+ *

Typical implementations will check their own internal state + * and return true or false from {@link #sweep()} to indicate + * whether they should be swept.

+ */ + public interface Sweepable + { + /** + * @return whether this resource should be swept + */ + public boolean sweep(); + } +} diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SweeperTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SweeperTest.java new file mode 100644 index 00000000000..113246da215 --- /dev/null +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SweeperTest.java @@ -0,0 +1,124 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SweeperTest +{ + private Scheduler scheduler; + + @Before + public void prepare() throws Exception + { + scheduler = new ScheduledExecutorScheduler(); + scheduler.start(); + } + + @After + public void dispose() throws Exception + { + scheduler.stop(); + } + + @Test + public void testResourceNotSweptIsNotRemoved() throws Exception + { + testResourceSweepRemove(false); + } + + @Test + public void testResourceSweptIsRemoved() throws Exception + { + testResourceSweepRemove(true); + } + + private void testResourceSweepRemove(final boolean sweep) throws Exception + { + long period = 1000; + final CountDownLatch taskLatch = new CountDownLatch(1); + Sweeper sweeper = new Sweeper(scheduler, period) + { + @Override + public void run() + { + super.run(); + taskLatch.countDown(); + } + }; + sweeper.start(); + + final CountDownLatch sweepLatch = new CountDownLatch(1); + sweeper.offer(new Sweeper.Sweepable() + { + @Override + public boolean sweep() + { + sweepLatch.countDown(); + return sweep; + } + }); + + Assert.assertTrue(sweepLatch.await(2 * period, TimeUnit.MILLISECONDS)); + Assert.assertTrue(taskLatch.await(2 * period, TimeUnit.MILLISECONDS)); + Assert.assertEquals(sweep ? 0 : 1, sweeper.getSize()); + + sweeper.stop(); + } + + @Test + public void testSweepThrows() throws Exception + { + long period = 500; + final CountDownLatch taskLatch = new CountDownLatch(2); + Sweeper sweeper = new Sweeper(scheduler, period) + { + @Override + public void run() + { + super.run(); + taskLatch.countDown(); + } + }; + sweeper.start(); + + final CountDownLatch sweepLatch = new CountDownLatch(2); + sweeper.offer(new Sweeper.Sweepable() + { + @Override + public boolean sweep() + { + sweepLatch.countDown(); + throw new NullPointerException(); + } + }); + + Assert.assertTrue(sweepLatch.await(4 * period, TimeUnit.MILLISECONDS)); + Assert.assertTrue(taskLatch.await(4 * period, TimeUnit.MILLISECONDS)); + Assert.assertEquals(1, sweeper.getSize()); + + sweeper.stop(); + } +} diff --git a/pom.xml b/pom.xml index 46388f02135..ba56d86414a 100644 --- a/pom.xml +++ b/pom.xml @@ -1035,5 +1035,17 @@ 8.1.3.v20150130 + + 8u40 + + + java.version + 1.8.0_40 + + + + 8.1.3.v20150130 + +