From e1d8f42cc18e16f13fcc5d1c0ab0f4d3b176ef9c Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 9 Jun 2020 12:19:01 -0500 Subject: [PATCH] ARTEMIS-2798 expiration for AMQP msgs not reloaded --- .../protocol/amqp/broker/AMQPMessage.java | 1 + .../client/MessageExpirationTest.java | 57 ++++++++++++++++++- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 073da32037..c89ba5c6f4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -958,6 +958,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. @Override public final long getExpiration() { + ensureMessageDataScanned(); return expiration; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java index f4b4a35b0a..e15abffb8d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java @@ -16,8 +16,15 @@ */ package org.apache.activemq.artemis.tests.integration.client; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -29,7 +36,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -70,6 +79,50 @@ public class MessageExpirationTest extends ActiveMQTestBase { session.deleteQueue(queue); } + @Test + public void testAmqpJmsReloaded() throws Exception { + SimpleString queue = RandomUtil.randomSimpleString(); + SimpleString expiry = RandomUtil.randomSimpleString(); + + server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(expiry)); + server.getAddressSettingsRepository().addMatch(queue.toString(), new AddressSettings().setExpiryAddress(expiry)); + + ConnectionFactory cf = new JmsConnectionFactory("amqp://localhost:61616"); + Connection connection = cf.createConnection(); + Session session = connection.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(queue.toString())); + producer.setTimeToLive(EXPIRATION); + + for (int i = 0; i < 20; i++) { + javax.jms.Message message = session.createMessage(); + producer.send(message); + } + connection.close(); + Wait.assertEquals(20L, () -> server.locateQueue(queue).getMessageCount(), 2000, 100); + Wait.assertEquals(0L, () -> server.locateQueue(expiry).getMessageCount(), 2000, 100); + + server.stop(); + server.start(); + + Thread.sleep(EXPIRATION * 2); + + Wait.assertEquals(0L, () -> server.locateQueue(queue).getMessageCount(), 2000, 100); + Wait.assertEquals(20L, () -> server.locateQueue(expiry).getMessageCount(), 2000, 100); + + connection = cf.createConnection(); + session = connection.createSession(); + MessageConsumer consumer = session.createConsumer(session.createQueue(queue.toString())); + connection.start(); + + for (int i = 0; i < 20; i++) { + javax.jms.Message message2 = consumer.receiveNoWait(); + Assert.assertNull(message2); + } + + consumer.close(); + } + @Test public void testMessageExpiredWithoutExpiryAddressWithExpiryDelayOverride() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); @@ -235,7 +288,9 @@ public class MessageExpirationTest extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); - server = createServer(false); + server = createServer(true); + server.getConfiguration().addAcceptorConfiguration("amqp", "tcp://127.0.0.1:61616"); + server.getConfiguration().setMessageExpiryScanPeriod(200); server.start(); locator = createInVMNonHALocator(); sf = createSessionFactory(locator);