diff --git a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/Http2Server.java b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/Http2Server.java index cd001135f49..8b1ceb3dc66 100644 --- a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/Http2Server.java +++ b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/Http2Server.java @@ -20,8 +20,10 @@ package org.eclipse.jetty.embedded; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.util.Date; import java.util.EnumSet; + import javax.servlet.DispatcherType; import javax.servlet.Filter; import javax.servlet.FilterChain; @@ -39,6 +41,7 @@ import javax.servlet.http.HttpSession; import org.eclipse.jetty.alpn.ALPN; import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.jmx.MBeanContainer; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.NegotiatingServerConnectionFactory; @@ -63,6 +66,10 @@ public class Http2Server { Server server = new Server(); + MBeanContainer mbContainer = new MBeanContainer( + ManagementFactory.getPlatformMBeanServer()); + server.addBean(mbContainer); + ServletContextHandler context = new ServletContextHandler(server, "/",ServletContextHandler.SESSIONS); context.setResourceBase("src/main/resources/docroot"); context.addFilter(PushSessionCacheFilter.class,"/*",EnumSet.of(DispatcherType.REQUEST)); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 38edc2bd2af..dc6576d4bf3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -27,8 +27,10 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -37,6 +39,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; @@ -49,16 +53,16 @@ import org.eclipse.jetty.util.thread.SpinLock; * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated * with the channel.

*/ -public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Dumpable TODO +public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable { private static final Logger LOG = Log.getLogger(ManagedSelector.class); private final SpinLock _lock = new SpinLock(); + private boolean _selecting=false; private final Queue _actions = new ConcurrentArrayQueue<>(); private final SelectorManager _selectorManager; private final int _id; private final ExecutionStrategy _strategy; - private State _state = State.PROCESSING; private Selector _selector; public ManagedSelector(SelectorManager selectorManager, int id) @@ -94,9 +98,14 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du { if (LOG.isDebugEnabled()) LOG.debug("Stopping {}", this); - Stop stop = new Stop(); - submit(stop); - stop.await(getStopTimeout()); + CloseEndPoints close_endps = new CloseEndPoints(); + submit(close_endps); + close_endps.await(getStopTimeout()); + super.doStop(); + CloseSelector close_selector = new CloseSelector(); + submit(close_selector); + close_selector.await(getStopTimeout()); + if (LOG.isDebugEnabled()) LOG.debug("Stopped {}", this); } @@ -109,12 +118,11 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du try (SpinLock.Lock lock = _lock.lock()) { _actions.offer(change); - if (_state == State.SELECTING) + if (_selecting) { _selector.wakeup(); - // Move to PROCESSING now, so other submit() - // calls will avoid the extra select wakeup. - _state = State.PROCESSING; + // To avoid the extra select wakeup. + _selecting=false; } } } @@ -181,7 +189,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du action = _actions.poll(); if (action == null) { - _state = State.SELECTING; + // No more actions, so we need to select + _selecting=true; return null; } } @@ -212,21 +221,26 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du { try { - if (LOG.isDebugEnabled()) - LOG.debug("Selector loop waiting on select"); - int selected = _selector.select(); - if (LOG.isDebugEnabled()) - LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size()); - - try (SpinLock.Lock lock = _lock.lock()) + Selector selector=_selector; + if (selector!=null && selector.isOpen()) { - _state = State.PROCESSING; + if (LOG.isDebugEnabled()) + LOG.debug("Selector loop waiting on select"); + int selected = selector.select(); + if (LOG.isDebugEnabled()) + LOG.debug("Selector loop woken up from select, {}/{} selected", selected, selector.keys().size()); + + try (SpinLock.Lock lock = _lock.lock()) + { + // finished selecting + _selecting=false; + } + + _keys = selector.selectedKeys(); + _cursor = _keys.iterator(); + + return true; } - - _keys = _selector.selectedKeys(); - _cursor = _keys.iterator(); - - return true; } catch (Throwable x) { @@ -235,8 +249,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du LOG.warn(x); else LOG.debug(x); - return false; } + return false; } private Runnable processSelected() @@ -252,6 +266,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du if (attachment instanceof SelectableEndPoint) { // Try to produce a task + @SuppressWarnings("resource") SelectableEndPoint selectable = (SelectableEndPoint)attachment; Runnable task = selectable.onSelected(); if (task != null) @@ -403,7 +418,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du _selectorManager.connectionClosed(connection); _selectorManager.endPointClosed(endPoint); } -/* + @Override public String dump() { @@ -428,20 +443,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du } } - public void dumpKeysState(List dumps) - { - Selector selector = _selector; - Set keys = selector.keys(); - dumps.add(selector + " keys=" + keys.size()); - for (SelectionKey key : keys) - { - if (key.isValid()) - dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps()); - else - dumps.add(key.attachment() + " iOps=-1 rOps=-1"); - } - } - @Override public String toString() { @@ -465,7 +466,15 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du @Override public void run() { - dumpKeysState(_dumps); + Selector selector = _selector; + if (selector!=null) + { + Set keys = selector.keys(); + _dumps.add(selector + " keys=" + keys.size()); + for (SelectionKey key : keys) + _dumps.add(String.format("Key@%x{i=%d}->%s",key.hashCode(),key.interestOps(),key.attachment())); + } + latch.countDown(); } @@ -481,7 +490,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du } } } -*/ + + class Acceptor implements Runnable { private final ServerSocketChannel _channel; @@ -626,44 +636,38 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du } } - // TODO: convert this to produce tasks that are run by the ExecutionStrategy. - private class Stop implements Runnable + private class CloseEndPoints implements Runnable { - private final CountDownLatch latch = new CountDownLatch(1); + private CountDownLatch _latch=new CountDownLatch(1); + CountDownLatch _allClosed ; @Override public void run() { - try + List end_points = new ArrayList(); + for (SelectionKey key : _selector.keys()) { - for (SelectionKey key : _selector.keys()) + if (key.isValid()) { Object attachment = key.attachment(); if (attachment instanceof EndPoint) - { - EndPointCloser closer = new EndPointCloser((EndPoint)attachment); - ManagedSelector.this._selectorManager.execute(closer); - // We are closing the SelectorManager, so we want to block the - // selector thread here until we have closed all EndPoints. - // This is different than calling close() directly, because close() - // can wait forever, while here we are limited by the stop timeout. - closer.await(getStopTimeout()); - } + end_points.add((EndPoint)attachment); } + } - closeNoExceptions(_selector); - } - finally - { - latch.countDown(); - } + _allClosed = new CountDownLatch(end_points.size()); + _latch.countDown(); + + for (EndPoint endp : end_points) + submit(new EndPointCloser(endp,_allClosed)); } public boolean await(long timeout) { try { - return latch.await(timeout, TimeUnit.MILLISECONDS); + return _latch.await(timeout, TimeUnit.MILLISECONDS) && + _allClosed.await(timeout,TimeUnit.MILLISECONDS); } catch (InterruptedException x) { @@ -672,34 +676,43 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du } } - private class EndPointCloser implements Runnable + private class EndPointCloser implements Product { - private final CountDownLatch latch = new CountDownLatch(1); - private final EndPoint endPoint; + private final EndPoint _endp; + private final CountDownLatch _latch; - private EndPointCloser(EndPoint endPoint) + private EndPointCloser(EndPoint endPoint, CountDownLatch latch) { - this.endPoint = endPoint; + _endp = endPoint; + _latch = latch; } @Override public void run() { - try - { - closeNoExceptions(endPoint.getConnection()); - } - finally - { - latch.countDown(); - } + closeNoExceptions(_endp.getConnection()); + _latch.countDown(); + } + } + + private class CloseSelector implements Runnable + { + private CountDownLatch _latch=new CountDownLatch(1); + + @Override + public void run() + { + Selector selector=_selector; + _selector=null; + closeNoExceptions(selector); + _latch.countDown(); } - private boolean await(long timeout) + public boolean await(long timeout) { try { - return latch.await(timeout, TimeUnit.MILLISECONDS); + return _latch.await(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException x) { @@ -707,9 +720,4 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du } } } - - private enum State - { - PROCESSING, SELECTING - } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 153edd8e036..9a57060d8fb 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -22,11 +22,11 @@ import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.SpinLock; /** * An ChannelEndpoint that can be scheduled by {@link SelectorManager}. @@ -35,12 +35,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel { public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); - private enum State - { - UPDATED, UPDATE_PENDING, LOCKED - } + private final SpinLock _lock = new SpinLock(); + private boolean _updatePending; - private final AtomicReference _interestState = new AtomicReference<>(State.UPDATED); /** * true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called */ @@ -83,68 +80,26 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel /** * This method may run concurrently with {@link #changeInterests(int)}. */ - - while (true) + int readyOps; + int oldInterestOps; + int newInterestOps; + try(SpinLock.Lock lock = _lock.lock()) { - State current = _interestState.get(); - if (LOG.isDebugEnabled()) - LOG.debug("Processing, state {} for {}", current, this); - switch (current) - { - case UPDATE_PENDING: - case UPDATED: - { - if (!_interestState.compareAndSet(current, State.LOCKED)) - continue; + _updatePending=true; - int readyOps; - try - { - // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both). - readyOps = _key.readyOps(); - int oldInterestOps = _interestOps; - int newInterestOps = oldInterestOps & ~readyOps; - _interestOps = newInterestOps; - - if (LOG.isDebugEnabled()) - LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this); - } - finally - { - // We have been called by onSelected, so we know the - // selector will call updateKey before selecting again. - _interestState.set(State.UPDATE_PENDING); - } - - boolean readable = (readyOps & SelectionKey.OP_READ) != 0; - boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0; - if (readable) - { - if (writable) - return _runFillableCompleteWrite; - return _runFillable; - } - else if (writable) - { - return _runCompleteWrite; - } - else - { - return null; - } - } - case LOCKED: - { - // Wait for other operations to finish. - Thread.yield(); - break; - } - default: - { - throw new IllegalStateException("Invalid state: " + current); - } - } + // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both). + readyOps = _key.readyOps(); + oldInterestOps = _interestOps; + newInterestOps = oldInterestOps & ~readyOps; + _interestOps = newInterestOps; } + + if (LOG.isDebugEnabled()) + LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this); + + boolean readable = (readyOps & SelectionKey.OP_READ) != 0; + boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0; + return readable?(writable?_runFillableCompleteWrite:_runFillable):(writable?_runCompleteWrite:null); } @Override @@ -154,114 +109,19 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel * This method may run concurrently with * {@link #changeInterests(int)} and {@link #onSelected()}. */ - - while (true) - { - State current = _interestState.get(); - if (LOG.isDebugEnabled()) - LOG.debug("Updating key, state {} for {}", current, this); - switch (current) - { - case UPDATE_PENDING: - case UPDATED: - { - if (!_interestState.compareAndSet(current, State.LOCKED)) - continue; - - try - { - // Set the key interest as expected. - setKeyInterests(); - return; - } - finally - { - // We have just updated the key, so we are now updated! - // and no call to unpdateKey is pending - _interestState.set(State.UPDATED); - } - } - case LOCKED: - { - // Wait for other operations to finish. - Thread.yield(); - break; - } - default: - { - throw new IllegalStateException("Invalid state: " + current); - } - } - } - } - - private void changeInterests(int operation) - { - /** - * This method may run concurrently with - * {@link #updateKey()} and {@link #onSelected()}. - */ - - while (true) - { - State current = _interestState.get(); - if (LOG.isDebugEnabled()) - LOG.debug("Changing interests in state {} for {}", current, this); - switch (current) - { - case UPDATE_PENDING: - case UPDATED: - { - if (!_interestState.compareAndSet(current, State.LOCKED)) - continue; - - try - { - int oldInterestOps = _interestOps; - int newInterestOps = oldInterestOps | operation; - - if (LOG.isDebugEnabled()) - LOG.debug("changeInterests s={} {}->{} for {}", current, oldInterestOps, newInterestOps, this); - - if (newInterestOps != oldInterestOps) - _interestOps = newInterestOps; - - if (current==State.UPDATED) - _selector.submit(_runUpdateKey); - } - finally - { - // If we were pending a call to updateKey, then we still are. - // If we were not, then we have submitted a callback to runUpdateKey, so we now are pending. - _interestState.set(State.UPDATE_PENDING); - } - - return; - } - case LOCKED: - { - // We lost the race to update _interestOps, but we - // must update it nonetheless, so yield and spin, - // waiting for our chance to update _interestOps. - Thread.yield(); - break; - } - default: - { - throw new IllegalStateException("Invalid state: " + current); - } - } - } - } - - private void setKeyInterests() - { try { - int oldInterestOps = _key.interestOps(); - int newInterestOps = _interestOps; - if (oldInterestOps != newInterestOps) - _key.interestOps(newInterestOps); + int oldInterestOps; + int newInterestOps; + try(SpinLock.Lock lock = _lock.lock()) + { + _updatePending=false; + oldInterestOps = _key.interestOps(); + newInterestOps = _interestOps; + if (oldInterestOps != newInterestOps) + _key.interestOps(newInterestOps); + } + if (LOG.isDebugEnabled()) LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this); } @@ -277,6 +137,34 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel } } + private void changeInterests(int operation) + { + /** + * This method may run concurrently with + * {@link #updateKey()} and {@link #onSelected()}. + */ + + int oldInterestOps; + int newInterestOps; + boolean pending; + + try(SpinLock.Lock lock = _lock.lock()) + { + pending=_updatePending; + oldInterestOps = _interestOps; + newInterestOps = oldInterestOps | operation; + if (newInterestOps != oldInterestOps) + _interestOps = newInterestOps; + } + + if (LOG.isDebugEnabled()) + LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this); + + if (!pending) + _selector.submit(_runUpdateKey); + } + + @Override public void close() { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SpinLock.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SpinLock.java index 23c7a18679d..ca451931fd0 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SpinLock.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SpinLock.java @@ -53,6 +53,11 @@ public class SpinLock } } + public boolean isLocked() + { + return _lock.get(); + } + public class Lock implements AutoCloseable { @Override diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SpinLockTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SpinLockTest.java new file mode 100644 index 00000000000..0533d7cb65d --- /dev/null +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SpinLockTest.java @@ -0,0 +1,140 @@ +/* ------------------------------------------------------------ */ +/** Spin Lock + *

This is a lock designed to protect VERY short sections of + * critical code. Threads attempting to take the lock will spin + * forever until the lock is available, thus it is important that + * the code protected by this lock is extremely simple and non + * blocking. The reason for this lock is that it prevents a thread + * from giving up a CPU core when contending for the lock.

+ *
+ * try(SpinLock.Lock lock = spinlock.lock())
+ * {
+ *   // something very quick and non blocking
+ * }
+ * 
+ */ + +package org.eclipse.jetty.util.thread; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +public class SpinLockTest +{ + + @Test + public void testLocked() + { + SpinLock lock = new SpinLock(); + assertFalse(lock.isLocked()); + + try(SpinLock.Lock l = lock.lock()) + { + assertTrue(lock.isLocked()); + } + finally + { + assertFalse(lock.isLocked()); + } + + assertFalse(lock.isLocked()); + } + + @Test + public void testLockedException() + { + SpinLock lock = new SpinLock(); + assertFalse(lock.isLocked()); + + try(SpinLock.Lock l = lock.lock()) + { + assertTrue(lock.isLocked()); + throw new Exception(); + } + catch(Exception e) + { + assertFalse(lock.isLocked()); + } + finally + { + assertFalse(lock.isLocked()); + } + + assertFalse(lock.isLocked()); + } + + + @Test + public void testContend() throws Exception + { + final SpinLock lock = new SpinLock(); + + final CountDownLatch held0 = new CountDownLatch(1); + final CountDownLatch hold0 = new CountDownLatch(1); + + Thread thread0 = new Thread() + { + @Override + public void run() + { + try(SpinLock.Lock l = lock.lock()) + { + held0.countDown(); + hold0.await(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + }; + thread0.start(); + held0.await(); + + assertTrue(lock.isLocked()); + + + final CountDownLatch held1 = new CountDownLatch(1); + final CountDownLatch hold1 = new CountDownLatch(1); + Thread thread1 = new Thread() + { + @Override + public void run() + { + try(SpinLock.Lock l = lock.lock()) + { + held1.countDown(); + hold1.await(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + }; + thread1.start(); + // thread1 will be spinning here + assertFalse(held1.await(100,TimeUnit.MILLISECONDS)); + + // Let thread0 complete + hold0.countDown(); + thread0.join(); + + // thread1 can progress + held1.await(); + + // let thread1 complete + hold1.countDown(); + thread1.join(); + + assertFalse(lock.isLocked()); + + } + + +}