Reviewed and made cosmetic changes.

This commit is contained in:
Simone Bordet 2015-02-19 11:46:09 +01:00
parent 7bb3127ee1
commit d4dfc0762f
2 changed files with 55 additions and 52 deletions

View File

@ -58,7 +58,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private static final Logger LOG = Log.getLogger(ManagedSelector.class);
private final SpinLock _lock = new SpinLock();
private boolean _selecting=false;
private boolean _selecting = false;
private final Queue<Runnable> _actions = new ConcurrentArrayQueue<>();
private final SelectorManager _selectorManager;
private final int _id;
@ -105,7 +105,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
CloseSelector close_selector = new CloseSelector();
submit(close_selector);
close_selector.await(getStopTimeout());
if (LOG.isDebugEnabled())
LOG.debug("Stopped {}", this);
}
@ -120,10 +120,11 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
_actions.offer(change);
if (_selecting)
{
if (_selector!=null)
_selector.wakeup();
Selector selector = _selector;
if (selector != null)
selector.wakeup();
// To avoid the extra select wakeup.
_selecting=false;
_selecting = false;
}
}
}
@ -191,7 +192,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
if (action == null)
{
// No more actions, so we need to select
_selecting=true;
_selecting = true;
return null;
}
}
@ -222,8 +223,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
{
try
{
Selector selector=_selector;
if (selector!=null && selector.isOpen())
Selector selector = _selector;
if (selector != null && selector.isOpen())
{
if (LOG.isDebugEnabled())
LOG.debug("Selector loop waiting on select");
@ -234,7 +235,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
try (SpinLock.Lock lock = _lock.lock())
{
// finished selecting
_selecting=false;
_selecting = false;
}
_keys = selector.selectedKeys();
@ -267,7 +268,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
if (attachment instanceof SelectableEndPoint)
{
// Try to produce a task
@SuppressWarnings("resource")
SelectableEndPoint selectable = (SelectableEndPoint)attachment;
Runnable task = selectable.onSelected();
if (task != null)
@ -339,6 +339,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
{
key.attach(connect.attachment);
boolean connected = _selectorManager.finishConnect(channel);
if (LOG.isDebugEnabled())
LOG.debug("Connected {} {}", connected, channel);
if (connected)
{
connect.timeout.cancel();
@ -416,7 +418,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
{
@Override
public void run()
{
{
if (LOG.isDebugEnabled())
LOG.debug("Destroyed {}", endPoint);
if (connection != null)
@ -425,7 +427,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
});
}
@Override
public String dump()
{
@ -435,7 +437,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
@Override
public void dump(Appendable out, String indent) throws IOException
{
out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append(System.lineSeparator());
Selector selector = _selector;
if (selector != null && selector.isOpen())
@ -475,14 +477,22 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
public void run()
{
Selector selector = _selector;
if (selector!=null)
if (selector != null && selector.isOpen())
{
Set<SelectionKey> keys = selector.keys();
_dumps.add(selector + " keys=" + keys.size());
for (SelectionKey key : keys)
_dumps.add(String.format("Key@%x{i=%d}->%s",key.hashCode(),key.interestOps(),key.attachment()));
{
try
{
_dumps.add(String.format("SelectionKey@%x{i=%d}->%s", key.hashCode(), key.interestOps(), key.attachment()));
}
catch (Throwable x)
{
LOG.ignore(x);
}
}
}
latch.countDown();
}
@ -499,7 +509,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
class Acceptor implements Runnable
{
private final ServerSocketChannel _channel;
@ -646,13 +655,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private class CloseEndPoints implements Runnable
{
private CountDownLatch _latch=new CountDownLatch(1);
CountDownLatch _allClosed ;
private final CountDownLatch _latch = new CountDownLatch(1);
private CountDownLatch _allClosed;
@Override
public void run()
{
List<EndPoint> end_points = new ArrayList<EndPoint>();
List<EndPoint> end_points = new ArrayList<>();
for (SelectionKey key : _selector.keys())
{
if (key.isValid())
@ -667,15 +676,15 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
_latch.countDown();
for (EndPoint endp : end_points)
submit(new EndPointCloser(endp,_allClosed));
submit(new EndPointCloser(endp, _allClosed));
}
public boolean await(long timeout)
{
try
{
return _latch.await(timeout, TimeUnit.MILLISECONDS) &&
_allClosed.await(timeout,TimeUnit.MILLISECONDS);
return _latch.await(timeout, TimeUnit.MILLISECONDS) &&
_allClosed.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException x)
{
@ -686,36 +695,32 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private class EndPointCloser implements Product
{
private final EndPoint _endp;
private final EndPoint _endPoint;
private final CountDownLatch _latch;
private EndPointCloser(EndPoint endPoint, CountDownLatch latch)
{
_endp = endPoint;
_endPoint = endPoint;
_latch = latch;
}
@Override
public void run()
{
closeNoExceptions(_endp.getConnection());
closeNoExceptions(_endPoint.getConnection());
_latch.countDown();
}
}
private class CloseSelector implements Runnable
{
private CountDownLatch _latch=new CountDownLatch(1);
private CountDownLatch _latch = new CountDownLatch(1);
@Override
public void run()
{
Selector selector;
try(SpinLock.Lock lock = _lock.lock())
{
selector=_selector;
_selector=null;
}
Selector selector = _selector;
_selector = null;
closeNoExceptions(selector);
_latch.countDown();
}
@ -724,7 +729,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
{
try
{
return _latch.await(timeout, TimeUnit.MILLISECONDS);
return _latch.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException x)
{

View File

@ -37,7 +37,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
private final SpinLock _lock = new SpinLock();
private boolean _updatePending;
/**
* true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called
*/
@ -48,7 +48,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
* The desired value for {@link SelectionKey#interestOps()}
*/
private int _interestOps;
private final Runnable _runUpdateKey = new Runnable() { public void run() { updateKey(); } };
private final Runnable _runFillable = new Runnable() { public void run() { getFillInterest().fillable(); } };
private final Runnable _runCompleteWrite = new Runnable() { public void run() { getWriteFlusher().completeWrite(); } };
@ -80,12 +80,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
/**
* This method may run concurrently with {@link #changeInterests(int)}.
*/
int readyOps;
int oldInterestOps;
int newInterestOps;
try(SpinLock.Lock lock = _lock.lock())
try (SpinLock.Lock lock = _lock.lock())
{
_updatePending=true;
_updatePending = true;
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
readyOps = _key.readyOps();
@ -96,32 +97,33 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this);
boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
return readable?(writable?_runFillableCompleteWrite:_runFillable):(writable?_runCompleteWrite:null);
return readable ? (writable ? _runFillableCompleteWrite : _runFillable)
: (writable ? _runCompleteWrite : null);
}
@Override
public void updateKey()
{
/**
* This method may run concurrently with
* {@link #changeInterests(int)} and {@link #onSelected()}.
* This method may run concurrently with {@link #changeInterests(int)}.
*/
try
{
int oldInterestOps;
int newInterestOps;
try(SpinLock.Lock lock = _lock.lock())
try (SpinLock.Lock lock = _lock.lock())
{
_updatePending=false;
_updatePending = false;
oldInterestOps = _key.interestOps();
newInterestOps = _interestOps;
if (oldInterestOps != newInterestOps)
_key.interestOps(newInterestOps);
}
if (LOG.isDebugEnabled())
LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this);
}
@ -147,10 +149,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
int oldInterestOps;
int newInterestOps;
boolean pending;
try(SpinLock.Lock lock = _lock.lock())
try (SpinLock.Lock lock = _lock.lock())
{
pending=_updatePending;
pending = _updatePending;
oldInterestOps = _interestOps;
newInterestOps = oldInterestOps | operation;
if (newInterestOps != oldInterestOps)
@ -194,8 +195,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
@Override
public String toString()
{
// Do NOT use synchronized (this)
// because it's very easy to deadlock when debugging is enabled.
// We do a best effort to print the right toString() and that's it.
try
{
@ -208,10 +207,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
keyInterests,
keyReadiness);
}
catch (CancelledKeyException x)
catch (Throwable x)
{
return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps);
}
}
}