This commit is contained in:
Clebert Suconic 2017-12-20 16:07:51 -05:00
commit 7e97dbd2e9
2 changed files with 62 additions and 0 deletions

View File

@ -55,6 +55,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.jms.bridge.ActiveMQJMSBridgeLogger;
import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
import org.apache.activemq.artemis.jms.bridge.DestinationFactory;
@ -497,6 +498,8 @@ public final class JMSBridgeImpl implements JMSBridge {
ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx");
}
stopSessionFailover();
try {
tx.rollback();
abortedMessageCount += messages.size();
@ -535,6 +538,14 @@ public final class JMSBridgeImpl implements JMSBridge {
}
}
private void stopSessionFailover() {
XASession xaSource = (XASession) sourceSession;
XASession xaTarget = (XASession) targetSession;
((ClientSessionInternal) xaSource.getXAResource()).getSessionContext().releaseCommunications();
((ClientSessionInternal) xaTarget.getXAResource()).getSessionContext().releaseCommunications();
}
@Override
public synchronized boolean isStarted() {
return started;

View File

@ -34,22 +34,31 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory;
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.ra.DummyTransactionManager;
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
public class JMSBridgeTest extends BridgeTestBase {
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@Rule
public Timeout timeout = new Timeout(120000);
// MaxBatchSize but no MaxBatchTime
@Test
@ -1289,6 +1298,48 @@ public class JMSBridgeTest extends BridgeTestBase {
}
}
@Test
public void testCrashDestStopBridge() throws Exception {
cff1xa = new ConnectionFactoryFactory() {
@Override
public Object createConnectionFactory() throws Exception {
ActiveMQXAConnectionFactory cf = (ActiveMQXAConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, params1));
cf.setReconnectAttempts(-1);
cf.setCallFailoverTimeout(-1);
cf.setCallTimeout(10000);
cf.setBlockOnNonDurableSend(true);
cf.setBlockOnDurableSend(true);
cf.setCacheLargeMessagesClient(true);
return cf;
}
};
JMSBridgeImpl bridge = new JMSBridgeImpl(cff0xa, cff1xa, sourceQueueFactory, targetQueueFactory, null, null, null, null, null, 1000, -1, QualityOfServiceMode.ONCE_AND_ONLY_ONCE, 10, 5000, null, null, false).setBridgeName("test-bridge");
addActiveMQComponent(bridge);
bridge.setTransactionManager(newTransactionManager());
bridge.start();
// Now crash the dest server
JMSBridgeTest.log.info("About to crash server");
jmsServer1.stop();
// Now stop the bridge while the failover is happening
JMSBridgeTest.log.info("About to stop the bridge");
bridge.stop();
// Shutdown the source server
jmsServer0.stop();
}
// Private -------------------------------------------------------------------------------
private void testStress(final QualityOfServiceMode qosMode,