ARTEMIS-3240 - ensure pending transactions are rolled back on connection failure. Fix and test

This commit is contained in:
gtully 2021-04-13 14:35:40 +01:00 committed by clebertsuconic
parent bf132b51b1
commit 0f3d87799a
2 changed files with 64 additions and 0 deletions

View File

@ -682,6 +682,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override @Override
public void fail(ActiveMQException me, String message) { public void fail(ActiveMQException me, String message) {
for (Transaction tx : txMap.values()) {
tx.rollbackIfPossible();
}
if (me != null) { if (me != null) {
//filter it like the other protocols //filter it like the other protocols
if (!(me instanceof ActiveMQRemoteDisconnectException)) { if (!(me instanceof ActiveMQRemoteDisconnectException)) {

View File

@ -21,7 +21,9 @@ import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.Session; import javax.jms.Session;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.net.Socket;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -31,10 +33,15 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test; import org.junit.Test;
/** /**
@ -211,4 +218,57 @@ public class JMSConsumer2Test extends BasicOpenWireTest {
redispatchSession.close(); redispatchSession.close();
} }
@Test
public void testRedeliveryOnServerConnectionFailWithPendingAckInLocalTx() throws Exception {
// Send a message to the broker.
connection.start();
sendMessages(connection, new ActiveMQQueue(queueName), 1);
connection.close();
factory.setWatchTopicAdvisories(false);
factory.setNonBlockingRedelivery(true);
connection = (ActiveMQConnection) factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch gotMessage = new CountDownLatch(1);
consumer.setMessageListener(message -> gotMessage.countDown());
assertTrue(gotMessage.await(1, TimeUnit.SECONDS));
// want to ensure the ack has had a chance to get back to the broker
final Queue queueInstance = server.locateQueue(new SimpleString(queueName));
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return queueInstance.getAcknowledgeAttempts() > 0;
}
});
// whack the connection so there is no transaction outcome
try {
connection.getTransport().narrow(Socket.class).close();
} catch (IOException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (Exception expected) {
}
// expect rollback and redelivery on new consumer
connection = (ActiveMQConnection) factory.createConnection();
connection.start();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumer = session.createConsumer(destination);
assertNotNull(consumer.receive(2000));
session.commit();
connection.close();
}
} }