SelectManager executes endpoint tasks

This commit is contained in:
Greg Wilkins 2014-12-17 17:03:15 +01:00
parent bbd2ba60e7
commit 97af3632a1
5 changed files with 19 additions and 13 deletions

View File

@ -325,12 +325,14 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private void processKey(SelectionKey key) private void processKey(SelectionKey key)
{ {
Object attachment = key.attachment(); final Object attachment = key.attachment();
try try
{ {
if (attachment instanceof SelectableEndPoint) if (attachment instanceof SelectableEndPoint)
{ {
((SelectableEndPoint)attachment).onSelected(); Runnable task=((SelectableEndPoint)attachment).onSelected();
if (task!=null)
_selectorManager.getExecutor().execute(task);
} }
else if (key.isConnectable()) else if (key.isConnectable())
{ {

View File

@ -53,6 +53,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
private int _interestOps; private int _interestOps;
private final Runnable _runUpdateKey = new Runnable() { public void run() { updateKey(); } }; private final Runnable _runUpdateKey = new Runnable() { public void run() { updateKey(); } };
private final Runnable _runFillable = new Runnable() { public void run() { getFillInterest().fillable(); } };
private final Runnable _runCompleteWrite = new Runnable() { public void run() { getWriteFlusher().completeWrite(); } };
private final Runnable _runFillableCompleteWrite = new Runnable() { public void run() { getFillInterest().fillable(); getWriteFlusher().completeWrite(); } };
public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout) public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
{ {
@ -76,14 +79,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
} }
@Override @Override
public void onSelected() public Runnable onSelected()
{ {
/** /**
* This method may run concurrently with {@link #changeInterests(int)}. * This method may run concurrently with {@link #changeInterests(int)}.
*/ */
assert _selector.isSelectorThread();
while (true) while (true)
{ {
State current = _interestState.get(); State current = _interestState.get();
@ -117,11 +118,15 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
} }
if ((readyOps & SelectionKey.OP_READ) != 0) if ((readyOps & SelectionKey.OP_READ) != 0)
getFillInterest().fillable(); {
if ((readyOps & SelectionKey.OP_WRITE) != 0) if ((readyOps & SelectionKey.OP_WRITE) != 0)
getWriteFlusher().completeWrite(); return _runFillableCompleteWrite;
return _runFillable;
}
if ((readyOps & SelectionKey.OP_WRITE) != 0)
return _runCompleteWrite;
return; return null;
} }
case LOCKED: case LOCKED:
{ {

View File

@ -394,7 +394,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* Callback method invoked when a read or write events has been * Callback method invoked when a read or write events has been
* detected by the {@link ManagedSelector} for this endpoint. * detected by the {@link ManagedSelector} for this endpoint.
*/ */
void onSelected(); Runnable onSelected();
/** /**
* Callback method invoked when all the keys selected by the * Callback method invoked when all the keys selected by the

View File

@ -276,7 +276,6 @@ public class SslConnectionTest
len=5; len=5;
while(len>0) while(len>0)
len-=client.getInputStream().read(buffer); len-=client.getInputStream().read(buffer);
Assert.assertEquals(1, _dispatches.get());
client.close(); client.close();
} }
@ -308,7 +307,7 @@ public class SslConnectionTest
public void testBlockedWrite() throws Exception public void testBlockedWrite() throws Exception
{ {
Socket client = newClient(); Socket client = newClient();
client.setSoTimeout(60000); client.setSoTimeout(5000);
SocketChannel server = _connector.accept(); SocketChannel server = _connector.accept();
server.configureBlocking(false); server.configureBlocking(false);

View File

@ -78,10 +78,10 @@ public class ExtendedServerTest extends HttpServerTestBase
} }
@Override @Override
public void onSelected() public Runnable onSelected()
{ {
_lastSelected=System.currentTimeMillis(); _lastSelected=System.currentTimeMillis();
super.onSelected(); return super.onSelected();
} }
long getLastSelected() long getLastSelected()