From 5fe42dd0c478f7f7433ef37fb77ea5f6b2332a71 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Wed, 3 Nov 2021 14:47:35 -0500 Subject: [PATCH] ARTEMIS-3552 NPE on message expiration --- .../core/server/ActiveMQServerLogger.java | 2 +- .../artemis/core/server/impl/QueueImpl.java | 5 +- .../amqp/AmqpExpiredMessageTest.java | 67 +++++++++++++++++++ 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 27d3d3d7f4..657cb66dcf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1050,7 +1050,7 @@ public interface ActiveMQServerLogger extends BasicLogger { void errorFlushingExecutorsOnQueue(); @LogMessage(level = Logger.Level.WARN) - @Message(id = 222145, value = "Error expiring reference {0} 0n queue", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 222145, value = "Error expiring reference {0} on queue", format = Message.Format.MESSAGE_FORMAT) void errorExpiringReferencesOnQueue(@Cause Exception e, MessageReference ref); @LogMessage(level = Logger.Level.WARN) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 90d43e509c..13f01407f7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3391,8 +3391,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { copyMessage.setAddress(toAddress); - if (ref.getMessage().getAnnotationString(Message.HDR_ORIG_ROUTING_TYPE) != null) { - copyMessage.setRoutingType(RoutingType.getType(ref.getMessage().getByteProperty(Message.HDR_ORIG_ROUTING_TYPE))); + Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE); + if (originalRoutingType != null && originalRoutingType instanceof Byte) { + copyMessage.setRoutingType(RoutingType.getType((Byte) originalRoutingType)); } if (queueIDs != null && queueIDs.length > 0) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java index a9c5d5c222..f98ea8067c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java @@ -27,12 +27,18 @@ import java.util.HashMap; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.config.TransformerConfiguration; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -538,6 +544,67 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { } } + @Test(timeout = 60000) + public void testExpirationAfterDivert() throws Throwable { + final String FORWARDING_ADDRESS = RandomUtil.randomString(); + server.createQueue(new QueueConfiguration(FORWARDING_ADDRESS).setRoutingType(RoutingType.ANYCAST)); + server.deployDivert(new DivertConfiguration() + .setName(RandomUtil.randomString()) + .setAddress(getQueueName()) + .setForwardingAddress(FORWARDING_ADDRESS) + .setTransformerConfiguration(new TransformerConfiguration(MyTransformer.class.getName())) + .setExclusive(true)); + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + try { + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getQueueName()); + + AmqpMessage message = new AmqpMessage(); + message.setDurable(true); + message.setText("Test-Message"); + message.setDeliveryAnnotation("shouldDisappear", 1); + message.setMessageAnnotation("x-opt-routing-type", (byte) 1); + sender.send(message); + + Queue forward = getProxyToQueue(FORWARDING_ADDRESS); + assertTrue("Message not diverted", Wait.waitFor(() -> forward.getMessageCount() > 0, 7000, 500)); + + Queue dlq = getProxyToQueue(getDeadLetterAddress()); + assertTrue("Message not moved to DLQ", Wait.waitFor(() -> dlq.getMessageCount() > 0, 7000, 500)); + + connection.close(); + + connection = client.connect(); + session = connection.createSession(); + + // Read all messages from the Queue + AmqpReceiver receiver = session.createReceiver(getDeadLetterAddress()); + receiver.flow(20); + + message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + assertEquals(FORWARDING_ADDRESS, message.getMessageAnnotation("x-opt-ORIG-QUEUE")); + assertNull(message.getDeliveryAnnotation("shouldDisappear")); + assertNull(receiver.receiveNoWait()); + } finally { + connection.close(); + } + } + + public static class MyTransformer implements Transformer { + public MyTransformer() { + } + + @Override + public org.apache.activemq.artemis.api.core.Message transform(org.apache.activemq.artemis.api.core.Message message) { + return message.setExpiration(System.currentTimeMillis() + 250); + } + } + @Test(timeout = 60000) public void testDLQdMessageCanBeRedeliveredMultipleTimes() throws Throwable { AmqpClient client = createAmqpClient();