460210 - ExecutionStragegy producer for SelectManager calls onOpen from produce method

Refinements to the refactored managed selector
This commit is contained in:
Greg Wilkins 2015-02-19 13:24:08 +11:00
parent e541865cef
commit 28d2172ea7
5 changed files with 301 additions and 253 deletions

View File

@ -20,8 +20,10 @@
package org.eclipse.jetty.embedded;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Date;
import java.util.EnumSet;
import javax.servlet.DispatcherType;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
@ -39,6 +41,7 @@ import javax.servlet.http.HttpSession;
import org.eclipse.jetty.alpn.ALPN;
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NegotiatingServerConnectionFactory;
@ -63,6 +66,10 @@ public class Http2Server
{
Server server = new Server();
MBeanContainer mbContainer = new MBeanContainer(
ManagementFactory.getPlatformMBeanServer());
server.addBean(mbContainer);
ServletContextHandler context = new ServletContextHandler(server, "/",ServletContextHandler.SESSIONS);
context.setResourceBase("src/main/resources/docroot");
context.addFilter(PushSessionCacheFilter.class,"/*",EnumSet.of(DispatcherType.REQUEST));

View File

@ -27,8 +27,10 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -37,6 +39,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
@ -49,16 +53,16 @@ import org.eclipse.jetty.util.thread.SpinLock;
* happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
* with the channel.</p>
*/
public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Dumpable TODO
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
{
private static final Logger LOG = Log.getLogger(ManagedSelector.class);
private final SpinLock _lock = new SpinLock();
private boolean _selecting=false;
private final Queue<Runnable> _actions = new ConcurrentArrayQueue<>();
private final SelectorManager _selectorManager;
private final int _id;
private final ExecutionStrategy _strategy;
private State _state = State.PROCESSING;
private Selector _selector;
public ManagedSelector(SelectorManager selectorManager, int id)
@ -94,9 +98,14 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
{
if (LOG.isDebugEnabled())
LOG.debug("Stopping {}", this);
Stop stop = new Stop();
submit(stop);
stop.await(getStopTimeout());
CloseEndPoints close_endps = new CloseEndPoints();
submit(close_endps);
close_endps.await(getStopTimeout());
super.doStop();
CloseSelector close_selector = new CloseSelector();
submit(close_selector);
close_selector.await(getStopTimeout());
if (LOG.isDebugEnabled())
LOG.debug("Stopped {}", this);
}
@ -109,12 +118,11 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
try (SpinLock.Lock lock = _lock.lock())
{
_actions.offer(change);
if (_state == State.SELECTING)
if (_selecting)
{
_selector.wakeup();
// Move to PROCESSING now, so other submit()
// calls will avoid the extra select wakeup.
_state = State.PROCESSING;
// To avoid the extra select wakeup.
_selecting=false;
}
}
}
@ -181,7 +189,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
action = _actions.poll();
if (action == null)
{
_state = State.SELECTING;
// No more actions, so we need to select
_selecting=true;
return null;
}
}
@ -212,21 +221,26 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("Selector loop waiting on select");
int selected = _selector.select();
if (LOG.isDebugEnabled())
LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
try (SpinLock.Lock lock = _lock.lock())
Selector selector=_selector;
if (selector!=null && selector.isOpen())
{
_state = State.PROCESSING;
if (LOG.isDebugEnabled())
LOG.debug("Selector loop waiting on select");
int selected = selector.select();
if (LOG.isDebugEnabled())
LOG.debug("Selector loop woken up from select, {}/{} selected", selected, selector.keys().size());
try (SpinLock.Lock lock = _lock.lock())
{
// finished selecting
_selecting=false;
}
_keys = selector.selectedKeys();
_cursor = _keys.iterator();
return true;
}
_keys = _selector.selectedKeys();
_cursor = _keys.iterator();
return true;
}
catch (Throwable x)
{
@ -235,8 +249,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
LOG.warn(x);
else
LOG.debug(x);
return false;
}
return false;
}
private Runnable processSelected()
@ -252,6 +266,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
if (attachment instanceof SelectableEndPoint)
{
// Try to produce a task
@SuppressWarnings("resource")
SelectableEndPoint selectable = (SelectableEndPoint)attachment;
Runnable task = selectable.onSelected();
if (task != null)
@ -403,7 +418,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
_selectorManager.connectionClosed(connection);
_selectorManager.endPointClosed(endPoint);
}
/*
@Override
public String dump()
{
@ -428,20 +443,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
}
}
public void dumpKeysState(List<Object> dumps)
{
Selector selector = _selector;
Set<SelectionKey> keys = selector.keys();
dumps.add(selector + " keys=" + keys.size());
for (SelectionKey key : keys)
{
if (key.isValid())
dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
else
dumps.add(key.attachment() + " iOps=-1 rOps=-1");
}
}
@Override
public String toString()
{
@ -465,7 +466,15 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
@Override
public void run()
{
dumpKeysState(_dumps);
Selector selector = _selector;
if (selector!=null)
{
Set<SelectionKey> keys = selector.keys();
_dumps.add(selector + " keys=" + keys.size());
for (SelectionKey key : keys)
_dumps.add(String.format("Key@%x{i=%d}->%s",key.hashCode(),key.interestOps(),key.attachment()));
}
latch.countDown();
}
@ -481,7 +490,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
}
}
}
*/
class Acceptor implements Runnable
{
private final ServerSocketChannel _channel;
@ -626,44 +636,38 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
}
}
// TODO: convert this to produce tasks that are run by the ExecutionStrategy.
private class Stop implements Runnable
private class CloseEndPoints implements Runnable
{
private final CountDownLatch latch = new CountDownLatch(1);
private CountDownLatch _latch=new CountDownLatch(1);
CountDownLatch _allClosed ;
@Override
public void run()
{
try
List<EndPoint> end_points = new ArrayList<EndPoint>();
for (SelectionKey key : _selector.keys())
{
for (SelectionKey key : _selector.keys())
if (key.isValid())
{
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
{
EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
ManagedSelector.this._selectorManager.execute(closer);
// We are closing the SelectorManager, so we want to block the
// selector thread here until we have closed all EndPoints.
// This is different than calling close() directly, because close()
// can wait forever, while here we are limited by the stop timeout.
closer.await(getStopTimeout());
}
end_points.add((EndPoint)attachment);
}
}
closeNoExceptions(_selector);
}
finally
{
latch.countDown();
}
_allClosed = new CountDownLatch(end_points.size());
_latch.countDown();
for (EndPoint endp : end_points)
submit(new EndPointCloser(endp,_allClosed));
}
public boolean await(long timeout)
{
try
{
return latch.await(timeout, TimeUnit.MILLISECONDS);
return _latch.await(timeout, TimeUnit.MILLISECONDS) &&
_allClosed.await(timeout,TimeUnit.MILLISECONDS);
}
catch (InterruptedException x)
{
@ -672,34 +676,43 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
}
}
private class EndPointCloser implements Runnable
private class EndPointCloser implements Product
{
private final CountDownLatch latch = new CountDownLatch(1);
private final EndPoint endPoint;
private final EndPoint _endp;
private final CountDownLatch _latch;
private EndPointCloser(EndPoint endPoint)
private EndPointCloser(EndPoint endPoint, CountDownLatch latch)
{
this.endPoint = endPoint;
_endp = endPoint;
_latch = latch;
}
@Override
public void run()
{
try
{
closeNoExceptions(endPoint.getConnection());
}
finally
{
latch.countDown();
}
closeNoExceptions(_endp.getConnection());
_latch.countDown();
}
}
private class CloseSelector implements Runnable
{
private CountDownLatch _latch=new CountDownLatch(1);
@Override
public void run()
{
Selector selector=_selector;
_selector=null;
closeNoExceptions(selector);
_latch.countDown();
}
private boolean await(long timeout)
public boolean await(long timeout)
{
try
{
return latch.await(timeout, TimeUnit.MILLISECONDS);
return _latch.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException x)
{
@ -707,9 +720,4 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
}
}
}
private enum State
{
PROCESSING, SELECTING
}
}

View File

@ -22,11 +22,11 @@ import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SpinLock;
/**
* An ChannelEndpoint that can be scheduled by {@link SelectorManager}.
@ -35,12 +35,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
{
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
private enum State
{
UPDATED, UPDATE_PENDING, LOCKED
}
private final SpinLock _lock = new SpinLock();
private boolean _updatePending;
private final AtomicReference<State> _interestState = new AtomicReference<>(State.UPDATED);
/**
* true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called
*/
@ -83,68 +80,26 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
/**
* This method may run concurrently with {@link #changeInterests(int)}.
*/
while (true)
int readyOps;
int oldInterestOps;
int newInterestOps;
try(SpinLock.Lock lock = _lock.lock())
{
State current = _interestState.get();
if (LOG.isDebugEnabled())
LOG.debug("Processing, state {} for {}", current, this);
switch (current)
{
case UPDATE_PENDING:
case UPDATED:
{
if (!_interestState.compareAndSet(current, State.LOCKED))
continue;
_updatePending=true;
int readyOps;
try
{
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
readyOps = _key.readyOps();
int oldInterestOps = _interestOps;
int newInterestOps = oldInterestOps & ~readyOps;
_interestOps = newInterestOps;
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this);
}
finally
{
// We have been called by onSelected, so we know the
// selector will call updateKey before selecting again.
_interestState.set(State.UPDATE_PENDING);
}
boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
if (readable)
{
if (writable)
return _runFillableCompleteWrite;
return _runFillable;
}
else if (writable)
{
return _runCompleteWrite;
}
else
{
return null;
}
}
case LOCKED:
{
// Wait for other operations to finish.
Thread.yield();
break;
}
default:
{
throw new IllegalStateException("Invalid state: " + current);
}
}
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
readyOps = _key.readyOps();
oldInterestOps = _interestOps;
newInterestOps = oldInterestOps & ~readyOps;
_interestOps = newInterestOps;
}
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this);
boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
return readable?(writable?_runFillableCompleteWrite:_runFillable):(writable?_runCompleteWrite:null);
}
@Override
@ -154,114 +109,19 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
* This method may run concurrently with
* {@link #changeInterests(int)} and {@link #onSelected()}.
*/
while (true)
{
State current = _interestState.get();
if (LOG.isDebugEnabled())
LOG.debug("Updating key, state {} for {}", current, this);
switch (current)
{
case UPDATE_PENDING:
case UPDATED:
{
if (!_interestState.compareAndSet(current, State.LOCKED))
continue;
try
{
// Set the key interest as expected.
setKeyInterests();
return;
}
finally
{
// We have just updated the key, so we are now updated!
// and no call to unpdateKey is pending
_interestState.set(State.UPDATED);
}
}
case LOCKED:
{
// Wait for other operations to finish.
Thread.yield();
break;
}
default:
{
throw new IllegalStateException("Invalid state: " + current);
}
}
}
}
private void changeInterests(int operation)
{
/**
* This method may run concurrently with
* {@link #updateKey()} and {@link #onSelected()}.
*/
while (true)
{
State current = _interestState.get();
if (LOG.isDebugEnabled())
LOG.debug("Changing interests in state {} for {}", current, this);
switch (current)
{
case UPDATE_PENDING:
case UPDATED:
{
if (!_interestState.compareAndSet(current, State.LOCKED))
continue;
try
{
int oldInterestOps = _interestOps;
int newInterestOps = oldInterestOps | operation;
if (LOG.isDebugEnabled())
LOG.debug("changeInterests s={} {}->{} for {}", current, oldInterestOps, newInterestOps, this);
if (newInterestOps != oldInterestOps)
_interestOps = newInterestOps;
if (current==State.UPDATED)
_selector.submit(_runUpdateKey);
}
finally
{
// If we were pending a call to updateKey, then we still are.
// If we were not, then we have submitted a callback to runUpdateKey, so we now are pending.
_interestState.set(State.UPDATE_PENDING);
}
return;
}
case LOCKED:
{
// We lost the race to update _interestOps, but we
// must update it nonetheless, so yield and spin,
// waiting for our chance to update _interestOps.
Thread.yield();
break;
}
default:
{
throw new IllegalStateException("Invalid state: " + current);
}
}
}
}
private void setKeyInterests()
{
try
{
int oldInterestOps = _key.interestOps();
int newInterestOps = _interestOps;
if (oldInterestOps != newInterestOps)
_key.interestOps(newInterestOps);
int oldInterestOps;
int newInterestOps;
try(SpinLock.Lock lock = _lock.lock())
{
_updatePending=false;
oldInterestOps = _key.interestOps();
newInterestOps = _interestOps;
if (oldInterestOps != newInterestOps)
_key.interestOps(newInterestOps);
}
if (LOG.isDebugEnabled())
LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this);
}
@ -277,6 +137,34 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
}
}
private void changeInterests(int operation)
{
/**
* This method may run concurrently with
* {@link #updateKey()} and {@link #onSelected()}.
*/
int oldInterestOps;
int newInterestOps;
boolean pending;
try(SpinLock.Lock lock = _lock.lock())
{
pending=_updatePending;
oldInterestOps = _interestOps;
newInterestOps = oldInterestOps | operation;
if (newInterestOps != oldInterestOps)
_interestOps = newInterestOps;
}
if (LOG.isDebugEnabled())
LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
if (!pending)
_selector.submit(_runUpdateKey);
}
@Override
public void close()
{

View File

@ -53,6 +53,11 @@ public class SpinLock
}
}
public boolean isLocked()
{
return _lock.get();
}
public class Lock implements AutoCloseable
{
@Override

View File

@ -0,0 +1,140 @@
/* ------------------------------------------------------------ */
/** Spin Lock
* <p>This is a lock designed to protect VERY short sections of
* critical code. Threads attempting to take the lock will spin
* forever until the lock is available, thus it is important that
* the code protected by this lock is extremely simple and non
* blocking. The reason for this lock is that it prevents a thread
* from giving up a CPU core when contending for the lock.</p>
* <pre>
* try(SpinLock.Lock lock = spinlock.lock())
* {
* // something very quick and non blocking
* }
* </pre>
*/
package org.eclipse.jetty.util.thread;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class SpinLockTest
{
@Test
public void testLocked()
{
SpinLock lock = new SpinLock();
assertFalse(lock.isLocked());
try(SpinLock.Lock l = lock.lock())
{
assertTrue(lock.isLocked());
}
finally
{
assertFalse(lock.isLocked());
}
assertFalse(lock.isLocked());
}
@Test
public void testLockedException()
{
SpinLock lock = new SpinLock();
assertFalse(lock.isLocked());
try(SpinLock.Lock l = lock.lock())
{
assertTrue(lock.isLocked());
throw new Exception();
}
catch(Exception e)
{
assertFalse(lock.isLocked());
}
finally
{
assertFalse(lock.isLocked());
}
assertFalse(lock.isLocked());
}
@Test
public void testContend() throws Exception
{
final SpinLock lock = new SpinLock();
final CountDownLatch held0 = new CountDownLatch(1);
final CountDownLatch hold0 = new CountDownLatch(1);
Thread thread0 = new Thread()
{
@Override
public void run()
{
try(SpinLock.Lock l = lock.lock())
{
held0.countDown();
hold0.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
};
thread0.start();
held0.await();
assertTrue(lock.isLocked());
final CountDownLatch held1 = new CountDownLatch(1);
final CountDownLatch hold1 = new CountDownLatch(1);
Thread thread1 = new Thread()
{
@Override
public void run()
{
try(SpinLock.Lock l = lock.lock())
{
held1.countDown();
hold1.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
};
thread1.start();
// thread1 will be spinning here
assertFalse(held1.await(100,TimeUnit.MILLISECONDS));
// Let thread0 complete
hold0.countDown();
thread0.join();
// thread1 can progress
held1.await();
// let thread1 complete
hold1.countDown();
thread1.join();
assertFalse(lock.isLocked());
}
}