AMQ-6697 Adds a test to show that the described case works

Correctly ACK inside a TX and then Abort and then ACK again outside a TX
to show that the broker will then mark the message as consumed.
This commit is contained in:
Timothy Bish 2017-06-02 11:50:14 -04:00
parent 7413ee00e1
commit bd8661796b
1 changed files with 59 additions and 0 deletions

View File

@ -35,6 +35,7 @@ import javax.jms.TextMessage;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
@ -1128,4 +1129,62 @@ public class Stomp11Test extends StompTestSupport {
assertEquals(view.getDurableTopicSubscribers().length, 2); assertEquals(view.getDurableTopicSubscribers().length, 2);
assertEquals(view.getInactiveDurableTopicSubscribers().length, 0); assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
} }
@Test(timeout = 60000)
public void testTransactionRollbackAllowsSecondAckOutsideTX() throws Exception {
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello"));
producer.close();
String frame = "STOMP\n" + "login:system\n" + "passcode:manager\n" +
"accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
QueueViewMBean queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getQueueSize());
frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
// ack it in the TX then abort
frame = "ACK\n" + "transaction: tx1\n" + "subscription:12345\n" + "message-id:" +
received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// rollback first message
frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
assertEquals(1, queueView.getQueueSize());
// ack it outside the TX and it should be really ack'd
frame = "ACK\n" + "subscription:12345\n" + "message-id:" +
received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
assertTrue("Message not ack'd", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return queueView.getQueueSize() == 0;
}
}));
String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"receipt:1\n" + "id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(unsub);
String receipt = stompConnection.receiveFrame();
assertTrue(receipt.contains("RECEIPT"));
}
} }