From aa03f295f523ab00d2b19d54a36f4140bef18d0b Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 8 Apr 2020 14:57:49 +0100 Subject: [PATCH] AMQ-7464 - ensure message.copy before server session run dispatch --- .../org/apache/activemq/ActiveMQSession.java | 15 ++- .../apache/activemq/RedeliveryPolicyTest.java | 106 +++++++++++++++++- 2 files changed, 117 insertions(+), 4 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 5910634f68..bef6f4e21c 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -73,6 +73,7 @@ import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Command; +import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; @@ -882,7 +883,16 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta MessageDispatch messageDispatch; while ((messageDispatch = executor.dequeueNoWait()) != null) { final MessageDispatch md = messageDispatch; - final ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); + + // subset of org.apache.activemq.ActiveMQMessageConsumer.createActiveMQMessage + final ActiveMQMessage message = (ActiveMQMessage)md.getMessage().copy(); + if (message.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) { + ((ActiveMQBlobMessage)message).setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); + } + if (message.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) { + ((ActiveMQObjectMessage)message).setTrustAllPackages(getConnection().isTrustAllPackages()); + ((ActiveMQObjectMessage)message).setTrustedPackages(getConnection().getTrustedPackages()); + } MessageAck earlyAck = null; if (message.isExpired()) { @@ -951,7 +961,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta @Override public void afterRollback() throws Exception { if (LOG.isTraceEnabled()) { - LOG.trace("rollback {}", ack, new Throwable("here")); + LOG.trace("afterRollback {}", ack, new Throwable("here")); } // ensure we don't filter this as a duplicate connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); @@ -979,6 +989,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); ack.setFirstMessageId(md.getMessage().getMessageId()); ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); + LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack); asyncSendPacket(ack); } else { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java index a0a1ca8e5d..5f325a4918 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -149,7 +149,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport { /** * @throws Exception */ - public void testNornalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception { + public void testNormalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception { // Receive a message with the JMS API RedeliveryPolicy policy = connection.getRedeliveryPolicy(); @@ -742,7 +742,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport { public void onMessage(Message message) { try { ActiveMQTextMessage m = (ActiveMQTextMessage) message; - LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId()); + LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId() + ", redeliveryCount: " + m.getRedeliveryCounter()); assertEquals("1st", m.getText()); assertEquals(receivedCount.get(), m.getRedeliveryCounter()); receivedCount.incrementAndGet(); @@ -802,6 +802,108 @@ public class RedeliveryPolicyTest extends JmsTestSupport { } + + public void testRepeatedRedeliveryNoCommitForwardMessage() throws Exception { + + connection.start(); + Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = dlqSession.createProducer(destination); + + // Send the messages + producer.send(dlqSession.createTextMessage("1st")); + + dlqSession.commit(); + MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + + final MessageProducer forwardingProducer = dlqSession.createProducer(new ActiveMQQueue("TEST_FORWARD")); + + // Send the messages + + final int maxRedeliveries = 2; + final AtomicInteger receivedCount = new AtomicInteger(0); + + for (int i=0;i<=maxRedeliveries+1;i++) { + connection = (ActiveMQConnection)factory.createConnection(userName, password); + connections.add(connection); + + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(maxRedeliveries); + + connection.start(); + final CountDownLatch done = new CountDownLatch(1); + + final ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED); + session.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + ActiveMQTextMessage m = (ActiveMQTextMessage) message; + LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId() + " ,Redelivery: " + m.getRedeliveryCounter()); + assertEquals("1st", m.getText()); + assertEquals(receivedCount.get(), m.getRedeliveryCounter()); + receivedCount.incrementAndGet(); + + // do a forward of the received message, will reset the messageID + forwardingProducer.send(message); + done.countDown(); + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + }); + + connection.createConnectionConsumer( + destination, + null, + new ServerSessionPool() { + @Override + public ServerSession getServerSession() throws JMSException { + return new ServerSession() { + @Override + public Session getSession() throws JMSException { + return session; + } + + @Override + public void start() throws JMSException { + } + }; + } + }, + 100, + false); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + session.run(); + return done.await(10, TimeUnit.MILLISECONDS); + } + }, 5000); + + if (i<=maxRedeliveries) { + assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS)); + } else { + // final redelivery gets poisoned before dispatch + assertFalse("listener not done @" + i, done.await(5, TimeUnit.SECONDS)); + } + connection.close(); + connections.remove(connection); + } + + // We should be able to get the message off the DLQ now. + TextMessage m = (TextMessage)dlqConsumer.receive(1000); + assertNotNull("Got message from DLQ", m); + assertEquals("1st", m.getText()); + String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy")); + dlqSession.commit(); + + } + public void testRedeliveryRollbackWithDelayBlocking() throws Exception { redeliveryRollbackWithDelay(true);