Prevent channel enqueue after selector close (#25478)

This commit adds additional protection to `ESSelector` and its
implementations to ensure that channels are not enqueued after the
selector is closed.

After a channel has been added to the queue, we check that the selector
is open. If it is not, then we remove the channel from the queue. If the
channel is removed successfully, we throw an `IllegalStateException`.
This commit is contained in:
Tim Brooks 2017-06-29 14:02:50 -05:00 committed by GitHub
parent 99aa04b79c
commit dd5d165da1
3 changed files with 28 additions and 7 deletions

View File

@ -79,6 +79,7 @@ public class AcceptingSelector extends ESSelector {
*/
public void registerServerChannel(NioServerSocketChannel serverSocketChannel) {
newChannels.add(serverSocketChannel);
ensureSelectorOpenForEnqueuing(newChannels, serverSocketChannel);
wakeup();
}

View File

@ -146,8 +146,8 @@ public abstract class ESSelector implements Closeable {
}
public void queueChannelClose(NioChannel channel) {
ensureOpen();
channelsToClose.offer(channel);
ensureSelectorOpenForEnqueuing(channelsToClose, channel);
wakeup();
}
@ -180,6 +180,31 @@ public abstract class ESSelector implements Closeable {
return isRunningFuture;
}
/**
* This is a convenience method to be called after some object (normally channels) are enqueued with this
* selector. This method will check if the selector is still open. If it is open, normal operation can
* proceed.
*
* If the selector is closed, then we attempt to remove the object from the queue. If the removal
* succeeds then we throw an {@link IllegalStateException} indicating that normal operation failed. If
* the object cannot be removed from the queue, then the object has already been handled by the selector
* and operation can proceed normally.
*
* If this method is called from the selector thread, we will not throw an exception as the selector
* thread can manipulate its queues internally even if it is no longer open.
*
* @param queue the queue to which the object was added
* @param objectAdded the objected added
* @param <O> the object type
*/
<O> void ensureSelectorOpenForEnqueuing(ConcurrentLinkedQueue<O> queue, O objectAdded) {
if (isClosed.get() && isOnCurrentThread() == false) {
if (queue.remove(objectAdded)) {
throw new IllegalStateException("selector is already closed");
}
}
}
private void closeChannel(NioChannel channel) {
try {
eventHandler.handleClose(channel);
@ -187,10 +212,4 @@ public abstract class ESSelector implements Closeable {
registeredChannels.remove(channel);
}
}
private void ensureOpen() {
if (isClosed.get()) {
throw new IllegalStateException("selector is already closed");
}
}
}

View File

@ -84,6 +84,7 @@ public class SocketSelector extends ESSelector {
*/
public void registerSocketChannel(NioSocketChannel nioSocketChannel) {
newChannels.offer(nioSocketChannel);
ensureSelectorOpenForEnqueuing(newChannels, nioSocketChannel);
wakeup();
}