Only Set TM on Bridge in QoS modes that require it

The bridge currently tries to assign a TM even when the quality of
service level is set to Duplicates OK.  This QoS does not use or require
a TM.  This patch stops the bridge from attempting to assign a TM for
this QoS and also checks that a TM is set of the other QoS.  If TM is
not set for a QoS that requires one, a error is logged and RunTime
exception thrown.
This commit is contained in:
Martyn Taylor 2014-12-15 16:27:27 +00:00
parent 739c0f368b
commit f28c9be8af
3 changed files with 108 additions and 20 deletions

View File

@ -101,4 +101,8 @@ public interface ActiveMQJMSBridgeLogger extends BasicLogger
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 344001, value = "Failed to start source connection" , format = Message.Format.MESSAGE_FORMAT) @Message(id = 344001, value = "Failed to start source connection" , format = Message.Format.MESSAGE_FORMAT)
void jmsBridgeSrcConnectError(@Cause Exception e); void jmsBridgeSrcConnectError(@Cause Exception e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 344002, value = "Failed to start JMS Bridge. QoS Mode: {0} requires a Transaction Manager, none found" , format = Message.Format.MESSAGE_FORMAT)
void jmsBridgeTransactionManagerMissing(QualityOfServiceMode qosMode);
} }

View File

@ -66,7 +66,6 @@ import org.apache.activemq.jms.client.ActiveMQMessage;
import org.apache.activemq.jms.server.ActiveMQJMSServerBundle; import org.apache.activemq.jms.server.ActiveMQJMSServerBundle;
import org.apache.activemq.service.extensions.ServiceUtils; import org.apache.activemq.service.extensions.ServiceUtils;
import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistry; import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistry;
import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistryImpl;
import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig; import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.utils.ClassloadingUtil; import org.apache.activemq.utils.ClassloadingUtil;
import org.apache.activemq.utils.DefaultSensitiveStringCodec; import org.apache.activemq.utils.DefaultSensitiveStringCodec;
@ -402,14 +401,26 @@ public final class JMSBridgeImpl implements JMSBridge
checkParams(); checkParams();
// There may already be a JTA transaction associated to the thread
boolean ok;
// Check to see if the QoSMode requires a TM
if (qualityOfServiceMode.equals(QualityOfServiceMode.AT_MOST_ONCE) ||
qualityOfServiceMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
{
if (tm == null) if (tm == null)
{ {
tm = ServiceUtils.getTransactionManager(); tm = ServiceUtils.getTransactionManager();
} }
// There may already be a JTA transaction associated to the thread if (tm == null)
{
ActiveMQJMSBridgeLogger.LOGGER.jmsBridgeTransactionManagerMissing(qualityOfServiceMode);
throw new RuntimeException();
}
boolean ok; // There may already be a JTA transaction associated to the thread
Transaction toResume = null; Transaction toResume = null;
try try
@ -425,6 +436,11 @@ public final class JMSBridgeImpl implements JMSBridge
tm.resume(toResume); tm.resume(toResume);
} }
} }
}
else
{
ok = setupJMSObjects();
}
if (ok) if (ok)
{ {
@ -2241,10 +2257,6 @@ public final class JMSBridgeImpl implements JMSBridge
{ {
registry = sl.iterator().next(); registry = sl.iterator().next();
} }
else
{
registry = ActiveMQRegistryImpl.getInstance();
}
} }
catch (Throwable e) catch (Throwable e)
{ {

View File

@ -36,6 +36,7 @@ import javax.transaction.SystemException;
import javax.transaction.Transaction; import javax.transaction.Transaction;
import javax.transaction.TransactionManager; import javax.transaction.TransactionManager;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -65,7 +66,9 @@ import org.apache.activemq.tests.util.UnitTestCase;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
/** /**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a> * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@ -84,6 +87,9 @@ public class JMSBridgeImplTest extends UnitTestCase
private JMSServerManager jmsServer; private JMSServerManager jmsServer;
@Rule
public ExpectedException thrown = ExpectedException.none();
// Static -------------------------------------------------------- // Static --------------------------------------------------------
protected static TransactionManager newTransactionManager() protected static TransactionManager newTransactionManager()
@ -633,6 +639,72 @@ public class JMSBridgeImplTest extends UnitTestCase
super.tearDown(); super.tearDown();
} }
@Test
public void testTransactionManagerNotSetForDuplicatesOK() throws Exception
{
ConnectionFactoryFactory sourceCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
ConnectionFactoryFactory targetCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
DestinationFactory sourceDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE));
DestinationFactory targetDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET));
JMSBridgeImpl bridge = new JMSBridgeImpl();
Assert.assertNotNull(bridge);
bridge.setSourceConnectionFactoryFactory(sourceCFF);
bridge.setSourceDestinationFactory(sourceDF);
bridge.setTargetConnectionFactoryFactory(targetCFF);
bridge.setTargetDestinationFactory(targetDF);
bridge.setFailureRetryInterval(10);
bridge.setMaxRetries(1);
bridge.setMaxBatchTime(-1);
bridge.setMaxBatchSize(10);
bridge.setQualityOfServiceMode(QualityOfServiceMode.DUPLICATES_OK);
Assert.assertFalse(bridge.isStarted());
bridge.start();
Field field = JMSBridgeImpl.class.getDeclaredField("tm");
field.setAccessible(true);
assertNull(field.get(bridge));
bridge.stop();
Assert.assertFalse(bridge.isStarted());
}
@Test
public void testThrowErrorWhenTMNotSetForOnceOnly() throws Exception
{
thrown.expect(RuntimeException.class);
ConnectionFactoryFactory sourceCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
ConnectionFactoryFactory targetCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
DestinationFactory sourceDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE));
DestinationFactory targetDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET));
JMSBridgeImpl bridge = new JMSBridgeImpl();
Assert.assertNotNull(bridge);
bridge.setSourceConnectionFactoryFactory(sourceCFF);
bridge.setSourceDestinationFactory(sourceDF);
bridge.setTargetConnectionFactoryFactory(targetCFF);
bridge.setTargetDestinationFactory(targetDF);
bridge.setFailureRetryInterval(10);
bridge.setMaxRetries(1);
bridge.setMaxBatchTime(-1);
bridge.setMaxBatchSize(10);
bridge.setQualityOfServiceMode(QualityOfServiceMode.ONCE_AND_ONLY_ONCE);
Assert.assertFalse(bridge.isStarted());
bridge.start();
Field field = JMSBridgeImpl.class.getDeclaredField("tm");
field.setAccessible(true);
assertNotNull(field.get(bridge));
bridge.stop();
Assert.assertFalse(bridge.isStarted());
}
// Private ------------------------------------------------------- // Private -------------------------------------------------------
// Inner classes ------------------------------------------------- // Inner classes -------------------------------------------------