This commit is contained in:
Clebert Suconic 2019-04-10 16:01:19 -04:00
commit 58bf52ac57
3 changed files with 77 additions and 2 deletions

View File

@ -86,6 +86,7 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeLis
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
@ -3023,8 +3024,42 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
acknowledge(tx, ref, AckReason.EXPIRED, null);
}
if (server != null && server.hasBrokerMessagePlugins()) {
ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
if (expiryLogger == null) {
expiryLogger = new ExpiryLogger();
tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger);
tx.addOperation(expiryLogger);
}
expiryLogger.addExpiry(address, ref);
}
}
private class ExpiryLogger extends TransactionOperationAbstract {
List<Pair<SimpleString, MessageReference>> expiries = new LinkedList<>();
public void addExpiry(SimpleString address, MessageReference ref) {
expiries.add(new Pair<>(address, ref));
}
@Override
public void afterCommit(Transaction tx) {
for (Pair<SimpleString, MessageReference> pair : expiries) {
try {
server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(pair.getB(), pair.getA(), null));
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
expiries.clear(); // just giving a hand to GC
}
}
@Override
public void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());

View File

@ -33,4 +33,6 @@ public class TransactionPropertyIndexes {
public static final int PAGE_DELIVERY = 7;
public static final int PAGE_CURSOR_POSITIONS = 8;
public static final int EXPIRY_LOGGER = 9;
}

View File

@ -349,6 +349,38 @@ public class NotificationTest extends ActiveMQTestBase {
session.deleteQueue(queue);
}
@Test
public void testMessageExpiredWithoutConsumers() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession mySession = sf.createSession("myUser", "myPassword", false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize());
mySession.start();
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString address = RandomUtil.randomSimpleString();
boolean durable = RandomUtil.randomBoolean();
session.createQueue(address, queue, durable);
ClientProducer producer = mySession.createProducer(address);
NotificationTest.flush(notifConsumer);
ClientMessage msg = session.createMessage(false);
msg.putStringProperty("someKey", "someValue");
msg.setExpiration(1);
producer.send(msg);
Thread.sleep(500);
ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer, 5000);
Assert.assertEquals(MESSAGE_EXPIRED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_MESSAGE_ID));
Assert.assertEquals(address, notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS));
Assert.assertEquals(queue, notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME));
Assert.assertEquals(RoutingType.MULTICAST.getType(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE));
session.deleteQueue(queue);
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@ -358,7 +390,7 @@ public class NotificationTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false));
server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setMessageExpiryScanPeriod(100), false));
NotificationActiveMQServerPlugin notificationPlugin = new NotificationActiveMQServerPlugin();
notificationPlugin.setSendAddressNotifications(true);
notificationPlugin.setSendConnectionNotifications(true);
@ -392,11 +424,17 @@ public class NotificationTest extends ActiveMQTestBase {
protected static ClientMessage[] consumeMessages(final int expected,
final ClientConsumer consumer) throws Exception {
return consumeMessages(expected, consumer, 500);
}
protected static ClientMessage[] consumeMessages(final int expected,
final ClientConsumer consumer,
final int timeout) throws Exception {
ClientMessage[] messages = new ClientMessage[expected];
ClientMessage m = null;
for (int i = 0; i < expected; i++) {
m = consumer.receive(500);
m = consumer.receive(timeout);
if (m != null) {
for (SimpleString key : m.getPropertyNames()) {
System.out.println(key + "=" + m.getObjectProperty(key));