From 113c0c9360197ef3467f3907a604fa527247c858 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 15 Sep 2016 15:28:07 -0400 Subject: [PATCH] ARTEMIS-738 Improving TX support on AMQP https://issues.apache.org/jira/browse/ARTEMIS-738 --- .../ActiveMQProtonConnectionCallback.java | 71 +- .../ProtonSessionIntegrationCallback.java | 84 ++- .../proton/plug/AMQPConnectionCallback.java | 11 + .../org/proton/plug/AMQPSessionCallback.java | 18 +- .../context/AbstractProtonSessionContext.java | 1 - .../context/ProtonTransactionHandler.java | 8 +- .../server/ProtonServerReceiverContext.java | 10 +- .../server/ProtonServerSenderContext.java | 7 +- .../server/ProtonServerSessionContext.java | 6 + .../ActiveMQAMQPProtocolMessageBundle.java | 3 + .../AbstractConnectionContextTest.java | 17 + .../proton/plug/test/invm/ProtonINVMSPI.java | 33 + .../test/minimalclient/AMQPClientSPI.java | 18 + .../minimalserver/MinimalConnectionSPI.java | 18 + .../test/minimalserver/MinimalSessionSPI.java | 22 +- .../artemis/core/server/ServerSession.java | 2 + .../core/server/impl/ServerSessionImpl.java | 18 +- .../transport/amqp/client/AmqpConnection.java | 9 +- .../transport/amqp/client/AmqpMessage.java | 129 +++- .../transport/amqp/client/AmqpReceiver.java | 28 + .../transport/amqp/client/AmqpSender.java | 27 +- .../transport/amqp/client/AmqpSession.java | 12 +- .../amqp/AmqpClientTestSupport.java | 194 ++++++ .../integration/amqp/AmqpTransactionTest.java | 625 ++++++++++++++++++ .../tests/integration/proton/ProtonTest.java | 3 +- .../integration/proton/ProtonTestBase.java | 1 - .../proton/ProtonTestForHeader.java | 3 - 27 files changed, 1263 insertions(+), 115 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java index ea66b01f59..d5b2ff7385 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java @@ -21,6 +21,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,8 +39,13 @@ import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.ReusableLatch; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.jboss.logging.Logger; @@ -47,7 +54,9 @@ import org.proton.plug.AMQPConnectionContext; import org.proton.plug.AMQPSessionCallback; import org.proton.plug.SASLResult; import org.proton.plug.ServerSASL; +import org.proton.plug.exceptions.ActiveMQAMQPException; import org.proton.plug.handler.ExtCapability; +import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle; import org.proton.plug.sasl.AnonymousServerSASL; import static org.proton.plug.AmqpSupport.CONTAINER_ID; @@ -55,8 +64,11 @@ import static org.proton.plug.AmqpSupport.INVALID_FIELD; import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED; public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback, FailureListener, CloseListener { + private static final Logger logger = Logger.getLogger(ActiveMQProtonConnectionCallback.class); private static final List connectedContainers = Collections.synchronizedList(new ArrayList()); + private ConcurrentMap transactions = new ConcurrentHashMap<>(); + private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class); private final ProtonProtocolManager manager; @@ -117,11 +129,23 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback, @Override public void close() { - if (registeredConnectionId.getAndSet(false)) { - server.removeClientConnection(remoteContainerId); + try { + if (registeredConnectionId.getAndSet(false)) { + server.removeClientConnection(remoteContainerId); + } + connection.close(); + amqpConnection.close(); + } + finally { + for (Transaction tx : transactions.values()) { + try { + tx.rollback(); + } + catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } } - connection.close(); - amqpConnection.close(); } public Executor getExeuctor() { @@ -219,4 +243,43 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback, public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { close(); } + + @Override + public Binary newTransaction() { + XidImpl xid = newXID(); + Transaction transaction = new TransactionImpl(xid, server.getStorageManager(), -1); + transactions.put(xid, transaction); + return new Binary(xid.getGlobalTransactionId()); + } + + @Override + public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { + XidImpl xid = newXID(txid.getArray()); + Transaction tx = transactions.get(xid); + + if (tx == null) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString()); + } + + return tx; + } + + @Override + public void removeTransaction(Binary txid) { + XidImpl xid = newXID(txid.getArray()); + transactions.remove(xid); + } + + + protected XidImpl newXID() { + return newXID(UUIDGenerator.getInstance().generateStringUUID().getBytes()); + } + + protected XidImpl newXID(byte[] bytes) { + return new XidImpl("amqp".getBytes(), 1, bytes); + } + + + + } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 153d033c1e..da9dd9cf74 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -42,7 +42,6 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; -import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SelectorTranslator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; @@ -61,6 +60,7 @@ import org.proton.plug.AMQPSessionCallback; import org.proton.plug.AMQPSessionContext; import org.proton.plug.SASLResult; import org.proton.plug.context.ProtonPlugSender; +import org.proton.plug.exceptions.ActiveMQAMQPException; import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException; import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.proton.plug.sasl.PlainSASLResult; @@ -281,46 +281,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount); } - @Override - public Binary getCurrentTXID() { - Transaction tx = serverSession.getCurrentTransaction(); - if (tx == null) { - tx = serverSession.newTransaction(); - serverSession.resetTX(tx); - } - return new Binary(ByteUtil.longToBytes(tx.getID())); - } - @Override public String tempQueueName() { return UUIDGenerator.getInstance().generateStringUUID(); } - @Override - public void commitCurrentTX() throws Exception { - recoverContext(); - try { - serverSession.commit(); - } - finally { - resetContext(); - } - } - - @Override - public void rollbackCurrentTX(boolean lastMessageDelivered) throws Exception { - //need to check here as this can be called if init fails - if (serverSession != null) { - recoverContext(); - try { - serverSession.rollback(lastMessageDelivered); - } - finally { - resetContext(); - } - } - } - @Override public void close() throws Exception { //need to check here as this can be called if init fails @@ -336,10 +301,13 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } @Override - public void ack(Object brokerConsumer, Object message) throws Exception { + public void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception { + if (transaction == null) { + transaction = serverSession.getCurrentTransaction(); + } recoverContext(); try { - ((ServerConsumer) brokerConsumer).individualAcknowledge(serverSession.getCurrentTransaction(), ((ServerMessage) message).getMessageID()); + ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, ((ServerMessage) message).getMessageID()); } finally { resetContext(); @@ -363,7 +331,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } @Override - public void serverSend(final Receiver receiver, + public void serverSend(final Transaction transaction, + final Receiver receiver, final Delivery delivery, String address, int messageFormat, @@ -382,10 +351,10 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se if (store.isRejectingMessages()) { // We drop pre-settled messages (and abort any associated Tx) if (delivery.remotelySettled()) { - if (serverSession.getCurrentTransaction() != null) { + if (transaction != null) { String amqpAddress = delivery.getLink().getTarget().getAddress(); ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress); - serverSession.getCurrentTransaction().markAsRollbackOnly(e); + transaction.markAsRollbackOnly(e); } } else { @@ -393,7 +362,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } } else { - serverSend(message, delivery, receiver); + serverSend(transaction, message, delivery, receiver); } } @@ -406,11 +375,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se connection.flush(); } - private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception { + private void serverSend(final Transaction transaction, final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception { try { message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer()); - serverSession.send(message, false); + serverSession.send(transaction, message, false, false); // FIXME Potential race here... manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { @@ -543,4 +512,31 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se return false; } } + + @Override + public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { + return protonSPI.getTransaction(txid); + } + + @Override + public Binary newTransaction() { + return protonSPI.newTransaction(); + } + + + @Override + public void commitTX(Binary txid) throws Exception { + Transaction tx = protonSPI.getTransaction(txid); + tx.commit(true); + protonSPI.removeTransaction(txid); + } + + @Override + public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception { + Transaction tx = protonSPI.getTransaction(txid); + tx.rollback(); + protonSPI.removeTransaction(txid); + + } + } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java index 15a3246fec..f4ed64cd9b 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java @@ -17,7 +17,10 @@ package org.proton.plug; import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.engine.Connection; +import org.proton.plug.exceptions.ActiveMQAMQPException; public interface AMQPConnectionCallback { @@ -44,4 +47,12 @@ public interface AMQPConnectionCallback { void sendSASLSupported(); boolean validateConnection(Connection connection, SASLResult saslResult); + + Binary newTransaction(); + + Transaction getTransaction(Binary txid) throws ActiveMQAMQPException; + + void removeTransaction(Binary txid); + + } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java index b6acd3f933..5f3b6dd57b 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java @@ -18,11 +18,13 @@ package org.proton.plug; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.ProtonJMessage; import org.proton.plug.context.ProtonPlugSender; +import org.proton.plug.exceptions.ActiveMQAMQPException; /** * These are methods where the Proton Plug component will call your server @@ -67,17 +69,20 @@ public interface AMQPSessionCallback { // This one can be a lot improved ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception; - Binary getCurrentTXID(); - String tempQueueName(); - void commitCurrentTX() throws Exception; - void rollbackCurrentTX(boolean lastMessageReceived) throws Exception; + Transaction getTransaction(Binary txid) throws ActiveMQAMQPException; + + Binary newTransaction(); + + void commitTX(Binary txid) throws Exception; + + void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception; void close() throws Exception; - void ack(Object brokerConsumer, Object message) throws Exception; + void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception; /** * @param brokerConsumer @@ -96,7 +101,8 @@ public interface AMQPSessionCallback { * @param messageFormat * @param messageEncoded a Heap Buffer ByteBuffer (safe to convert into byte[]) */ - void serverSend(Receiver receiver, + void serverSend(Transaction transaction, + Receiver receiver, Delivery delivery, String address, int messageFormat, diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java index 96f74131e9..5c0a626f68 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java @@ -140,7 +140,6 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i senders.clear(); try { if (sessionSPI != null) { - sessionSPI.rollbackCurrentTX(false); sessionSPI.close(); } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java index 597b5e45ad..263d3e686b 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java @@ -72,7 +72,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { Object action = ((AmqpValue) msg.getBody()).getValue(); if (action instanceof Declare) { - Binary txID = sessionSPI.getCurrentTXID(); + Binary txID = sessionSPI.newTransaction(); Declared declared = new Declared(); declared.setTxnId(txID); delivery.disposition(declared); @@ -80,9 +80,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { } else if (action instanceof Discharge) { Discharge discharge = (Discharge) action; + + Binary txID = discharge.getTxnId(); if (discharge.getFail()) { try { - sessionSPI.rollbackCurrentTX(true); + sessionSPI.rollbackTX(txID, true); delivery.disposition(new Accepted()); } catch (Exception e) { @@ -91,7 +93,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { } else { try { - sessionSPI.commitCurrentTX(); + sessionSPI.commitTX(txID); delivery.disposition(new Accepted()); } catch (ActiveMQAMQPException amqpE) { diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java index c564a9e610..173ff286be 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java @@ -18,8 +18,10 @@ package org.proton.plug.context.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; @@ -130,7 +132,13 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext { receiver.advance(); - sessionSPI.serverSend(receiver, delivery, address, delivery.getMessageFormat(), buffer); + Transaction tx = null; + if (delivery.getRemoteState() instanceof TransactionalState) { + + TransactionalState txState = (TransactionalState) delivery.getRemoteState(); + tx = this.sessionSPI.getTransaction(txState.getTxnId()); + } + sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), buffer); flow(maxCreditAllocation, minCreditRefresh); } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java index 2d91f3767a..e9bd1236e3 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java @@ -21,6 +21,7 @@ import java.util.Objects; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; @@ -339,7 +340,9 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple if (remoteState != null) { // If we are transactional then we need ack if the msg has been accepted if (remoteState instanceof TransactionalState) { + TransactionalState txState = (TransactionalState) remoteState; + Transaction tx = this.sessionSPI.getTransaction(txState.getTxnId()); if (txState.getOutcome() != null) { Outcome outcome = txState.getOutcome(); if (outcome instanceof Accepted) { @@ -353,7 +356,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order // from dealer, a perf hit but a must try { - sessionSPI.ack(brokerConsumer, message); + sessionSPI.ack(tx, brokerConsumer, message); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); @@ -365,7 +368,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order // from dealer, a perf hit but a must try { - sessionSPI.ack(brokerConsumer, message); + sessionSPI.ack(null, brokerConsumer, message); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java index 46178a9ba1..983fa4e902 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java @@ -19,6 +19,7 @@ package org.proton.plug.context.server; import java.util.HashMap; import java.util.Map; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transaction.Coordinator; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Receiver; @@ -60,6 +61,11 @@ public class ProtonServerSessionContext extends AbstractProtonSessionContext { public void addTransactionHandler(Coordinator coordinator, Receiver receiver) { ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI); + + coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), + Symbol.getSymbol("amqp:multi-txns-per-ssn"), + Symbol.getSymbol("amqp:multi-ssns-per-txn")); + receiver.setContext(transactionHandler); receiver.open(); receiver.flow(100); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java index 8817310983..576e61ae5a 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java @@ -74,4 +74,7 @@ public interface ActiveMQAMQPProtocolMessageBundle { @Message(id = 219013, value = "error committing coordinator: {0}", format = Message.Format.MESSAGE_FORMAT) ActiveMQAMQPIllegalStateException errorCommittingCoordinator(String message); + @Message(id = 219014, value = "Transaction not found: xid={0}", format = Message.Format.MESSAGE_FORMAT) + ActiveMQAMQPIllegalStateException txNotFound(String xidToString); + } diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java index da7b61734f..825b98709b 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java @@ -20,7 +20,9 @@ import java.util.concurrent.Executors; import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Session; @@ -77,6 +79,21 @@ public class AbstractConnectionContextTest { } + @Override + public Binary newTransaction() { + return null; + } + + @Override + public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { + return null; + } + + @Override + public void removeTransaction(Binary txid) { + + } + @Override public void onTransport(ByteBuf bytes, AMQPConnectionContext connection) { diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java index 5de6e9d76d..a35e8ac13f 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java @@ -20,7 +20,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.engine.Connection; import org.jboss.logging.Logger; import org.proton.plug.AMQPConnectionContext; @@ -29,6 +31,7 @@ import org.proton.plug.AMQPSessionCallback; import org.proton.plug.SASLResult; import org.proton.plug.ServerSASL; import org.proton.plug.context.server.ProtonServerConnectionContext; +import org.proton.plug.exceptions.ActiveMQAMQPException; import org.proton.plug.sasl.AnonymousServerSASL; import org.proton.plug.sasl.ServerSASLPlain; import org.proton.plug.test.minimalserver.MinimalSessionSPI; @@ -132,6 +135,21 @@ public class ProtonINVMSPI implements AMQPConnectionCallback { return null; } + @Override + public Binary newTransaction() { + return null; + } + + @Override + public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { + return null; + } + + @Override + public void removeTransaction(Binary txid) { + + } + class ReturnSPI implements AMQPConnectionCallback { @Override @@ -139,6 +157,21 @@ public class ProtonINVMSPI implements AMQPConnectionCallback { } + @Override + public Binary newTransaction() { + return null; + } + + @Override + public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { + return null; + } + + @Override + public void removeTransaction(Binary txid) { + + } + @Override public ServerSASL[] getSASLMechnisms() { return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()}; diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java index be1571cc66..85e4c0247e 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java @@ -22,6 +22,8 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.engine.Connection; import org.jboss.logging.Logger; import org.proton.plug.AMQPConnectionContext; @@ -29,6 +31,7 @@ import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPSessionCallback; import org.proton.plug.SASLResult; import org.proton.plug.ServerSASL; +import org.proton.plug.exceptions.ActiveMQAMQPException; import org.proton.plug.sasl.AnonymousServerSASL; import org.proton.plug.sasl.ServerSASLPlain; import org.proton.plug.util.ByteUtil; @@ -74,6 +77,21 @@ public class AMQPClientSPI implements AMQPConnectionCallback { } + @Override + public Binary newTransaction() { + return null; + } + + @Override + public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { + return null; + } + + @Override + public void removeTransaction(Binary txid) { + + } + @Override public boolean validateConnection(Connection connection, SASLResult saslResult) { return true; diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java index 1b9c9195ae..6325ad7b0f 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java @@ -24,7 +24,9 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.engine.Connection; import org.jboss.logging.Logger; import org.proton.plug.AMQPConnectionContext; @@ -32,6 +34,7 @@ import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPSessionCallback; import org.proton.plug.SASLResult; import org.proton.plug.ServerSASL; +import org.proton.plug.exceptions.ActiveMQAMQPException; import org.proton.plug.sasl.AnonymousServerSASL; import org.proton.plug.sasl.ServerSASLPlain; import org.proton.plug.util.ByteUtil; @@ -87,6 +90,21 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback { return true; } + @Override + public Binary newTransaction() { + return null; + } + + @Override + public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { + return null; + } + + @Override + public void removeTransaction(Binary txid) { + + } + @Override public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) { final int bufferSize = bytes.writerIndex(); diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java index f9a3533030..d366c5b2dc 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java @@ -22,7 +22,10 @@ import java.util.concurrent.atomic.AtomicInteger; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; @@ -123,16 +126,23 @@ public class MinimalSessionSPI implements AMQPSessionCallback { } @Override - public Binary getCurrentTXID() { - return new Binary(new byte[]{1}); + public Transaction getTransaction(Binary txid) { + return new TransactionImpl(new NullStorageManager()); } @Override - public void commitCurrentTX() { + public Binary newTransaction() { + return null; } @Override - public void rollbackCurrentTX(boolean lastMessage) { + public void commitTX(Binary txid) throws Exception { + + } + + @Override + public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception { + } @Override @@ -141,7 +151,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback { } @Override - public void ack(Object brokerConsumer, Object message) { + public void ack(Transaction tx, Object brokerConsumer, Object message) { } @@ -157,7 +167,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback { } @Override - public void serverSend(Receiver receiver, Delivery delivery, String address, int messageFormat, ByteBuf buffer) { + public void serverSend(Transaction tx, Receiver receiver, Delivery delivery, String address, int messageFormat, ByteBuf buffer) { ProtonServerMessage serverMessage = new ProtonServerMessage(); serverMessage.decode(buffer.nioBuffer()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 953de1f5c9..3521d71764 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -133,6 +133,8 @@ public interface ServerSession extends SecurityAuth { void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception; + RoutingStatus send(Transaction tx, ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception; + RoutingStatus send(ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception; RoutingStatus send(ServerMessage message, boolean direct) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index d20fa43d4d..3ccfd16f95 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1256,6 +1256,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public RoutingStatus send(final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception { + return send(getCurrentTransaction(), message, direct, noAutoCreateQueue); + } + + public RoutingStatus send(Transaction tx, final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception { // If the protocol doesn't support flow control, we have no choice other than fail the communication if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { @@ -1308,10 +1312,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (message.getAddress().equals(managementAddress)) { // It's a management message - handleManagementMessage(message, direct); + handleManagementMessage(tx, message, direct); } else { - result = doSend(message, direct, noAutoCreateQueue); + result = doSend(tx, message, direct, noAutoCreateQueue); } return result; } @@ -1337,7 +1341,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize); } - doSend(currentLargeMessage, false, false); + doSend(tx, currentLargeMessage, false, false); currentLargeMessage = null; } @@ -1526,7 +1530,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { started = s; } - private void handleManagementMessage(final ServerMessage message, final boolean direct) throws Exception { + private RoutingStatus handleManagementMessage(final Transaction tx, final ServerMessage message, final boolean direct) throws Exception { try { securityCheck(message.getAddress(), CheckType.MANAGE, this); } @@ -1544,8 +1548,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (replyTo != null) { reply.setAddress(replyTo); - doSend(reply, direct, false); + doSend(tx, reply, direct, false); } + + return RoutingStatus.OK; } private void doRollback(final boolean clientFailed, @@ -1600,7 +1606,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { theTx.rollback(); } - protected RoutingStatus doSend(final ServerMessage msg, final boolean direct, final boolean noAutoCreateQueue) throws Exception { + public RoutingStatus doSend(final Transaction tx, final ServerMessage msg, final boolean direct, final boolean noAutoCreateQueue) throws Exception { RoutingStatus result = RoutingStatus.OK; // check the user has write access to this address. try { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 1454dd9a0a..3e2c5d74e5 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -228,7 +228,14 @@ public class AmqpConnection extends AmqpAbstractResource implements } } - serializer.shutdown(); + serializer.shutdownNow(); + try { + if (!serializer.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.warn("Serializer didn't shutdown cleanly"); + } + } + catch (InterruptedException e) { + } } } } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 320d174710..060fc4ea1b 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -60,7 +60,8 @@ public class AmqpMessage { * Creates a new AmqpMessage that wraps the information necessary to handle * an outgoing message. * - * @param message the Proton message that is to be sent. + * @param message + * the Proton message that is to be sent. */ public AmqpMessage(Message message) { this(null, message, null); @@ -70,9 +71,12 @@ public class AmqpMessage { * Creates a new AmqpMessage that wraps the information necessary to handle * an incoming delivery. * - * @param receiver the AmqpReceiver that received this message. - * @param message the Proton message that was received. - * @param delivery the Delivery instance that produced this message. + * @param receiver + * the AmqpReceiver that received this message. + * @param message + * the Proton message that was received. + * @param delivery + * the Delivery instance that produced this message. */ @SuppressWarnings("unchecked") public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) { @@ -135,11 +139,30 @@ public class AmqpMessage { receiver.accept(delivery); } + /** + * Accepts the message marking it as consumed on the remote peer. + * + * @param session + * The session that is used to manage acceptance of the message. + * + * @throws Exception if an error occurs during the accept. + */ + public void accept(AmqpSession txnSession) throws Exception { + if (receiver == null) { + throw new IllegalStateException("Can't accept non-received message."); + } + + receiver.accept(delivery, txnSession); + } + /** * Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here. * - * @param deliveryFailed indicates that the delivery failed for some reason. - * @param undeliverableHere marks the delivery as not being able to be process by link it was sent to. + * @param deliveryFailed + * indicates that the delivery failed for some reason. + * @param undeliverableHere + * marks the delivery as not being able to be process by link it was sent to. + * * @throws Exception if an error occurs during the process. */ public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception { @@ -165,10 +188,36 @@ public class AmqpMessage { //----- Convenience methods for constructing outbound messages -----------// + /** + * Sets the address which is applied to the AMQP message To field in the message properties + * + * @param address + * The address that should be applied in the Message To field. + */ + public void setAddress(String address) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setAddress(address); + } + + /** + * Return the set address that was set in the Message To field. + * + * @return the set address String form or null if not set. + */ + public String getAddress() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getTo(); + } + /** * Sets the MessageId property on an outbound message using the provided String * - * @param messageId the String message ID value to set. + * @param messageId + * the String message ID value to set. */ public void setMessageId(String messageId) { checkReadOnly(); @@ -207,7 +256,8 @@ public class AmqpMessage { /** * Sets the MessageId property on an outbound message using the provided value * - * @param messageId the message ID value to set. + * @param messageId + * the message ID value to set. */ public void setRawMessageId(Object messageId) { checkReadOnly(); @@ -218,7 +268,8 @@ public class AmqpMessage { /** * Sets the CorrelationId property on an outbound message using the provided String * - * @param correlationId the String Correlation ID value to set. + * @param correlationId + * the String Correlation ID value to set. */ public void setCorrelationId(String correlationId) { checkReadOnly(); @@ -257,7 +308,8 @@ public class AmqpMessage { /** * Sets the CorrelationId property on an outbound message using the provided value * - * @param correlationId the correlation ID value to set. + * @param correlationId + * the correlation ID value to set. */ public void setRawCorrelationId(Object correlationId) { checkReadOnly(); @@ -268,7 +320,8 @@ public class AmqpMessage { /** * Sets the GroupId property on an outbound message using the provided String * - * @param groupId the String Group ID value to set. + * @param messageId + * the String Group ID value to set. */ public void setGroupId(String groupId) { checkReadOnly(); @@ -293,7 +346,8 @@ public class AmqpMessage { /** * Sets the durable header on the outgoing message. * - * @param durable the boolean durable value to set. + * @param durable + * the boolean durable value to set. */ public void setDurable(boolean durable) { checkReadOnly(); @@ -318,8 +372,10 @@ public class AmqpMessage { /** * Sets a given application property on an outbound message. * - * @param key the name to assign the new property. - * @param value the value to set for the named property. + * @param key + * the name to assign the new property. + * @param value + * the value to set for the named property. */ public void setApplicationProperty(String key, Object value) { checkReadOnly(); @@ -331,8 +387,10 @@ public class AmqpMessage { * Gets the application property that is mapped to the given name or null * if no property has been set with that name. * - * @param key the name used to lookup the property in the application properties. - * @return the propety value or null if not set. + * @param key + * the name used to lookup the property in the application properties. + * + * @return the property value or null if not set. */ public Object getApplicationProperty(String key) { if (applicationPropertiesMap == null) { @@ -346,8 +404,10 @@ public class AmqpMessage { * Perform a proper annotation set on the AMQP Message based on a Symbol key and * the target value to append to the current annotations. * - * @param key The name of the Symbol whose value is being set. - * @param value The new value to set in the annotations of this message. + * @param key + * The name of the Symbol whose value is being set. + * @param value + * The new value to set in the annotations of this message. */ public void setMessageAnnotation(String key, Object value) { checkReadOnly(); @@ -360,7 +420,9 @@ public class AmqpMessage { * that annotation name. If the message annotations have not been created yet * then this method will always return null. * - * @param key the Symbol name that should be looked up in the message annotations. + * @param key + * the Symbol name that should be looked up in the message annotations. + * * @return the value of the annotation if it exists, or null if not set or not accessible. */ public Object getMessageAnnotation(String key) { @@ -375,8 +437,10 @@ public class AmqpMessage { * Perform a proper delivery annotation set on the AMQP Message based on a Symbol * key and the target value to append to the current delivery annotations. * - * @param key The name of the Symbol whose value is being set. - * @param value The new value to set in the delivery annotations of this message. + * @param key + * The name of the Symbol whose value is being set. + * @param value + * The new value to set in the delivery annotations of this message. */ public void setDeliveryAnnotation(String key, Object value) { checkReadOnly(); @@ -389,7 +453,9 @@ public class AmqpMessage { * that annotation name. If the message annotations have not been created yet * then this method will always return null. * - * @param key the Symbol name that should be looked up in the message annotations. + * @param key + * the Symbol name that should be looked up in the message annotations. + * * @return the value of the annotation if it exists, or null if not set or not accessible. */ public Object getDeliveryAnnotation(String key) { @@ -406,7 +472,9 @@ public class AmqpMessage { * Sets a String value into the body of an outgoing Message, throws * an exception if this is an incoming message instance. * - * @param value the String value to store in the Message body. + * @param value + * the String value to store in the Message body. + * * @throws IllegalStateException if the message is read only. */ public void setText(String value) throws IllegalStateException { @@ -419,7 +487,9 @@ public class AmqpMessage { * Sets a byte array value into the body of an outgoing Message, throws * an exception if this is an incoming message instance. * - * @param bytes the byte array value to store in the Message body. + * @param value + * the byte array value to store in the Message body. + * * @throws IllegalStateException if the message is read only. */ public void setBytes(byte[] bytes) throws IllegalStateException { @@ -432,7 +502,9 @@ public class AmqpMessage { * Sets a byte array value into the body of an outgoing Message, throws * an exception if this is an incoming message instance. * - * @param described the byte array value to store in the Message body. + * @param value + * the byte array value to store in the Message body. + * * @throws IllegalStateException if the message is read only. */ public void setDescribedType(DescribedType described) throws IllegalStateException { @@ -445,6 +517,7 @@ public class AmqpMessage { * Attempts to retrieve the message body as an DescribedType instance. * * @return an DescribedType instance if one is stored in the message body. + * * @throws NoSuchElementException if the body does not contain a DescribedType. */ public DescribedType getDescribedType() throws NoSuchElementException { @@ -482,21 +555,21 @@ public class AmqpMessage { private void lazyCreateMessageAnnotations() { if (messageAnnotationsMap == null) { - messageAnnotationsMap = new HashMap<>(); + messageAnnotationsMap = new HashMap(); message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap)); } } private void lazyCreateDeliveryAnnotations() { if (deliveryAnnotationsMap == null) { - deliveryAnnotationsMap = new HashMap<>(); + deliveryAnnotationsMap = new HashMap(); message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap)); } } private void lazyCreateApplicationProperties() { if (applicationPropertiesMap == null) { - applicationPropertiesMap = new HashMap<>(); + applicationPropertiesMap = new HashMap(); message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap)); } } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 9f3bff20fb..2802751ccc 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -208,6 +208,7 @@ public class AmqpReceiver extends AmqpAbstractResource { * it is returned immediately otherwise this methods return null without waiting. * * @return a newly received message or null if there is no currently available message. + * * @throws Exception if an error occurs during the receive attempt. */ public AmqpMessage receiveNoWait() throws Exception { @@ -219,6 +220,7 @@ public class AmqpReceiver extends AmqpAbstractResource { * Request a remote peer send a Message to this client waiting until one arrives. * * @return the pulled AmqpMessage or null if none was pulled from the remote. + * * @throws IOException if an error occurs */ public AmqpMessage pull() throws IOException { @@ -402,12 +404,38 @@ public class AmqpReceiver extends AmqpAbstractResource { * @throws IOException if an error occurs while sending the accept. */ public void accept(final Delivery delivery) throws IOException { + accept(delivery, this.session); + } + + /** + * Accepts a message that was dispatched under the given Delivery instance. + * + * This method allows for the session that is used in the accept to be specified by the + * caller. This allows for an accepted message to be involved in a transaction that is + * being managed by some other session other than the one that created this receiver. + * + * @param delivery + * the Delivery instance to accept. + * @param session + * the session under which the message is being accepted. + * + * @throws IOException if an error occurs while sending the accept. + */ + public void accept(final Delivery delivery, final AmqpSession session) throws IOException { checkClosed(); if (delivery == null) { throw new IllegalArgumentException("Delivery to accept cannot be null"); } + if (session == null) { + throw new IllegalArgumentException("Session given cannot be null"); + } + + if (session.getConnection() != this.session.getConnection()) { + throw new IllegalArgumentException("The session used for accept must originate from the connection that created this receiver."); + } + final ClientFuture request = new ClientFuture(); session.getScheduler().execute(new Runnable() { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 5ae2948547..ed83e02070 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -117,6 +117,18 @@ public class AmqpSender extends AmqpAbstractResource { * @throws IOException if an error occurs during the send. */ public void send(final AmqpMessage message) throws IOException { + checkClosed(); + send(message, null); + } + + /** + * Sends the given message to this senders assigned address using the supplied transaction ID. + * + * @param message the message to send. + * @param txId the transaction ID to assign the outgoing send. + * @throws IOException if an error occurs during the send. + */ + public void send(final AmqpMessage message, final AmqpTransactionId txId) throws IOException { checkClosed(); final ClientFuture sendRequest = new ClientFuture(); @@ -125,7 +137,7 @@ public class AmqpSender extends AmqpAbstractResource { @Override public void run() { try { - doSend(message, sendRequest); + doSend(message, sendRequest, txId); session.pumpToProtonTransport(sendRequest); } catch (Exception e) { @@ -316,7 +328,7 @@ public class AmqpSender extends AmqpAbstractResource { } } - private void doSend(AmqpMessage message, AsyncResult request) throws Exception { + private void doSend(AmqpMessage message, AsyncResult request, AmqpTransactionId txId) throws Exception { LOG.trace("Producer sending message: {}", message); Delivery delivery = null; @@ -330,8 +342,15 @@ public class AmqpSender extends AmqpAbstractResource { delivery.setContext(request); - if (session.isInTransaction()) { - Binary amqpTxId = session.getTransactionId().getRemoteTxId(); + Binary amqpTxId = null; + if (txId != null) { + amqpTxId = txId.getRemoteTxId(); + } + else if (session.isInTransaction()) { + amqpTxId = session.getTransactionId().getRemoteTxId(); + } + + if (amqpTxId != null) { TransactionalState state = new TransactionalState(); state.setTxnId(amqpTxId); delivery.disposition(state); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 82b6aec95b..755ecf804a 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -408,11 +408,15 @@ public class AmqpSession extends AmqpAbstractResource { connection.pumpToProtonTransport(request); } - AmqpTransactionId getTransactionId() { - return txContext.getTransactionId(); + public AmqpTransactionId getTransactionId() { + if (txContext != null && txContext.isInTransaction()) { + return txContext.getTransactionId(); + } + + return null; } - public AmqpTransactionContext getTransactionContext() { + AmqpTransactionContext getTransactionContext() { return txContext; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java new file mode 100644 index 0000000000..8fa140a13d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.net.URI; +import java.util.LinkedList; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.junit.After; +import org.junit.Before; + +/** + * Test support class for tests that will be using the AMQP Proton wrapper client. + * This is to make it easier to migrate tests from ActiveMQ5 + */ +public class AmqpClientTestSupport extends ActiveMQTestBase { + + + ActiveMQServer server; + + LinkedList connections = new LinkedList<>(); + + + protected AmqpConnection addConnection(AmqpConnection connection) { + connections.add(connection); + return connection; + } + + + @Before + public void setUp() throws Exception { + super.setUp(); + server = createServer(true, true); + server.start(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + + for (AmqpConnection conn: connections) { + try { + conn.close(); + } + catch (Throwable ignored) { + ignored.printStackTrace(); + } + } + server.stop(); + } + + public Queue getProxyToQueue(String queueName) { + return server.locateQueue(SimpleString.toSimpleString(queueName)); + } + + private String connectorScheme = "amqp"; + private boolean useSSL; + + public String getTestName() { + return "jms.queue." + getName(); + } + + public AmqpClientTestSupport() { + } + + public AmqpClientTestSupport(String connectorScheme, boolean useSSL) { + this.connectorScheme = connectorScheme; + this.useSSL = useSSL; + } + + public String getConnectorScheme() { + return connectorScheme; + } + + public boolean isUseSSL() { + return useSSL; + } + + public String getAmqpConnectionURIOptions() { + return ""; + } + + protected boolean isUseTcpConnector() { + return !isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("ws"); + } + + protected boolean isUseSslConnector() { + return isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("wss"); + } + + protected boolean isUseNioConnector() { + return !isUseSSL() && connectorScheme.contains("nio"); + } + + protected boolean isUseNioPlusSslConnector() { + return isUseSSL() && connectorScheme.contains("nio"); + } + + protected boolean isUseWsConnector() { + return !isUseSSL() && connectorScheme.contains("ws"); + } + + protected boolean isUseWssConnector() { + return isUseSSL() && connectorScheme.contains("wss"); + } + + public URI getBrokerAmqpConnectionURI() { + boolean webSocket = false; + + try { + int port = 61616; + + String uri = null; + + if (isUseSSL()) { + if (webSocket) { + uri = "wss://127.0.0.1:" + port; + } + else { + uri = "ssl://127.0.0.1:" + port; + } + } + else { + if (webSocket) { + uri = "ws://127.0.0.1:" + port; + } + else { + uri = "tcp://127.0.0.1:" + port; + } + } + + if (!getAmqpConnectionURIOptions().isEmpty()) { + uri = uri + "?" + getAmqpConnectionURIOptions(); + } + + return new URI(uri); + } + catch (Exception e) { + throw new RuntimeException(); + } + } + + public AmqpConnection createAmqpConnection() throws Exception { + return createAmqpConnection(getBrokerAmqpConnectionURI()); + } + + public AmqpConnection createAmqpConnection(String username, String password) throws Exception { + return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password); + } + + public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception { + return createAmqpConnection(brokerURI, null, null); + } + + public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception { + return createAmqpClient(brokerURI, username, password).connect(); + } + + public AmqpClient createAmqpClient() throws Exception { + return createAmqpClient(getBrokerAmqpConnectionURI(), null, null); + } + + public AmqpClient createAmqpClient(URI brokerURI) throws Exception { + return createAmqpClient(brokerURI, null, null); + } + + public AmqpClient createAmqpClient(String username, String password) throws Exception { + return createAmqpClient(getBrokerAmqpConnectionURI(), username, password); + } + + public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception { + return new AmqpClient(brokerURI, username, password); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java new file mode 100644 index 0000000000..e84534f476 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -0,0 +1,625 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Before; +import org.junit.Test; + +/** + * Test various aspects of Transaction support. + */ +public class AmqpTransactionTest extends AmqpClientTestSupport { + + @Before + public void createQueue() throws Exception { + server.createQueue(SimpleString.toSimpleString(getTestName()), SimpleString.toSimpleString(getTestName()), null, true, false); + } + + @Test(timeout = 30000) + public void testBeginAndCommitTransaction() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + assertNotNull(session); + + session.begin(); + assertTrue(session.isInTransaction()); + session.commit(); + + connection.close(); + } + + @Test(timeout = 30000) + public void testBeginAndRollbackTransaction() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + assertNotNull(session); + + session.begin(); + assertTrue(session.isInTransaction()); + session.rollback(); + + connection.close(); + + System.err.println("Closed"); + } + + @Test(timeout = 60000) + public void testSendMessageToQueueWithCommit() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + final Queue queue = getProxyToQueue(getTestName()); + + session.begin(); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + assertEquals(0, queue.getMessageCount()); + + session.commit(); + + assertEquals(1, queue.getMessageCount()); + + sender.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageToQueueWithRollback() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + final Queue queue = getProxyToQueue(getTestName()); + + session.begin(); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + assertEquals(0, queue.getMessageCount()); + + session.rollback(); + + assertEquals(0, queue.getMessageCount()); + + sender.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiveMessageWithCommit() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + final Queue queue = getProxyToQueue(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + assertEquals(1, queue.getMessageCount()); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + session.begin(); + + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + session.commit(); + + assertEquals(0, queue.getMessageCount()); + + sender.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiveAfterConnectionClose() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + final Queue queue = getProxyToQueue(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + assertEquals(1, queue.getMessageCount()); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + session.begin(); + + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + // this will force a rollback on the TX (It should at least) + connection.close(); + + connection = addConnection(client.connect()); + session = connection.createSession(); + receiver = session.createReceiver(getTestName()); + session.begin(); + receiver.flow(1); + + received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + session.commit(); + + assertEquals(0, queue.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiveMessageWithRollback() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + final Queue queue = getProxyToQueue(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + assertEquals(1, queue.getMessageCount()); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + session.begin(); + + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + session.rollback(); + + assertEquals(1, queue.getMessageCount()); + + sender.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testMultipleSessionReceiversInSingleTXNWithCommit() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + // Load up the Queue with some messages + { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getTestName()); + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + sender.send(message); + sender.send(message); + sender.close(); + } + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Create some sender sessions + AmqpSession session1 = connection.createSession(); + AmqpSession session2 = connection.createSession(); + AmqpSession session3 = connection.createSession(); + + // Sender linked to each session + AmqpReceiver receiver1 = session1.createReceiver(getTestName()); + AmqpReceiver receiver2 = session2.createReceiver(getTestName()); + AmqpReceiver receiver3 = session3.createReceiver(getTestName()); + + final Queue queue = getProxyToQueue(getTestName()); + assertEquals(3, queue.getMessageCount()); + + // Begin the transaction that all senders will operate in. + txnSession.begin(); + + assertTrue(txnSession.isInTransaction()); + + receiver1.flow(1); + receiver2.flow(1); + receiver3.flow(1); + + AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS); + AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS); + + message1.accept(txnSession); + message2.accept(txnSession); + message3.accept(txnSession); + + assertEquals(3, queue.getMessageCount()); + + txnSession.commit(); + + assertEquals(0, queue.getMessageCount()); + } + + @Test(timeout = 60000) + public void testMultipleSessionReceiversInSingleTXNWithRollback() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + // Load up the Queue with some messages + { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getTestName()); + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + sender.send(message); + sender.send(message); + sender.close(); + } + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Create some sender sessions + AmqpSession session1 = connection.createSession(); + AmqpSession session2 = connection.createSession(); + AmqpSession session3 = connection.createSession(); + + // Sender linked to each session + AmqpReceiver receiver1 = session1.createReceiver(getTestName()); + AmqpReceiver receiver2 = session2.createReceiver(getTestName()); + AmqpReceiver receiver3 = session3.createReceiver(getTestName()); + + final Queue queue = getProxyToQueue(getTestName()); + assertEquals(3, queue.getMessageCount()); + + // Begin the transaction that all senders will operate in. + txnSession.begin(); + + assertTrue(txnSession.isInTransaction()); + + receiver1.flow(1); + receiver2.flow(1); + receiver3.flow(1); + + AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS); + AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS); + + message1.accept(txnSession); + message2.accept(txnSession); + message3.accept(txnSession); + + assertEquals(3, queue.getMessageCount()); + + txnSession.rollback(); + + assertEquals(3, queue.getMessageCount()); + } + + @Test(timeout = 60000) + public void testMultipleSessionSendersInSingleTXNWithCommit() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Create some sender sessions + AmqpSession session1 = connection.createSession(); + AmqpSession session2 = connection.createSession(); + AmqpSession session3 = connection.createSession(); + + // Sender linked to each session + AmqpSender sender1 = session1.createSender(getTestName()); + AmqpSender sender2 = session2.createSender(getTestName()); + AmqpSender sender3 = session3.createSender(getTestName()); + + final Queue queue = getProxyToQueue(getTestName()); + assertEquals(0, queue.getMessageCount()); + + // Begin the transaction that all senders will operate in. + txnSession.begin(); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + + assertTrue(txnSession.isInTransaction()); + + sender1.send(message, txnSession.getTransactionId()); + sender2.send(message, txnSession.getTransactionId()); + sender3.send(message, txnSession.getTransactionId()); + + assertEquals(0, queue.getMessageCount()); + + txnSession.commit(); + + assertEquals(3, queue.getMessageCount()); + } + + @Test(timeout = 60000) + public void testMultipleSessionSendersInSingleTXNWithRollback() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Create some sender sessions + AmqpSession session1 = connection.createSession(); + AmqpSession session2 = connection.createSession(); + AmqpSession session3 = connection.createSession(); + + // Sender linked to each session + AmqpSender sender1 = session1.createSender(getTestName()); + AmqpSender sender2 = session2.createSender(getTestName()); + AmqpSender sender3 = session3.createSender(getTestName()); + + final Queue queue = getProxyToQueue(getTestName()); + assertEquals(0, queue.getMessageCount()); + + // Begin the transaction that all senders will operate in. + txnSession.begin(); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + + assertTrue(txnSession.isInTransaction()); + + sender1.send(message, txnSession.getTransactionId()); + sender2.send(message, txnSession.getTransactionId()); + sender3.send(message, txnSession.getTransactionId()); + + assertEquals(0, queue.getMessageCount()); + + txnSession.rollback(); + + assertEquals(0, queue.getMessageCount()); + } + + //----- Tests Ported from AmqpNetLite client -----------------------------// + + @Test(timeout = 60000) + public void testSendersCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception { + final int NUM_MESSAGES = 5; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Commit TXN work from a sender. + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message, txnSession.getTransactionId()); + } + txnSession.commit(); + + // Rollback an additional batch of TXN work from a sender. + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message, txnSession.getTransactionId()); + } + txnSession.rollback(); + + // Commit more TXN work from a sender. + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message, txnSession.getTransactionId()); + } + txnSession.commit(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(NUM_MESSAGES * 2); + for (int i = 0; i < NUM_MESSAGES * 2; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(txnSession); + } + + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception { + final int NUM_MESSAGES = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getTestName()); + + for (int i = 0; i < NUM_MESSAGES + 1; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", i); + sender.send(message, txnSession.getTransactionId()); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver(getTestName()); + ArrayList messages = new ArrayList<>(NUM_MESSAGES); + receiver.flow((NUM_MESSAGES + 2) * 2); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + messages.add(message); + } + + // Commit half the consumed messages + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES / 2; ++i) { + messages.get(i).accept(txnSession); + } + txnSession.commit(); + + // Rollback the other half the consumed messages + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + messages.get(i).accept(txnSession); + } + txnSession.rollback(); + + { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.release(); + } + + // Commit the other half the consumed messages + // This is a variation from the .NET client tests which doesn't settle the + // messages in the TX until commit is called but on ActiveMQ they will be + // redispatched regardless and not stay in the acquired state. + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + } + txnSession.commit(); + + // The final message should still be pending. + { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + receiver.flow(1); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.release(); + } + + connection.close(); + } + + @Test(timeout = 60000) + public void testCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception { + final int NUM_MESSAGES = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getTestName()); + + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", i); + sender.send(message, txnSession.getTransactionId()); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(2); + AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS); + + // Accept the first one in a TXN and send a new message in that TXN as well + txnSession.begin(); + { + message1.accept(txnSession); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", NUM_MESSAGES); + + sender.send(message, txnSession.getTransactionId()); + } + txnSession.commit(); + + // Accept the second one in a TXN and send a new message in that TXN as well but rollback + txnSession.begin(); + { + message2.accept(txnSession); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", NUM_MESSAGES + 1); + sender.send(message, txnSession.getTransactionId()); + } + txnSession.rollback(); + + // Variation here from .NET code, the client settles the accepted message where + // the .NET client does not and instead releases here to have it redelivered. + + receiver.flow(NUM_MESSAGES); + for (int i = 1; i <= NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + assertEquals(i, message.getApplicationProperty("msgId")); + message.accept(); + } + + // Should be nothing left. + assertNull(receiver.receive(1, TimeUnit.SECONDS)); + + connection.close(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 193b46b9be..984459ed20 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -815,7 +815,8 @@ public class ProtonTest extends ProtonTestBase { request.setText("[]"); sender.send(request); - AmqpMessage response = receiver.receive(); + AmqpMessage response = receiver.receive(50, TimeUnit.SECONDS); + Assert.assertNotNull(response); assertNotNull(response); Object section = response.getWrappedMessage().getBody(); assertTrue(section instanceof AmqpValue); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java index 0acd4aef94..cec59da7bb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java @@ -41,7 +41,6 @@ public class ProtonTestBase extends ActiveMQTestBase { @Before public void setUp() throws Exception { super.setUp(); - disableCheckThread(); server = this.createServer(true, true); HashMap params = new HashMap<>(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java index b4a6a34743..d23bd5b636 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java @@ -43,7 +43,6 @@ public class ProtonTestForHeader extends ActiveMQTestBase { @Before public void setUp() throws Exception { super.setUp(); - disableCheckThread(); server = this.createServer(true, true); HashMap params = new HashMap<>(); params.put(TransportConstants.PORT_PROP_NAME, "5672"); @@ -61,8 +60,6 @@ public class ProtonTestForHeader extends ActiveMQTestBase { @After public void tearDown() throws Exception { try { - Thread.sleep(250); - server.stop(); } finally {