Apply patch to use individual ack for messages in a TX to avoid
unmatched ack when ack range is non-sequential
This commit is contained in:
Timothy Bish 2014-12-01 14:16:01 -05:00
parent 9797d3b957
commit 9edf907aed
3 changed files with 74 additions and 24 deletions

View File

@ -1203,29 +1203,29 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
@Override
void doCommit() throws Exception {
if (!dispatchedInTx.isEmpty()) {
for (MessageDispatch md : dispatchedInTx) {
MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
MessageDispatch md = dispatchedInTx.getFirst();
MessageAck pendingTxAck = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, dispatchedInTx.size());
pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
pendingTxAck.setFirstMessageId(dispatchedInTx.getLast().getMessage().getMessageId());
LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
sendToActiveMQ(pendingTxAck, new ResponseHandler() {
@Override
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
exception.printStackTrace();
sender.close();
}
}
pumpProtonToSocket();
}
});
}
dispatchedInTx.clear();
sendToActiveMQ(pendingTxAck, new ResponseHandler() {
@Override
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
exception.printStackTrace();
sender.close();
}
}
pumpProtonToSocket();
}
});
}
}

View File

@ -54,6 +54,8 @@ import org.slf4j.LoggerFactory;
public class AmqpTestSupport {
public static final String MESSAGE_NUMBER = "MessageNumber";
@Rule public TestName name = new TestName();
protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
@ -249,9 +251,10 @@ public class AmqpTestSupport {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = session.createProducer(destination);
for (int i = 0; i < count; i++) {
for (int i = 1; i <= count; i++) {
TextMessage message = session.createTextMessage();
message.setText("TextMessage: " + i);
message.setIntProperty(MESSAGE_NUMBER, i);
p.send(message);
}

View File

@ -18,6 +18,9 @@ package org.apache.activemq.transport.amqp;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -140,7 +143,7 @@ public class JMSClientTest extends JMSClientTestSupport {
final int msgCount = 1;
connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, msgCount);
@ -170,7 +173,7 @@ public class JMSClientTest extends JMSClientTestSupport {
final int msgCount = 1;
connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, msgCount);
@ -206,6 +209,50 @@ public class JMSClientTest extends JMSClientTestSupport {
session.close();
}
@Test(timeout = 60000)
public void testRollbackSomeThenReceiveAndCommit() throws Exception {
int totalCount = 5;
int consumeBeforeRollback = 2;
connection = createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, totalCount);
QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
assertEquals(totalCount, proxy.getQueueSize());
MessageConsumer consumer = session.createConsumer(queue);
for(int i = 1; i <= consumeBeforeRollback; i++) {
Message message = consumer.receive(1000);
assertNotNull(message);
assertEquals("Unexpected message number", i, message.getIntProperty(AmqpTestSupport.MESSAGE_NUMBER));
}
session.rollback();
assertEquals(totalCount, proxy.getQueueSize());
// Consume again..check we receive all the messages.
Set<Integer> messageNumbers = new HashSet<Integer>();
for(int i = 1; i <= totalCount; i++) {
messageNumbers.add(i);
}
for(int i = 1; i <= totalCount; i++) {
Message message = consumer.receive(1000);
assertNotNull(message);
int msgNum = message.getIntProperty(AmqpTestSupport.MESSAGE_NUMBER);
messageNumbers.remove(msgNum);
}
session.commit();
assertTrue("Did not consume all expected messages, missing messages: " + messageNumbers, messageNumbers.isEmpty());
assertEquals("Queue should have no messages left after commit", 0, proxy.getQueueSize());
}
@Test(timeout=60000)
public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
@ -213,7 +260,7 @@ public class JMSClientTest extends JMSClientTestSupport {
final int msgCount = 500;
connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, msgCount);
@ -757,7 +804,7 @@ public class JMSClientTest extends JMSClientTestSupport {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
connection.start();