ARTEMIS-3552 NPE on message expiration

This commit is contained in:
Justin Bertram 2021-11-03 14:47:35 -05:00 committed by clebertsuconic
parent ff0e97150e
commit 5fe42dd0c4
3 changed files with 71 additions and 3 deletions

View File

@ -1050,7 +1050,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
void errorFlushingExecutorsOnQueue(); void errorFlushingExecutorsOnQueue();
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 222145, value = "Error expiring reference {0} 0n queue", format = Message.Format.MESSAGE_FORMAT) @Message(id = 222145, value = "Error expiring reference {0} on queue", format = Message.Format.MESSAGE_FORMAT)
void errorExpiringReferencesOnQueue(@Cause Exception e, MessageReference ref); void errorExpiringReferencesOnQueue(@Cause Exception e, MessageReference ref);
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)

View File

@ -3391,8 +3391,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
copyMessage.setAddress(toAddress); copyMessage.setAddress(toAddress);
if (ref.getMessage().getAnnotationString(Message.HDR_ORIG_ROUTING_TYPE) != null) { Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
copyMessage.setRoutingType(RoutingType.getType(ref.getMessage().getByteProperty(Message.HDR_ORIG_ROUTING_TYPE))); if (originalRoutingType != null && originalRoutingType instanceof Byte) {
copyMessage.setRoutingType(RoutingType.getType((Byte) originalRoutingType));
} }
if (queueIDs != null && queueIDs.length > 0) { if (queueIDs != null && queueIDs.length > 0) {

View File

@ -27,12 +27,18 @@ import java.util.HashMap;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -538,6 +544,67 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
} }
} }
@Test(timeout = 60000)
public void testExpirationAfterDivert() throws Throwable {
final String FORWARDING_ADDRESS = RandomUtil.randomString();
server.createQueue(new QueueConfiguration(FORWARDING_ADDRESS).setRoutingType(RoutingType.ANYCAST));
server.deployDivert(new DivertConfiguration()
.setName(RandomUtil.randomString())
.setAddress(getQueueName())
.setForwardingAddress(FORWARDING_ADDRESS)
.setTransformerConfiguration(new TransformerConfiguration(MyTransformer.class.getName()))
.setExclusive(true));
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
try {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setDurable(true);
message.setText("Test-Message");
message.setDeliveryAnnotation("shouldDisappear", 1);
message.setMessageAnnotation("x-opt-routing-type", (byte) 1);
sender.send(message);
Queue forward = getProxyToQueue(FORWARDING_ADDRESS);
assertTrue("Message not diverted", Wait.waitFor(() -> forward.getMessageCount() > 0, 7000, 500));
Queue dlq = getProxyToQueue(getDeadLetterAddress());
assertTrue("Message not moved to DLQ", Wait.waitFor(() -> dlq.getMessageCount() > 0, 7000, 500));
connection.close();
connection = client.connect();
session = connection.createSession();
// Read all messages from the Queue
AmqpReceiver receiver = session.createReceiver(getDeadLetterAddress());
receiver.flow(20);
message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(FORWARDING_ADDRESS, message.getMessageAnnotation("x-opt-ORIG-QUEUE"));
assertNull(message.getDeliveryAnnotation("shouldDisappear"));
assertNull(receiver.receiveNoWait());
} finally {
connection.close();
}
}
public static class MyTransformer implements Transformer {
public MyTransformer() {
}
@Override
public org.apache.activemq.artemis.api.core.Message transform(org.apache.activemq.artemis.api.core.Message message) {
return message.setExpiration(System.currentTimeMillis() + 250);
}
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testDLQdMessageCanBeRedeliveredMultipleTimes() throws Throwable { public void testDLQdMessageCanBeRedeliveredMultipleTimes() throws Throwable {
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();