diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 4b4550da17..b653f8597f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.artemis.core.management.impl; -import org.apache.activemq.artemis.json.JsonArray; -import org.apache.activemq.artemis.json.JsonArrayBuilder; -import org.apache.activemq.artemis.json.JsonObjectBuilder; import javax.management.MBeanAttributeInfo; import javax.management.MBeanOperationInfo; import javax.management.openmbean.CompositeData; @@ -56,8 +53,11 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperation; -import org.apache.activemq.artemis.selector.filter.Filterable; +import org.apache.activemq.artemis.json.JsonArray; +import org.apache.activemq.artemis.json.JsonArrayBuilder; +import org.apache.activemq.artemis.json.JsonObjectBuilder; import org.apache.activemq.artemis.logs.AuditLogger; +import org.apache.activemq.artemis.selector.filter.Filterable; import org.apache.activemq.artemis.utils.JsonLoader; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.jboss.logging.Logger; @@ -545,12 +545,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { clearIO(); try { - AddressSettings addressSettings = addressSettingsRepository.getMatch(address); - - if (addressSettings != null && addressSettings.getDeadLetterAddress() != null) { - return addressSettings.getDeadLetterAddress().toString(); - } - return null; + return queue.getDeadLetterAddress() == null ? null : queue.getDeadLetterAddress().toString(); } finally { blockOnIO(); } @@ -565,13 +560,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { clearIO(); try { - AddressSettings addressSettings = addressSettingsRepository.getMatch(address); - - if (addressSettings != null && addressSettings.getExpiryAddress() != null) { - return addressSettings.getExpiryAddress().toString(); - } else { - return null; - } + return queue.getExpiryAddress() == null ? null : queue.getExpiryAddress().toString(); } finally { blockOnIO(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index dd1a90fd10..fc09776228 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -431,6 +431,8 @@ public interface Queue extends Bindable,CriticalComponent { SimpleString getExpiryAddress(); + SimpleString getDeadLetterAddress(); + /** * Pauses the queue. It will receive messages but won't give them to the consumers until resumed. * If a queue is paused, pausing it again will only throw a warning. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index f9ca299056..e91e5e3bb8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -253,7 +253,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final StorageManager storageManager; - private final HierarchicalRepository addressSettingsRepository; + private volatile AddressSettings addressSettings; private final ActiveMQServer server; @@ -283,8 +283,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private volatile Consumer exclusiveConsumer; - private volatile SimpleString expiryAddress; - private final ArtemisExecutor executor; private boolean internalQueue; @@ -705,8 +703,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.storageManager = storageManager; - this.addressSettingsRepository = addressSettingsRepository; - this.scheduledExecutor = scheduledExecutor; this.server = server; @@ -714,10 +710,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this); if (addressSettingsRepository != null) { - addressSettingsRepositoryListener = new AddressSettingsRepositoryListener(); + addressSettingsRepositoryListener = new AddressSettingsRepositoryListener(addressSettingsRepository); addressSettingsRepository.registerListener(addressSettingsRepositoryListener); + this.addressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch()); } else { - expiryAddress = null; + this.addressSettings = new AddressSettings(); } if (pageSubscription != null) { @@ -1337,9 +1334,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } }); - if (addressSettingsRepository != null) { - addressSettingsRepository.unRegisterListener(addressSettingsRepositoryListener); - } + addressSettingsRepositoryListener.close(); } @Override @@ -1975,19 +1970,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception { - SimpleString messageExpiryAddress = expiryAddressFromMessageAddress(ref); - if (messageExpiryAddress == null) { - messageExpiryAddress = expiryAddressFromAddressSettings(ref); - } - - if (messageExpiryAddress != null) { + if (addressSettings.getExpiryAddress() != null) { createExpiryResources(); if (logger.isTraceEnabled()) { - logger.trace("moving expired reference " + ref + " to address = " + messageExpiryAddress + " from queue=" + this.getName()); + logger.trace("moving expired reference " + ref + " to address = " + addressSettings.getExpiryAddress() + " from queue=" + this.getName()); } - move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED, consumer); + move(null, addressSettings.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer); } else { if (logger.isTraceEnabled()) { logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName()); @@ -1999,46 +1989,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { refCountForConsumers.check(); if (server != null && server.hasBrokerMessagePlugins()) { - final SimpleString expiryAddress = messageExpiryAddress; - server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer)); - } - } - - private SimpleString expiryAddressFromMessageAddress(MessageReference ref) { - SimpleString messageAddress = extractAddress(ref.getMessage()); - SimpleString expiryAddress = null; - - if (messageAddress == null || messageAddress.equals(getAddress())) { - expiryAddress = getExpiryAddress(); - } - - return expiryAddress; - } - - private SimpleString expiryAddressFromAddressSettings(MessageReference ref) { - SimpleString messageAddress = extractAddress(ref.getMessage()); - SimpleString expiryAddress = null; - - if (messageAddress != null) { - AddressSettings addressSettings = addressSettingsRepository.getMatch(messageAddress.toString()); - - expiryAddress = addressSettings.getExpiryAddress(); - } - - return expiryAddress; - } - - private SimpleString extractAddress(Message message) { - if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) { - return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString()); - } else { - return message.getAddressSimpleString(); + server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, addressSettings.getExpiryAddress(), consumer)); } } @Override public SimpleString getExpiryAddress() { - return this.expiryAddress; + return this.addressSettings.getExpiryAddress(); + } + + @Override + public SimpleString getDeadLetterAddress() { + return this.addressSettings.getDeadLetterAddress(); } @Override @@ -2413,10 +2375,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } public boolean isExpirationRedundant() { - if (expiryAddress != null && expiryAddress.equals(this.address)) { + if (addressSettings.getExpiryAddress() != null && addressSettings.getExpiryAddress().equals(this.address)) { // check expire with itself would be silly (waste of time) if (logger.isTraceEnabled()) - logger.trace("Redundant expiration from " + address + " to " + expiryAddress); + logger.trace("Redundant expiration from " + address + " to " + addressSettings.getExpiryAddress()); return true; } @@ -3334,8 +3296,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { storageManager.updateDeliveryCount(reference); } - AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); - int maxDeliveries = addressSettings.getMaxDeliveryAttempts(); int deliveryCount = reference.getDeliveryCount(); @@ -3567,7 +3527,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } private void expire(final Transaction tx, final MessageReference ref) throws Exception { - SimpleString expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress(); + SimpleString expiryAddress = addressSettings.getExpiryAddress(); if (expiryAddress != null && expiryAddress.length() != 0) { @@ -3634,7 +3594,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public boolean sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception { - return sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress()); + return sendToDeadLetterAddress(tx, ref, addressSettings.getDeadLetterAddress()); } private boolean sendToDeadLetterAddress(final Transaction tx, @@ -4415,12 +4375,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return size; } - private void configureExpiry(final AddressSettings settings) { - this.expiryAddress = settings == null ? null : settings.getExpiryAddress(); - } - - private void configureSlowConsumerReaper(final AddressSettings settings) { - if (settings == null || settings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) { + private void configureSlowConsumerReaper() { + if (addressSettings == null || addressSettings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) { if (slowConsumerReaperFuture != null) { slowConsumerReaperFuture.cancel(false); slowConsumerReaperFuture = null; @@ -4431,13 +4387,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } else { if (slowConsumerReaperRunnable == null) { - scheduleSlowConsumerReaper(settings); - } else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != settings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy())) { + scheduleSlowConsumerReaper(addressSettings); + } else if (slowConsumerReaperRunnable.checkPeriod != addressSettings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != addressSettings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(addressSettings.getSlowConsumerPolicy())) { if (slowConsumerReaperFuture != null) { slowConsumerReaperFuture.cancel(false); slowConsumerReaperFuture = null; } - scheduleSlowConsumerReaper(settings); + scheduleSlowConsumerReaper(addressSettings); } } } @@ -4489,21 +4445,34 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private class AddressSettingsRepositoryListener implements HierarchicalRepositoryChangeListener { + HierarchicalRepository addressSettingsRepository; + + AddressSettingsRepositoryListener(HierarchicalRepository addressSettingsRepository) { + this.addressSettingsRepository = addressSettingsRepository; + } + @Override public void onChange() { - AddressSettings settings = addressSettingsRepository.getMatch(((ActiveMQServerImpl)server).getRuntimeTempQueueNamespace(temporary) + address.toString()); - configureExpiry(settings); - checkDeadLetterAddressAndExpiryAddress(settings); - configureSlowConsumerReaper(settings); + addressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch()); + checkDeadLetterAddressAndExpiryAddress(); + configureSlowConsumerReaper(); + } + + public void close() { + addressSettingsRepository.unRegisterListener(this); } } - private void checkDeadLetterAddressAndExpiryAddress(final AddressSettings settings) { + private String getAddressSettingsMatch() { + return ((ActiveMQServerImpl)server).getRuntimeTempQueueNamespace(temporary) + address.toString(); + } + + private void checkDeadLetterAddressAndExpiryAddress() { if (!Env.isTestEnv() && !internalQueue && !address.equals(server.getConfiguration().getManagementNotificationAddress())) { - if (settings.getDeadLetterAddress() == null) { + if (addressSettings.getDeadLetterAddress() == null) { ActiveMQServerLogger.LOGGER.AddressSettingsNoDLA(name); } - if (settings.getExpiryAddress() == null) { + if (addressSettings.getExpiryAddress() == null) { ActiveMQServerLogger.LOGGER.AddressSettingsNoExpiryAddress(name); } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 9204f49030..daf8b7bac5 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1533,6 +1533,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { return null; } + @Override + public SimpleString getDeadLetterAddress() { + return null; + } + @Override public void pause() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/TempQueueNamespaceTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/TempQueueNamespaceTest.java index db34ed1936..5bdb657f20 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/TempQueueNamespaceTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/TempQueueNamespaceTest.java @@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.integration.server; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.SingleServerTestBase; import org.apache.activemq.artemis.utils.RandomUtil; @@ -28,14 +30,21 @@ public class TempQueueNamespaceTest extends SingleServerTestBase { @Test public void testTempQueueNamespace() throws Exception { final String TEMP_QUEUE_NAMESPACE = "temp"; + final SimpleString DLA = RandomUtil.randomSimpleString(); + final SimpleString EA = RandomUtil.randomSimpleString(); + final int RING_SIZE = 10; + server.getConfiguration().setTemporaryQueueNamespace(TEMP_QUEUE_NAMESPACE); - server.getAddressSettingsRepository().addMatch(TEMP_QUEUE_NAMESPACE + ".#", new AddressSettings().setDefaultRingSize(10)); + server.getAddressSettingsRepository().addMatch(TEMP_QUEUE_NAMESPACE + ".#", new AddressSettings().setDefaultRingSize(RING_SIZE).setDeadLetterAddress(DLA).setExpiryAddress(EA)); SimpleString queue = RandomUtil.randomSimpleString(); SimpleString address = RandomUtil.randomSimpleString(); session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false).setTemporary(true)); - assertEquals(10, (long) server.locateQueue(queue).getQueueConfiguration().getRingSize()); + QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + queue); + assertEquals(RING_SIZE, queueControl.getRingSize()); + assertEquals(DLA.toString(), queueControl.getDeadLetterAddress()); + assertEquals(EA.toString(), queueControl.getExpiryAddress()); session.close(); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index adb79f8857..1ba14141fa 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -841,6 +841,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { return null; } + @Override + public SimpleString getDeadLetterAddress() { + return null; + } + @Override public void route(final Message message, final RoutingContext context) throws Exception { // no-op