Implement socket and server ChannelContexts (#28275)

This commit is related to #27260. Currently have a channel context that
implements reading and writing logic for socket channels. Additionally,
we have exception contexts to handle exceptions. And accepting contexts
to handle accepted channels. This PR introduces a ChannelContext that
handles close and exception handling for all channel types.
Additionally, it has implementers that provide specific functionality
for socket channels (read and writing). And specific functionality for
server channels (accepting).
This commit is contained in:
Tim Brooks 2018-01-18 13:06:40 -07:00 committed by GitHub
parent de9d903b1e
commit a6a57a71d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 386 additions and 325 deletions

View File

@ -79,7 +79,7 @@ public abstract class AbstractNioChannel<S extends SelectableChannel & NetworkCh
selector.assertOnSelectorThread();
if (closeContext.isDone() == false) {
try {
closeRawChannel();
socketChannel.close();
closeContext.complete(null);
} catch (IOException e) {
closeContext.completeExceptionally(e);
@ -119,13 +119,13 @@ public abstract class AbstractNioChannel<S extends SelectableChannel & NetworkCh
closeContext.whenComplete(listener);
}
@Override
public void close() {
getContext().closeChannel();
}
// Package visibility for testing
void setSelectionKey(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
// Package visibility for testing
void closeRawChannel() throws IOException {
socketChannel.close();
}
}

View File

@ -67,7 +67,7 @@ public class AcceptorEventHandler extends EventHandler {
ChannelFactory<?, ?> channelFactory = nioServerChannel.getChannelFactory();
SocketSelector selector = selectorSupplier.get();
NioSocketChannel nioSocketChannel = channelFactory.acceptNioChannel(nioServerChannel, selector);
nioServerChannel.getAcceptContext().accept(nioSocketChannel);
nioServerChannel.getContext().acceptChannel(nioSocketChannel);
}
/**

View File

@ -26,25 +26,20 @@ import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
public class BytesChannelContext implements ChannelContext {
public class BytesChannelContext extends SocketChannelContext {
private final NioSocketChannel channel;
private final ReadConsumer readConsumer;
private final InboundChannelBuffer channelBuffer;
private final LinkedList<BytesWriteOperation> queued = new LinkedList<>();
private final AtomicBoolean isClosing = new AtomicBoolean(false);
private boolean peerClosed = false;
private boolean ioException = false;
public BytesChannelContext(NioSocketChannel channel, ReadConsumer readConsumer, InboundChannelBuffer channelBuffer) {
this.channel = channel;
public BytesChannelContext(NioSocketChannel channel, BiConsumer<NioSocketChannel, Exception> exceptionHandler,
ReadConsumer readConsumer, InboundChannelBuffer channelBuffer) {
super(channel, exceptionHandler);
this.readConsumer = readConsumer;
this.channelBuffer = channelBuffer;
}
@Override
public void channelRegistered() throws IOException {}
@Override
public int read() throws IOException {
if (channelBuffer.getRemaining() == 0) {
@ -52,16 +47,9 @@ public class BytesChannelContext implements ChannelContext {
channelBuffer.ensureCapacity(channelBuffer.getCapacity() + 1);
}
int bytesRead;
try {
bytesRead = channel.read(channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()));
} catch (IOException ex) {
ioException = true;
throw ex;
}
int bytesRead = readFromChannel(channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()));
if (bytesRead == -1) {
peerClosed = true;
if (bytesRead == 0) {
return 0;
}
@ -90,7 +78,6 @@ public class BytesChannelContext implements ChannelContext {
return;
}
// TODO: Eval if we will allow writes from sendMessage
selector.queueWriteInChannelBuffer(writeOperation);
}
@ -126,28 +113,38 @@ public class BytesChannelContext implements ChannelContext {
@Override
public boolean selectorShouldClose() {
return peerClosed || ioException || isClosing.get();
return isPeerClosed() || hasIOException() || isClosing.get();
}
@Override
public void closeFromSelector() {
public void closeFromSelector() throws IOException {
channel.getSelector().assertOnSelectorThread();
// Set to true in order to reject new writes before queuing with selector
isClosing.set(true);
channelBuffer.close();
for (BytesWriteOperation op : queued) {
channel.getSelector().executeFailedListener(op.getListener(), new ClosedChannelException());
if (channel.isOpen()) {
IOException channelCloseException = null;
try {
channel.closeFromSelector();
} catch (IOException e) {
channelCloseException = e;
}
// Set to true in order to reject new writes before queuing with selector
isClosing.set(true);
channelBuffer.close();
for (BytesWriteOperation op : queued) {
channel.getSelector().executeFailedListener(op.getListener(), new ClosedChannelException());
}
queued.clear();
if (channelCloseException != null) {
throw channelCloseException;
}
}
queued.clear();
}
private void singleFlush(BytesWriteOperation headOp) throws IOException {
try {
int written = channel.write(headOp.getBuffersToWrite());
int written = flushToChannel(headOp.getBuffersToWrite());
headOp.incrementIndex(written);
} catch (IOException e) {
channel.getSelector().executeFailedListener(headOp.getListener(), e);
ioException = true;
throw e;
}

View File

@ -20,52 +20,8 @@
package org.elasticsearch.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.BiConsumer;
/**
* This context should implement the specific logic for a channel. When a channel receives a notification
* that it is ready to perform certain operations (read, write, etc) the {@link ChannelContext} will be
* called. This context will need to implement all protocol related logic. Additionally, if any special
* close behavior is required, it should be implemented in this context.
*
* The only methods of the context that should ever be called from a non-selector thread are
* {@link #closeChannel()} and {@link #sendMessage(ByteBuffer[], BiConsumer)}.
*/
public interface ChannelContext {
void channelRegistered() throws IOException;
int read() throws IOException;
void sendMessage(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener);
void queueWriteOperation(WriteOperation writeOperation);
void flushChannel() throws IOException;
boolean hasQueuedWriteOps();
/**
* Schedules a channel to be closed by the selector event loop with which it is registered.
* <p>
* If the channel is open and the state can be transitioned to closed, the close operation will
* be scheduled with the event loop.
* <p>
* If the channel is already set to closed, it is assumed that it is already scheduled to be closed.
* <p>
* Depending on the underlying protocol of the channel, a close operation might simply close the socket
* channel or may involve reading and writing messages.
*/
void closeChannel();
/**
* This method indicates if a selector should close this channel.
*
* @return a boolean indicating if the selector should close
*/
boolean selectorShouldClose();
/**
* This method cleans up any context resources that need to be released when a channel is closed. It
* should only be called by the selector thread.
@ -74,8 +30,16 @@ public interface ChannelContext {
*/
void closeFromSelector() throws IOException;
@FunctionalInterface
interface ReadConsumer {
int consumeReads(InboundChannelBuffer channelBuffer) throws IOException;
}
/**
* Schedules a channel to be closed by the selector event loop with which it is registered.
*
* If the channel is open and the state can be transitioned to closed, the close operation will
* be scheduled with the event loop.
*
* Depending on the underlying protocol of the channel, a close operation might simply close the socket
* channel or may involve reading and writing messages.
*/
void closeChannel();
void handleException(Exception e);
}

View File

@ -89,7 +89,6 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
try {
Socket channel = createChannel(selector, rawChannel);
assert channel.getContext() != null : "channel context should have been set on channel";
assert channel.getExceptionContext() != null : "exception handler should have been set on channel";
return channel;
} catch (Exception e) {
closeRawChannel(rawChannel, e);

View File

@ -69,7 +69,7 @@ public abstract class EventHandler {
*/
protected void handleClose(NioChannel channel) {
try {
channel.closeFromSelector();
channel.getContext().closeFromSelector();
} catch (IOException e) {
closeException(channel, e);
}

View File

@ -59,6 +59,10 @@ public final class InboundChannelBuffer implements AutoCloseable {
ensureCapacity(PAGE_SIZE);
}
public static InboundChannelBuffer allocatingInstance() {
return new InboundChannelBuffer(() -> new Page(ByteBuffer.allocate(PAGE_SIZE), () -> {}));
}
@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {

View File

@ -32,6 +32,8 @@ public interface NioChannel {
InetSocketAddress getLocalAddress();
void close();
void closeFromSelector() throws IOException;
void register() throws ClosedChannelException;
@ -42,6 +44,8 @@ public interface NioChannel {
NetworkChannel getRawChannel();
ChannelContext getContext();
/**
* Adds a close listener to the channel. Multiple close listeners can be added. There is no guarantee
* about the order in which close listeners will be executed. If the channel is already closed, the

View File

@ -21,12 +21,13 @@ package org.elasticsearch.nio;
import java.io.IOException;
import java.nio.channels.ServerSocketChannel;
import java.util.function.Consumer;
import java.util.concurrent.atomic.AtomicBoolean;
public class NioServerSocketChannel extends AbstractNioChannel<ServerSocketChannel> {
private final ChannelFactory<?, ?> channelFactory;
private Consumer<NioSocketChannel> acceptContext;
private ServerChannelContext context;
private final AtomicBoolean contextSet = new AtomicBoolean(false);
public NioServerSocketChannel(ServerSocketChannel socketChannel, ChannelFactory<?, ?> channelFactory, AcceptingSelector selector)
throws IOException {
@ -39,17 +40,22 @@ public class NioServerSocketChannel extends AbstractNioChannel<ServerSocketChann
}
/**
* This method sets the accept context for a server socket channel. The accept context is called when a
* new channel is accepted. The parameter passed to the context is the new channel.
* This method sets the context for a server socket channel. The context is called when a new channel is
* accepted, an exception occurs, or it is time to close the channel.
*
* @param acceptContext to call
* @param context to call
*/
public void setAcceptContext(Consumer<NioSocketChannel> acceptContext) {
this.acceptContext = acceptContext;
public void setContext(ServerChannelContext context) {
if (contextSet.compareAndSet(false, true)) {
this.context = context;
} else {
throw new IllegalStateException("Context on this channel were already set. It should only be once.");
}
}
public Consumer<NioSocketChannel> getAcceptContext() {
return acceptContext;
@Override
public ServerChannelContext getContext() {
return context;
}
@Override

View File

@ -19,13 +19,10 @@
package org.elasticsearch.nio;
import org.elasticsearch.nio.utils.ExceptionsHelper;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
@ -35,9 +32,8 @@ public class NioSocketChannel extends AbstractNioChannel<SocketChannel> {
private final InetSocketAddress remoteAddress;
private final CompletableFuture<Void> connectContext = new CompletableFuture<>();
private final SocketSelector socketSelector;
private final AtomicBoolean contextsSet = new AtomicBoolean(false);
private ChannelContext context;
private BiConsumer<NioSocketChannel, Exception> exceptionContext;
private final AtomicBoolean contextSet = new AtomicBoolean(false);
private SocketChannelContext context;
private Exception connectException;
public NioSocketChannel(SocketChannel socketChannel, SocketSelector selector) throws IOException {
@ -46,25 +42,6 @@ public class NioSocketChannel extends AbstractNioChannel<SocketChannel> {
this.socketSelector = selector;
}
@Override
public void closeFromSelector() throws IOException {
getSelector().assertOnSelectorThread();
if (isOpen()) {
ArrayList<IOException> closingExceptions = new ArrayList<>(2);
try {
super.closeFromSelector();
} catch (IOException e) {
closingExceptions.add(e);
}
try {
context.closeFromSelector();
} catch (IOException e) {
closingExceptions.add(e);
}
ExceptionsHelper.rethrowAndSuppress(closingExceptions);
}
}
@Override
public SocketSelector getSelector() {
return socketSelector;
@ -94,23 +71,19 @@ public class NioSocketChannel extends AbstractNioChannel<SocketChannel> {
}
}
public void setContexts(ChannelContext context, BiConsumer<NioSocketChannel, Exception> exceptionContext) {
if (contextsSet.compareAndSet(false, true)) {
public void setContext(SocketChannelContext context) {
if (contextSet.compareAndSet(false, true)) {
this.context = context;
this.exceptionContext = exceptionContext;
} else {
throw new IllegalStateException("Contexts on this channel were already set. They should only be once.");
throw new IllegalStateException("Context on this channel were already set. It should only be once.");
}
}
public ChannelContext getContext() {
@Override
public SocketChannelContext getContext() {
return context;
}
public BiConsumer<NioSocketChannel, Exception> getExceptionContext() {
return exceptionContext;
}
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class ServerChannelContext implements ChannelContext {
private final NioServerSocketChannel channel;
private final Consumer<NioSocketChannel> acceptor;
private final BiConsumer<NioServerSocketChannel, Exception> exceptionHandler;
private final AtomicBoolean isClosing = new AtomicBoolean(false);
public ServerChannelContext(NioServerSocketChannel channel, Consumer<NioSocketChannel> acceptor,
BiConsumer<NioServerSocketChannel, Exception> exceptionHandler) {
this.channel = channel;
this.acceptor = acceptor;
this.exceptionHandler = exceptionHandler;
}
public void acceptChannel(NioSocketChannel acceptedChannel) {
acceptor.accept(acceptedChannel);
}
@Override
public void closeFromSelector() throws IOException {
channel.closeFromSelector();
}
@Override
public void closeChannel() {
if (isClosing.compareAndSet(false, true)) {
channel.getSelector().queueChannelClose(channel);
}
}
@Override
public void handleException(Exception e) {
exceptionHandler.accept(channel, e);
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.BiConsumer;
/**
* This context should implement the specific logic for a channel. When a channel receives a notification
* that it is ready to perform certain operations (read, write, etc) the {@link SocketChannelContext} will
* be called. This context will need to implement all protocol related logic. Additionally, if any special
* close behavior is required, it should be implemented in this context.
*
* The only methods of the context that should ever be called from a non-selector thread are
* {@link #closeChannel()} and {@link #sendMessage(ByteBuffer[], BiConsumer)}.
*/
public abstract class SocketChannelContext implements ChannelContext {
protected final NioSocketChannel channel;
private final BiConsumer<NioSocketChannel, Exception> exceptionHandler;
private boolean ioException;
private boolean peerClosed;
protected SocketChannelContext(NioSocketChannel channel, BiConsumer<NioSocketChannel, Exception> exceptionHandler) {
this.channel = channel;
this.exceptionHandler = exceptionHandler;
}
@Override
public void handleException(Exception e) {
exceptionHandler.accept(channel, e);
}
public void channelRegistered() throws IOException {}
public abstract int read() throws IOException;
public abstract void sendMessage(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener);
public abstract void queueWriteOperation(WriteOperation writeOperation);
public abstract void flushChannel() throws IOException;
public abstract boolean hasQueuedWriteOps();
/**
* This method indicates if a selector should close this channel.
*
* @return a boolean indicating if the selector should close
*/
public abstract boolean selectorShouldClose();
protected boolean hasIOException() {
return ioException;
}
protected boolean isPeerClosed() {
return peerClosed;
}
protected int readFromChannel(ByteBuffer buffer) throws IOException {
try {
int bytesRead = channel.read(buffer);
if (bytesRead < 0) {
peerClosed = true;
bytesRead = 0;
}
return bytesRead;
} catch (IOException e) {
ioException = true;
throw e;
}
}
protected int readFromChannel(ByteBuffer[] buffers) throws IOException {
try {
int bytesRead = channel.read(buffers);
if (bytesRead < 0) {
peerClosed = true;
bytesRead = 0;
}
return bytesRead;
} catch (IOException e) {
ioException = true;
throw e;
}
}
protected int flushToChannel(ByteBuffer buffer) throws IOException {
try {
return channel.write(buffer);
} catch (IOException e) {
ioException = true;
throw e;
}
}
protected int flushToChannel(ByteBuffer[] buffers) throws IOException {
try {
return channel.write(buffers);
} catch (IOException e) {
ioException = true;
throw e;
}
}
@FunctionalInterface
public interface ReadConsumer {
int consumeReads(InboundChannelBuffer channelBuffer) throws IOException;
}
}

View File

@ -44,7 +44,7 @@ public class SocketEventHandler extends EventHandler {
* @param channel that was registered
*/
protected void handleRegistration(NioSocketChannel channel) throws IOException {
ChannelContext context = channel.getContext();
SocketChannelContext context = channel.getContext();
context.channelRegistered();
if (context.hasQueuedWriteOps()) {
SelectionKeyUtils.setConnectReadAndWriteInterested(channel);
@ -61,7 +61,7 @@ public class SocketEventHandler extends EventHandler {
*/
protected void registrationException(NioSocketChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("failed to register socket channel: {}", channel), exception);
exceptionCaught(channel, exception);
channel.getContext().handleException(exception);
}
/**
@ -82,7 +82,7 @@ public class SocketEventHandler extends EventHandler {
*/
protected void connectException(NioSocketChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("failed to connect to socket channel: {}", channel), exception);
exceptionCaught(channel, exception);
channel.getContext().handleException(exception);
}
/**
@ -103,7 +103,7 @@ public class SocketEventHandler extends EventHandler {
*/
protected void readException(NioSocketChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("exception while reading from socket channel: {}", channel), exception);
exceptionCaught(channel, exception);
channel.getContext().handleException(exception);
}
/**
@ -113,7 +113,7 @@ public class SocketEventHandler extends EventHandler {
* @param channel that can be written to
*/
protected void handleWrite(NioSocketChannel channel) throws IOException {
ChannelContext channelContext = channel.getContext();
SocketChannelContext channelContext = channel.getContext();
channelContext.flushChannel();
}
@ -125,20 +125,7 @@ public class SocketEventHandler extends EventHandler {
*/
protected void writeException(NioSocketChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("exception while writing to socket channel: {}", channel), exception);
exceptionCaught(channel, exception);
}
/**
* This method is called when handling an event from a channel fails due to an unexpected exception.
* An example would be if checking ready ops on a {@link java.nio.channels.SelectionKey} threw
* {@link java.nio.channels.CancelledKeyException}.
*
* @param channel that caused the exception
* @param exception that was thrown
*/
protected void genericChannelException(NioChannel channel, Exception exception) {
super.genericChannelException(channel, exception);
exceptionCaught((NioSocketChannel) channel, exception);
channel.getContext().handleException(exception);
}
/**
@ -167,8 +154,4 @@ public class SocketEventHandler extends EventHandler {
}
}
}
private void exceptionCaught(NioSocketChannel channel, Exception e) {
channel.getExceptionContext().accept(channel, e);
}
}

View File

@ -122,7 +122,7 @@ public class SocketSelector extends ESSelector {
public void queueWriteInChannelBuffer(WriteOperation writeOperation) {
assertOnSelectorThread();
NioSocketChannel channel = writeOperation.getChannel();
ChannelContext context = channel.getContext();
SocketChannelContext context = channel.getContext();
try {
SelectionKeyUtils.setWriteInterested(channel);
context.queueWriteOperation(writeOperation);

View File

@ -24,7 +24,7 @@ import java.util.function.BiConsumer;
/**
* This is a basic write operation that can be queued with a channel. The only requirements of a write
* operation is that is has a listener and a reference to its channel. The actual conversion of the write
* operation implementation to bytes will be performed by the {@link ChannelContext}.
* operation implementation to bytes will be performed by the {@link SocketChannelContext}.
*/
public interface WriteOperation {

View File

@ -27,8 +27,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
@ -41,21 +39,21 @@ public class AcceptorEventHandlerTests extends ESTestCase {
private SocketSelector socketSelector;
private ChannelFactory<NioServerSocketChannel, NioSocketChannel> channelFactory;
private NioServerSocketChannel channel;
private Consumer<NioSocketChannel> acceptedChannelCallback;
private ServerChannelContext context;
@Before
@SuppressWarnings("unchecked")
public void setUpHandler() throws IOException {
channelFactory = mock(ChannelFactory.class);
socketSelector = mock(SocketSelector.class);
acceptedChannelCallback = mock(Consumer.class);
context = mock(ServerChannelContext.class);
ArrayList<SocketSelector> selectors = new ArrayList<>();
selectors.add(socketSelector);
handler = new AcceptorEventHandler(logger, new RoundRobinSupplier<>(selectors.toArray(new SocketSelector[selectors.size()])));
AcceptingSelector selector = mock(AcceptingSelector.class);
channel = new DoNotRegisterServerChannel(mock(ServerSocketChannel.class), channelFactory, selector);
channel.setAcceptContext(acceptedChannelCallback);
channel.setContext(context);
channel.register();
}
@ -80,11 +78,11 @@ public class AcceptorEventHandlerTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testHandleAcceptCallsServerAcceptCallback() throws IOException {
NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class), socketSelector);
childChannel.setContexts(mock(ChannelContext.class), mock(BiConsumer.class));
childChannel.setContext(mock(SocketChannelContext.class));
when(channelFactory.acceptNioChannel(same(channel), same(socketSelector))).thenReturn(childChannel);
handler.acceptChannel(channel);
verify(acceptedChannelCallback).accept(childChannel);
verify(context).acceptChannel(childChannel);
}
}

View File

@ -40,7 +40,7 @@ import static org.mockito.Mockito.when;
public class BytesChannelContextTests extends ESTestCase {
private ChannelContext.ReadConsumer readConsumer;
private SocketChannelContext.ReadConsumer readConsumer;
private NioSocketChannel channel;
private BytesChannelContext context;
private InboundChannelBuffer channelBuffer;
@ -51,16 +51,14 @@ public class BytesChannelContextTests extends ESTestCase {
@Before
@SuppressWarnings("unchecked")
public void init() {
readConsumer = mock(ChannelContext.ReadConsumer.class);
readConsumer = mock(SocketChannelContext.ReadConsumer.class);
messageLength = randomInt(96) + 20;
selector = mock(SocketSelector.class);
listener = mock(BiConsumer.class);
channel = mock(NioSocketChannel.class);
Supplier<InboundChannelBuffer.Page> pageSupplier = () ->
new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {});
channelBuffer = new InboundChannelBuffer(pageSupplier);
context = new BytesChannelContext(channel, readConsumer, channelBuffer);
channelBuffer = InboundChannelBuffer.allocatingInstance();
context = new BytesChannelContext(channel, null, readConsumer, channelBuffer);
when(channel.getSelector()).thenReturn(selector);
when(selector.isOnCurrentThread()).thenReturn(true);
@ -153,11 +151,12 @@ public class BytesChannelContextTests extends ESTestCase {
}
public void testCloseClosesChannelBuffer() throws IOException {
when(channel.isOpen()).thenReturn(true);
Runnable closer = mock(Runnable.class);
Supplier<InboundChannelBuffer.Page> pageSupplier = () -> new InboundChannelBuffer.Page(ByteBuffer.allocate(1 << 14), closer);
InboundChannelBuffer buffer = new InboundChannelBuffer(pageSupplier);
buffer.ensureCapacity(1);
BytesChannelContext context = new BytesChannelContext(channel, readConsumer, buffer);
BytesChannelContext context = new BytesChannelContext(channel, null, readConsumer, buffer);
context.closeFromSelector();
verify(closer).run();
}
@ -218,6 +217,7 @@ public class BytesChannelContextTests extends ESTestCase {
assertTrue(context.hasQueuedWriteOps());
when(channel.isOpen()).thenReturn(true);
context.closeFromSelector();
verify(selector).executeFailedListener(same(listener), any(ClosedChannelException.class));

View File

@ -28,7 +28,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.function.BiConsumer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
@ -139,7 +138,7 @@ public class ChannelFactoryTests extends ESTestCase {
@Override
public NioSocketChannel createChannel(SocketSelector selector, SocketChannel channel) throws IOException {
NioSocketChannel nioSocketChannel = new NioSocketChannel(channel, selector);
nioSocketChannel.setContexts(mock(ChannelContext.class), mock(BiConsumer.class));
nioSocketChannel.setContext(mock(SocketChannelContext.class));
return nioSocketChannel;
}

View File

@ -30,6 +30,8 @@ import java.io.IOException;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.mockito.Mockito.mock;
@ -56,52 +58,42 @@ public class NioServerSocketChannelTests extends ESTestCase {
thread.join();
}
@SuppressWarnings("unchecked")
public void testClose() throws Exception {
AtomicBoolean isClosed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
NioChannel channel = new DoNotCloseServerChannel(mock(ServerSocketChannel.class), mock(ChannelFactory.class), selector);
try (ServerSocketChannel rawChannel = ServerSocketChannel.open()) {
NioServerSocketChannel channel = new NioServerSocketChannel(rawChannel, mock(ChannelFactory.class), selector);
channel.setContext(new ServerChannelContext(channel, mock(Consumer.class), mock(BiConsumer.class)));
channel.addCloseListener(ActionListener.toBiConsumer(new ActionListener<Void>() {
@Override
public void onResponse(Void o) {
isClosed.set(true);
latch.countDown();
}
channel.addCloseListener(ActionListener.toBiConsumer(new ActionListener<Void>() {
@Override
public void onResponse(Void o) {
isClosed.set(true);
latch.countDown();
}
@Override
public void onFailure(Exception e) {
isClosed.set(true);
latch.countDown();
}
}));
@Override
public void onFailure(Exception e) {
isClosed.set(true);
latch.countDown();
}
}));
assertTrue(channel.isOpen());
assertTrue(rawChannel.isOpen());
assertFalse(isClosed.get());
assertTrue(channel.isOpen());
assertFalse(closedRawChannel.get());
assertFalse(isClosed.get());
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(ActionListener.toBiConsumer(closeFuture));
selector.queueChannelClose(channel);
closeFuture.actionGet();
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(ActionListener.toBiConsumer(closeFuture));
selector.queueChannelClose(channel);
closeFuture.actionGet();
assertTrue(closedRawChannel.get());
assertFalse(channel.isOpen());
latch.await();
assertTrue(isClosed.get());
}
private class DoNotCloseServerChannel extends DoNotRegisterServerChannel {
private DoNotCloseServerChannel(ServerSocketChannel channel, ChannelFactory<?, ?> channelFactory, AcceptingSelector selector)
throws IOException {
super(channel, channelFactory, selector);
}
@Override
void closeRawChannel() throws IOException {
closedRawChannel.set(true);
assertFalse(rawChannel.isOpen());
assertFalse(channel.isOpen());
latch.await();
assertTrue(isClosed.get());
}
}
}

View File

@ -35,14 +35,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class NioSocketChannelTests extends ESTestCase {
private SocketSelector selector;
private AtomicBoolean closedRawChannel;
private Thread thread;
@Before
@ -50,7 +48,6 @@ public class NioSocketChannelTests extends ESTestCase {
public void startSelector() throws IOException {
selector = new SocketSelector(new SocketEventHandler(logger));
thread = new Thread(selector::runLoop);
closedRawChannel = new AtomicBoolean(false);
thread.start();
FutureUtils.get(selector.isRunningFuture());
}
@ -66,80 +63,46 @@ public class NioSocketChannelTests extends ESTestCase {
AtomicBoolean isClosed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
NioSocketChannel socketChannel = new DoNotCloseChannel(mock(SocketChannel.class), selector);
socketChannel.setContexts(mock(ChannelContext.class), mock(BiConsumer.class));
socketChannel.addCloseListener(ActionListener.toBiConsumer(new ActionListener<Void>() {
@Override
public void onResponse(Void o) {
isClosed.set(true);
latch.countDown();
}
@Override
public void onFailure(Exception e) {
isClosed.set(true);
latch.countDown();
}
}));
try(SocketChannel rawChannel = SocketChannel.open()) {
NioSocketChannel socketChannel = new NioSocketChannel(rawChannel, selector);
socketChannel.setContext(new BytesChannelContext(socketChannel, mock(BiConsumer.class),
mock(SocketChannelContext.ReadConsumer.class), InboundChannelBuffer.allocatingInstance()));
socketChannel.addCloseListener(ActionListener.toBiConsumer(new ActionListener<Void>() {
@Override
public void onResponse(Void o) {
isClosed.set(true);
latch.countDown();
}
assertTrue(socketChannel.isOpen());
assertFalse(closedRawChannel.get());
assertFalse(isClosed.get());
@Override
public void onFailure(Exception e) {
isClosed.set(true);
latch.countDown();
}
}));
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
socketChannel.addCloseListener(ActionListener.toBiConsumer(closeFuture));
selector.queueChannelClose(socketChannel);
closeFuture.actionGet();
assertTrue(socketChannel.isOpen());
assertTrue(rawChannel.isOpen());
assertFalse(isClosed.get());
assertTrue(closedRawChannel.get());
assertFalse(socketChannel.isOpen());
latch.await();
assertTrue(isClosed.get());
}
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
socketChannel.addCloseListener(ActionListener.toBiConsumer(closeFuture));
selector.queueChannelClose(socketChannel);
closeFuture.actionGet();
@SuppressWarnings("unchecked")
public void testCloseContextExceptionDoesNotStopClose() throws Exception {
AtomicBoolean isClosed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
IOException ioException = new IOException();
NioSocketChannel socketChannel = new DoNotCloseChannel(mock(SocketChannel.class), selector);
ChannelContext context = mock(ChannelContext.class);
doThrow(ioException).when(context).closeFromSelector();
socketChannel.setContexts(context, mock(BiConsumer.class));
socketChannel.addCloseListener(ActionListener.toBiConsumer(new ActionListener<Void>() {
@Override
public void onResponse(Void o) {
isClosed.set(true);
latch.countDown();
}
@Override
public void onFailure(Exception e) {
isClosed.set(true);
latch.countDown();
}
}));
assertTrue(socketChannel.isOpen());
assertFalse(closedRawChannel.get());
assertFalse(isClosed.get());
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
socketChannel.addCloseListener(ActionListener.toBiConsumer(closeFuture));
selector.queueChannelClose(socketChannel);
closeFuture.actionGet();
assertTrue(closedRawChannel.get());
assertFalse(socketChannel.isOpen());
latch.await();
assertTrue(isClosed.get());
assertFalse(rawChannel.isOpen());
assertFalse(socketChannel.isOpen());
latch.await();
assertTrue(isClosed.get());
}
}
@SuppressWarnings("unchecked")
public void testConnectSucceeds() throws Exception {
SocketChannel rawChannel = mock(SocketChannel.class);
when(rawChannel.finishConnect()).thenReturn(true);
NioSocketChannel socketChannel = new DoNotCloseChannel(rawChannel, selector);
socketChannel.setContexts(mock(ChannelContext.class), mock(BiConsumer.class));
NioSocketChannel socketChannel = new DoNotRegisterChannel(rawChannel, selector);
socketChannel.setContext(mock(SocketChannelContext.class));
selector.scheduleForRegistration(socketChannel);
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
@ -148,15 +111,14 @@ public class NioSocketChannelTests extends ESTestCase {
assertTrue(socketChannel.isConnectComplete());
assertTrue(socketChannel.isOpen());
assertFalse(closedRawChannel.get());
}
@SuppressWarnings("unchecked")
public void testConnectFails() throws Exception {
SocketChannel rawChannel = mock(SocketChannel.class);
when(rawChannel.finishConnect()).thenThrow(new ConnectException());
NioSocketChannel socketChannel = new DoNotCloseChannel(rawChannel, selector);
socketChannel.setContexts(mock(ChannelContext.class), mock(BiConsumer.class));
NioSocketChannel socketChannel = new DoNotRegisterChannel(rawChannel, selector);
socketChannel.setContext(mock(SocketChannelContext.class));
selector.scheduleForRegistration(socketChannel);
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
@ -168,16 +130,4 @@ public class NioSocketChannelTests extends ESTestCase {
// Even if connection fails the channel is 'open' until close() is called
assertTrue(socketChannel.isOpen());
}
private class DoNotCloseChannel extends DoNotRegisterChannel {
private DoNotCloseChannel(SocketChannel channel, SocketSelector selector) throws IOException {
super(channel, selector);
}
@Override
void closeRawChannel() throws IOException {
closedRawChannel.set(true);
}
}
}

View File

@ -53,9 +53,8 @@ public class SocketEventHandlerTests extends ESTestCase {
channel = new DoNotRegisterChannel(rawChannel, socketSelector);
when(rawChannel.finishConnect()).thenReturn(true);
Supplier<InboundChannelBuffer.Page> pageSupplier = () -> new InboundChannelBuffer.Page(ByteBuffer.allocate(1 << 14), () -> {});
InboundChannelBuffer buffer = new InboundChannelBuffer(pageSupplier);
channel.setContexts(new BytesChannelContext(channel, mock(ChannelContext.ReadConsumer.class), buffer), exceptionHandler);
InboundChannelBuffer buffer = InboundChannelBuffer.allocatingInstance();
channel.setContext(new BytesChannelContext(channel, exceptionHandler, mock(SocketChannelContext.ReadConsumer.class), buffer));
channel.register();
channel.finishConnect();
@ -64,7 +63,7 @@ public class SocketEventHandlerTests extends ESTestCase {
public void testRegisterCallsContext() throws IOException {
NioSocketChannel channel = mock(NioSocketChannel.class);
ChannelContext channelContext = mock(ChannelContext.class);
SocketChannelContext channelContext = mock(SocketChannelContext.class);
when(channel.getContext()).thenReturn(channelContext);
when(channel.getSelectionKey()).thenReturn(new TestSelectionKey(0));
handler.handleRegistration(channel);
@ -102,8 +101,8 @@ public class SocketEventHandlerTests extends ESTestCase {
public void testHandleReadDelegatesToContext() throws IOException {
NioSocketChannel channel = new DoNotRegisterChannel(rawChannel, mock(SocketSelector.class));
ChannelContext context = mock(ChannelContext.class);
channel.setContexts(context, exceptionHandler);
SocketChannelContext context = mock(SocketChannelContext.class);
channel.setContext(context);
when(context.read()).thenReturn(1);
handler.handleRead(channel);
@ -124,19 +123,19 @@ public class SocketEventHandlerTests extends ESTestCase {
public void testPostHandlingCallWillCloseTheChannelIfReady() throws IOException {
NioSocketChannel channel = mock(NioSocketChannel.class);
ChannelContext context = mock(ChannelContext.class);
SocketChannelContext context = mock(SocketChannelContext.class);
when(channel.getSelectionKey()).thenReturn(new TestSelectionKey(0));
when(channel.getContext()).thenReturn(context);
when(context.selectorShouldClose()).thenReturn(true);
handler.postHandling(channel);
verify(channel).closeFromSelector();
verify(context).closeFromSelector();
}
public void testPostHandlingCallWillNotCloseTheChannelIfNotReady() throws IOException {
NioSocketChannel channel = mock(NioSocketChannel.class);
ChannelContext context = mock(ChannelContext.class);
SocketChannelContext context = mock(SocketChannelContext.class);
when(channel.getSelectionKey()).thenReturn(new TestSelectionKey(0));
when(channel.getContext()).thenReturn(context);
@ -149,8 +148,8 @@ public class SocketEventHandlerTests extends ESTestCase {
public void testPostHandlingWillAddWriteIfNecessary() throws IOException {
NioSocketChannel channel = new DoNotRegisterChannel(rawChannel, mock(SocketSelector.class));
channel.setSelectionKey(new TestSelectionKey(SelectionKey.OP_READ));
ChannelContext context = mock(ChannelContext.class);
channel.setContexts(context, null);
SocketChannelContext context = mock(SocketChannelContext.class);
channel.setContext(context);
when(context.hasQueuedWriteOps()).thenReturn(true);
@ -162,8 +161,8 @@ public class SocketEventHandlerTests extends ESTestCase {
public void testPostHandlingWillRemoveWriteIfNecessary() throws IOException {
NioSocketChannel channel = new DoNotRegisterChannel(rawChannel, mock(SocketSelector.class));
channel.setSelectionKey(new TestSelectionKey(SelectionKey.OP_READ | SelectionKey.OP_WRITE));
ChannelContext context = mock(ChannelContext.class);
channel.setContexts(context, null);
SocketChannelContext context = mock(SocketChannelContext.class);
channel.setContext(context);
when(context.hasQueuedWriteOps()).thenReturn(false);
@ -171,10 +170,4 @@ public class SocketEventHandlerTests extends ESTestCase {
handler.postHandling(channel);
assertEquals(SelectionKey.OP_READ, channel.getSelectionKey().interestOps());
}
private void setWriteAndRead(NioChannel channel) {
SelectionKeyUtils.setConnectAndReadInterested(channel);
SelectionKeyUtils.removeConnectInterested(channel);
SelectionKeyUtils.setWriteInterested(channel);
}
}

View File

@ -49,7 +49,7 @@ public class SocketSelectorTests extends ESTestCase {
private SocketEventHandler eventHandler;
private NioSocketChannel channel;
private TestSelectionKey selectionKey;
private ChannelContext channelContext;
private SocketChannelContext channelContext;
private BiConsumer<Void, Throwable> listener;
private ByteBuffer[] buffers = {ByteBuffer.allocate(1)};
private Selector rawSelector;
@ -60,7 +60,7 @@ public class SocketSelectorTests extends ESTestCase {
super.setUp();
eventHandler = mock(SocketEventHandler.class);
channel = mock(NioSocketChannel.class);
channelContext = mock(ChannelContext.class);
channelContext = mock(SocketChannelContext.class);
listener = mock(BiConsumer.class);
selectionKey = new TestSelectionKey(0);
selectionKey.attach(channel);

View File

@ -31,14 +31,15 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.nio.AcceptingSelector;
import org.elasticsearch.nio.AcceptorEventHandler;
import org.elasticsearch.nio.BytesChannelContext;
import org.elasticsearch.nio.ChannelContext;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.BytesChannelContext;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketEventHandler;
import org.elasticsearch.nio.SocketSelector;
import org.elasticsearch.threadpool.ThreadPool;
@ -52,6 +53,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import static org.elasticsearch.common.settings.Setting.intSetting;
@ -182,18 +184,21 @@ public class NioTransport extends TcpTransport {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close);
};
ChannelContext.ReadConsumer nioReadConsumer = channelBuffer ->
SocketChannelContext.ReadConsumer nioReadConsumer = channelBuffer ->
consumeNetworkReads(nioChannel, BytesReference.fromByteBuffers(channelBuffer.sliceBuffersTo(channelBuffer.getIndex())));
BytesChannelContext context = new BytesChannelContext(nioChannel, nioReadConsumer, new InboundChannelBuffer(pageSupplier));
nioChannel.setContexts(context, NioTransport.this::exceptionCaught);
BiConsumer<NioSocketChannel, Exception> exceptionHandler = NioTransport.this::exceptionCaught;
BytesChannelContext context = new BytesChannelContext(nioChannel, exceptionHandler, nioReadConsumer,
new InboundChannelBuffer(pageSupplier));
nioChannel.setContext(context);
return nioChannel;
}
@Override
public TcpNioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException {
TcpNioServerSocketChannel nioServerChannel = new TcpNioServerSocketChannel(profileName, channel, this, selector);
nioServerChannel.setAcceptContext(NioTransport.this::acceptChannel);
return nioServerChannel;
TcpNioServerSocketChannel nioChannel = new TcpNioServerSocketChannel(profileName, channel, this, selector);
ServerChannelContext context = new ServerChannelContext(nioChannel, NioTransport.this::acceptChannel, (c, e) -> {});
nioChannel.setContext(context);
return nioChannel;
}
}
}

View File

@ -39,8 +39,8 @@ public class TcpNioServerSocketChannel extends NioServerSocketChannel implements
private final String profile;
public TcpNioServerSocketChannel(String profile, ServerSocketChannel socketChannel,
ChannelFactory<TcpNioServerSocketChannel, TcpNioSocketChannel> channelFactory,
AcceptingSelector selector) throws IOException {
ChannelFactory<TcpNioServerSocketChannel, TcpNioSocketChannel> channelFactory,
AcceptingSelector selector) throws IOException {
super(socketChannel, channelFactory, selector);
this.profile = profile;
}

View File

@ -32,12 +32,13 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.nio.AcceptingSelector;
import org.elasticsearch.nio.AcceptorEventHandler;
import org.elasticsearch.nio.BytesChannelContext;
import org.elasticsearch.nio.ChannelContext;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.nio.SocketSelector;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpChannel;
@ -161,17 +162,19 @@ public class MockNioTransport extends TcpTransport {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close);
};
ChannelContext.ReadConsumer nioReadConsumer = channelBuffer ->
SocketChannelContext.ReadConsumer nioReadConsumer = channelBuffer ->
consumeNetworkReads(nioChannel, BytesReference.fromByteBuffers(channelBuffer.sliceBuffersTo(channelBuffer.getIndex())));
BytesChannelContext context = new BytesChannelContext(nioChannel, nioReadConsumer, new InboundChannelBuffer(pageSupplier));
nioChannel.setContexts(context, MockNioTransport.this::exceptionCaught);
BytesChannelContext context = new BytesChannelContext(nioChannel, MockNioTransport.this::exceptionCaught, nioReadConsumer,
new InboundChannelBuffer(pageSupplier));
nioChannel.setContext(context);
return nioChannel;
}
@Override
public MockServerChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException {
MockServerChannel nioServerChannel = new MockServerChannel(profileName, channel, this, selector);
nioServerChannel.setAcceptContext(MockNioTransport.this::acceptChannel);
ServerChannelContext context = new ServerChannelContext(nioServerChannel, MockNioTransport.this::acceptChannel, (c, e) -> {});
nioServerChannel.setContext(context);
return nioServerChannel;
}
}