Improve exception handling in transport local execution

Local execution of transport messages failures can create a more detailed remote transport exceptions. Also, when failing to handle an exception, the error should be logged, and not call the handler again with another exception
closes #10554
This commit is contained in:
Shay Banon 2015-04-12 17:40:58 +02:00
parent e2a05bb43f
commit 41343eca95
2 changed files with 27 additions and 23 deletions

View File

@ -93,7 +93,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
private final ApplySettings settingsListener = new ApplySettings(); private final ApplySettings settingsListener = new ApplySettings();
/** if set will call requests sent to this id to shortcut and executed locally */ /** if set will call requests sent to this id to shortcut and executed locally */
volatile String localNodeId = null; volatile DiscoveryNode localNode = null;
public TransportService(Transport transport, ThreadPool threadPool) { public TransportService(Transport transport, ThreadPool threadPool) {
this(EMPTY_SETTINGS, transport, threadPool); this(EMPTY_SETTINGS, transport, threadPool);
@ -113,16 +113,14 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
/** /**
* makes the transport service aware of the local node. this allows it to optimize requests sent * makes the transport service aware of the local node. this allows it to optimize requests sent
* from the local node to it self and by pass the network stack/ serialization * from the local node to it self and by pass the network stack/ serialization
*
* @param localNode
*/ */
public void setLocalNode(DiscoveryNode localNode) { public void setLocalNode(DiscoveryNode localNode) {
localNodeId = localNode.id(); this.localNode = localNode;
} }
// for testing // for testing
String getLocalNodeId() { DiscoveryNode getLocalNode() {
return localNodeId; return localNode;
} }
protected Adapter createAdapter() { protected Adapter createAdapter() {
@ -225,25 +223,25 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
} }
public boolean nodeConnected(DiscoveryNode node) { public boolean nodeConnected(DiscoveryNode node) {
return node.id().equals(localNodeId) || transport.nodeConnected(node); return node.equals(localNode) || transport.nodeConnected(node);
} }
public void connectToNode(DiscoveryNode node) throws ConnectTransportException { public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
if (node.id().equals(localNodeId)) { if (node.equals(localNode)) {
return; return;
} }
transport.connectToNode(node); transport.connectToNode(node);
} }
public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
if (node.id().equals(localNodeId)) { if (node.equals(localNode)) {
return; return;
} }
transport.connectToNodeLight(node); transport.connectToNodeLight(node);
} }
public void disconnectFromNode(DiscoveryNode node) { public void disconnectFromNode(DiscoveryNode node) {
if (node.id().equals(localNodeId)) { if (node.equals(localNode)) {
return; return;
} }
transport.disconnectFromNode(node); transport.disconnectFromNode(node);
@ -298,7 +296,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
assert options.timeout() != null; assert options.timeout() != null;
timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler); timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler);
} }
if (node.id().equals(localNodeId)) { if (node.equals(localNode)) {
sendLocalRequest(requestId, action, request); sendLocalRequest(requestId, action, request);
} else { } else {
transport.sendRequest(node, requestId, action, request, options); transport.sendRequest(node, requestId, action, request, options);
@ -324,7 +322,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
} }
private void sendLocalRequest(long requestId, final String action, final TransportRequest request) { private void sendLocalRequest(long requestId, final String action, final TransportRequest request) {
final DirectResponseChannel channel = new DirectResponseChannel(action, requestId, adapter, threadPool); final DirectResponseChannel channel = new DirectResponseChannel(logger, localNode, action, requestId, adapter, threadPool);
try { try {
final TransportRequestHandler handler = adapter.handler(action); final TransportRequestHandler handler = adapter.handler(action);
if (handler == null) { if (handler == null) {
@ -619,11 +617,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
static class TimeoutInfoHolder { static class TimeoutInfoHolder {
private final DiscoveryNode node; private final DiscoveryNode node;
private final String action; private final String action;
private final long sentTime; private final long sentTime;
private final long timeoutTime; private final long timeoutTime;
TimeoutInfoHolder(DiscoveryNode node, String action, long sentTime, long timeoutTime) { TimeoutInfoHolder(DiscoveryNode node, String action, long sentTime, long timeoutTime) {
@ -687,12 +682,16 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
} }
static class DirectResponseChannel implements TransportChannel { static class DirectResponseChannel implements TransportChannel {
final ESLogger logger;
final DiscoveryNode localNode;
final private String action; final private String action;
final private long requestId; final private long requestId;
final TransportServiceAdapter adapter; final TransportServiceAdapter adapter;
final ThreadPool threadPool; final ThreadPool threadPool;
public DirectResponseChannel(String action, long requestId, TransportServiceAdapter adapter, ThreadPool threadPool) { public DirectResponseChannel(ESLogger logger, DiscoveryNode localNode, String action, long requestId, TransportServiceAdapter adapter, ThreadPool threadPool) {
this.logger = logger;
this.localNode = localNode;
this.action = action; this.action = action;
this.requestId = requestId; this.requestId = requestId;
this.adapter = adapter; this.adapter = adapter;
@ -729,11 +728,12 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
} }
} }
@SuppressWarnings("unchecked")
protected void processResponse(TransportResponseHandler handler, TransportResponse response) { protected void processResponse(TransportResponseHandler handler, TransportResponse response) {
try { try {
handler.handleResponse(response); handler.handleResponse(response);
} catch (Throwable e) { } catch (Throwable e) {
handler.handleException(new ResponseHandlerFailureTransportException(e)); processException(handler, wrapInRemote(new ResponseHandlerFailureTransportException(e)));
} }
} }
@ -742,10 +742,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
final TransportResponseHandler handler = adapter.onResponseReceived(requestId); final TransportResponseHandler handler = adapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it // ignore if its null, the adapter logs it
if (handler != null) { if (handler != null) {
if (!(error instanceof RemoteTransportException)) { final RemoteTransportException rtx = wrapInRemote(error);
error = new RemoteTransportException(error.getMessage(), error);
}
final RemoteTransportException rtx = (RemoteTransportException) error;
final String executor = handler.executor(); final String executor = handler.executor();
if (ThreadPool.Names.SAME.equals(executor)) { if (ThreadPool.Names.SAME.equals(executor)) {
processException(handler, rtx); processException(handler, rtx);
@ -761,11 +758,18 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
} }
} }
protected RemoteTransportException wrapInRemote(Throwable t) {
if (t instanceof RemoteTransportException) {
return (RemoteTransportException) t;
}
return new RemoteTransportException(localNode.name(), localNode.getAddress(), action, t);
}
protected void processException(final TransportResponseHandler handler, final RemoteTransportException rtx) { protected void processException(final TransportResponseHandler handler, final RemoteTransportException rtx) {
try { try {
handler.handleException(rtx); handler.handleException(rtx);
} catch (Throwable e) { } catch (Throwable e) {
handler.handleException(new ResponseHandlerFailureTransportException(e)); logger.error("failed to handle exception for action [{}], handler [{}]", e, action, handler);
} }
} }
} }

View File

@ -216,7 +216,7 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
@Test @Test
public void testLocalNodeConnection() throws InterruptedException { public void testLocalNodeConnection() throws InterruptedException {
assertTrue("serviceA is not connected to nodeA", serviceA.nodeConnected(nodeA)); assertTrue("serviceA is not connected to nodeA", serviceA.nodeConnected(nodeA));
if (((TransportService) serviceA).getLocalNodeId() != null) { if (((TransportService) serviceA).getLocalNode() != null) {
// this should be a noop // this should be a noop
serviceA.disconnectFromNode(nodeA); serviceA.disconnectFromNode(nodeA);
} }