diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 61816af43a..14c1042cd0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -56,6 +56,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; +import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler; import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -109,6 +110,8 @@ public class AMQPSessionCallback implements SessionCallback { private final AddressQueryCache addressQueryCache = new AddressQueryCache<>(); + private ProtonTransactionHandler transactionHandler; + public AMQPSessionCallback(AMQPConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection, @@ -690,6 +693,14 @@ public class AMQPSessionCallback implements SessionCallback { } } + @Override + public Transaction getCurrentTransaction() { + if (this.transactionHandler != null) { + return this.transactionHandler.getCurrentTransaction(); + } + return null; + } + public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException { return protonSPI.getTransaction(txid, remove); } @@ -740,6 +751,14 @@ public class AMQPSessionCallback implements SessionCallback { serverSession.removeProducer(name); } + public void setTransactionHandler(ProtonTransactionHandler transactionHandler) { + this.transactionHandler = transactionHandler; + } + + public ProtonTransactionHandler getTransactionHandler() { + return this.transactionHandler; + } + class AddressQueryCache { SimpleString address; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index 9ccc1964e3..78a5b33637 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction; import java.nio.ByteBuffer; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; @@ -47,6 +48,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { private final int amqpCredit; private final int amqpLowMark; + private Transaction currentTx; final AMQPSessionCallback sessionSPI; final AMQPConnectionContext connection; @@ -58,6 +60,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { this.connection = connection; this.amqpCredit = connection.getAmqpCredits(); this.amqpLowMark = connection.getAmqpLowCredits(); + this.sessionSPI.setTransactionHandler(this); } @Override @@ -100,6 +103,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { Binary txID = sessionSPI.newTransaction(); Declared declared = new Declared(); declared.setTxnId(txID); + currentTx = sessionSPI.getTransaction(txID, false); IOCallback ioAction = new IOCallback() { @Override public void done() { @@ -115,7 +119,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { @Override public void onError(int errorCode, String errorMessage) { - + currentTx = null; } }; sessionSPI.afterIO(ioAction); @@ -133,6 +137,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { try { delivery.settle(); delivery.disposition(new Accepted()); + currentTx = null; } finally { connection.unlock(); connection.flush(); @@ -192,4 +197,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { message.decode(encoded); return message; } + + public Transaction getCurrentTransaction() { + return currentTx; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index d13cd76014..7ab353a70d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1884,6 +1884,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener { return oper.getListOnConsumer(consumerId); } } else { + //amqp handles the transaction in callback + if (callback != null) { + Transaction transaction = callback.getCurrentTransaction(); + if (transaction != null) { + RefsOperation operation = (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION); + if (operation != null) { + return operation.getListOnConsumer(consumerId); + } + } + } return Collections.emptyList(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index c4a2dbe5a0..5577522aca 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; public interface SessionCallback { @@ -93,4 +94,8 @@ public interface SessionCallback { default void close(boolean failed) { } + + default Transaction getCurrentTransaction() { + return null; + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java new file mode 100644 index 0000000000..3be3e88c32 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.Map; + +public class JMXManagementTest extends JMSClientTestSupport { + + @Test + public void testListDeliveringMessages() throws Exception { + SimpleString queue = new SimpleString(getQueueName()); + + Connection connection1 = createConnection(); + Connection connection2 = createConnection(); + Session prodSession = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session consSession = connection2.createSession(true, Session.SESSION_TRANSACTED); + + javax.jms.Queue jmsQueue = prodSession.createQueue(queue.toString()); + + QueueControl queueControl = createManagementControl(queue, queue); + + MessageProducer producer = prodSession.createProducer(jmsQueue); + final int num = 20; + + for (int i = 0; i < num; i++) { + TextMessage message = prodSession.createTextMessage("hello" + i); + producer.send(message); + } + + connection2.start(); + MessageConsumer consumer = consSession.createConsumer(jmsQueue); + + for (int i = 0; i < num; i++) { + TextMessage msgRec = (TextMessage) consumer.receive(5000); + assertNotNull(msgRec); + assertEquals(msgRec.getText(), "hello" + i); + } + + //before commit + assertEquals(num, queueControl.getDeliveringCount()); + + Map[]> result = queueControl.listDeliveringMessages(); + assertEquals(1, result.size()); + + Map[] msgMaps = result.entrySet().iterator().next().getValue(); + + assertEquals(num, msgMaps.length); + + consSession.commit(); + result = queueControl.listDeliveringMessages(); + + assertEquals(0, result.size()); + + consSession.close(); + prodSession.close(); + + connection1.close(); + connection2.close(); + } + + protected QueueControl createManagementControl(final SimpleString address, + final SimpleString queue) throws Exception { + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, RoutingType.ANYCAST, this.mBeanServer); + + return queueControl; + } +}