This closes #666

This commit is contained in:
Clebert Suconic 2016-07-26 14:30:11 -04:00
commit 41d87490f7
3 changed files with 120 additions and 2 deletions

View File

@ -311,7 +311,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
public void ack(Object brokerConsumer, Object message) throws Exception {
recoverContext();
try {
((ServerConsumer) brokerConsumer).individualAcknowledge(null, ((ServerMessage) message).getMessageID());
((ServerConsumer) brokerConsumer).individualAcknowledge(serverSession.getCurrentTransaction(), ((ServerMessage) message).getMessageID());
}
finally {
resetContext();

View File

@ -24,11 +24,13 @@ import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@ -294,7 +296,31 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
DeliveryState remoteState = delivery.getRemoteState();
if (remoteState != null) {
if (remoteState instanceof Accepted) {
// If we are transactional then we need ack if the msg has been accepted
if (remoteState instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) remoteState;
if (txState.getOutcome() != null) {
Outcome outcome = txState.getOutcome();
if (outcome instanceof Accepted) {
if (!delivery.remotelySettled()) {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
txAccepted.setTxnId(txState.getTxnId());
delivery.disposition(txAccepted);
}
//we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
// from dealer, a perf hit but a must
try {
sessionSPI.ack(brokerConsumer, message);
}
catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
}
}
}
else if (remoteState instanceof Accepted) {
//we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
// from dealer, a perf hit but a must
try {

View File

@ -229,6 +229,98 @@ public class ProtonTest extends ActiveMQTestBase {
Assert.assertNotNull(message);
}
@Test
public void testCommitProducer() throws Throwable {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
javax.jms.Queue queue = createQueue(address);
System.out.println("queue:" + queue.getQueueName());
MessageProducer p = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage();
message.setText("Message:" + i);
p.send(message);
}
session.commit();
session.close();
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
Assert.assertEquals(q.getMessageCount(), 10);
}
@Test
public void testRollbackProducer() throws Throwable {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
javax.jms.Queue queue = createQueue(address);
System.out.println("queue:" + queue.getQueueName());
MessageProducer p = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage();
message.setText("Message:" + i);
p.send(message);
}
session.rollback();
session.close();
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
Assert.assertEquals(q.getMessageCount(), 0);
}
@Test
public void testCommitConsumer() throws Throwable {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = createQueue(address);
System.out.println("queue:" + queue.getQueueName());
MessageProducer p = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage();
message.setText("Message:" + i);
p.send(message);
}
session.close();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer cons = session.createConsumer(queue);
connection.start();
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) cons.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("Message:" + i, message.getText());
}
session.commit();
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
Assert.assertEquals(q.getMessageCount(), 0);
}
@Test
public void testRollbackConsumer() throws Throwable {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = createQueue(address);
System.out.println("queue:" + queue.getQueueName());
MessageProducer p = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage();
message.setText("Message:" + i);
p.send(message);
}
session.close();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer cons = session.createConsumer(queue);
connection.start();
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) cons.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("Message:" + i, message.getText());
}
session.rollback();
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
Assert.assertEquals(q.getMessageCount(), 10);
}
@Test
public void testResourceLimitExceptionOnAddressFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol