Make Connection Future Err. Handling more Resilient (#42781) (#42804)

* There were a number of possible (runtime-) exceptions that could be raised in the adjusted code and prevent resolving the listener
* Relates #42350
This commit is contained in:
Armin Braun 2019-06-03 19:29:36 +02:00 committed by GitHub
parent 34fd9ce067
commit 00db9c1a2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 24 additions and 30 deletions

View File

@ -933,34 +933,20 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
if (countDown.countDown()) {
final TcpChannel handshakeChannel = channels.get(0);
try {
executeHandshake(node, handshakeChannel, connectionProfile, new ActionListener<Version>() {
@Override
public void onResponse(Version version) {
NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
long relativeMillisTime = threadPool.relativeTimeInMillis();
nodeChannels.channels.forEach(ch -> {
// Mark the channel init time
ch.getChannelStats().markAccessed(relativeMillisTime);
ch.addCloseListener(ActionListener.wrap(nodeChannels::close));
});
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
listener.onResponse(nodeChannels);
}
@Override
public void onFailure(Exception e) {
CloseableChannel.closeChannels(channels, false);
if (e instanceof ConnectTransportException) {
listener.onFailure(e);
} else {
listener.onFailure(new ConnectTransportException(node, "general node connection failure", e));
}
}
});
executeHandshake(node, handshakeChannel, connectionProfile, ActionListener.wrap(version -> {
NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
long relativeMillisTime = threadPool.relativeTimeInMillis();
nodeChannels.channels.forEach(ch -> {
// Mark the channel init time
ch.getChannelStats().markAccessed(relativeMillisTime);
ch.addCloseListener(ActionListener.wrap(nodeChannels::close));
});
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
listener.onResponse(nodeChannels);
}, e -> closeAndFail(e instanceof ConnectTransportException ?
e : new ConnectTransportException(node, "general node connection failure", e))));
} catch (Exception ex) {
CloseableChannel.closeChannels(channels, false);
listener.onFailure(ex);
closeAndFail(ex);
}
}
}
@ -968,15 +954,23 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
@Override
public void onFailure(Exception ex) {
if (countDown.fastForward()) {
CloseableChannel.closeChannels(channels, false);
listener.onFailure(new ConnectTransportException(node, "connect_exception", ex));
closeAndFail(new ConnectTransportException(node, "connect_exception", ex));
}
}
public void onTimeout() {
if (countDown.fastForward()) {
closeAndFail(new ConnectTransportException(node, "connect_timeout[" + connectionProfile.getConnectTimeout() + "]"));
}
}
private void closeAndFail(Exception e) {
try {
CloseableChannel.closeChannels(channels, false);
listener.onFailure(new ConnectTransportException(node, "connect_timeout[" + connectionProfile.getConnectTimeout() + "]"));
} catch (Exception ex) {
e.addSuppressed(ex);
} finally {
listener.onFailure(e);
}
}
}