From fa5514985d74ff2d0bfd7faebb28452a2a7eed2e Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 14 Sep 2016 18:23:52 -0400 Subject: [PATCH] NO-JIRA AMQP Test updates Adds support for doing sends and receives that are enrolled in a transaction created in a session other than the session that created the sender or receiver. Adds some tests that show this in action. --- .../transport/amqp/client/AmqpMessage.java | 19 +- .../transport/amqp/client/AmqpReceiver.java | 26 +++ .../transport/amqp/client/AmqpSender.java | 29 ++- .../transport/amqp/client/AmqpSession.java | 8 +- .../amqp/interop/AmqpTransactionTest.java | 194 ++++++++++++++++++ 5 files changed, 269 insertions(+), 7 deletions(-) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 99f4cfb867..8b378e1b9f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -139,6 +139,22 @@ public class AmqpMessage { receiver.accept(delivery); } + /** + * Accepts the message marking it as consumed on the remote peer. + * + * @param session + * The session that is used to manage acceptance of the message. + * + * @throws Exception if an error occurs during the accept. + */ + public void accept(AmqpSession txnSession) throws Exception { + if (receiver == null) { + throw new IllegalStateException("Can't accept non-received message."); + } + + receiver.accept(delivery, txnSession); + } + /** * Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here. * @@ -374,7 +390,7 @@ public class AmqpMessage { * @param key * the name used to lookup the property in the application properties. * - * @return the propety value or null if not set. + * @return the property value or null if not set. */ public Object getApplicationProperty(String key) { if (applicationPropertiesMap == null) { @@ -560,6 +576,7 @@ public class AmqpMessage { message.setHeader(new Header()); } } + private void lazyCreateProperties() { if (message.getProperties() == null) { message.setProperties(new Properties()); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 77a529d865..999e033f89 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -422,12 +422,38 @@ public class AmqpReceiver extends AmqpAbstractResource { * @throws IOException if an error occurs while sending the accept. */ public void accept(final Delivery delivery) throws IOException { + accept(delivery, this.session); + } + + /** + * Accepts a message that was dispatched under the given Delivery instance. + * + * This method allows for the session that is used in the accept to be specified by the + * caller. This allows for an accepted message to be involved in a transaction that is + * being managed by some other session other than the one that created this receiver. + * + * @param delivery + * the Delivery instance to accept. + * @param session + * the session under which the message is being accepted. + * + * @throws IOException if an error occurs while sending the accept. + */ + public void accept(final Delivery delivery, final AmqpSession session) throws IOException { checkClosed(); if (delivery == null) { throw new IllegalArgumentException("Delivery to accept cannot be null"); } + if (session == null) { + throw new IllegalArgumentException("Session given cannot be null"); + } + + if (session.getConnection() != this.session.getConnection()) { + throw new IllegalArgumentException("The session used for accept must originate from the connection that created this receiver."); + } + final ClientFuture request = new ClientFuture(); session.getScheduler().execute(new Runnable() { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index f9d64354c5..dd3a3719a4 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -126,6 +126,21 @@ public class AmqpSender extends AmqpAbstractResource { * @throws IOException if an error occurs during the send. */ public void send(final AmqpMessage message) throws IOException { + checkClosed(); + send(message, null); + } + + /** + * Sends the given message to this senders assigned address using the supplied transaction ID. + * + * @param message + * the message to send. + * @param txId + * the transaction ID to assign the outgoing send. + * + * @throws IOException if an error occurs during the send. + */ + public void send(final AmqpMessage message, final AmqpTransactionId txId) throws IOException { checkClosed(); final ClientFuture sendRequest = new ClientFuture(); @@ -134,7 +149,7 @@ public class AmqpSender extends AmqpAbstractResource { @Override public void run() { try { - doSend(message, sendRequest); + doSend(message, sendRequest, txId); session.pumpToProtonTransport(sendRequest); } catch (Exception e) { sendRequest.onFailure(e); @@ -319,7 +334,7 @@ public class AmqpSender extends AmqpAbstractResource { } } - private void doSend(AmqpMessage message, AsyncResult request) throws Exception { + private void doSend(AmqpMessage message, AsyncResult request, AmqpTransactionId txId) throws Exception { LOG.trace("Producer sending message: {}", message); Delivery delivery = null; @@ -332,8 +347,14 @@ public class AmqpSender extends AmqpAbstractResource { delivery.setContext(request); - if (session.isInTransaction()) { - Binary amqpTxId = session.getTransactionId().getRemoteTxId(); + Binary amqpTxId = null; + if (txId != null) { + amqpTxId = txId.getRemoteTxId(); + } else if (session.isInTransaction()) { + amqpTxId = session.getTransactionId().getRemoteTxId(); + } + + if (amqpTxId != null) { TransactionalState state = new TransactionalState(); state.setTxnId(amqpTxId); delivery.disposition(state); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index ae99f6541c..38046035e9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -464,8 +464,12 @@ public class AmqpSession extends AmqpAbstractResource { connection.pumpToProtonTransport(request); } - AmqpTransactionId getTransactionId() { - return txContext.getTransactionId(); + public AmqpTransactionId getTransactionId() { + if (txContext != null && txContext.isInTransaction()) { + return txContext.getTransactionId(); + } + + return null; } AmqpTransactionContext getTransactionContext() { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java index a9982906e6..97089a931f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java @@ -178,4 +178,198 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { sender.close(); connection.close(); } + + @Test(timeout = 60000) + public void testMultipleSessionReceiversInSingleTXNWithCommit() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Load up the Queue with some messages + { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("queue://" + getTestName()); + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + sender.send(message); + sender.send(message); + sender.close(); + } + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Create some sender sessions + AmqpSession session1 = connection.createSession(); + AmqpSession session2 = connection.createSession(); + AmqpSession session3 = connection.createSession(); + + // Sender linked to each session + AmqpReceiver receiver1 = session1.createReceiver("queue://" + getTestName()); + AmqpReceiver receiver2 = session2.createReceiver("queue://" + getTestName()); + AmqpReceiver receiver3 = session3.createReceiver("queue://" + getTestName()); + + final QueueViewMBean queue = getProxyToQueue(getTestName()); + assertEquals(3, queue.getQueueSize()); + + // Begin the transaction that all senders will operate in. + txnSession.begin(); + + assertTrue(txnSession.isInTransaction()); + + receiver1.flow(1); + receiver2.flow(1); + receiver3.flow(1); + + AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS); + AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS); + + message1.accept(txnSession); + message2.accept(txnSession); + message3.accept(txnSession); + + assertEquals(3, queue.getQueueSize()); + + txnSession.commit(); + + assertEquals(0, queue.getQueueSize()); + } + + @Test(timeout = 60000) + public void testMultipleSessionReceiversInSingleTXNWithRollback() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Load up the Queue with some messages + { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("queue://" + getTestName()); + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + sender.send(message); + sender.send(message); + sender.close(); + } + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Create some sender sessions + AmqpSession session1 = connection.createSession(); + AmqpSession session2 = connection.createSession(); + AmqpSession session3 = connection.createSession(); + + // Sender linked to each session + AmqpReceiver receiver1 = session1.createReceiver("queue://" + getTestName()); + AmqpReceiver receiver2 = session2.createReceiver("queue://" + getTestName()); + AmqpReceiver receiver3 = session3.createReceiver("queue://" + getTestName()); + + final QueueViewMBean queue = getProxyToQueue(getTestName()); + assertEquals(3, queue.getQueueSize()); + + // Begin the transaction that all senders will operate in. + txnSession.begin(); + + assertTrue(txnSession.isInTransaction()); + + receiver1.flow(1); + receiver2.flow(1); + receiver3.flow(1); + + AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS); + AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS); + + message1.accept(txnSession); + message2.accept(txnSession); + message3.accept(txnSession); + + assertEquals(3, queue.getQueueSize()); + + txnSession.rollback(); + + assertEquals(3, queue.getQueueSize()); + } + + @Test(timeout = 60000) + public void testMultipleSessionSendersInSingleTXNWithCommit() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Create some sender sessions + AmqpSession session1 = connection.createSession(); + AmqpSession session2 = connection.createSession(); + AmqpSession session3 = connection.createSession(); + + // Sender linked to each session + AmqpSender sender1 = session1.createSender("queue://" + getTestName()); + AmqpSender sender2 = session2.createSender("queue://" + getTestName()); + AmqpSender sender3 = session3.createSender("queue://" + getTestName()); + + final QueueViewMBean queue = getProxyToQueue(getTestName()); + assertEquals(0, queue.getQueueSize()); + + // Begin the transaction that all senders will operate in. + txnSession.begin(); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + + assertTrue(txnSession.isInTransaction()); + + sender1.send(message, txnSession.getTransactionId()); + sender2.send(message, txnSession.getTransactionId()); + sender3.send(message, txnSession.getTransactionId()); + + assertEquals(0, queue.getQueueSize()); + + txnSession.commit(); + + assertEquals(3, queue.getQueueSize()); + } + + @Test(timeout = 60000) + public void testMultipleSessionSendersInSingleTXNWithRollback() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Create some sender sessions + AmqpSession session1 = connection.createSession(); + AmqpSession session2 = connection.createSession(); + AmqpSession session3 = connection.createSession(); + + // Sender linked to each session + AmqpSender sender1 = session1.createSender("queue://" + getTestName()); + AmqpSender sender2 = session2.createSender("queue://" + getTestName()); + AmqpSender sender3 = session3.createSender("queue://" + getTestName()); + + final QueueViewMBean queue = getProxyToQueue(getTestName()); + assertEquals(0, queue.getQueueSize()); + + // Begin the transaction that all senders will operate in. + txnSession.begin(); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + + assertTrue(txnSession.isInTransaction()); + + sender1.send(message, txnSession.getTransactionId()); + sender2.send(message, txnSession.getTransactionId()); + sender3.send(message, txnSession.getTransactionId()); + + assertEquals(0, queue.getQueueSize()); + + txnSession.rollback(); + + assertEquals(0, queue.getQueueSize()); + } }