ARTEMIS-3719 DLA and expiry incorrect w/temp-queue-namespace

When using a temporary queue with a `temporary-queue-namespace` the
`AddressSettings` lookup wasn't correct. This commit fixes that and
refactors `QueueImpl` a bit so that it holds a copy of its
`AddressSettings` rather than looking them up all the time. If any
relevant `AddressSettings` changes the
`HierarchicalRepositoryChangeListener` implementation will still
refresh the `QueueImpl` appropriately.

The `QueueControlImpl` was likewise changed to get the dead-letter
address and expiry address directly from the `QueueImpl` rather than
looking them up in the `AddressSettings` repository.

I modified some code that came from ARTEMIS-734, but I ran the test that
was associated with that Jira (i.e.
`o.a.a.a.t.i.c.d.ExpireWhileLoadBalanceTest`) and it passed so I think
that should be fine. There actually was no test included with the
original commit. One was added later so it's hard to say for sure it
exactly captures the original issue.
This commit is contained in:
Justin Bertram 2022-03-11 09:36:22 -06:00 committed by clebertsuconic
parent 36dcb30cda
commit 1ed7cc1efc
6 changed files with 73 additions and 94 deletions

View File

@ -16,9 +16,6 @@
*/
package org.apache.activemq.artemis.core.management.impl;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import org.apache.activemq.artemis.json.JsonObjectBuilder;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import javax.management.openmbean.CompositeData;
@ -56,8 +53,11 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import org.apache.activemq.artemis.json.JsonObjectBuilder;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.jboss.logging.Logger;
@ -545,12 +545,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
clearIO();
try {
AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
if (addressSettings != null && addressSettings.getDeadLetterAddress() != null) {
return addressSettings.getDeadLetterAddress().toString();
}
return null;
return queue.getDeadLetterAddress() == null ? null : queue.getDeadLetterAddress().toString();
} finally {
blockOnIO();
}
@ -565,13 +560,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
clearIO();
try {
AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
if (addressSettings != null && addressSettings.getExpiryAddress() != null) {
return addressSettings.getExpiryAddress().toString();
} else {
return null;
}
return queue.getExpiryAddress() == null ? null : queue.getExpiryAddress().toString();
} finally {
blockOnIO();
}

View File

@ -431,6 +431,8 @@ public interface Queue extends Bindable,CriticalComponent {
SimpleString getExpiryAddress();
SimpleString getDeadLetterAddress();
/**
* Pauses the queue. It will receive messages but won't give them to the consumers until resumed.
* If a queue is paused, pausing it again will only throw a warning.

View File

@ -253,7 +253,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final StorageManager storageManager;
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
private volatile AddressSettings addressSettings;
private final ActiveMQServer server;
@ -283,8 +283,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile Consumer exclusiveConsumer;
private volatile SimpleString expiryAddress;
private final ArtemisExecutor executor;
private boolean internalQueue;
@ -705,8 +703,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.storageManager = storageManager;
this.addressSettingsRepository = addressSettingsRepository;
this.scheduledExecutor = scheduledExecutor;
this.server = server;
@ -714,10 +710,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this);
if (addressSettingsRepository != null) {
addressSettingsRepositoryListener = new AddressSettingsRepositoryListener();
addressSettingsRepositoryListener = new AddressSettingsRepositoryListener(addressSettingsRepository);
addressSettingsRepository.registerListener(addressSettingsRepositoryListener);
this.addressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch());
} else {
expiryAddress = null;
this.addressSettings = new AddressSettings();
}
if (pageSubscription != null) {
@ -1337,9 +1334,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
});
if (addressSettingsRepository != null) {
addressSettingsRepository.unRegisterListener(addressSettingsRepositoryListener);
}
addressSettingsRepositoryListener.close();
}
@Override
@ -1975,19 +1970,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception {
SimpleString messageExpiryAddress = expiryAddressFromMessageAddress(ref);
if (messageExpiryAddress == null) {
messageExpiryAddress = expiryAddressFromAddressSettings(ref);
}
if (messageExpiryAddress != null) {
if (addressSettings.getExpiryAddress() != null) {
createExpiryResources();
if (logger.isTraceEnabled()) {
logger.trace("moving expired reference " + ref + " to address = " + messageExpiryAddress + " from queue=" + this.getName());
logger.trace("moving expired reference " + ref + " to address = " + addressSettings.getExpiryAddress() + " from queue=" + this.getName());
}
move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED, consumer);
move(null, addressSettings.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer);
} else {
if (logger.isTraceEnabled()) {
logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName());
@ -1999,46 +1989,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
refCountForConsumers.check();
if (server != null && server.hasBrokerMessagePlugins()) {
final SimpleString expiryAddress = messageExpiryAddress;
server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer));
}
}
private SimpleString expiryAddressFromMessageAddress(MessageReference ref) {
SimpleString messageAddress = extractAddress(ref.getMessage());
SimpleString expiryAddress = null;
if (messageAddress == null || messageAddress.equals(getAddress())) {
expiryAddress = getExpiryAddress();
}
return expiryAddress;
}
private SimpleString expiryAddressFromAddressSettings(MessageReference ref) {
SimpleString messageAddress = extractAddress(ref.getMessage());
SimpleString expiryAddress = null;
if (messageAddress != null) {
AddressSettings addressSettings = addressSettingsRepository.getMatch(messageAddress.toString());
expiryAddress = addressSettings.getExpiryAddress();
}
return expiryAddress;
}
private SimpleString extractAddress(Message message) {
if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) {
return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString());
} else {
return message.getAddressSimpleString();
server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, addressSettings.getExpiryAddress(), consumer));
}
}
@Override
public SimpleString getExpiryAddress() {
return this.expiryAddress;
return this.addressSettings.getExpiryAddress();
}
@Override
public SimpleString getDeadLetterAddress() {
return this.addressSettings.getDeadLetterAddress();
}
@Override
@ -2413,10 +2375,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
public boolean isExpirationRedundant() {
if (expiryAddress != null && expiryAddress.equals(this.address)) {
if (addressSettings.getExpiryAddress() != null && addressSettings.getExpiryAddress().equals(this.address)) {
// check expire with itself would be silly (waste of time)
if (logger.isTraceEnabled())
logger.trace("Redundant expiration from " + address + " to " + expiryAddress);
logger.trace("Redundant expiration from " + address + " to " + addressSettings.getExpiryAddress());
return true;
}
@ -3334,8 +3296,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
storageManager.updateDeliveryCount(reference);
}
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
int deliveryCount = reference.getDeliveryCount();
@ -3567,7 +3527,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
private void expire(final Transaction tx, final MessageReference ref) throws Exception {
SimpleString expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();
SimpleString expiryAddress = addressSettings.getExpiryAddress();
if (expiryAddress != null && expiryAddress.length() != 0) {
@ -3634,7 +3594,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public boolean sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
return sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
return sendToDeadLetterAddress(tx, ref, addressSettings.getDeadLetterAddress());
}
private boolean sendToDeadLetterAddress(final Transaction tx,
@ -4415,12 +4375,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return size;
}
private void configureExpiry(final AddressSettings settings) {
this.expiryAddress = settings == null ? null : settings.getExpiryAddress();
}
private void configureSlowConsumerReaper(final AddressSettings settings) {
if (settings == null || settings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) {
private void configureSlowConsumerReaper() {
if (addressSettings == null || addressSettings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) {
if (slowConsumerReaperFuture != null) {
slowConsumerReaperFuture.cancel(false);
slowConsumerReaperFuture = null;
@ -4431,13 +4387,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
} else {
if (slowConsumerReaperRunnable == null) {
scheduleSlowConsumerReaper(settings);
} else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != settings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy())) {
scheduleSlowConsumerReaper(addressSettings);
} else if (slowConsumerReaperRunnable.checkPeriod != addressSettings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != addressSettings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(addressSettings.getSlowConsumerPolicy())) {
if (slowConsumerReaperFuture != null) {
slowConsumerReaperFuture.cancel(false);
slowConsumerReaperFuture = null;
}
scheduleSlowConsumerReaper(settings);
scheduleSlowConsumerReaper(addressSettings);
}
}
}
@ -4489,21 +4445,34 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private class AddressSettingsRepositoryListener implements HierarchicalRepositoryChangeListener {
HierarchicalRepository<AddressSettings> addressSettingsRepository;
AddressSettingsRepositoryListener(HierarchicalRepository addressSettingsRepository) {
this.addressSettingsRepository = addressSettingsRepository;
}
@Override
public void onChange() {
AddressSettings settings = addressSettingsRepository.getMatch(((ActiveMQServerImpl)server).getRuntimeTempQueueNamespace(temporary) + address.toString());
configureExpiry(settings);
checkDeadLetterAddressAndExpiryAddress(settings);
configureSlowConsumerReaper(settings);
addressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch());
checkDeadLetterAddressAndExpiryAddress();
configureSlowConsumerReaper();
}
public void close() {
addressSettingsRepository.unRegisterListener(this);
}
}
private void checkDeadLetterAddressAndExpiryAddress(final AddressSettings settings) {
private String getAddressSettingsMatch() {
return ((ActiveMQServerImpl)server).getRuntimeTempQueueNamespace(temporary) + address.toString();
}
private void checkDeadLetterAddressAndExpiryAddress() {
if (!Env.isTestEnv() && !internalQueue && !address.equals(server.getConfiguration().getManagementNotificationAddress())) {
if (settings.getDeadLetterAddress() == null) {
if (addressSettings.getDeadLetterAddress() == null) {
ActiveMQServerLogger.LOGGER.AddressSettingsNoDLA(name);
}
if (settings.getExpiryAddress() == null) {
if (addressSettings.getExpiryAddress() == null) {
ActiveMQServerLogger.LOGGER.AddressSettingsNoExpiryAddress(name);
}
}

View File

@ -1533,6 +1533,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return null;
}
@Override
public SimpleString getDeadLetterAddress() {
return null;
}
@Override
public void pause() {

View File

@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.integration.server;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.SingleServerTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
@ -28,14 +30,21 @@ public class TempQueueNamespaceTest extends SingleServerTestBase {
@Test
public void testTempQueueNamespace() throws Exception {
final String TEMP_QUEUE_NAMESPACE = "temp";
final SimpleString DLA = RandomUtil.randomSimpleString();
final SimpleString EA = RandomUtil.randomSimpleString();
final int RING_SIZE = 10;
server.getConfiguration().setTemporaryQueueNamespace(TEMP_QUEUE_NAMESPACE);
server.getAddressSettingsRepository().addMatch(TEMP_QUEUE_NAMESPACE + ".#", new AddressSettings().setDefaultRingSize(10));
server.getAddressSettingsRepository().addMatch(TEMP_QUEUE_NAMESPACE + ".#", new AddressSettings().setDefaultRingSize(RING_SIZE).setDeadLetterAddress(DLA).setExpiryAddress(EA));
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString address = RandomUtil.randomSimpleString();
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false).setTemporary(true));
assertEquals(10, (long) server.locateQueue(queue).getQueueConfiguration().getRingSize());
QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + queue);
assertEquals(RING_SIZE, queueControl.getRingSize());
assertEquals(DLA.toString(), queueControl.getDeadLetterAddress());
assertEquals(EA.toString(), queueControl.getExpiryAddress());
session.close();
}

View File

@ -841,6 +841,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return null;
}
@Override
public SimpleString getDeadLetterAddress() {
return null;
}
@Override
public void route(final Message message, final RoutingContext context) throws Exception {
// no-op