This commit is contained in:
Clebert Suconic 2018-01-23 14:02:17 -05:00
commit acd20790f7
1 changed files with 58 additions and 47 deletions

View File

@ -954,65 +954,76 @@ public final class JMSBridgeImpl implements JMSBridge {
final String clientID,
final boolean isXA,
boolean isSource) throws Exception {
Connection conn;
Connection conn = null;
Object cf = cff.createConnectionFactory();
try {
if (cf instanceof ActiveMQConnectionFactory && registry != null) {
registry.register(XARecoveryConfig.newConfig((ActiveMQConnectionFactory) cf, username, password, null));
}
Object cf = cff.createConnectionFactory();
if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE && !(cf instanceof XAConnectionFactory)) {
throw new IllegalArgumentException("Connection factory must be XAConnectionFactory");
}
if (cf instanceof ActiveMQConnectionFactory && registry != null) {
registry.register(XARecoveryConfig.newConfig((ActiveMQConnectionFactory) cf, username, password, null));
}
if (username == null) {
if (isXA) {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Creating an XA connection");
if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE && !(cf instanceof XAConnectionFactory)) {
throw new IllegalArgumentException("Connection factory must be XAConnectionFactory");
}
if (username == null) {
if (isXA) {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Creating an XA connection");
}
conn = ((XAConnectionFactory) cf).createXAConnection();
} else {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Creating a non XA connection");
}
conn = ((ConnectionFactory) cf).createConnection();
}
conn = ((XAConnectionFactory) cf).createXAConnection();
} else {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Creating a non XA connection");
if (isXA) {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Creating an XA connection");
}
conn = ((XAConnectionFactory) cf).createXAConnection(username, password);
} else {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Creating a non XA connection");
}
conn = ((ConnectionFactory) cf).createConnection(username, password);
}
conn = ((ConnectionFactory) cf).createConnection();
}
} else {
if (isXA) {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Creating an XA connection");
if (clientID != null) {
conn.setClientID(clientID);
}
boolean ha = false;
BridgeFailoverListener failoverListener = null;
if (conn instanceof ActiveMQConnection) {
ActiveMQConnectionFactory activeMQCF = (ActiveMQConnectionFactory) cf;
ha = activeMQCF.isHA();
if (ha) {
ActiveMQConnection activeMQConn = (ActiveMQConnection) conn;
failoverListener = new BridgeFailoverListener(isSource);
activeMQConn.setFailoverListener(failoverListener);
}
conn = ((XAConnectionFactory) cf).createXAConnection(username, password);
} else {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Creating a non XA connection");
}
conn.setExceptionListener(new BridgeExceptionListener(ha, failoverListener, isSource));
return conn;
} catch (JMSException e) {
try {
if (conn != null) {
conn.close();
}
conn = ((ConnectionFactory) cf).createConnection(username, password);
} catch (Throwable ignored) {
}
throw e;
}
if (clientID != null) {
conn.setClientID(clientID);
}
boolean ha = false;
BridgeFailoverListener failoverListener = null;
if (conn instanceof ActiveMQConnection) {
ActiveMQConnectionFactory activeMQCF = (ActiveMQConnectionFactory) cf;
ha = activeMQCF.isHA();
if (ha) {
ActiveMQConnection activeMQConn = (ActiveMQConnection) conn;
failoverListener = new BridgeFailoverListener(isSource);
activeMQConn.setFailoverListener(failoverListener);
}
}
conn.setExceptionListener(new BridgeExceptionListener(ha, failoverListener, isSource));
return conn;
}
/*