Remove manual tracking of registered channels (#27445)

This is related to #27260. Currently, every ESSelector keeps track of
all channels that are registered with it. ESSelector is just an
abstraction over a raw java nio selector. The java nio selector already
tracks its own selection keys. This commit removes our tracking and
relies on the java nio selector tracking.
This commit is contained in:
Tim Brooks 2017-11-17 16:20:09 -07:00 committed by GitHub
parent cc3be6ddda
commit ce45e29be7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 16 additions and 62 deletions

View File

@ -22,13 +22,9 @@ package org.elasticsearch.transport.nio;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
/** /**
@ -93,7 +89,6 @@ public class AcceptingSelector extends ESSelector {
newChannel.register(); newChannel.register();
SelectionKey selectionKey = newChannel.getSelectionKey(); SelectionKey selectionKey = newChannel.getSelectionKey();
selectionKey.attach(newChannel); selectionKey.attach(newChannel);
addRegisteredChannel(newChannel);
eventHandler.serverChannelRegistered(newChannel); eventHandler.serverChannelRegistered(newChannel);
} else { } else {
eventHandler.registrationException(newChannel, new ClosedChannelException()); eventHandler.registrationException(newChannel, new ClosedChannelException());

View File

@ -28,14 +28,13 @@ import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException; import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/** /**
* This is a basic selector abstraction used by {@link org.elasticsearch.transport.nio.NioTransport}. This * This is a basic selector abstraction used by {@link org.elasticsearch.transport.nio.NioTransport}. This
@ -56,7 +55,6 @@ public abstract class ESSelector implements Closeable {
private final CountDownLatch exitedLoop = new CountDownLatch(1); private final CountDownLatch exitedLoop = new CountDownLatch(1);
private final AtomicBoolean isClosed = new AtomicBoolean(false); private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final PlainActionFuture<Boolean> isRunningFuture = PlainActionFuture.newFuture(); private final PlainActionFuture<Boolean> isRunningFuture = PlainActionFuture.newFuture();
private final Set<NioChannel> registeredChannels = Collections.newSetFromMap(new ConcurrentHashMap<NioChannel, Boolean>());
private volatile Thread thread; private volatile Thread thread;
ESSelector(EventHandler eventHandler) throws IOException { ESSelector(EventHandler eventHandler) throws IOException {
@ -134,7 +132,7 @@ public abstract class ESSelector implements Closeable {
void cleanupAndCloseChannels() { void cleanupAndCloseChannels() {
cleanup(); cleanup();
channelsToClose.addAll(registeredChannels); channelsToClose.addAll(selector.keys().stream().map(sk -> (NioChannel) sk.attachment()).collect(Collectors.toList()));
closePendingChannels(); closePendingChannels();
} }
@ -171,19 +169,6 @@ public abstract class ESSelector implements Closeable {
selector.wakeup(); selector.wakeup();
} }
public Set<NioChannel> getRegisteredChannels() {
return registeredChannels;
}
public void addRegisteredChannel(NioChannel channel) {
assert registeredChannels.contains(channel) == false : "Should only register channel once";
registeredChannels.add(channel);
}
public void removeRegisteredChannel(NioChannel channel) {
registeredChannels.remove(channel);
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (isClosed.compareAndSet(false, true)) { if (isClosed.compareAndSet(false, true)) {

View File

@ -171,7 +171,6 @@ public class SocketSelector extends ESSelector {
try { try {
if (newChannel.isOpen()) { if (newChannel.isOpen()) {
newChannel.register(); newChannel.register();
addRegisteredChannel(newChannel);
SelectionKey key = newChannel.getSelectionKey(); SelectionKey key = newChannel.getSelectionKey();
key.attach(newChannel); key.attach(newChannel);
eventHandler.handleRegistration(newChannel); eventHandler.handleRegistration(newChannel);

View File

@ -115,9 +115,6 @@ public abstract class AbstractNioChannel<S extends SelectableChannel & NetworkCh
} catch (IOException e) { } catch (IOException e) {
closeContext.completeExceptionally(e); closeContext.completeExceptionally(e);
throw e; throw e;
} finally {
// There is no problem with calling this multiple times
selector.removeRegisteredChannel(this);
} }
} }
} }

View File

@ -20,7 +20,6 @@
package org.elasticsearch.transport.nio; package org.elasticsearch.transport.nio;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.utils.TestSelectionKey; import org.elasticsearch.transport.nio.utils.TestSelectionKey;
import org.junit.Before; import org.junit.Before;
@ -30,8 +29,8 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.security.PrivilegedActionException; import java.security.PrivilegedActionException;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
@ -46,6 +45,7 @@ public class AcceptingSelectorTests extends ESTestCase {
private NioServerSocketChannel serverChannel; private NioServerSocketChannel serverChannel;
private AcceptorEventHandler eventHandler; private AcceptorEventHandler eventHandler;
private TestSelectionKey selectionKey; private TestSelectionKey selectionKey;
private Selector rawSelector;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -54,7 +54,7 @@ public class AcceptingSelectorTests extends ESTestCase {
eventHandler = mock(AcceptorEventHandler.class); eventHandler = mock(AcceptorEventHandler.class);
serverChannel = mock(NioServerSocketChannel.class); serverChannel = mock(NioServerSocketChannel.class);
Selector rawSelector = mock(Selector.class); rawSelector = mock(Selector.class);
selector = new AcceptingSelector(eventHandler, rawSelector); selector = new AcceptingSelector(eventHandler, rawSelector);
this.selector.setThread(); this.selector.setThread();
@ -71,9 +71,6 @@ public class AcceptingSelectorTests extends ESTestCase {
selector.preSelect(); selector.preSelect();
verify(eventHandler).serverChannelRegistered(serverChannel); verify(eventHandler).serverChannelRegistered(serverChannel);
Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
assertEquals(1, registeredChannels.size());
assertTrue(registeredChannels.contains(serverChannel));
} }
public void testClosedChannelWillNotBeRegistered() throws Exception { public void testClosedChannelWillNotBeRegistered() throws Exception {
@ -83,10 +80,6 @@ public class AcceptingSelectorTests extends ESTestCase {
selector.preSelect(); selector.preSelect();
verify(eventHandler).registrationException(same(serverChannel), any(ClosedChannelException.class)); verify(eventHandler).registrationException(same(serverChannel), any(ClosedChannelException.class));
Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
assertEquals(0, registeredChannels.size());
assertFalse(registeredChannels.contains(serverChannel));
} }
public void testRegisterChannelFailsDueToException() throws Exception { public void testRegisterChannelFailsDueToException() throws Exception {
@ -98,10 +91,6 @@ public class AcceptingSelectorTests extends ESTestCase {
selector.preSelect(); selector.preSelect();
verify(eventHandler).registrationException(serverChannel, closedChannelException); verify(eventHandler).registrationException(serverChannel, closedChannelException);
Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
assertEquals(0, registeredChannels.size());
assertFalse(registeredChannels.contains(serverChannel));
} }
public void testAcceptEvent() throws IOException { public void testAcceptEvent() throws IOException {
@ -128,7 +117,9 @@ public class AcceptingSelectorTests extends ESTestCase {
selector.preSelect(); selector.preSelect();
assertEquals(1, selector.getRegisteredChannels().size()); TestSelectionKey key = new TestSelectionKey(0);
key.attach(serverChannel);
when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(key)));
selector.cleanupAndCloseChannels(); selector.cleanupAndCloseChannels();

View File

@ -51,19 +51,12 @@ public class ESSelectorTests extends ESTestCase {
public void testQueueChannelForClosed() throws IOException { public void testQueueChannelForClosed() throws IOException {
NioChannel channel = mock(NioChannel.class); NioChannel channel = mock(NioChannel.class);
when(channel.getSelector()).thenReturn(selector); when(channel.getSelector()).thenReturn(selector);
selector.addRegisteredChannel(channel);
selector.queueChannelClose(channel); selector.queueChannelClose(channel);
assertEquals(1, selector.getRegisteredChannels().size());
selector.singleLoop(); selector.singleLoop();
verify(handler).handleClose(channel); verify(handler).handleClose(channel);
// Will be called in the channel close method
selector.removeRegisteredChannel(channel);
assertEquals(0, selector.getRegisteredChannels().size());
} }
public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException { public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException {

View File

@ -34,7 +34,8 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException; import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.Set; import java.util.Collections;
import java.util.HashSet;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
@ -54,6 +55,7 @@ public class SocketSelectorTests extends ESTestCase {
private WriteContext writeContext; private WriteContext writeContext;
private ActionListener<NioChannel> listener; private ActionListener<NioChannel> listener;
private NetworkBytesReference bufferReference = NetworkBytesReference.wrap(new BytesArray(new byte[1])); private NetworkBytesReference bufferReference = NetworkBytesReference.wrap(new BytesArray(new byte[1]));
private Selector rawSelector;
@Before @Before
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -65,7 +67,7 @@ public class SocketSelectorTests extends ESTestCase {
listener = mock(ActionListener.class); listener = mock(ActionListener.class);
selectionKey = new TestSelectionKey(0); selectionKey = new TestSelectionKey(0);
selectionKey.attach(channel); selectionKey.attach(channel);
Selector rawSelector = mock(Selector.class); rawSelector = mock(Selector.class);
this.socketSelector = new SocketSelector(eventHandler, rawSelector); this.socketSelector = new SocketSelector(eventHandler, rawSelector);
this.socketSelector.setThread(); this.socketSelector.setThread();
@ -83,10 +85,6 @@ public class SocketSelectorTests extends ESTestCase {
socketSelector.preSelect(); socketSelector.preSelect();
verify(eventHandler).handleRegistration(channel); verify(eventHandler).handleRegistration(channel);
Set<NioChannel> registeredChannels = socketSelector.getRegisteredChannels();
assertEquals(1, registeredChannels.size());
assertTrue(registeredChannels.contains(channel));
} }
public void testClosedChannelWillNotBeRegistered() throws Exception { public void testClosedChannelWillNotBeRegistered() throws Exception {
@ -97,10 +95,6 @@ public class SocketSelectorTests extends ESTestCase {
verify(eventHandler).registrationException(same(channel), any(ClosedChannelException.class)); verify(eventHandler).registrationException(same(channel), any(ClosedChannelException.class));
verify(channel, times(0)).finishConnect(); verify(channel, times(0)).finishConnect();
Set<NioChannel> registeredChannels = socketSelector.getRegisteredChannels();
assertEquals(0, registeredChannels.size());
assertFalse(registeredChannels.contains(channel));
} }
public void testRegisterChannelFailsDueToException() throws Exception { public void testRegisterChannelFailsDueToException() throws Exception {
@ -113,10 +107,6 @@ public class SocketSelectorTests extends ESTestCase {
verify(eventHandler).registrationException(channel, closedChannelException); verify(eventHandler).registrationException(channel, closedChannelException);
verify(channel, times(0)).finishConnect(); verify(channel, times(0)).finishConnect();
Set<NioChannel> registeredChannels = socketSelector.getRegisteredChannels();
assertEquals(0, registeredChannels.size());
assertFalse(registeredChannels.contains(channel));
} }
public void testSuccessfullyRegisterChannelWillConnect() throws Exception { public void testSuccessfullyRegisterChannelWillConnect() throws Exception {
@ -309,6 +299,10 @@ public class SocketSelectorTests extends ESTestCase {
socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), networkBuffer, listener)); socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), networkBuffer, listener));
socketSelector.scheduleForRegistration(unRegisteredChannel); socketSelector.scheduleForRegistration(unRegisteredChannel);
TestSelectionKey testSelectionKey = new TestSelectionKey(0);
testSelectionKey.attach(channel);
when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(testSelectionKey)));
socketSelector.cleanupAndCloseChannels(); socketSelector.cleanupAndCloseChannels();
verify(listener).onFailure(any(ClosedSelectorException.class)); verify(listener).onFailure(any(ClosedSelectorException.class));