ARTEMIS-1554 JMS bridge with transactions cannot be stopped on session failover

This commit is contained in:
xstefank 2017-12-13 09:03:30 +01:00 committed by Clebert Suconic
parent 43b72759e5
commit cfb8206650
2 changed files with 62 additions and 0 deletions

View File

@ -55,6 +55,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener; import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType; import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.jms.bridge.ActiveMQJMSBridgeLogger; import org.apache.activemq.artemis.jms.bridge.ActiveMQJMSBridgeLogger;
import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory; import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
import org.apache.activemq.artemis.jms.bridge.DestinationFactory; import org.apache.activemq.artemis.jms.bridge.DestinationFactory;
@ -497,6 +498,8 @@ public final class JMSBridgeImpl implements JMSBridge {
ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx"); ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx");
} }
stopSessionFailover();
try { try {
tx.rollback(); tx.rollback();
abortedMessageCount += messages.size(); abortedMessageCount += messages.size();
@ -535,6 +538,14 @@ public final class JMSBridgeImpl implements JMSBridge {
} }
} }
private void stopSessionFailover() {
XASession xaSource = (XASession) sourceSession;
XASession xaTarget = (XASession) targetSession;
((ClientSessionInternal) xaSource.getXAResource()).getSessionContext().releaseCommunications();
((ClientSessionInternal) xaTarget.getXAResource()).getSessionContext().releaseCommunications();
}
@Override @Override
public synchronized boolean isStarted() { public synchronized boolean isStarted() {
return started; return started;

View File

@ -34,22 +34,31 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory; import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode; import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl; import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage; import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory;
import org.apache.activemq.artemis.service.extensions.ServiceUtils; import org.apache.activemq.artemis.service.extensions.ServiceUtils;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.ra.DummyTransactionManager; import org.apache.activemq.artemis.tests.integration.ra.DummyTransactionManager;
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout;
public class JMSBridgeTest extends BridgeTestBase { public class JMSBridgeTest extends BridgeTestBase {
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@Rule
public Timeout timeout = new Timeout(120000);
// MaxBatchSize but no MaxBatchTime // MaxBatchSize but no MaxBatchTime
@Test @Test
@ -1289,6 +1298,48 @@ public class JMSBridgeTest extends BridgeTestBase {
} }
} }
@Test
public void testCrashDestStopBridge() throws Exception {
cff1xa = new ConnectionFactoryFactory() {
@Override
public Object createConnectionFactory() throws Exception {
ActiveMQXAConnectionFactory cf = (ActiveMQXAConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, params1));
cf.setReconnectAttempts(-1);
cf.setCallFailoverTimeout(-1);
cf.setCallTimeout(10000);
cf.setBlockOnNonDurableSend(true);
cf.setBlockOnDurableSend(true);
cf.setCacheLargeMessagesClient(true);
return cf;
}
};
JMSBridgeImpl bridge = new JMSBridgeImpl(cff0xa, cff1xa, sourceQueueFactory, targetQueueFactory, null, null, null, null, null, 1000, -1, QualityOfServiceMode.ONCE_AND_ONLY_ONCE, 10, 5000, null, null, false).setBridgeName("test-bridge");
addActiveMQComponent(bridge);
bridge.setTransactionManager(newTransactionManager());
bridge.start();
// Now crash the dest server
JMSBridgeTest.log.info("About to crash server");
jmsServer1.stop();
// Now stop the bridge while the failover is happening
JMSBridgeTest.log.info("About to stop the bridge");
bridge.stop();
// Shutdown the source server
jmsServer0.stop();
}
// Private ------------------------------------------------------------------------------- // Private -------------------------------------------------------------------------------
private void testStress(final QualityOfServiceMode qosMode, private void testStress(final QualityOfServiceMode qosMode,