diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index b2d029f264..b00474dde2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -311,7 +311,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se public void ack(Object brokerConsumer, Object message) throws Exception { recoverContext(); try { - ((ServerConsumer) brokerConsumer).individualAcknowledge(null, ((ServerMessage) message).getMessageID()); + ((ServerConsumer) brokerConsumer).individualAcknowledge(serverSession.getCurrentTransaction(), ((ServerMessage) message).getMessageID()); } finally { resetContext(); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java index 5fd24d9f6d..40a454859f 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java @@ -24,11 +24,13 @@ import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Outcome; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -294,7 +296,31 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple DeliveryState remoteState = delivery.getRemoteState(); if (remoteState != null) { - if (remoteState instanceof Accepted) { + // If we are transactional then we need ack if the msg has been accepted + if (remoteState instanceof TransactionalState) { + TransactionalState txState = (TransactionalState) remoteState; + if (txState.getOutcome() != null) { + Outcome outcome = txState.getOutcome(); + if (outcome instanceof Accepted) { + if (!delivery.remotelySettled()) { + TransactionalState txAccepted = new TransactionalState(); + txAccepted.setOutcome(Accepted.getInstance()); + txAccepted.setTxnId(txState.getTxnId()); + + delivery.disposition(txAccepted); + } + //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order + // from dealer, a perf hit but a must + try { + sessionSPI.ack(brokerConsumer, message); + } + catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); + } + } + } + } + else if (remoteState instanceof Accepted) { //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order // from dealer, a perf hit but a must try { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 2c68dde43d..98f0e0fef6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -229,6 +229,98 @@ public class ProtonTest extends ActiveMQTestBase { Assert.assertNotNull(message); } + @Test + public void testCommitProducer() throws Throwable { + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue queue = createQueue(address); + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message); + } + session.commit(); + session.close(); + Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); + Assert.assertEquals(q.getMessageCount(), 10); + } + + @Test + public void testRollbackProducer() throws Throwable { + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue queue = createQueue(address); + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message); + } + session.rollback(); + session.close(); + Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); + Assert.assertEquals(q.getMessageCount(), 0); + } + + @Test + public void testCommitConsumer() throws Throwable { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = createQueue(address); + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message); + } + session.close(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer cons = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) cons.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("Message:" + i, message.getText()); + } + session.commit(); + Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); + Assert.assertEquals(q.getMessageCount(), 0); + } + + @Test + public void testRollbackConsumer() throws Throwable { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = createQueue(address); + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message); + } + session.close(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer cons = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) cons.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("Message:" + i, message.getText()); + } + session.rollback(); + Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); + Assert.assertEquals(q.getMessageCount(), 10); + } + @Test public void testResourceLimitExceptionOnAddressFull() throws Exception { if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol