diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java index e116d642170..23775b4bc16 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java @@ -22,13 +22,9 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; import java.io.IOException; -import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; -import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.util.Iterator; -import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -93,7 +89,6 @@ public class AcceptingSelector extends ESSelector { newChannel.register(); SelectionKey selectionKey = newChannel.getSelectionKey(); selectionKey.attach(newChannel); - addRegisteredChannel(newChannel); eventHandler.serverChannelRegistered(newChannel); } else { eventHandler.registrationException(newChannel, new ClosedChannelException()); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java index ba0fae3ee31..667107f7b3e 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java @@ -28,14 +28,13 @@ import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.util.Collections; import java.util.Iterator; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; /** * This is a basic selector abstraction used by {@link org.elasticsearch.transport.nio.NioTransport}. This @@ -56,7 +55,6 @@ public abstract class ESSelector implements Closeable { private final CountDownLatch exitedLoop = new CountDownLatch(1); private final AtomicBoolean isClosed = new AtomicBoolean(false); private final PlainActionFuture isRunningFuture = PlainActionFuture.newFuture(); - private final Set registeredChannels = Collections.newSetFromMap(new ConcurrentHashMap()); private volatile Thread thread; ESSelector(EventHandler eventHandler) throws IOException { @@ -134,7 +132,7 @@ public abstract class ESSelector implements Closeable { void cleanupAndCloseChannels() { cleanup(); - channelsToClose.addAll(registeredChannels); + channelsToClose.addAll(selector.keys().stream().map(sk -> (NioChannel) sk.attachment()).collect(Collectors.toList())); closePendingChannels(); } @@ -171,19 +169,6 @@ public abstract class ESSelector implements Closeable { selector.wakeup(); } - public Set getRegisteredChannels() { - return registeredChannels; - } - - public void addRegisteredChannel(NioChannel channel) { - assert registeredChannels.contains(channel) == false : "Should only register channel once"; - registeredChannels.add(channel); - } - - public void removeRegisteredChannel(NioChannel channel) { - registeredChannels.remove(channel); - } - @Override public void close() throws IOException { if (isClosed.compareAndSet(false, true)) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java index 9c90463421a..5a298b34bb9 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java @@ -171,7 +171,6 @@ public class SocketSelector extends ESSelector { try { if (newChannel.isOpen()) { newChannel.register(); - addRegisteredChannel(newChannel); SelectionKey key = newChannel.getSelectionKey(); key.attach(newChannel); eventHandler.handleRegistration(newChannel); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java index a7208beb661..21f46631c6e 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java @@ -115,9 +115,6 @@ public abstract class AbstractNioChannel registeredChannels = selector.getRegisteredChannels(); - assertEquals(1, registeredChannels.size()); - assertTrue(registeredChannels.contains(serverChannel)); } public void testClosedChannelWillNotBeRegistered() throws Exception { @@ -83,10 +80,6 @@ public class AcceptingSelectorTests extends ESTestCase { selector.preSelect(); verify(eventHandler).registrationException(same(serverChannel), any(ClosedChannelException.class)); - - Set registeredChannels = selector.getRegisteredChannels(); - assertEquals(0, registeredChannels.size()); - assertFalse(registeredChannels.contains(serverChannel)); } public void testRegisterChannelFailsDueToException() throws Exception { @@ -98,10 +91,6 @@ public class AcceptingSelectorTests extends ESTestCase { selector.preSelect(); verify(eventHandler).registrationException(serverChannel, closedChannelException); - - Set registeredChannels = selector.getRegisteredChannels(); - assertEquals(0, registeredChannels.size()); - assertFalse(registeredChannels.contains(serverChannel)); } public void testAcceptEvent() throws IOException { @@ -128,7 +117,9 @@ public class AcceptingSelectorTests extends ESTestCase { selector.preSelect(); - assertEquals(1, selector.getRegisteredChannels().size()); + TestSelectionKey key = new TestSelectionKey(0); + key.attach(serverChannel); + when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(key))); selector.cleanupAndCloseChannels(); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/ESSelectorTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/ESSelectorTests.java index afcd42dcb52..69c2c00489d 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/ESSelectorTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/ESSelectorTests.java @@ -51,19 +51,12 @@ public class ESSelectorTests extends ESTestCase { public void testQueueChannelForClosed() throws IOException { NioChannel channel = mock(NioChannel.class); when(channel.getSelector()).thenReturn(selector); - selector.addRegisteredChannel(channel); selector.queueChannelClose(channel); - assertEquals(1, selector.getRegisteredChannels().size()); - selector.singleLoop(); verify(handler).handleClose(channel); - // Will be called in the channel close method - selector.removeRegisteredChannel(channel); - - assertEquals(0, selector.getRegisteredChannels().size()); } public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException { diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java index fdaed26a557..670134d9bee 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java @@ -34,7 +34,8 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.util.Set; +import java.util.Collections; +import java.util.HashSet; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -54,6 +55,7 @@ public class SocketSelectorTests extends ESTestCase { private WriteContext writeContext; private ActionListener listener; private NetworkBytesReference bufferReference = NetworkBytesReference.wrap(new BytesArray(new byte[1])); + private Selector rawSelector; @Before @SuppressWarnings("unchecked") @@ -65,7 +67,7 @@ public class SocketSelectorTests extends ESTestCase { listener = mock(ActionListener.class); selectionKey = new TestSelectionKey(0); selectionKey.attach(channel); - Selector rawSelector = mock(Selector.class); + rawSelector = mock(Selector.class); this.socketSelector = new SocketSelector(eventHandler, rawSelector); this.socketSelector.setThread(); @@ -83,10 +85,6 @@ public class SocketSelectorTests extends ESTestCase { socketSelector.preSelect(); verify(eventHandler).handleRegistration(channel); - - Set registeredChannels = socketSelector.getRegisteredChannels(); - assertEquals(1, registeredChannels.size()); - assertTrue(registeredChannels.contains(channel)); } public void testClosedChannelWillNotBeRegistered() throws Exception { @@ -97,10 +95,6 @@ public class SocketSelectorTests extends ESTestCase { verify(eventHandler).registrationException(same(channel), any(ClosedChannelException.class)); verify(channel, times(0)).finishConnect(); - - Set registeredChannels = socketSelector.getRegisteredChannels(); - assertEquals(0, registeredChannels.size()); - assertFalse(registeredChannels.contains(channel)); } public void testRegisterChannelFailsDueToException() throws Exception { @@ -113,10 +107,6 @@ public class SocketSelectorTests extends ESTestCase { verify(eventHandler).registrationException(channel, closedChannelException); verify(channel, times(0)).finishConnect(); - - Set registeredChannels = socketSelector.getRegisteredChannels(); - assertEquals(0, registeredChannels.size()); - assertFalse(registeredChannels.contains(channel)); } public void testSuccessfullyRegisterChannelWillConnect() throws Exception { @@ -309,6 +299,10 @@ public class SocketSelectorTests extends ESTestCase { socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), networkBuffer, listener)); socketSelector.scheduleForRegistration(unRegisteredChannel); + TestSelectionKey testSelectionKey = new TestSelectionKey(0); + testSelectionKey.attach(channel); + when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(testSelectionKey))); + socketSelector.cleanupAndCloseChannels(); verify(listener).onFailure(any(ClosedSelectorException.class));