Add exception handling for write listeners (#27590)

This potential issue was exposed when I saw this PR #27542. Essentially
we currently execute the write listeners all over the place without
consistently catching and handling exceptions. Some of these exceptions
will be logged in different ways (including as low as `debug`).

This commit adds a single location where these listeners are executed.
If the listener throws an execption, the exception is caught and logged
at the `warn` level.
This commit is contained in:
Tim Brooks 2017-11-29 15:47:12 -07:00 committed by GitHub
parent 55cb8ddd80
commit b8557651aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 75 additions and 12 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.SelectionKeyUtils;
@ -146,6 +147,16 @@ public class SocketEventHandler extends EventHandler {
exceptionCaught((NioSocketChannel) channel, exception);
}
/**
* This method is called when a listener attached to a channel operation throws an exception.
*
* @param listener that was called
* @param exception that occurred
*/
<V> void listenerException(ActionListener<V> listener, Exception exception) {
logger.warn(new ParameterizedMessage("exception while executing listener: {}", listener), exception);
}
private void exceptionCaught(NioSocketChannel channel, Exception e) {
channel.getExceptionContext().accept(channel, e);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.SelectionKeyUtils;
import org.elasticsearch.transport.nio.channel.WriteContext;
@ -79,7 +80,7 @@ public class SocketSelector extends ESSelector {
void cleanup() {
WriteOperation op;
while ((op = queuedWrites.poll()) != null) {
op.getListener().onFailure(new ClosedSelectorException());
executeFailedListener(op.getListener(), new ClosedSelectorException());
}
channelsToClose.addAll(newChannels);
}
@ -107,7 +108,7 @@ public class SocketSelector extends ESSelector {
if (isOpen() == false) {
boolean wasRemoved = queuedWrites.remove(writeOperation);
if (wasRemoved) {
writeOperation.getListener().onFailure(new ClosedSelectorException());
executeFailedListener(writeOperation.getListener(), new ClosedSelectorException());
}
} else {
wakeup();
@ -128,7 +129,39 @@ public class SocketSelector extends ESSelector {
SelectionKeyUtils.setWriteInterested(channel);
context.queueWriteOperations(writeOperation);
} catch (Exception e) {
writeOperation.getListener().onFailure(e);
executeFailedListener(writeOperation.getListener(), e);
}
}
/**
* Executes a success listener with consistent exception handling. This can only be called from current
* selector thread.
*
* @param listener to be executed
* @param value to provide to listener
*/
public <V> void executeListener(ActionListener<V> listener, V value) {
assert isOnCurrentThread() : "Must be on selector thread";
try {
listener.onResponse(value);
} catch (Exception e) {
eventHandler.listenerException(listener, e);
}
}
/**
* Executes a failed listener with consistent exception handling. This can only be called from current
* selector thread.
*
* @param listener to be executed
* @param exception to provide to listener
*/
public <V> void executeFailedListener(ActionListener<V> listener, Exception exception) {
assert isOnCurrentThread() : "Must be on selector thread";
try {
listener.onFailure(exception);
} catch (Exception e) {
eventHandler.listenerException(listener, e);
}
}
@ -154,7 +187,7 @@ public class SocketSelector extends ESSelector {
if (writeOperation.getChannel().isWritable()) {
queueWriteInChannelBuffer(writeOperation);
} else {
writeOperation.getListener().onFailure(new ClosedChannelException());
executeFailedListener(writeOperation.getListener(), new ClosedChannelException());
}
}
}

View File

@ -82,7 +82,7 @@ public class TcpWriteContext implements WriteContext {
public void clearQueuedWriteOps(Exception e) {
assert channel.getSelector().isOnCurrentThread() : "Must be on selector thread to clear queued writes";
for (WriteOperation op : queued) {
op.getListener().onFailure(e);
channel.getSelector().executeFailedListener(op.getListener(), e);
}
queued.clear();
}
@ -91,12 +91,12 @@ public class TcpWriteContext implements WriteContext {
try {
headOp.flush();
} catch (IOException e) {
headOp.getListener().onFailure(e);
channel.getSelector().executeFailedListener(headOp.getListener(), e);
throw e;
}
if (headOp.isFullyFlushed()) {
headOp.getListener().onResponse(null);
channel.getSelector().executeListener(headOp.getListener(), null);
} else {
queued.push(headOp);
}

View File

@ -308,4 +308,23 @@ public class SocketSelectorTests extends ESTestCase {
verify(eventHandler).handleClose(channel);
verify(eventHandler).handleClose(unRegisteredChannel);
}
public void testExecuteListenerWillHandleException() throws Exception {
RuntimeException exception = new RuntimeException();
doThrow(exception).when(listener).onResponse(null);
socketSelector.executeListener(listener, null);
verify(eventHandler).listenerException(listener, exception);
}
public void testExecuteFailedListenerWillHandleException() throws Exception {
IOException ioException = new IOException();
RuntimeException exception = new RuntimeException();
doThrow(exception).when(listener).onFailure(ioException);
socketSelector.executeFailedListener(listener, ioException);
verify(eventHandler).listenerException(listener, exception);
}
}

View File

@ -118,7 +118,7 @@ public class TcpWriteContextTests extends ESTestCase {
ClosedChannelException e = new ClosedChannelException();
writeContext.clearQueuedWriteOps(e);
verify(listener).onFailure(e);
verify(selector).executeFailedListener(listener, e);
assertFalse(writeContext.hasQueuedWriteOps());
}
@ -136,7 +136,7 @@ public class TcpWriteContextTests extends ESTestCase {
writeContext.flushChannel();
verify(writeOperation).flush();
verify(listener).onResponse(null);
verify(selector).executeListener(listener, null);
assertFalse(writeContext.hasQueuedWriteOps());
}
@ -173,7 +173,7 @@ public class TcpWriteContextTests extends ESTestCase {
when(writeOperation2.isFullyFlushed()).thenReturn(false);
writeContext.flushChannel();
verify(listener).onResponse(null);
verify(selector).executeListener(listener, null);
verify(listener2, times(0)).onResponse(channel);
assertTrue(writeContext.hasQueuedWriteOps());
@ -181,7 +181,7 @@ public class TcpWriteContextTests extends ESTestCase {
writeContext.flushChannel();
verify(listener2).onResponse(null);
verify(selector).executeListener(listener2, null);
assertFalse(writeContext.hasQueuedWriteOps());
}
@ -198,7 +198,7 @@ public class TcpWriteContextTests extends ESTestCase {
when(writeOperation.getListener()).thenReturn(listener);
expectThrows(IOException.class, () -> writeContext.flushChannel());
verify(listener).onFailure(exception);
verify(selector).executeFailedListener(listener, exception);
assertFalse(writeContext.hasQueuedWriteOps());
}