From b8b7cc899f49bd1ce89d20058aa4990f34ae1c5a Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 10 Apr 2019 10:08:12 -0400 Subject: [PATCH] ARTEMIS-2300 Expiry notifications are not called from scanner --- .../artemis/core/server/impl/QueueImpl.java | 35 ++++++++++++++++ .../TransactionPropertyIndexes.java | 2 + .../management/NotificationTest.java | 42 ++++++++++++++++++- 3 files changed, 77 insertions(+), 2 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index cf74815da8..9d04f5b3a4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -86,6 +86,7 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeLis import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; @@ -3023,8 +3024,42 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { acknowledge(tx, ref, AckReason.EXPIRED, null); } + + if (server != null && server.hasBrokerMessagePlugins()) { + ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER); + if (expiryLogger == null) { + expiryLogger = new ExpiryLogger(); + tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger); + tx.addOperation(expiryLogger); + } + + expiryLogger.addExpiry(address, ref); + } + } + private class ExpiryLogger extends TransactionOperationAbstract { + + List> expiries = new LinkedList<>(); + + public void addExpiry(SimpleString address, MessageReference ref) { + expiries.add(new Pair<>(address, ref)); + } + + @Override + public void afterCommit(Transaction tx) { + for (Pair pair : expiries) { + try { + server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(pair.getB(), pair.getA(), null)); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + } + expiries.clear(); // just giving a hand to GC + } + } + + @Override public void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception { sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java index 19ae77775c..4ba31eeb47 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java @@ -33,4 +33,6 @@ public class TransactionPropertyIndexes { public static final int PAGE_DELIVERY = 7; public static final int PAGE_CURSOR_POSITIONS = 8; + + public static final int EXPIRY_LOGGER = 9; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java index 196e939a2a..ba229bfc7b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java @@ -349,6 +349,38 @@ public class NotificationTest extends ActiveMQTestBase { session.deleteQueue(queue); } + @Test + public void testMessageExpiredWithoutConsumers() throws Exception { + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession mySession = sf.createSession("myUser", "myPassword", false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize()); + + mySession.start(); + + SimpleString queue = RandomUtil.randomSimpleString(); + SimpleString address = RandomUtil.randomSimpleString(); + boolean durable = RandomUtil.randomBoolean(); + + session.createQueue(address, queue, durable); + ClientProducer producer = mySession.createProducer(address); + + NotificationTest.flush(notifConsumer); + + ClientMessage msg = session.createMessage(false); + msg.putStringProperty("someKey", "someValue"); + msg.setExpiration(1); + producer.send(msg); + Thread.sleep(500); + + ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer, 5000); + Assert.assertEquals(MESSAGE_EXPIRED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); + Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_MESSAGE_ID)); + Assert.assertEquals(address, notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS)); + Assert.assertEquals(queue, notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME)); + Assert.assertEquals(RoutingType.MULTICAST.getType(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE)); + + session.deleteQueue(queue); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -358,7 +390,7 @@ public class NotificationTest extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); - server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false)); + server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setMessageExpiryScanPeriod(100), false)); NotificationActiveMQServerPlugin notificationPlugin = new NotificationActiveMQServerPlugin(); notificationPlugin.setSendAddressNotifications(true); notificationPlugin.setSendConnectionNotifications(true); @@ -392,11 +424,17 @@ public class NotificationTest extends ActiveMQTestBase { protected static ClientMessage[] consumeMessages(final int expected, final ClientConsumer consumer) throws Exception { + return consumeMessages(expected, consumer, 500); + } + + protected static ClientMessage[] consumeMessages(final int expected, + final ClientConsumer consumer, + final int timeout) throws Exception { ClientMessage[] messages = new ClientMessage[expected]; ClientMessage m = null; for (int i = 0; i < expected; i++) { - m = consumer.receive(500); + m = consumer.receive(timeout); if (m != null) { for (SimpleString key : m.getPropertyNames()) { System.out.println(key + "=" + m.getObjectProperty(key));