Fix error handling for connection establishment on the bridge.
This commit is contained in:
Timothy Bish 2016-06-23 11:03:06 -04:00
parent d563e9019d
commit 03785a4d53
2 changed files with 206 additions and 166 deletions

View File

@ -158,8 +158,9 @@ public class SimpleJmsQueueConnector extends JmsConnector {
@Override @Override
protected void initializeForeignConnection() throws NamingException, JMSException { protected void initializeForeignConnection() throws NamingException, JMSException {
final QueueConnection newConnection; QueueConnection newConnection = null;
try {
if (foreignConnection.get() == null) { if (foreignConnection.get() == null) {
// get the connection factories // get the connection factories
if (outboundQueueConnectionFactory == null) { if (outboundQueueConnectionFactory == null) {
@ -189,6 +190,15 @@ public class SimpleJmsQueueConnector extends JmsConnector {
newConnection = (QueueConnection) foreignConnection.getAndSet(null); newConnection = (QueueConnection) foreignConnection.getAndSet(null);
} }
// Register for any async error notifications now so we can reset in the
// case where there's not a lot of activity and a connection drops.
newConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
handleConnectionFailure(foreignConnection.get());
}
});
if (outboundClientId != null && outboundClientId.length() > 0) { if (outboundClientId != null && outboundClientId.length() > 0) {
newConnection.setClientID(getOutboundClientId()); newConnection.setClientID(getOutboundClientId());
} }
@ -200,24 +210,25 @@ public class SimpleJmsQueueConnector extends JmsConnector {
initializeInboundDestinationBridgesOutboundSide(newConnection); initializeInboundDestinationBridgesOutboundSide(newConnection);
initializeOutboundDestinationBridgesOutboundSide(newConnection); initializeOutboundDestinationBridgesOutboundSide(newConnection);
// Register for any async error notifications now so we can reset in the
// case where there's not a lot of activity and a connection drops.
newConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
handleConnectionFailure(newConnection);
}
});
// At this point all looks good, so this our current connection now. // At this point all looks good, so this our current connection now.
foreignConnection.set(newConnection); foreignConnection.set(newConnection);
} catch (Exception ex) {
if (newConnection != null) {
try {
newConnection.close();
} catch (Exception ignore) {}
}
throw ex;
}
} }
@Override @Override
protected void initializeLocalConnection() throws NamingException, JMSException { protected void initializeLocalConnection() throws NamingException, JMSException {
final QueueConnection newConnection; QueueConnection newConnection = null;
try {
if (localConnection.get() == null) { if (localConnection.get() == null) {
// get the connection factories // get the connection factories
if (localQueueConnectionFactory == null) { if (localQueueConnectionFactory == null) {
@ -252,6 +263,15 @@ public class SimpleJmsQueueConnector extends JmsConnector {
newConnection = (QueueConnection) localConnection.getAndSet(null); newConnection = (QueueConnection) localConnection.getAndSet(null);
} }
// Register for any async error notifications now so we can reset in the
// case where there's not a lot of activity and a connection drops.
newConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
handleConnectionFailure(localConnection.get());
}
});
if (localClientId != null && localClientId.length() > 0) { if (localClientId != null && localClientId.length() > 0) {
newConnection.setClientID(getLocalClientId()); newConnection.setClientID(getLocalClientId());
} }
@ -263,17 +283,17 @@ public class SimpleJmsQueueConnector extends JmsConnector {
initializeInboundDestinationBridgesLocalSide(newConnection); initializeInboundDestinationBridgesLocalSide(newConnection);
initializeOutboundDestinationBridgesLocalSide(newConnection); initializeOutboundDestinationBridgesLocalSide(newConnection);
// Register for any async error notifications now so we can reset in the
// case where there's not a lot of activity and a connection drops.
newConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
handleConnectionFailure(newConnection);
}
});
// At this point all looks good, so this our current connection now. // At this point all looks good, so this our current connection now.
localConnection.set(newConnection); localConnection.set(newConnection);
} catch (Exception ex) {
if (newConnection != null) {
try {
newConnection.close();
} catch (Exception ignore) {}
}
throw ex;
}
} }
protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException { protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {

View File

@ -157,8 +157,9 @@ public class SimpleJmsTopicConnector extends JmsConnector {
@Override @Override
protected void initializeForeignConnection() throws NamingException, JMSException { protected void initializeForeignConnection() throws NamingException, JMSException {
final TopicConnection newConnection; TopicConnection newConnection = null;
try {
if (foreignConnection.get() == null) { if (foreignConnection.get() == null) {
// get the connection factories // get the connection factories
if (outboundTopicConnectionFactory == null) { if (outboundTopicConnectionFactory == null) {
@ -188,6 +189,15 @@ public class SimpleJmsTopicConnector extends JmsConnector {
newConnection = (TopicConnection) foreignConnection.getAndSet(null); newConnection = (TopicConnection) foreignConnection.getAndSet(null);
} }
// Register for any async error notifications now so we can reset in the
// case where there's not a lot of activity and a connection drops.
newConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
handleConnectionFailure(foreignConnection.get());
}
});
if (outboundClientId != null && outboundClientId.length() > 0) { if (outboundClientId != null && outboundClientId.length() > 0) {
newConnection.setClientID(getOutboundClientId()); newConnection.setClientID(getOutboundClientId());
} }
@ -199,24 +209,25 @@ public class SimpleJmsTopicConnector extends JmsConnector {
initializeInboundDestinationBridgesOutboundSide(newConnection); initializeInboundDestinationBridgesOutboundSide(newConnection);
initializeOutboundDestinationBridgesOutboundSide(newConnection); initializeOutboundDestinationBridgesOutboundSide(newConnection);
// Register for any async error notifications now so we can reset in the
// case where there's not a lot of activity and a connection drops.
newConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
handleConnectionFailure(newConnection);
}
});
// At this point all looks good, so this our current connection now. // At this point all looks good, so this our current connection now.
foreignConnection.set(newConnection); foreignConnection.set(newConnection);
} catch (Exception ex) {
if (newConnection != null) {
try {
newConnection.close();
} catch (Exception ignore) {}
}
throw ex;
}
} }
@Override @Override
protected void initializeLocalConnection() throws NamingException, JMSException { protected void initializeLocalConnection() throws NamingException, JMSException {
final TopicConnection newConnection; TopicConnection newConnection = null;
try {
if (localConnection.get() == null) { if (localConnection.get() == null) {
// get the connection factories // get the connection factories
if (localTopicConnectionFactory == null) { if (localTopicConnectionFactory == null) {
@ -251,6 +262,15 @@ public class SimpleJmsTopicConnector extends JmsConnector {
newConnection = (TopicConnection) localConnection.getAndSet(null); newConnection = (TopicConnection) localConnection.getAndSet(null);
} }
// Register for any async error notifications now so we can reset in the
// case where there's not a lot of activity and a connection drops.
newConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
handleConnectionFailure(localConnection.get());
}
});
if (localClientId != null && localClientId.length() > 0) { if (localClientId != null && localClientId.length() > 0) {
newConnection.setClientID(getLocalClientId()); newConnection.setClientID(getLocalClientId());
} }
@ -262,17 +282,17 @@ public class SimpleJmsTopicConnector extends JmsConnector {
initializeInboundDestinationBridgesLocalSide(newConnection); initializeInboundDestinationBridgesLocalSide(newConnection);
initializeOutboundDestinationBridgesLocalSide(newConnection); initializeOutboundDestinationBridgesLocalSide(newConnection);
// Register for any async error notifications now so we can reset in the
// case where there's not a lot of activity and a connection drops.
newConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
handleConnectionFailure(newConnection);
}
});
// At this point all looks good, so this our current connection now. // At this point all looks good, so this our current connection now.
localConnection.set(newConnection); localConnection.set(newConnection);
} catch (Exception ex) {
if (newConnection != null) {
try {
newConnection.close();
} catch (Exception ignore) {}
}
throw ex;
}
} }
protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {