diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java index 3eeb9e67a1..41b6167c4b 100644 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java @@ -101,4 +101,8 @@ public interface ActiveMQJMSBridgeLogger extends BasicLogger @LogMessage(level = Logger.Level.ERROR) @Message(id = 344001, value = "Failed to start source connection" , format = Message.Format.MESSAGE_FORMAT) void jmsBridgeSrcConnectError(@Cause Exception e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 344002, value = "Failed to start JMS Bridge. QoS Mode: {0} requires a Transaction Manager, none found" , format = Message.Format.MESSAGE_FORMAT) + void jmsBridgeTransactionManagerMissing(QualityOfServiceMode qosMode); } diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java index cefc92817d..0301e844c2 100644 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java @@ -66,7 +66,6 @@ import org.apache.activemq.jms.client.ActiveMQMessage; import org.apache.activemq.jms.server.ActiveMQJMSServerBundle; import org.apache.activemq.service.extensions.ServiceUtils; import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistry; -import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistryImpl; import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig; import org.apache.activemq.utils.ClassloadingUtil; import org.apache.activemq.utils.DefaultSensitiveStringCodec; @@ -402,28 +401,45 @@ public final class JMSBridgeImpl implements JMSBridge checkParams(); - if (tm == null) - { - tm = ServiceUtils.getTransactionManager(); - } - // There may already be a JTA transaction associated to the thread boolean ok; - Transaction toResume = null; - try + // Check to see if the QoSMode requires a TM + if (qualityOfServiceMode.equals(QualityOfServiceMode.AT_MOST_ONCE) || + qualityOfServiceMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE)) { - toResume = tm.suspend(); - - ok = setupJMSObjects(); - } - finally - { - if (toResume != null) + if (tm == null) { - tm.resume(toResume); + tm = ServiceUtils.getTransactionManager(); } + + if (tm == null) + { + ActiveMQJMSBridgeLogger.LOGGER.jmsBridgeTransactionManagerMissing(qualityOfServiceMode); + throw new RuntimeException(); + } + + // There may already be a JTA transaction associated to the thread + + Transaction toResume = null; + try + { + toResume = tm.suspend(); + + ok = setupJMSObjects(); + } + finally + { + if (toResume != null) + { + tm.resume(toResume); + } + } + } + else + { + ok = setupJMSObjects(); } if (ok) @@ -2241,10 +2257,6 @@ public final class JMSBridgeImpl implements JMSBridge { registry = sl.iterator().next(); } - else - { - registry = ActiveMQRegistryImpl.getInstance(); - } } catch (Throwable e) { diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java index 1ec6dbce45..c28251a616 100644 --- a/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java +++ b/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java @@ -36,6 +36,7 @@ import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import java.lang.management.ManagementFactory; +import java.lang.reflect.Field; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -65,7 +66,9 @@ import org.apache.activemq.tests.util.UnitTestCase; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; /** * @author Jeff Mesnil @@ -84,6 +87,9 @@ public class JMSBridgeImplTest extends UnitTestCase private JMSServerManager jmsServer; + @Rule + public ExpectedException thrown = ExpectedException.none(); + // Static -------------------------------------------------------- protected static TransactionManager newTransactionManager() @@ -633,6 +639,72 @@ public class JMSBridgeImplTest extends UnitTestCase super.tearDown(); } + @Test + public void testTransactionManagerNotSetForDuplicatesOK() throws Exception + { + + ConnectionFactoryFactory sourceCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory()); + ConnectionFactoryFactory targetCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory()); + DestinationFactory sourceDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE)); + DestinationFactory targetDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET)); + + JMSBridgeImpl bridge = new JMSBridgeImpl(); + Assert.assertNotNull(bridge); + + bridge.setSourceConnectionFactoryFactory(sourceCFF); + bridge.setSourceDestinationFactory(sourceDF); + bridge.setTargetConnectionFactoryFactory(targetCFF); + bridge.setTargetDestinationFactory(targetDF); + bridge.setFailureRetryInterval(10); + bridge.setMaxRetries(1); + bridge.setMaxBatchTime(-1); + bridge.setMaxBatchSize(10); + bridge.setQualityOfServiceMode(QualityOfServiceMode.DUPLICATES_OK); + + Assert.assertFalse(bridge.isStarted()); + bridge.start(); + + Field field = JMSBridgeImpl.class.getDeclaredField("tm"); + field.setAccessible(true); + assertNull(field.get(bridge)); + + bridge.stop(); + Assert.assertFalse(bridge.isStarted()); + } + + @Test + public void testThrowErrorWhenTMNotSetForOnceOnly() throws Exception + { + thrown.expect(RuntimeException.class); + + ConnectionFactoryFactory sourceCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory()); + ConnectionFactoryFactory targetCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory()); + DestinationFactory sourceDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE)); + DestinationFactory targetDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET)); + + JMSBridgeImpl bridge = new JMSBridgeImpl(); + Assert.assertNotNull(bridge); + + bridge.setSourceConnectionFactoryFactory(sourceCFF); + bridge.setSourceDestinationFactory(sourceDF); + bridge.setTargetConnectionFactoryFactory(targetCFF); + bridge.setTargetDestinationFactory(targetDF); + bridge.setFailureRetryInterval(10); + bridge.setMaxRetries(1); + bridge.setMaxBatchTime(-1); + bridge.setMaxBatchSize(10); + bridge.setQualityOfServiceMode(QualityOfServiceMode.ONCE_AND_ONLY_ONCE); + + Assert.assertFalse(bridge.isStarted()); + bridge.start(); + + Field field = JMSBridgeImpl.class.getDeclaredField("tm"); + field.setAccessible(true); + assertNotNull(field.get(bridge)); + + bridge.stop(); + Assert.assertFalse(bridge.isStarted()); + } // Private ------------------------------------------------------- // Inner classes -------------------------------------------------