From 1ed7cc1efcf53183abc95a08ec6f27fb1cbe36ac Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 11 Mar 2022 09:36:22 -0600 Subject: [PATCH] ARTEMIS-3719 DLA and expiry incorrect w/temp-queue-namespace When using a temporary queue with a `temporary-queue-namespace` the `AddressSettings` lookup wasn't correct. This commit fixes that and refactors `QueueImpl` a bit so that it holds a copy of its `AddressSettings` rather than looking them up all the time. If any relevant `AddressSettings` changes the `HierarchicalRepositoryChangeListener` implementation will still refresh the `QueueImpl` appropriately. The `QueueControlImpl` was likewise changed to get the dead-letter address and expiry address directly from the `QueueImpl` rather than looking them up in the `AddressSettings` repository. I modified some code that came from ARTEMIS-734, but I ran the test that was associated with that Jira (i.e. `o.a.a.a.t.i.c.d.ExpireWhileLoadBalanceTest`) and it passed so I think that should be fine. There actually was no test included with the original commit. One was added later so it's hard to say for sure it exactly captures the original issue. --- .../management/impl/QueueControlImpl.java | 23 +--- .../activemq/artemis/core/server/Queue.java | 2 + .../artemis/core/server/impl/QueueImpl.java | 119 +++++++----------- .../impl/ScheduledDeliveryHandlerTest.java | 5 + .../server/TempQueueNamespaceTest.java | 13 +- .../unit/core/postoffice/impl/FakeQueue.java | 5 + 6 files changed, 73 insertions(+), 94 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 4b4550da17..b653f8597f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.artemis.core.management.impl; -import org.apache.activemq.artemis.json.JsonArray; -import org.apache.activemq.artemis.json.JsonArrayBuilder; -import org.apache.activemq.artemis.json.JsonObjectBuilder; import javax.management.MBeanAttributeInfo; import javax.management.MBeanOperationInfo; import javax.management.openmbean.CompositeData; @@ -56,8 +53,11 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperation; -import org.apache.activemq.artemis.selector.filter.Filterable; +import org.apache.activemq.artemis.json.JsonArray; +import org.apache.activemq.artemis.json.JsonArrayBuilder; +import org.apache.activemq.artemis.json.JsonObjectBuilder; import org.apache.activemq.artemis.logs.AuditLogger; +import org.apache.activemq.artemis.selector.filter.Filterable; import org.apache.activemq.artemis.utils.JsonLoader; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.jboss.logging.Logger; @@ -545,12 +545,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { clearIO(); try { - AddressSettings addressSettings = addressSettingsRepository.getMatch(address); - - if (addressSettings != null && addressSettings.getDeadLetterAddress() != null) { - return addressSettings.getDeadLetterAddress().toString(); - } - return null; + return queue.getDeadLetterAddress() == null ? null : queue.getDeadLetterAddress().toString(); } finally { blockOnIO(); } @@ -565,13 +560,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { clearIO(); try { - AddressSettings addressSettings = addressSettingsRepository.getMatch(address); - - if (addressSettings != null && addressSettings.getExpiryAddress() != null) { - return addressSettings.getExpiryAddress().toString(); - } else { - return null; - } + return queue.getExpiryAddress() == null ? null : queue.getExpiryAddress().toString(); } finally { blockOnIO(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index dd1a90fd10..fc09776228 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -431,6 +431,8 @@ public interface Queue extends Bindable,CriticalComponent { SimpleString getExpiryAddress(); + SimpleString getDeadLetterAddress(); + /** * Pauses the queue. It will receive messages but won't give them to the consumers until resumed. * If a queue is paused, pausing it again will only throw a warning. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index f9ca299056..e91e5e3bb8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -253,7 +253,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final StorageManager storageManager; - private final HierarchicalRepository addressSettingsRepository; + private volatile AddressSettings addressSettings; private final ActiveMQServer server; @@ -283,8 +283,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private volatile Consumer exclusiveConsumer; - private volatile SimpleString expiryAddress; - private final ArtemisExecutor executor; private boolean internalQueue; @@ -705,8 +703,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.storageManager = storageManager; - this.addressSettingsRepository = addressSettingsRepository; - this.scheduledExecutor = scheduledExecutor; this.server = server; @@ -714,10 +710,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this); if (addressSettingsRepository != null) { - addressSettingsRepositoryListener = new AddressSettingsRepositoryListener(); + addressSettingsRepositoryListener = new AddressSettingsRepositoryListener(addressSettingsRepository); addressSettingsRepository.registerListener(addressSettingsRepositoryListener); + this.addressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch()); } else { - expiryAddress = null; + this.addressSettings = new AddressSettings(); } if (pageSubscription != null) { @@ -1337,9 +1334,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } }); - if (addressSettingsRepository != null) { - addressSettingsRepository.unRegisterListener(addressSettingsRepositoryListener); - } + addressSettingsRepositoryListener.close(); } @Override @@ -1975,19 +1970,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception { - SimpleString messageExpiryAddress = expiryAddressFromMessageAddress(ref); - if (messageExpiryAddress == null) { - messageExpiryAddress = expiryAddressFromAddressSettings(ref); - } - - if (messageExpiryAddress != null) { + if (addressSettings.getExpiryAddress() != null) { createExpiryResources(); if (logger.isTraceEnabled()) { - logger.trace("moving expired reference " + ref + " to address = " + messageExpiryAddress + " from queue=" + this.getName()); + logger.trace("moving expired reference " + ref + " to address = " + addressSettings.getExpiryAddress() + " from queue=" + this.getName()); } - move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED, consumer); + move(null, addressSettings.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer); } else { if (logger.isTraceEnabled()) { logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName()); @@ -1999,46 +1989,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { refCountForConsumers.check(); if (server != null && server.hasBrokerMessagePlugins()) { - final SimpleString expiryAddress = messageExpiryAddress; - server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer)); - } - } - - private SimpleString expiryAddressFromMessageAddress(MessageReference ref) { - SimpleString messageAddress = extractAddress(ref.getMessage()); - SimpleString expiryAddress = null; - - if (messageAddress == null || messageAddress.equals(getAddress())) { - expiryAddress = getExpiryAddress(); - } - - return expiryAddress; - } - - private SimpleString expiryAddressFromAddressSettings(MessageReference ref) { - SimpleString messageAddress = extractAddress(ref.getMessage()); - SimpleString expiryAddress = null; - - if (messageAddress != null) { - AddressSettings addressSettings = addressSettingsRepository.getMatch(messageAddress.toString()); - - expiryAddress = addressSettings.getExpiryAddress(); - } - - return expiryAddress; - } - - private SimpleString extractAddress(Message message) { - if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) { - return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString()); - } else { - return message.getAddressSimpleString(); + server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, addressSettings.getExpiryAddress(), consumer)); } } @Override public SimpleString getExpiryAddress() { - return this.expiryAddress; + return this.addressSettings.getExpiryAddress(); + } + + @Override + public SimpleString getDeadLetterAddress() { + return this.addressSettings.getDeadLetterAddress(); } @Override @@ -2413,10 +2375,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } public boolean isExpirationRedundant() { - if (expiryAddress != null && expiryAddress.equals(this.address)) { + if (addressSettings.getExpiryAddress() != null && addressSettings.getExpiryAddress().equals(this.address)) { // check expire with itself would be silly (waste of time) if (logger.isTraceEnabled()) - logger.trace("Redundant expiration from " + address + " to " + expiryAddress); + logger.trace("Redundant expiration from " + address + " to " + addressSettings.getExpiryAddress()); return true; } @@ -3334,8 +3296,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { storageManager.updateDeliveryCount(reference); } - AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); - int maxDeliveries = addressSettings.getMaxDeliveryAttempts(); int deliveryCount = reference.getDeliveryCount(); @@ -3567,7 +3527,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } private void expire(final Transaction tx, final MessageReference ref) throws Exception { - SimpleString expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress(); + SimpleString expiryAddress = addressSettings.getExpiryAddress(); if (expiryAddress != null && expiryAddress.length() != 0) { @@ -3634,7 +3594,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public boolean sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception { - return sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress()); + return sendToDeadLetterAddress(tx, ref, addressSettings.getDeadLetterAddress()); } private boolean sendToDeadLetterAddress(final Transaction tx, @@ -4415,12 +4375,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return size; } - private void configureExpiry(final AddressSettings settings) { - this.expiryAddress = settings == null ? null : settings.getExpiryAddress(); - } - - private void configureSlowConsumerReaper(final AddressSettings settings) { - if (settings == null || settings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) { + private void configureSlowConsumerReaper() { + if (addressSettings == null || addressSettings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) { if (slowConsumerReaperFuture != null) { slowConsumerReaperFuture.cancel(false); slowConsumerReaperFuture = null; @@ -4431,13 +4387,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } else { if (slowConsumerReaperRunnable == null) { - scheduleSlowConsumerReaper(settings); - } else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != settings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy())) { + scheduleSlowConsumerReaper(addressSettings); + } else if (slowConsumerReaperRunnable.checkPeriod != addressSettings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != addressSettings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(addressSettings.getSlowConsumerPolicy())) { if (slowConsumerReaperFuture != null) { slowConsumerReaperFuture.cancel(false); slowConsumerReaperFuture = null; } - scheduleSlowConsumerReaper(settings); + scheduleSlowConsumerReaper(addressSettings); } } } @@ -4489,21 +4445,34 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private class AddressSettingsRepositoryListener implements HierarchicalRepositoryChangeListener { + HierarchicalRepository addressSettingsRepository; + + AddressSettingsRepositoryListener(HierarchicalRepository addressSettingsRepository) { + this.addressSettingsRepository = addressSettingsRepository; + } + @Override public void onChange() { - AddressSettings settings = addressSettingsRepository.getMatch(((ActiveMQServerImpl)server).getRuntimeTempQueueNamespace(temporary) + address.toString()); - configureExpiry(settings); - checkDeadLetterAddressAndExpiryAddress(settings); - configureSlowConsumerReaper(settings); + addressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch()); + checkDeadLetterAddressAndExpiryAddress(); + configureSlowConsumerReaper(); + } + + public void close() { + addressSettingsRepository.unRegisterListener(this); } } - private void checkDeadLetterAddressAndExpiryAddress(final AddressSettings settings) { + private String getAddressSettingsMatch() { + return ((ActiveMQServerImpl)server).getRuntimeTempQueueNamespace(temporary) + address.toString(); + } + + private void checkDeadLetterAddressAndExpiryAddress() { if (!Env.isTestEnv() && !internalQueue && !address.equals(server.getConfiguration().getManagementNotificationAddress())) { - if (settings.getDeadLetterAddress() == null) { + if (addressSettings.getDeadLetterAddress() == null) { ActiveMQServerLogger.LOGGER.AddressSettingsNoDLA(name); } - if (settings.getExpiryAddress() == null) { + if (addressSettings.getExpiryAddress() == null) { ActiveMQServerLogger.LOGGER.AddressSettingsNoExpiryAddress(name); } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 9204f49030..daf8b7bac5 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1533,6 +1533,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { return null; } + @Override + public SimpleString getDeadLetterAddress() { + return null; + } + @Override public void pause() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/TempQueueNamespaceTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/TempQueueNamespaceTest.java index db34ed1936..5bdb657f20 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/TempQueueNamespaceTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/TempQueueNamespaceTest.java @@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.integration.server; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.SingleServerTestBase; import org.apache.activemq.artemis.utils.RandomUtil; @@ -28,14 +30,21 @@ public class TempQueueNamespaceTest extends SingleServerTestBase { @Test public void testTempQueueNamespace() throws Exception { final String TEMP_QUEUE_NAMESPACE = "temp"; + final SimpleString DLA = RandomUtil.randomSimpleString(); + final SimpleString EA = RandomUtil.randomSimpleString(); + final int RING_SIZE = 10; + server.getConfiguration().setTemporaryQueueNamespace(TEMP_QUEUE_NAMESPACE); - server.getAddressSettingsRepository().addMatch(TEMP_QUEUE_NAMESPACE + ".#", new AddressSettings().setDefaultRingSize(10)); + server.getAddressSettingsRepository().addMatch(TEMP_QUEUE_NAMESPACE + ".#", new AddressSettings().setDefaultRingSize(RING_SIZE).setDeadLetterAddress(DLA).setExpiryAddress(EA)); SimpleString queue = RandomUtil.randomSimpleString(); SimpleString address = RandomUtil.randomSimpleString(); session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false).setTemporary(true)); - assertEquals(10, (long) server.locateQueue(queue).getQueueConfiguration().getRingSize()); + QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + queue); + assertEquals(RING_SIZE, queueControl.getRingSize()); + assertEquals(DLA.toString(), queueControl.getDeadLetterAddress()); + assertEquals(EA.toString(), queueControl.getExpiryAddress()); session.close(); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index adb79f8857..1ba14141fa 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -841,6 +841,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { return null; } + @Override + public SimpleString getDeadLetterAddress() { + return null; + } + @Override public void route(final Message message, final RoutingContext context) throws Exception { // no-op