diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java index 19cac25c86..b67b0b3c85 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java @@ -55,6 +55,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.client.FailoverEventListener; import org.apache.activemq.artemis.api.core.client.FailoverEventType; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; +import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.jms.bridge.ActiveMQJMSBridgeLogger; import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory; import org.apache.activemq.artemis.jms.bridge.DestinationFactory; @@ -497,6 +498,8 @@ public final class JMSBridgeImpl implements JMSBridge { ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx"); } + stopSessionFailover(); + try { tx.rollback(); abortedMessageCount += messages.size(); @@ -535,6 +538,14 @@ public final class JMSBridgeImpl implements JMSBridge { } } + private void stopSessionFailover() { + XASession xaSource = (XASession) sourceSession; + XASession xaTarget = (XASession) targetSession; + + ((ClientSessionInternal) xaSource.getXAResource()).getSessionContext().releaseCommunications(); + ((ClientSessionInternal) xaTarget.getXAResource()).getSessionContext().releaseCommunications(); + } + @Override public synchronized boolean isStarted() { return started; diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java index df45f68700..fed218ba72 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java @@ -34,22 +34,31 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; +import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory; import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode; import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl; import org.apache.activemq.artemis.jms.client.ActiveMQMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory; import org.apache.activemq.artemis.service.extensions.ServiceUtils; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.ra.DummyTransactionManager; import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; public class JMSBridgeTest extends BridgeTestBase { private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + @Rule + public Timeout timeout = new Timeout(120000); + // MaxBatchSize but no MaxBatchTime @Test @@ -1289,6 +1298,48 @@ public class JMSBridgeTest extends BridgeTestBase { } } + @Test + public void testCrashDestStopBridge() throws Exception { + cff1xa = new ConnectionFactoryFactory() { + @Override + public Object createConnectionFactory() throws Exception { + ActiveMQXAConnectionFactory cf = (ActiveMQXAConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, params1)); + + cf.setReconnectAttempts(-1); + cf.setCallFailoverTimeout(-1); + cf.setCallTimeout(10000); + cf.setBlockOnNonDurableSend(true); + cf.setBlockOnDurableSend(true); + cf.setCacheLargeMessagesClient(true); + + return cf; + } + + }; + + JMSBridgeImpl bridge = new JMSBridgeImpl(cff0xa, cff1xa, sourceQueueFactory, targetQueueFactory, null, null, null, null, null, 1000, -1, QualityOfServiceMode.ONCE_AND_ONLY_ONCE, 10, 5000, null, null, false).setBridgeName("test-bridge"); + addActiveMQComponent(bridge); + bridge.setTransactionManager(newTransactionManager()); + + bridge.start(); + + // Now crash the dest server + + JMSBridgeTest.log.info("About to crash server"); + + jmsServer1.stop(); + + // Now stop the bridge while the failover is happening + + JMSBridgeTest.log.info("About to stop the bridge"); + + bridge.stop(); + + // Shutdown the source server + + jmsServer0.stop(); + } + // Private ------------------------------------------------------------------------------- private void testStress(final QualityOfServiceMode qosMode,