From dd5d165da18506b3994cfd42068f30285acc8cc5 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 29 Jun 2017 14:02:50 -0500 Subject: [PATCH] Prevent channel enqueue after selector close (#25478) This commit adds additional protection to `ESSelector` and its implementations to ensure that channels are not enqueued after the selector is closed. After a channel has been added to the queue, we check that the selector is open. If it is not, then we remove the channel from the queue. If the channel is removed successfully, we throw an `IllegalStateException`. --- .../transport/nio/AcceptingSelector.java | 1 + .../transport/nio/ESSelector.java | 33 +++++++++++++++---- .../transport/nio/SocketSelector.java | 1 + 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java index c2c9ac03a2a..ec5c9a963de 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java @@ -79,6 +79,7 @@ public class AcceptingSelector extends ESSelector { */ public void registerServerChannel(NioServerSocketChannel serverSocketChannel) { newChannels.add(serverSocketChannel); + ensureSelectorOpenForEnqueuing(newChannels, serverSocketChannel); wakeup(); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java index c5cf7e25931..44c3901d1ff 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java @@ -146,8 +146,8 @@ public abstract class ESSelector implements Closeable { } public void queueChannelClose(NioChannel channel) { - ensureOpen(); channelsToClose.offer(channel); + ensureSelectorOpenForEnqueuing(channelsToClose, channel); wakeup(); } @@ -180,6 +180,31 @@ public abstract class ESSelector implements Closeable { return isRunningFuture; } + /** + * This is a convenience method to be called after some object (normally channels) are enqueued with this + * selector. This method will check if the selector is still open. If it is open, normal operation can + * proceed. + * + * If the selector is closed, then we attempt to remove the object from the queue. If the removal + * succeeds then we throw an {@link IllegalStateException} indicating that normal operation failed. If + * the object cannot be removed from the queue, then the object has already been handled by the selector + * and operation can proceed normally. + * + * If this method is called from the selector thread, we will not throw an exception as the selector + * thread can manipulate its queues internally even if it is no longer open. + * + * @param queue the queue to which the object was added + * @param objectAdded the objected added + * @param the object type + */ + void ensureSelectorOpenForEnqueuing(ConcurrentLinkedQueue queue, O objectAdded) { + if (isClosed.get() && isOnCurrentThread() == false) { + if (queue.remove(objectAdded)) { + throw new IllegalStateException("selector is already closed"); + } + } + } + private void closeChannel(NioChannel channel) { try { eventHandler.handleClose(channel); @@ -187,10 +212,4 @@ public abstract class ESSelector implements Closeable { registeredChannels.remove(channel); } } - - private void ensureOpen() { - if (isClosed.get()) { - throw new IllegalStateException("selector is already closed"); - } - } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java index 24f68504d8f..ac40afe9bcc 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java @@ -84,6 +84,7 @@ public class SocketSelector extends ESSelector { */ public void registerSocketChannel(NioSocketChannel nioSocketChannel) { newChannels.offer(nioSocketChannel); + ensureSelectorOpenForEnqueuing(newChannels, nioSocketChannel); wakeup(); }