From aa32a0f7925c4981aca9a4369b5e95f3336cde94 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 28 Sep 2016 14:56:36 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6444 Ensure that unsettled TX messages remain acquired and not redelivered to the receiver. Add several tests that demonstrate that a received message can be released, rejected, accepted or modified after a TX rollback if it was not settled. (cherry picked from commit 0dd806f43f3bee9372ee9b9481089d417c265dfe) --- .../amqp/protocol/AmqpAbstractReceiver.java | 5 +- .../transport/amqp/protocol/AmqpLink.java | 11 +- .../transport/amqp/protocol/AmqpSender.java | 64 ++++--- .../transport/amqp/protocol/AmqpSession.java | 15 +- .../protocol/AmqpTransactionCoordinator.java | 10 +- .../amqp/interop/AmqpTransactionTest.java | 178 ++++++++++++++++-- .../JMSMappingOutboundTransformerTest.java | 10 +- 7 files changed, 232 insertions(+), 61 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java index 7ed2f92269..9ed465a2c8 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp.protocol; +import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.transport.amqp.AmqpProtocolException; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; @@ -78,11 +79,11 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink { } @Override - public void commit() throws Exception { + public void commit(LocalTransactionId txnId) throws Exception { } @Override - public void rollback() throws Exception { + public void rollback(LocalTransactionId txnId) throws Exception { } @Override diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java index d2457699ef..0c75f48933 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.protocol; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.LocalTransactionId; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; @@ -60,17 +61,23 @@ public interface AmqpLink extends AmqpResource { * Handle work necessary on commit of transacted resources associated with * this Link instance. * + * @param txnId + * The Transaction ID being committed. + * * @throws Exception if an error occurs while performing the commit. */ - void commit() throws Exception; + void commit(LocalTransactionId txnId) throws Exception; /** * Handle work necessary on rollback of transacted resources associated with * this Link instance. * + * @param txnId + * The Transaction ID being rolled back. + * * @throws Exception if an error occurs while performing the rollback. */ - void rollback() throws Exception; + void rollback(LocalTransactionId txnId) throws Exception; /** * @return the ActiveMQDestination that this link is servicing. diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 2531c1aadf..149b2e8599 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -78,7 +78,7 @@ public class AmqpSender extends AmqpAbstractLink { private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(); private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator(); private final LinkedList outbound = new LinkedList(); - private final LinkedList dispatchedInTx = new LinkedList(); + private final LinkedList dispatchedInTx = new LinkedList(); private final ConsumerInfo consumerInfo; private AbstractSubscription subscription; @@ -208,26 +208,26 @@ public class AmqpSender extends AmqpAbstractLink { } else if (endpointCredit >= 0) { if (endpointCredit == 0 && currentCreditRequest != 0) { - prefetchExtension.set(0); currentCreditRequest = 0; logicalDeliveryCount = 0; LOG.trace("Flow: credit 0 for sub:" + subscription); - } else { - int deltaToAdd = endpointCredit; int logicalCredit = currentCreditRequest - logicalDeliveryCount; if (logicalCredit > 0) { deltaToAdd -= logicalCredit; } else { - // reset delivery counter - dispatch from broker concurrent with credit=0 flow can go negative + // reset delivery counter - dispatch from broker concurrent with credit=0 + // flow can go negative logicalDeliveryCount = 0; } + if (deltaToAdd > 0) { currentCreditRequest = prefetchExtension.addAndGet(deltaToAdd); subscription.wakeupDestinationsForDispatch(); - // force dispatch of matched/pending for topics (pending messages accumulate in the sub and are dispatched on update of prefetch) + // force dispatch of matched/pending for topics (pending messages accumulate + // in the sub and are dispatched on update of prefetch) subscription.setPrefetchSize(0); LOG.trace("Flow: credit addition of {} for sub {}", deltaToAdd, subscription); } @@ -246,14 +246,20 @@ public class AmqpSender extends AmqpAbstractLink { if (txState.getOutcome() != null) { Outcome outcome = txState.getOutcome(); if (outcome instanceof Accepted) { + TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId())); + + // Store the message sent in this TX we might need to re-send on rollback + // and we need to ACK it on commit. + session.enlist(txId); + dispatchedInTx.addFirst(delivery); + if (!delivery.remotelySettled()) { TransactionalState txAccepted = new TransactionalState(); txAccepted.setOutcome(Accepted.getInstance()); - txAccepted.setTxnId(((TransactionalState) state).getTxnId()); + txAccepted.setTxnId(txState.getTxnId()); delivery.disposition(txAccepted); } - settle(delivery, MessageAck.DELIVERED_ACK_TYPE); } } } else { @@ -294,12 +300,14 @@ public class AmqpSender extends AmqpAbstractLink { } @Override - public void commit() throws Exception { + public void commit(LocalTransactionId txnId) throws Exception { if (!dispatchedInTx.isEmpty()) { - for (MessageDispatch md : dispatchedInTx) { - MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); - pendingTxAck.setFirstMessageId(md.getMessage().getMessageId()); - pendingTxAck.setTransactionId(md.getMessage().getTransactionId()); + for (final Delivery delivery : dispatchedInTx) { + MessageDispatch dispatch = (MessageDispatch) delivery.getContext(); + + MessageAck pendingTxAck = new MessageAck(dispatch, MessageAck.INDIVIDUAL_ACK_TYPE, 1); + pendingTxAck.setFirstMessageId(dispatch.getMessage().getMessageId()); + pendingTxAck.setTransactionId(txnId); LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck); @@ -310,6 +318,8 @@ public class AmqpSender extends AmqpAbstractLink { Throwable exception = ((ExceptionResponse) response).getException(); exception.printStackTrace(); getEndpoint().close(); + } else { + delivery.settle(); } session.pumpProtonToSocket(); } @@ -321,15 +331,22 @@ public class AmqpSender extends AmqpAbstractLink { } @Override - public void rollback() throws Exception { + public void rollback(LocalTransactionId txnId) throws Exception { synchronized (outbound) { LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size()); - for (MessageDispatch dispatch : dispatchedInTx) { - dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1); + for (Delivery delivery : dispatchedInTx) { + // Only settled deliveries should be re-dispatched, unsettled deliveries + // remain acquired on the remote end and can be accepted again in a new + // TX or released or rejected etc. + MessageDispatch dispatch = (MessageDispatch) delivery.getContext(); dispatch.getMessage().setTransactionId(null); - outbound.addFirst(dispatch); + + if (delivery.remotelySettled()) { + dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1); + outbound.addFirst(dispatch); + } } dispatchedInTx.clear(); @@ -507,19 +524,6 @@ public class AmqpSender extends AmqpAbstractLink { ack.setMessageCount(1); ack.setAckType((byte) ackType); ack.setDestination(md.getDestination()); - - DeliveryState remoteState = delivery.getRemoteState(); - if (remoteState != null && remoteState instanceof TransactionalState) { - TransactionalState txState = (TransactionalState) remoteState; - TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId())); - ack.setTransactionId(txId); - - // Store the message sent in this TX we might need to re-send on rollback - session.enlist(txId); - md.getMessage().setTransactionId(txId); - dispatchedInTx.addFirst(md); - } - LOG.trace("Sending Ack to ActiveMQ: {}", ack); sendToActiveMQ(ack, new ResponseHandler() { diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java index 4cb5f37941..1c91962082 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java @@ -35,6 +35,7 @@ import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveInfo; @@ -123,11 +124,14 @@ public class AmqpSession implements AmqpResource { /** * Commits all pending work for all resources managed under this session. * + * @param txId + * The specific TransactionId that is being committed. + * * @throws Exception if an error occurs while attempting to commit work. */ - public void commit() throws Exception { + public void commit(LocalTransactionId txId) throws Exception { for (AmqpSender consumer : consumers.values()) { - consumer.commit(); + consumer.commit(txId); } enlisted = false; @@ -136,11 +140,14 @@ public class AmqpSession implements AmqpResource { /** * Rolls back any pending work being down under this session. * + * @param txId + * The specific TransactionId that is being rolled back. + * * @throws Exception if an error occurs while attempting to roll back work. */ - public void rollback() throws Exception { + public void rollback(LocalTransactionId txId) throws Exception { for (AmqpSender consumer : consumers.values()) { - consumer.rollback(); + consumer.rollback(txId); } enlisted = false; diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java index 40bcda5d47..95cd5e372a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java @@ -98,7 +98,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { TransactionInfo txInfo = new TransactionInfo(connectionId, txId, TransactionInfo.BEGIN); session.getConnection().registerTransaction(txId, this); sendToActiveMQ(txInfo, null); - LOG.trace("started transaction {}", txId.getValue()); + LOG.trace("started transaction {}", txId); Declared declared = new Declared(); declared.setTxnId(new Binary(toBytes(txId.getValue()))); @@ -110,18 +110,18 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { final byte operation; if (discharge.getFail()) { - LOG.trace("rollback transaction {}", txId.getValue()); + LOG.trace("rollback transaction {}", txId); operation = TransactionInfo.ROLLBACK; } else { - LOG.trace("commit transaction {}", txId.getValue()); + LOG.trace("commit transaction {}", txId); operation = TransactionInfo.COMMIT_ONE_PHASE; } for (AmqpSession txSession : txSessions) { if (operation == TransactionInfo.ROLLBACK) { - txSession.rollback(); + txSession.rollback(txId); } else { - txSession.commit(); + txSession.commit(txId); } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java index 0815f8ab9a..f61cbc3fbc 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java @@ -32,7 +32,11 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; -import org.junit.Ignore; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Outcome; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Released; import org.junit.Test; /** @@ -89,7 +93,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { assertEquals(1, queue.getQueueSize()); - sender.close(); connection.close(); } @@ -114,7 +117,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { assertEquals(0, queue.getQueueSize()); - sender.close(); connection.close(); } @@ -146,7 +148,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { assertEquals(0, queue.getQueueSize()); - sender.close(); connection.close(); } @@ -194,7 +195,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { connection.close(); } - @Test(timeout = 60000) public void testReceiveMessageWithRollback() throws Exception { AmqpClient client = createAmqpClient(); @@ -223,7 +223,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { assertEquals(1, queue.getQueueSize()); - sender.close(); connection.close(); } @@ -421,6 +420,163 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { assertEquals(0, queue.getQueueSize()); } + @Test(timeout = 60000) + public void testAcceptedButNotSettledInTXRemainsAquiredCanBeAccepted() throws Exception { + doTestAcceptedButNotSettledInTXRemainsAquired(Accepted.getInstance()); + } + + @Test(timeout = 60000) + public void testAcceptedButNotSettledInTXRemainsAquiredCanBeReleased() throws Exception { + doTestAcceptedButNotSettledInTXRemainsAquired(Released.getInstance()); + } + + @Test(timeout = 60000) + public void testAcceptedButNotSettledInTXRemainsAquiredCanBeRejected() throws Exception { + doTestAcceptedButNotSettledInTXRemainsAquired(new Rejected()); + } + + @Test(timeout = 60000) + public void testAcceptedButNotSettledInTXRemainsAquiredCanBeModifiedAsFailed() throws Exception { + Modified outcome = new Modified(); + outcome.setDeliveryFailed(true); + doTestAcceptedButNotSettledInTXRemainsAquired(outcome); + } + + @Test(timeout = 60000) + public void testAcceptedButNotSettledInTXRemainsAquiredCanBeModifiedAsUndeliverable() throws Exception { + Modified outcome = new Modified(); + outcome.setDeliveryFailed(true); + outcome.setUndeliverableHere(true); + doTestAcceptedButNotSettledInTXRemainsAquired(outcome); + } + + private void doTestAcceptedButNotSettledInTXRemainsAquired(Outcome outcome) throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + + final QueueViewMBean queue = getProxyToQueue(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + session.begin(); + + receiver.flow(10); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(false); + + session.rollback(); + + // Message should remain acquired an not be redelivered. + assertEquals(1, queue.getQueueSize()); + assertNull(receiver.receive(2, TimeUnit.SECONDS)); + + if (outcome instanceof Released || outcome instanceof Rejected) { + // Receiver should be able to release the still acquired message and the + // broker should redispatch it to the client again. + received.release(); + received = receiver.receive(3, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + received = receiver.receive(2, TimeUnit.SECONDS); + assertNull(received); + assertEquals(0, queue.getQueueSize()); + } else if (outcome instanceof Accepted) { + // Receiver should be able to accept the still acquired message and the + // broker should then mark it as consumed. + received.accept(); + received = receiver.receive(2, TimeUnit.SECONDS); + assertNull(received); + assertEquals(0, queue.getQueueSize()); + } else if (outcome instanceof Modified) { + // Depending on the undeliverable here state the message will either be + // redelivered or DLQ'd + Modified modified = (Modified) outcome; + received.modified(Boolean.TRUE.equals(modified.getDeliveryFailed()), Boolean.TRUE.equals(modified.getUndeliverableHere())); + if (Boolean.TRUE.equals(modified.getUndeliverableHere())) { + received = receiver.receive(2, TimeUnit.SECONDS); + assertNull(received); + assertEquals(0, queue.getQueueSize()); + } else { + received = receiver.receive(3, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + received = receiver.receive(2, TimeUnit.SECONDS); + assertNull(received); + assertEquals(0, queue.getQueueSize()); + } + } + + connection.close(); + } + + @Test(timeout = 60000) + public void testTransactionallyAcquiredMessageCanBeTransactionallyConsumed() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + + final QueueViewMBean queue = getProxyToQueue(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + session.begin(); + + receiver.flow(10); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(false); + + session.rollback(); + + // Message should remain acquired an not be redelivered. + assertEquals(1, queue.getQueueSize()); + assertNull(receiver.receive(1, TimeUnit.SECONDS)); + + // Consume under TX but settle this time + session.begin(); + received.accept(false); + session.rollback(); + + // Should still be acquired + assertEquals(1, queue.getQueueSize()); + assertNull(receiver.receive(1, TimeUnit.SECONDS)); + + // Consume under TX and settle but rollback, message should be redelivered. + session.begin(); + received.accept(); + session.rollback(); + + assertEquals(1, queue.getQueueSize()); + received = receiver.receive(1, TimeUnit.SECONDS); + assertNotNull(received); + + // Consume under TX and commit it this time. + session.begin(); + received.accept(false); + session.commit(); + + // Check that it is now consumed and no more message available + assertTrue(received.getWrappedDelivery().remotelySettled()); + assertEquals(0, queue.getQueueSize()); + assertNull(receiver.receive(1, TimeUnit.SECONDS)); + + connection.close(); + } + //----- Tests Ported from AmqpNetLite client -----------------------------// @Test(timeout = 60000) @@ -621,9 +777,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { connection.close(); } - // TODO - Direct ports of the AmqpNetLite client tests that don't currently with this broker. - - @Ignore("Fails due to no support for TX enrollment without settlement.") @Test(timeout = 60000) public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception { final int NUM_MESSAGES = 10; @@ -701,7 +854,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { connection.close(); } - @Ignore("Fails due to no support for TX enrollment without settlement.") @Test(timeout = 60000) public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception { final int NUM_MESSAGES = 10; @@ -756,12 +908,12 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { message2.release(); - // Should be two message available for dispatch given that we sent and committed one, and + // Should be ten message available for dispatch given that we sent and committed one, and // releases another we had previously received. - receiver.flow(2); + receiver.flow(10); for (int i = 1; i <= NUM_MESSAGES; ++i) { AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(message); + assertNotNull("Expected a message for: " + i, message); assertEquals(i, message.getApplicationProperty("msgId")); message.accept(); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java index d0d31ccfcc..ee69650487 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java @@ -480,7 +480,7 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertObjectMessageToAmqpMessageWithDataBody() throws Exception { - ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID()); + ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE); outbound.onSend(); outbound.storeContent(); @@ -502,7 +502,7 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception { - ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID()); + ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE); outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN); outbound.onSend(); outbound.storeContent(); @@ -525,7 +525,7 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception { - ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID()); + ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE); outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); outbound.onSend(); outbound.storeContent(); @@ -571,7 +571,7 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception { - ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true); + ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true); outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN); outbound.onSend(); outbound.storeContent(); @@ -594,7 +594,7 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception { - ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true); + ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true); outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); outbound.onSend(); outbound.storeContent();