diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketEventHandler.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketEventHandler.java index 50362c5a665..799e81dd4a4 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketEventHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketEventHandler.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport.nio; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.transport.nio.channel.NioChannel; import org.elasticsearch.transport.nio.channel.NioSocketChannel; import org.elasticsearch.transport.nio.channel.SelectionKeyUtils; @@ -146,6 +147,16 @@ public class SocketEventHandler extends EventHandler { exceptionCaught((NioSocketChannel) channel, exception); } + /** + * This method is called when a listener attached to a channel operation throws an exception. + * + * @param listener that was called + * @param exception that occurred + */ + void listenerException(ActionListener listener, Exception exception) { + logger.warn(new ParameterizedMessage("exception while executing listener: {}", listener), exception); + } + private void exceptionCaught(NioSocketChannel channel, Exception e) { channel.getExceptionContext().accept(channel, e); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java index 5a298b34bb9..12bc51914d6 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.nio; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.transport.nio.channel.NioSocketChannel; import org.elasticsearch.transport.nio.channel.SelectionKeyUtils; import org.elasticsearch.transport.nio.channel.WriteContext; @@ -79,7 +80,7 @@ public class SocketSelector extends ESSelector { void cleanup() { WriteOperation op; while ((op = queuedWrites.poll()) != null) { - op.getListener().onFailure(new ClosedSelectorException()); + executeFailedListener(op.getListener(), new ClosedSelectorException()); } channelsToClose.addAll(newChannels); } @@ -107,7 +108,7 @@ public class SocketSelector extends ESSelector { if (isOpen() == false) { boolean wasRemoved = queuedWrites.remove(writeOperation); if (wasRemoved) { - writeOperation.getListener().onFailure(new ClosedSelectorException()); + executeFailedListener(writeOperation.getListener(), new ClosedSelectorException()); } } else { wakeup(); @@ -128,7 +129,39 @@ public class SocketSelector extends ESSelector { SelectionKeyUtils.setWriteInterested(channel); context.queueWriteOperations(writeOperation); } catch (Exception e) { - writeOperation.getListener().onFailure(e); + executeFailedListener(writeOperation.getListener(), e); + } + } + + /** + * Executes a success listener with consistent exception handling. This can only be called from current + * selector thread. + * + * @param listener to be executed + * @param value to provide to listener + */ + public void executeListener(ActionListener listener, V value) { + assert isOnCurrentThread() : "Must be on selector thread"; + try { + listener.onResponse(value); + } catch (Exception e) { + eventHandler.listenerException(listener, e); + } + } + + /** + * Executes a failed listener with consistent exception handling. This can only be called from current + * selector thread. + * + * @param listener to be executed + * @param exception to provide to listener + */ + public void executeFailedListener(ActionListener listener, Exception exception) { + assert isOnCurrentThread() : "Must be on selector thread"; + try { + listener.onFailure(exception); + } catch (Exception e) { + eventHandler.listenerException(listener, e); } } @@ -154,7 +187,7 @@ public class SocketSelector extends ESSelector { if (writeOperation.getChannel().isWritable()) { queueWriteInChannelBuffer(writeOperation); } else { - writeOperation.getListener().onFailure(new ClosedChannelException()); + executeFailedListener(writeOperation.getListener(), new ClosedChannelException()); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpWriteContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpWriteContext.java index d38cd1320d1..63b876c0987 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpWriteContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpWriteContext.java @@ -82,7 +82,7 @@ public class TcpWriteContext implements WriteContext { public void clearQueuedWriteOps(Exception e) { assert channel.getSelector().isOnCurrentThread() : "Must be on selector thread to clear queued writes"; for (WriteOperation op : queued) { - op.getListener().onFailure(e); + channel.getSelector().executeFailedListener(op.getListener(), e); } queued.clear(); } @@ -91,12 +91,12 @@ public class TcpWriteContext implements WriteContext { try { headOp.flush(); } catch (IOException e) { - headOp.getListener().onFailure(e); + channel.getSelector().executeFailedListener(headOp.getListener(), e); throw e; } if (headOp.isFullyFlushed()) { - headOp.getListener().onResponse(null); + channel.getSelector().executeListener(headOp.getListener(), null); } else { queued.push(headOp); } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java index 61a9499f8db..34a44ee4e4b 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java @@ -308,4 +308,23 @@ public class SocketSelectorTests extends ESTestCase { verify(eventHandler).handleClose(channel); verify(eventHandler).handleClose(unRegisteredChannel); } + + public void testExecuteListenerWillHandleException() throws Exception { + RuntimeException exception = new RuntimeException(); + doThrow(exception).when(listener).onResponse(null); + + socketSelector.executeListener(listener, null); + + verify(eventHandler).listenerException(listener, exception); + } + + public void testExecuteFailedListenerWillHandleException() throws Exception { + IOException ioException = new IOException(); + RuntimeException exception = new RuntimeException(); + doThrow(exception).when(listener).onFailure(ioException); + + socketSelector.executeFailedListener(listener, ioException); + + verify(eventHandler).listenerException(listener, exception); + } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java index 7e6410b6c61..16b53cd71b0 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java @@ -118,7 +118,7 @@ public class TcpWriteContextTests extends ESTestCase { ClosedChannelException e = new ClosedChannelException(); writeContext.clearQueuedWriteOps(e); - verify(listener).onFailure(e); + verify(selector).executeFailedListener(listener, e); assertFalse(writeContext.hasQueuedWriteOps()); } @@ -136,7 +136,7 @@ public class TcpWriteContextTests extends ESTestCase { writeContext.flushChannel(); verify(writeOperation).flush(); - verify(listener).onResponse(null); + verify(selector).executeListener(listener, null); assertFalse(writeContext.hasQueuedWriteOps()); } @@ -173,7 +173,7 @@ public class TcpWriteContextTests extends ESTestCase { when(writeOperation2.isFullyFlushed()).thenReturn(false); writeContext.flushChannel(); - verify(listener).onResponse(null); + verify(selector).executeListener(listener, null); verify(listener2, times(0)).onResponse(channel); assertTrue(writeContext.hasQueuedWriteOps()); @@ -181,7 +181,7 @@ public class TcpWriteContextTests extends ESTestCase { writeContext.flushChannel(); - verify(listener2).onResponse(null); + verify(selector).executeListener(listener2, null); assertFalse(writeContext.hasQueuedWriteOps()); } @@ -198,7 +198,7 @@ public class TcpWriteContextTests extends ESTestCase { when(writeOperation.getListener()).thenReturn(listener); expectThrows(IOException.class, () -> writeContext.flushChannel()); - verify(listener).onFailure(exception); + verify(selector).executeFailedListener(listener, exception); assertFalse(writeContext.hasQueuedWriteOps()); }