Jetty9 - Better handling for I/O interests.
When the SelectChannelEndPoint is selected, it should only remove interests for what has been selected, so that other interests are kept unchanged.
This commit is contained in:
parent
aa24693870
commit
0c1ca16e86
|
@ -54,8 +54,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
@Override
|
||||
protected boolean needsFill()
|
||||
{
|
||||
updateKey(SelectionKey.OP_READ, true);
|
||||
return false;
|
||||
return SelectChannelEndPoint.this.needsFill();
|
||||
}
|
||||
};
|
||||
private final WriteFlusher _writeFlusher = new WriteFlusher(this)
|
||||
|
@ -63,7 +62,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
@Override
|
||||
protected void onIncompleteFlushed()
|
||||
{
|
||||
updateKey(SelectionKey.OP_WRITE, true);
|
||||
SelectChannelEndPoint.this.onIncompleteFlush();
|
||||
}
|
||||
};
|
||||
private final SelectorManager.ManagedSelector _selector;
|
||||
|
@ -91,6 +90,17 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
scheduleIdleTimeout(idleTimeout);
|
||||
}
|
||||
|
||||
protected boolean needsFill()
|
||||
{
|
||||
updateLocalInterests(SelectionKey.OP_READ, true);
|
||||
return false;
|
||||
}
|
||||
|
||||
protected void onIncompleteFlush()
|
||||
{
|
||||
updateLocalInterests(SelectionKey.OP_WRITE, true);
|
||||
}
|
||||
|
||||
private void scheduleIdleTimeout(long delay)
|
||||
{
|
||||
Future<?> newTimeout = null;
|
||||
|
@ -138,7 +148,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
@Override
|
||||
public void onSelected()
|
||||
{
|
||||
_interestOps = 0;
|
||||
int oldInterestOps = _key.interestOps();
|
||||
int readyOps = _key.readyOps();
|
||||
int newInterestOps = oldInterestOps & ~readyOps;
|
||||
setKeyInterests(oldInterestOps, newInterestOps);
|
||||
updateLocalInterests(readyOps, false);
|
||||
if (_key.isReadable())
|
||||
_readInterest.readable();
|
||||
if (_key.isWritable())
|
||||
|
@ -178,7 +192,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
}
|
||||
}
|
||||
|
||||
private void updateKey(int operation, boolean add)
|
||||
private void updateLocalInterests(int operation, boolean add)
|
||||
{
|
||||
int oldInterestOps = _interestOps;
|
||||
int newInterestOps;
|
||||
|
@ -195,12 +209,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
if (newInterestOps != oldInterestOps)
|
||||
{
|
||||
_interestOps = newInterestOps;
|
||||
LOG.debug("Key update {} -> {} for {}", oldInterestOps, newInterestOps, this);
|
||||
LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
|
||||
_selector.submit(this);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.debug("Ignoring key update {} -> {} for {}", oldInterestOps, newInterestOps, this);
|
||||
LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -214,7 +228,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
int oldInterestOps = _key.interestOps();
|
||||
int newInterestOps = _interestOps;
|
||||
if (newInterestOps != oldInterestOps)
|
||||
_key.interestOps(newInterestOps);
|
||||
setKeyInterests(oldInterestOps, newInterestOps);
|
||||
}
|
||||
}
|
||||
catch (CancelledKeyException x)
|
||||
|
@ -229,6 +243,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
}
|
||||
}
|
||||
|
||||
private void setKeyInterests(int oldInterestOps, int newInterestOps)
|
||||
{
|
||||
LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps);
|
||||
_key.interestOps(newInterestOps);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -218,13 +218,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
* or {@link #accept(SocketChannel)}.</p>
|
||||
*
|
||||
* @param channel the channel associated to the endpoint
|
||||
* @param selectSet the selector the channel is registered to
|
||||
* @param selector the selector the channel is registered to
|
||||
* @param selectionKey the selection key
|
||||
* @return a new endpoint
|
||||
* @throws IOException if the endPoint cannot be created
|
||||
* @see #newConnection(SocketChannel, AsyncEndPoint, Object)
|
||||
*/
|
||||
protected abstract AsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey selectionKey) throws IOException;
|
||||
protected abstract AsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
|
||||
|
||||
/**
|
||||
* <p>Factory method to create {@link AsyncConnection}.</p>
|
||||
|
@ -414,7 +414,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
if (attachment instanceof SelectableAsyncEndPoint)
|
||||
{
|
||||
key.interestOps(0);
|
||||
((SelectableAsyncEndPoint)attachment).onSelected();
|
||||
}
|
||||
else if (key.isConnectable())
|
||||
|
|
Loading…
Reference in New Issue