ARTEMIS-3502 Auto delete & auto create leading to inconsistencies

This commit is contained in:
Clebert Suconic 2021-09-27 17:32:49 -04:00 committed by clebertsuconic
parent d6237cb4d8
commit 481b73c8ca
43 changed files with 778 additions and 181 deletions

View File

@ -155,6 +155,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<build>

View File

@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Pair;
@ -92,6 +93,7 @@ import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
import org.apache.commons.configuration2.builder.fluent.Configurations;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -122,6 +124,19 @@ public class ArtemisTest extends CliTestBase {
super.setup();
}
long timeBefore;
@Before
public void setupScanTimeout() throws Exception {
timeBefore = ActiveMQDefaultConfiguration.getDefaultAddressQueueScanPeriod();
org.apache.activemq.artemis.api.config.ActiveMQDefaultConfigurationTestAccessor.setDefaultAddressQueueScanPeriod(100);
}
@After
public void resetScanTimeout() throws Exception {
org.apache.activemq.artemis.api.config.ActiveMQDefaultConfigurationTestAccessor.setDefaultAddressQueueScanPeriod(timeBefore);
}
@Test
public void invalidCliDoesntThrowException() {
testCli("--silent", "create");

View File

@ -882,6 +882,11 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_ADDRESS_QUEUE_SCAN_PERIOD;
}
// FOR TESTING
static void setDefaultAddressQueueScanPeriod(long scanPeriod) {
DEFAULT_ADDRESS_QUEUE_SCAN_PERIOD = scanPeriod;
}
/**
* the size of the cache for pre-creating message ID's
*/

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.config;
public class ActiveMQDefaultConfigurationTestAccessor {
public static void setDefaultAddressQueueScanPeriod(long scanPeriod) {
ActiveMQDefaultConfiguration.setDefaultAddressQueueScanPeriod(scanPeriod);
}
}

View File

@ -52,8 +52,6 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
@ -69,7 +67,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
@ -1072,23 +1069,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
//this is ok, ActiveMQ 5 allows this and will actually do it quite often
ActiveMQServerLogger.LOGGER.debug("queue never existed");
}
} else {
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(new SimpleString(dest.getPhysicalName()));
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
Queue b = (Queue) binding.getBindable();
if (b.getConsumerCount() > 0) {
throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
}
if (b.isDurable()) {
throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
}
b.deleteQueue();
}
}
}
if (!AdvisorySupport.isAdvisoryTopic(dest)) {

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
/**
@ -82,4 +83,8 @@ public interface AddressManager {
void scanAddresses(MirrorController mirrorController) throws Exception;
boolean checkAutoRemoveAddress(SimpleString address,
AddressInfo addressInfo,
AddressSettings settings) throws Exception;
}

View File

@ -24,4 +24,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
public interface BindingsFactory {
Bindings createBindings(SimpleString address) throws Exception;
default boolean isAddressBound(SimpleString address) throws Exception {
return false;
}
}

View File

@ -220,4 +220,8 @@ public interface PostOffice extends ActiveMQComponent {
default void scanAddresses(MirrorController mirrorController) throws Exception {
}
default AddressManager getAddressManager() {
return null;
}
}

View File

@ -1843,47 +1843,75 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public void run() {
getLocalQueues().forEach(queue -> {
if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue) && queueWasUsed(queue)) {
reapAddresses();
}
}
private static boolean queueWasUsed(Queue queue) {
return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged() > 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() != -1;
}
/** To be used by the AddressQueueReaper.
* It is also exposed for tests through PostOfficeTestAccessor */
void reapAddresses() {
getLocalQueues().forEach(queue -> {
if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue) && queueWasUsed(queue)) {
if (queue.isSwept()) {
if (logger.isDebugEnabled()) {
logger.debug("Removing queue " + queue.getName() + " after it being swept twice on reaping process");
}
QueueManagerImpl.performAutoDeleteQueue(server, queue);
} else {
queue.setSwept(true);
}
});
} else {
queue.setSwept(false);
}
});
Set<SimpleString> addresses = addressManager.getAddresses();
Set<SimpleString> addresses = addressManager.getAddresses();
for (SimpleString address : addresses) {
AddressInfo addressInfo = getAddressInfo(address);
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
for (SimpleString address : addresses) {
AddressInfo addressInfo = getAddressInfo(address);
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
try {
if (settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address) && addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay())) {
try {
if (addressManager.checkAutoRemoveAddress(address, addressInfo, settings)) {
if (addressInfo.isSwept()) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("deleting auto-created address \"" + address + ".\"");
}
server.removeAddressInfo(address, null);
}
} catch (ActiveMQShutdownException e) {
// the address and queue reaper is asynchronous so it may happen
// that the broker is shutting down while the reaper iterates
// through the addresses, next restart this operation will be retried
logger.debug(e.getMessage(), e);
} catch (Exception e) {
if (e instanceof ActiveMQAddressDoesNotExistException && getAddressInfo(address) == null) {
// the address and queue reaper is asynchronous so it may happen
// that the address is removed before the reaper removes it
logger.debug(e.getMessage(), e);
server.autoRemoveAddressInfo(address, null);
} else {
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, address);
if (logger.isDebugEnabled()) {
logger.debug("Sweeping address " + address);
}
addressInfo.setSwept(true);
}
} else {
if (addressInfo != null) {
addressInfo.setSwept(false);
}
}
} catch (ActiveMQShutdownException e) {
// the address and queue reaper is asynchronous so it may happen
// that the broker is shutting down while the reaper iterates
// through the addresses, next restart this operation will be retried
logger.debug(e.getMessage(), e);
} catch (Exception e) {
if (e instanceof ActiveMQAddressDoesNotExistException && getAddressInfo(address) == null) {
// the address and queue reaper is asynchronous so it may happen
// that the address is removed before the reaper removes it
logger.debug(e.getMessage(), e);
} else {
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedDestination(e, address, "address");
}
}
}
}
private boolean queueWasUsed(Queue queue) {
return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged() > 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() != -1;
}
public boolean checkAutoRemoveAddress(SimpleString address,
AddressInfo addressInfo,
AddressSettings settings) throws Exception {
return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address) && addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay());
}
private Stream<Queue> getLocalQueues() {
@ -1965,7 +1993,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return bindings;
}
// For tests only
@Override
public AddressManager getAddressManager() {
return addressManager;
}

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.jboss.logging.Logger;
@ -364,6 +365,13 @@ public class SimpleAddressManager implements AddressManager {
}
}
@Override
public boolean checkAutoRemoveAddress(SimpleString address,
AddressInfo addressInfo,
AddressSettings settings) throws Exception {
return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(address) && addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay());
}
@Override
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
return addressInfoMap.remove(CompositeAddress.extractAddressName(address));

View File

@ -923,6 +923,7 @@ public interface ActiveMQServer extends ServiceComponent {
* @throws Exception
*/
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
/**
* Remove an {@code AddressInfo} from the broker.
*
@ -932,6 +933,15 @@ public interface ActiveMQServer extends ServiceComponent {
*/
void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
/**
* Remove an {@code AddressInfo} from the broker.
*
* @param address the {@code AddressInfo} to remove
* @param auth authorization information; {@code null} is valid
* @throws Exception
*/
void autoRemoveAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
/**
* Remove an {@code AddressInfo} from the broker.
*

View File

@ -2006,8 +2006,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void incompatibleWithHAPolicyChosen(String parameter);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224065, value = "Failed to remove auto-created queue {0}", format = Message.Format.MESSAGE_FORMAT)
void errorRemovingAutoCreatedQueue(@Cause Exception e, SimpleString bindingName);
@Message(id = 224065, value = "Failed to remove auto-created {1} {0}", format = Message.Format.MESSAGE_FORMAT)
void errorRemovingAutoCreatedDestination(@Cause Exception e, SimpleString bindingName, String destinationType);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224066, value = "Error opening context for LDAP", format = Message.Format.MESSAGE_FORMAT)
@ -2190,4 +2190,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224111, value = "Both 'whitelist' and 'allowlist' detected. Configuration 'whitelist' is deprecated, please use only the 'allowlist' configuration", format = Message.Format.MESSAGE_FORMAT)
void useOnlyAllowList();
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224112, value = "Auto removing Queue {0} with queueID={1} on address={2}", format = Message.Format.MESSAGE_FORMAT)
void autoRemoveQueue(String name, long queueID, String address);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224113, value = "Auto removing Address {0}", format = Message.Format.MESSAGE_FORMAT)
void autoRemoveAddress(String name);
}

View File

@ -88,6 +88,13 @@ public interface Queue extends Bindable,CriticalComponent {
boolean isAutoDelete();
default boolean isSwept() {
return false;
}
default void setSwept(boolean sweep) {
}
long getAutoDeleteDelay();
long getAutoDeleteMessageCount();

View File

@ -2396,13 +2396,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (hasBrokerQueuePlugins()) {
callBrokerQueuePlugins(plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount, removeConsumers, autoDeleteAddress));
}
AddressInfo addressInfo = getAddressInfo(address);
if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address.toString()) && addressSettingsRepository.getMatch(address.toString()).getAutoDeleteAddressesDelay() == 0) {
try {
removeAddressInfo(address, session);
} catch (ActiveMQDeleteAddressException e) {
// Could be thrown if the address has bindings or is not deletable.
if (queue.isTemporary()) {
AddressInfo addressInfo = getAddressInfo(address);
if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address.toString()) && addressSettingsRepository.getMatch(address.toString()).getAutoDeleteAddressesDelay() == 0) {
try {
removeAddressInfo(address, session);
} catch (ActiveMQDeleteAddressException e) {
// Could be thrown if the address has bindings or is not deletable.
}
}
}
@ -3658,26 +3661,41 @@ public class ActiveMQServerImpl implements ActiveMQServer {
removeAddressInfo(address, auth, false);
}
@Override
public void autoRemoveAddressInfo(SimpleString address, SecurityAuth auth) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("deleting auto-created address \"" + address + ".\"");
}
ActiveMQServerLogger.LOGGER.autoRemoveAddress("" + address);
removeAddressInfo(address, auth);
}
@Override
public void removeAddressInfo(final SimpleString address, final SecurityAuth auth, boolean force) throws Exception {
if (auth != null) {
securityStore.check(address, CheckType.DELETE_ADDRESS, auth);
}
AddressInfo addressInfo = getAddressInfo(address);
if (postOffice.removeAddressInfo(address, force) == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
}
try {
AddressInfo addressInfo = getAddressInfo(address);
if (postOffice.removeAddressInfo(address, force) == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
}
if (addressInfo.getRepositoryChangeListener() != null) {
addressSettingsRepository.unRegisterListener(addressInfo.getRepositoryChangeListener());
addressInfo.setRepositoryChangeListener(null);
}
if (addressInfo.getRepositoryChangeListener() != null) {
addressSettingsRepository.unRegisterListener(addressInfo.getRepositoryChangeListener());
addressInfo.setRepositoryChangeListener(null);
}
long txID = storageManager.generateID();
storageManager.deleteAddressBinding(txID, addressInfo.getId());
storageManager.commitBindings(txID);
pagingManager.deletePageStore(address);
long txID = storageManager.generateID();
storageManager.deleteAddressBinding(txID, addressInfo.getId());
storageManager.commitBindings(txID);
pagingManager.deletePageStore(address);
} finally {
clearAddressCache();
}
}
@Override
@ -3753,102 +3771,105 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public Queue createQueue(final QueueConfiguration queueConfiguration, boolean ignoreIfExists) throws Exception {
if (queueConfiguration.getName() == null || queueConfiguration.getName().length() == 0) {
throw ActiveMQMessageBundle.BUNDLE.invalidQueueName(queueConfiguration.getName());
final PostOffice postOfficeInUse = postOffice;
if (postOfficeInUse == null) {
return null;
}
final Binding rawBinding = postOffice.getBinding(queueConfiguration.getName());
if (rawBinding != null) {
if (rawBinding.getType() != BindingType.LOCAL_QUEUE) {
throw ActiveMQMessageBundle.BUNDLE.bindingAlreadyExists(queueConfiguration.getName().toString(), rawBinding.toManagementString());
synchronized (postOfficeInUse) {
if (queueConfiguration.getName() == null || queueConfiguration.getName().length() == 0) {
throw ActiveMQMessageBundle.BUNDLE.invalidQueueName(queueConfiguration.getName());
}
final QueueBinding queueBinding = (QueueBinding) rawBinding;
if (ignoreIfExists) {
return queueBinding.getQueue();
} else {
throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueConfiguration.getName(), queueBinding.getAddress());
final Binding rawBinding = postOfficeInUse.getBinding(queueConfiguration.getName());
if (rawBinding != null) {
if (rawBinding.getType() != BindingType.LOCAL_QUEUE) {
throw ActiveMQMessageBundle.BUNDLE.bindingAlreadyExists(queueConfiguration.getName().toString(), rawBinding.toManagementString());
}
final QueueBinding queueBinding = (QueueBinding) rawBinding;
if (ignoreIfExists) {
return queueBinding.getQueue();
} else {
throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueConfiguration.getName(), queueBinding.getAddress());
}
}
}
QueueConfigurationUtils.applyDynamicQueueDefaults(queueConfiguration, addressSettingsRepository.getMatch(getRuntimeTempQueueNamespace(queueConfiguration.isTemporary()) + queueConfiguration.getAddress().toString()));
QueueConfigurationUtils.applyDynamicQueueDefaults(queueConfiguration, addressSettingsRepository.getMatch(getRuntimeTempQueueNamespace(queueConfiguration.isTemporary()) + queueConfiguration.getAddress().toString()));
AddressInfo info = postOffice.getAddressInfo(queueConfiguration.getAddress());
if (queueConfiguration.isAutoCreateAddress() || queueConfiguration.isTemporary()) {
if (info == null) {
addAddressInfo(new AddressInfo(queueConfiguration.getAddress(), queueConfiguration.getRoutingType())
.setAutoCreated(true)
.setTemporary(queueConfiguration.isTemporary())
.setInternal(queueConfiguration.isInternal()));
AddressInfo info = postOfficeInUse.getAddressInfo(queueConfiguration.getAddress());
if (queueConfiguration.isAutoCreateAddress() || queueConfiguration.isTemporary()) {
if (info == null) {
addAddressInfo(new AddressInfo(queueConfiguration.getAddress(), queueConfiguration.getRoutingType()).setAutoCreated(true).setTemporary(queueConfiguration.isTemporary()).setInternal(queueConfiguration.isInternal()));
} else if (!info.getRoutingTypes().contains(queueConfiguration.getRoutingType())) {
EnumSet<RoutingType> routingTypes = EnumSet.copyOf(info.getRoutingTypes());
routingTypes.add(queueConfiguration.getRoutingType());
updateAddressInfo(info.getName(), routingTypes);
}
} else if (info == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(queueConfiguration.getAddress());
} else if (!info.getRoutingTypes().contains(queueConfiguration.getRoutingType())) {
EnumSet<RoutingType> routingTypes = EnumSet.copyOf(info.getRoutingTypes());
routingTypes.add(queueConfiguration.getRoutingType());
updateAddressInfo(info.getName(), routingTypes);
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(queueConfiguration.getRoutingType(), info.getName().toString(), info.getRoutingTypes());
}
} else if (info == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(queueConfiguration.getAddress());
} else if (!info.getRoutingTypes().contains(queueConfiguration.getRoutingType())) {
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(queueConfiguration.getRoutingType(), info.getName().toString(), info.getRoutingTypes());
}
if (hasBrokerQueuePlugins()) {
callBrokerQueuePlugins(plugin -> plugin.beforeCreateQueue(queueConfiguration));
}
if (hasBrokerQueuePlugins()) {
callBrokerQueuePlugins(plugin -> plugin.beforeCreateQueue(queueConfiguration));
}
if (mirrorControllerService != null) {
mirrorControllerService.createQueue(queueConfiguration);
}
if (mirrorControllerService != null) {
mirrorControllerService.createQueue(queueConfiguration);
}
queueConfiguration.setId(storageManager.generateID());
queueConfiguration.setId(storageManager.generateID());
// preemptive check to ensure the filterString is good
FilterImpl.createFilter(queueConfiguration.getFilterString());
// preemptive check to ensure the filterString is good
FilterImpl.createFilter(queueConfiguration.getFilterString());
final Queue queue = queueFactory.createQueueWith(queueConfiguration, pagingManager);
final Queue queue = queueFactory.createQueueWith(queueConfiguration, pagingManager);
final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
long txID = 0;
if (queue.isDurable()) {
txID = storageManager.generateID();
storageManager.addQueueBinding(txID, localQueueBinding);
}
try {
postOffice.addBinding(localQueueBinding);
long txID = 0;
if (queue.isDurable()) {
storageManager.commitBindings(txID);
txID = storageManager.generateID();
storageManager.addQueueBinding(txID, localQueueBinding);
}
} catch (Exception e) {
try {
if (queueConfiguration.isDurable()) {
storageManager.rollbackBindings(txID);
postOfficeInUse.addBinding(localQueueBinding);
if (queue.isDurable()) {
storageManager.commitBindings(txID);
}
} catch (Exception e) {
try {
queue.close();
} finally {
if (queue.getPageSubscription() != null) {
queue.getPageSubscription().destroy();
if (queueConfiguration.isDurable()) {
storageManager.rollbackBindings(txID);
}
try {
queue.close();
} finally {
if (queue.getPageSubscription() != null) {
queue.getPageSubscription().destroy();
}
}
} catch (Throwable ignored) {
logger.debug(ignored.getMessage(), ignored);
}
} catch (Throwable ignored) {
logger.debug(ignored.getMessage(), ignored);
throw e;
}
throw e;
if (!queueConfiguration.isInternal()) {
managementService.registerQueue(queue, queue.getAddress(), storageManager);
}
copyRetroactiveMessages(queue);
if (hasBrokerQueuePlugins()) {
callBrokerQueuePlugins(plugin -> plugin.afterCreateQueue(queue));
}
callPostQueueCreationCallbacks(queue.getName());
return queue;
}
if (!queueConfiguration.isInternal()) {
managementService.registerQueue(queue, queue.getAddress(), storageManager);
}
copyRetroactiveMessages(queue);
if (hasBrokerQueuePlugins()) {
callBrokerQueuePlugins(plugin -> plugin.afterCreateQueue(queue));
}
callPostQueueCreationCallbacks(queue.getName());
return queue;
}
public String getRuntimeTempQueueNamespace(boolean temporary) {

View File

@ -46,6 +46,7 @@ public class AddressInfo {
private long id;
private long pauseStatusRecord = -1;
private boolean swept;
private SimpleString name;
@ -75,6 +76,14 @@ public class AddressInfo {
private StorageManager storageManager;
private HierarchicalRepositoryChangeListener repositoryChangeListener;
public boolean isSwept() {
return swept;
}
public void setSwept(boolean swept) {
this.swept = swept;
}
/**
* Private constructor used on JSON decoding.
*/

View File

@ -333,6 +333,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final boolean autoDelete;
private volatile boolean swept;
private final long autoDeleteDelay;
private final long autoDeleteMessageCount;
@ -353,6 +355,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
public boolean isSwept() {
return swept;
}
@Override
public void setSwept(boolean swept) {
this.swept = swept;
}
/**
* This is to avoid multi-thread races on calculating direct delivery,
@ -1408,6 +1419,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.debug(this + " adding consumer " + consumer);
}
this.setSwept(false);
try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) {
synchronized (this) {
if (maxConsumers != MAX_CONSUMERS_UNLIMITED && getConsumerCount() >= maxConsumers) {

View File

@ -23,9 +23,12 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueManager;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
import org.jboss.logging.Logger;
public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManager {
private static final Logger logger = Logger.getLogger(QueueManagerImpl.class);
private final SimpleString queueName;
private final ActiveMQServer server;
@ -34,15 +37,13 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
Queue queue = server.locateQueue(queueName);
//the queue may already have been deleted and this is a result of that
if (queue == null) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("no queue to delete \"" + queueName + "\".");
if (logger.isDebugEnabled()) {
logger.debug("no queue to delete \"" + queueName + "\".");
}
return;
}
if (isAutoDelete(queue) && consumerCountCheck(queue) && delayCheck(queue) && messageCountCheck(queue)) {
performAutoDeleteQueue(server, queue);
} else if (queue.isPurgeOnNoConsumers()) {
if (queue.isPurgeOnNoConsumers()) {
purge(queue);
}
}
@ -51,8 +52,8 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
long consumerCount = queue.getConsumerCount();
long messageCount = queue.getMessageCount();
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queue.getName() + "\": consumerCount = " + consumerCount + "; messageCount = " + messageCount);
if (logger.isDebugEnabled()) {
logger.debug("purging queue \"" + queue.getName() + "\": consumerCount = " + consumerCount + "; messageCount = " + messageCount);
}
try {
queue.deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
@ -64,14 +65,16 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
public static void performAutoDeleteQueue(ActiveMQServer server, Queue queue) {
SimpleString queueName = queue.getName();
AddressSettings settings = server.getAddressSettingsRepository().getMatch(queue.getAddress().toString());
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + "\": consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount() + "; isAutoDelete = " + queue.isAutoDelete());
if (logger.isDebugEnabled()) {
logger.debug("deleting auto-created queue \"" + queueName + "\": consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount() + "; isAutoDelete = " + queue.isAutoDelete());
}
ActiveMQServerLogger.LOGGER.autoRemoveQueue("" + queue.getName(), queue.getID(), "" + queue.getAddress());
try {
server.destroyQueue(queueName, null, true, false, settings.isAutoDeleteAddresses(), true);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedDestination(e, queueName, "queue");
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
public class PostOfficeTestAccessor {
public static void reapAddresses(PostOfficeImpl postOffice) {
postOffice.reapAddresses();
}
}

View File

@ -1448,7 +1448,7 @@ public abstract class ActiveMQTestBase extends Assert {
}
}
protected final ActiveMQServer createServer(final boolean realFiles,
protected ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize) {

View File

@ -30,11 +30,13 @@ import org.apache.activemq.artemis.cli.commands.queue.CreateQueue;
import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue;
import org.apache.activemq.artemis.cli.commands.queue.PurgeQueue;
import org.apache.activemq.artemis.cli.commands.queue.UpdateQueue;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;
@ -44,6 +46,12 @@ public class QueueCommandTest extends JMSTestBase {
private ByteArrayOutputStream output;
private ByteArrayOutputStream error;
@Override
protected void extraServerConfig(ActiveMQServer server) {
super.extraServerConfig(server);
server.getConfiguration().setAddressQueueScanPeriod(100);
}
@Before
@Override
public void setUp() throws Exception {
@ -236,7 +244,7 @@ public class QueueCommandTest extends JMSTestBase {
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(command);
assertNull(server.getAddressInfo(queueName));
Wait.assertTrue(() -> server.getAddressInfo(queueName) == null, 2000, 10);
}
@Test

View File

@ -294,7 +294,7 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
connection.close();
assertNull(server.getManagementService().getResource(ResourceNames.ADDRESS + topicName));
Wait.assertTrue(() -> server.getManagementService().getResource(ResourceNames.ADDRESS + topicName) == null);
}
@Test

View File

@ -0,0 +1,332 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class AutoCreateTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(AutoCreateTest.class);
public final SimpleString addressA = new SimpleString("addressA");
public final SimpleString queueA = new SimpleString("queueA");
private ActiveMQServer server;
@After
public void clearLogg() {
AssertionLoggerHandler.stopCapture();
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(true, true);
AddressSettings settings = new AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true);
server.getConfiguration().getAddressesSettings().clear();
server.getConfiguration().getAddressesSettings().put("#", settings);
}
@Test
public void testAutoCreateDeleteRecreate() throws Exception {
// This test is about validating the behaviour of queues recreates with the default configuration
Assert.assertEquals("Supposed to use default configuration on this test", ActiveMQDefaultConfiguration.getDefaultAddressQueueScanPeriod(), server.getConfiguration().getAddressQueueScanPeriod());
server.start();
int THREADS = 40;
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
try {
String QUEUE_NAME = getName();
AtomicInteger errors = new AtomicInteger(0);
for (int i = 0; i < 50; i++) {
ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
logger.debug("*******************************************************************************************************************************");
logger.debug("run " + i);
CyclicBarrier barrier = new CyclicBarrier(THREADS + 1);
CountDownLatch done = new CountDownLatch(THREADS);
Runnable consumerThread = () -> {
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
barrier.await(10, TimeUnit.SECONDS);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
done.countDown();
}
};
for (int t = 0; t < THREADS; t++) {
executor.execute(consumerThread);
}
barrier.await(10, TimeUnit.SECONDS);
Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
Assert.assertEquals(0, errors.get());
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello"));
Assert.assertNotNull(consumer.receive(5000));
}
}
} finally {
executor.shutdownNow();
}
}
@Test
public void testSweep() throws Exception {
AssertionLoggerHandler.startCapture();
server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually
server.start();
String QUEUE_NAME = "autoCreateAndRecreate";
ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
}
try (Connection connection = cf.createConnection()) {
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(QUEUE_NAME);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertTrue(serverQueue.isSwept());
MessageConsumer consumer = session.createConsumer(queue);
// no need to wait reaping to wait reaping to set it false. A simple add consumer on the queue should clear this
Wait.assertFalse(serverQueue::isSwept);
connection.start();
}
AddressInfo info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
Assert.assertNotNull(info);
Assert.assertTrue(info.isAutoCreated());
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224112"));
Assert.assertTrue("Queue name should be mentioned on logs", AssertionLoggerHandler.findText(QUEUE_NAME));
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113")); // we need another sweep to remove it
}
@Test
public void testSweepAddress() throws Exception {
AssertionLoggerHandler.startCapture();
server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually
AddressSettings settings = new AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
server.getConfiguration().getAddressesSettings().clear();
server.getConfiguration().getAddressesSettings().put("#", settings);
server.start();
String ADDRESS_NAME = getName();
AddressInfo info = new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true);
server.getPostOffice().addAddressInfo(info);
info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME));
ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(ADDRESS_NAME);
session.createConsumer(topic);
}
{ // just a namespace area
final AddressInfo infoRef = info;
Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
}
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
Thread.sleep(50);
Assert.assertFalse(info.isSwept());
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
Assert.assertTrue(info.isSwept());
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113"));
}
@Test
public void testNegativeSweepAddress() throws Exception {
AssertionLoggerHandler.startCapture();
server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually
AddressSettings settings = new AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
server.getConfiguration().getAddressesSettings().clear();
server.getConfiguration().getAddressesSettings().put("#", settings);
server.start();
String ADDRESS_NAME = getName();
AddressInfo info = new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true);
server.getPostOffice().addAddressInfo(info);
info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME));
ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(ADDRESS_NAME);
session.createConsumer(topic);
}
{ // just a namespace area
final AddressInfo infoRef = info;
Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
}
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
Thread.sleep(50);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
Assert.assertTrue(info.isSwept());
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(ADDRESS_NAME);
session.createConsumer(topic);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
Assert.assertFalse(info.isSwept()); // it should be cleared because there is a consumer now
}
}
@Test
public void testNegativeSweepBecauseOfConsumer() throws Exception {
AssertionLoggerHandler.startCapture();
server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually
server.start();
String QUEUE_NAME = getName();
ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
}
AddressInfo info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
Assert.assertNotNull(info);
Assert.assertTrue(info.isAutoCreated());
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(QUEUE_NAME);
Assert.assertTrue(serverQueue.isSwept());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
MessageConsumer consumer = session.createConsumer(queue);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(serverQueue.isSwept());
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); // we need another sweep to remove it
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
}
}
@Test
public void testNegativeSweepBecauseOfSend() throws Exception {
AssertionLoggerHandler.startCapture();
server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually
server.start();
String QUEUE_NAME = getName();
ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
}
AddressInfo info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
Assert.assertNotNull(info);
Assert.assertTrue(info.isAutoCreated());
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(QUEUE_NAME);
Assert.assertTrue(serverQueue.isSwept());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello"));
Wait.assertEquals(1, serverQueue::getMessageCount);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(serverQueue.isSwept());
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); // we need another sweep to remove it
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
}
}
}

View File

@ -43,6 +43,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
super.setUp();
locator = createInVMNonHALocator();
server = createServer(false);
server.getConfiguration().setAddressQueueScanPeriod(10);
server.start();
cf = createSessionFactory(locator);

View File

@ -26,6 +26,7 @@ import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
@ -35,6 +36,12 @@ import org.junit.Test;
public class AutoDeleteJmsDestinationTest extends JMSTestBase {
@Override
protected void extraServerConfig(ActiveMQServer server) {
super.extraServerConfig(server);
server.getConfiguration().setAddressQueueScanPeriod(100);
}
@Test
public void testAutoDeleteQueue() throws Exception {
Connection connection = cf.createConnection();

View File

@ -111,6 +111,8 @@ public class ConsumerTest extends ActiveMQTestBase {
server = createServer(durable, isNetty());
server.getConfiguration().setAddressQueueScanPeriod(10);
server.start();
locator = createFactory(isNetty());

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@ -50,24 +51,23 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.SingleServerTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.jboss.logging.Logger;
import org.junit.Test;
public class TemporaryQueueTest extends SingleServerTestBase {
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(TemporaryQueueTest.class);
private static final long CONNECTION_TTL = 2000;
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = super.createServer();
server.getConfiguration().setAddressQueueScanPeriod(100);
return server;
}
@Test
public void testConsumeFromTemporaryQueue() throws Exception {
@ -108,7 +108,7 @@ public class TemporaryQueueTest extends SingleServerTestBase {
sf.close();
assertTrue(server.getAddressSettingsRepository().getCacheSize() < 10);
Wait.assertTrue("server.getAddressSettingsRepository().getCacheSize() = " + server.getAddressSettingsRepository().getCacheSize(), () -> server.getAddressSettingsRepository().getCacheSize() < 10);
}
@Test
@ -129,17 +129,14 @@ public class TemporaryQueueTest extends SingleServerTestBase {
assertNotNull(message);
message.acknowledge();
SimpleString[] storeNames = server.getPagingManager().getStoreNames();
assertTrue(Arrays.asList(storeNames).contains(address));
Wait.assertTrue(() -> Arrays.asList(server.getPagingManager().getStoreNames()).contains(address));
consumer.close();
session.deleteQueue(queue);
session.close();
storeNames = server.getPagingManager().getStoreNames();
assertFalse(Arrays.asList(storeNames).contains(address));
Wait.assertFalse(() -> Arrays.asList(server.getPagingManager().getStoreNames()).contains(address));
}
@Test

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.SingleServerTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
@ -32,6 +33,13 @@ import org.junit.Test;
public class TransientQueueTest extends SingleServerTestBase {
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = super.createServer();
server.getConfiguration().setAddressQueueScanPeriod(100);
return server;
}
@Test
public void testSimpleTransientQueue() throws Exception {
SimpleString queue = RandomUtil.randomSimpleString();
@ -177,6 +185,7 @@ public class TransientQueueTest extends SingleServerTestBase {
session.createConsumer(queue).close();
Wait.assertTrue(() -> server.locateQueue(queue) == null, 2000, 100);
Wait.assertTrue(() -> server.getAddressInfo(queue) == null, 2000, 100);
session.createSharedQueue(new QueueConfiguration(queue).setAddress(address).setFilterString(SimpleString.toSimpleString("q=1")).setDurable(false));

View File

@ -209,8 +209,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
public void testSecurityCacheSizes() throws Exception {
ActiveMQServerControl serverControl = createManagementControl();
Assert.assertEquals(usingCore() ? 1 : 0, serverControl.getAuthenticationCacheSize());
Assert.assertEquals(usingCore() ? 7 : 0, serverControl.getAuthorizationCacheSize());
Wait.assertEquals(usingCore() ? 1 : 0, serverControl::getAuthenticationCacheSize);
Wait.assertEquals(usingCore() ? 7 : 0, serverControl::getAuthorizationCacheSize);
ServerLocator loc = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(loc);
@ -228,7 +228,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
producer.send(m);
Assert.assertEquals(usingCore() ? 2 : 1, serverControl.getAuthenticationCacheSize());
Assert.assertEquals(usingCore() ? 8 : 1, serverControl.getAuthorizationCacheSize());
Wait.assertEquals(usingCore() ? 8 : 1, () -> serverControl.getAuthorizationCacheSize());
}
@Test
@ -827,7 +827,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
Assert.assertTrue(countBeforeCreate < serverControl.getAddressCount());
serverControl.destroyQueue(name.toString(), true, true);
Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
Wait.assertFalse(() -> ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
}
@Test
@ -849,7 +849,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
Assert.assertTrue(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
serverControl.destroyQueue(name.toString(), true, true);
Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
Wait.assertFalse(() -> ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
}
@Test
@ -4308,6 +4308,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), securityConfiguration);
conf.setJournalRetentionDirectory(conf.getJournalDirectory() + "_ret"); // needed for replay tests
server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, securityManager, true));
server.getConfiguration().setAddressQueueScanPeriod(100);
server.start();
HashSet<Role> role = new HashSet<>();

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -91,11 +92,11 @@ public abstract class ManagementTestBase extends ActiveMQTestBase {
}
protected void checkNoResource(final ObjectName on) {
Assert.assertFalse("unexpected resource for " + on, mbeanServer.isRegistered(on));
Wait.assertFalse("unexpected resource for " + on, () -> mbeanServer.isRegistered(on));
}
protected void checkResource(final ObjectName on) {
Assert.assertTrue("no resource for " + on, mbeanServer.isRegistered(on));
Wait.assertTrue("no resource for " + on, () -> mbeanServer.isRegistered(on));
}
protected QueueControl createManagementControl(final String address, final String queue) throws Exception {

View File

@ -153,7 +153,7 @@ public class NotificationTest extends ActiveMQTestBase {
session.deleteQueue(queue);
//There will be 2 notifications, first is for binding removal, second is for address removal
ClientMessage[] notifications = NotificationTest.consumeMessages(2, notifConsumer);
ClientMessage[] notifications = NotificationTest.consumeMessages(2, notifConsumer, 5000);
Assert.assertEquals(BINDING_REMOVED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertEquals(queue.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME).toString());
Assert.assertEquals(address.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
@ -440,6 +440,7 @@ public class NotificationTest extends ActiveMQTestBase {
super.setUp();
server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setMessageExpiryScanPeriod(100), false));
server.getConfiguration().setAddressQueueScanPeriod(100);
NotificationActiveMQServerPlugin notificationPlugin = new NotificationActiveMQServerPlugin();
notificationPlugin.setSendAddressNotifications(true);
notificationPlugin.setSendConnectionNotifications(true);

View File

@ -82,6 +82,14 @@ public class MQTTTest extends MQTTTestSupport {
private static final String AMQP_URI = "tcp://localhost:61616";
@Override
public void configureBroker() throws Exception {
super.configureBroker();
server.getConfiguration().setAddressQueueScanPeriod(100);
}
@Test
public void testConnectWithLargePassword() throws Exception {
for (String version : Arrays.asList("3.1", "3.1.1")) {
@ -2074,6 +2082,6 @@ public class MQTTTest extends MQTTTestSupport {
subscriptionProvider.disconnect();
assertNull(server.getAddressInfo(SimpleString.toSimpleString("foo.bar")));
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString("foo.bar")) == null);
}
}

View File

@ -63,6 +63,7 @@ import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -90,6 +91,12 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
private final String testProp = "BASE_DATE";
private final String propValue = "2017-11-01";
@Override
protected void extraServerConfig(Configuration configuration) {
super.extraServerConfig(configuration);
configuration.setAddressQueueScanPeriod(100);
}
@Override
@Before
public void setUp() throws Exception {

View File

@ -36,6 +36,7 @@ public class ProducerAutoCreateQueueTest extends BasicOpenWireTest {
@Override
protected void extraServerConfig(Configuration serverConfig) {
serverConfig.setAddressQueueScanPeriod(100);
String match = "#";
Map<String, AddressSettings> asMap = serverConfig.getAddressesSettings();
asMap.get(match).setAutoCreateAddresses(true).setAutoCreateQueues(true);

View File

@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
@ -105,6 +106,7 @@ public class GlobalPagingTest extends PagingTest {
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.getConfiguration().setGlobalMaxSize(-1);
server.getConfiguration().setAddressQueueScanPeriod(100);
server.start();
@ -143,12 +145,14 @@ public class GlobalPagingTest extends PagingTest {
serverImpl.getMonitor().tick();
AtomicInteger errors = new AtomicInteger(0);
Thread t = new Thread() {
@Override
public void run() {
try {
sendFewMessages(numberOfMessages, session, producer, body);
} catch (Exception e) {
errors.incrementAndGet();
e.printStackTrace();
}
}
@ -159,9 +163,12 @@ public class GlobalPagingTest extends PagingTest {
t.join(1000);
Assert.assertTrue(t.isAlive());
Assert.assertEquals(0, errors.get());
// releasing the disk
serverImpl.getMonitor().setMaxUsage(1).tick();
t.join(5000);
Assert.assertEquals(0, errors.get());
Assert.assertFalse(t.isAlive());
session.start();

View File

@ -6768,10 +6768,10 @@ public class PagingTest extends ActiveMQTestBase {
locator.close();
locator = null;
sf = null;
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
Wait.assertFalse(() -> Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
// Ensure pagingStore is physically deleted
server.getPagingManager().reloadStores();
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
Wait.assertFalse(() -> Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
server.stop();
server.start();
@ -7068,6 +7068,16 @@ public class PagingTest extends ActiveMQTestBase {
session.close();
}
@Override
protected final ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize) {
ActiveMQServer server = super.createServer(realFiles, configuration, pageSize, maxAddressSize);
server.getConfiguration().setAddressQueueScanPeriod(100);
return server;
}
@Override
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(serverID, netty);

View File

@ -68,6 +68,7 @@ public class MqttPluginTest extends MQTTTestSupport {
public void configureBroker() throws Exception {
super.configureBroker();
server.registerBrokerPlugin(verifier);
server.getConfiguration().setAddressQueueScanPeriod(100);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoDeleteQueues(true).setAutoDeleteAddresses(true);

View File

@ -117,6 +117,7 @@ public class StompPluginTest extends StompTestBase {
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = super.createServer();
server.getConfiguration().setAddressQueueScanPeriod(100);
server.registerBrokerPlugin(verifier);
server.registerBrokerPlugin(new ActiveMQServerPlugin() {

View File

@ -52,6 +52,7 @@ public class AutoCreateDeadLetterResourcesTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
server = createServer(false);
server.getConfiguration().setAddressQueueScanPeriod(100);
// set common address settings needed for all tests; make sure to use getMatch instead of addMatch in invidual tests or these will be overwritten
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateDeadLetterResources(true).setDeadLetterAddress(dla).setMaxDeliveryAttempts(1));

View File

@ -52,6 +52,7 @@ public class AutoCreateExpiryResourcesTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
server = createServer(false);
server.getConfiguration().setAddressQueueScanPeriod(100);
// set common address settings needed for all tests; make sure to use getMatch instead of addMatch in invidual tests or these will be overwritten
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(EXPIRY_DELAY));

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@ -81,6 +82,13 @@ public class StompTest extends StompTestBase {
protected StompClientConnection conn;
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = super.createServer();
server.getConfiguration().setAddressQueueScanPeriod(100);
return server;
}
@Override
@Before
public void setUp() throws Exception {

View File

@ -134,6 +134,7 @@ public class JMSTestBase extends ActiveMQTestBase {
setTransactionTimeoutScanPeriod(100);
config.getConnectorConfigurations().put("netty", new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, usePersistence()));
extraServerConfig(server);
jmsServer = new JMSServerManagerImpl(server);
namingContext = new InVMNamingContext();
jmsServer.setRegistry(new JndiBindingRegistry(namingContext));
@ -142,6 +143,9 @@ public class JMSTestBase extends ActiveMQTestBase {
registerConnectionFactory();
}
protected void extraServerConfig(ActiveMQServer server) {
}
@Override
protected Configuration createDefaultConfig(boolean netty) throws Exception {
return super.createDefaultConfig(netty).setJMXManagementEnabled(true);

View File

@ -125,6 +125,11 @@ public class WildcardAddressManagerPerfTest {
class BindingFactoryFake implements BindingsFactory {
@Override
public boolean isAddressBound(SimpleString address) throws Exception {
return false;
}
@Override
public Bindings createBindings(SimpleString address) throws Exception {
return new BindingsImpl(address, null);

View File

@ -319,6 +319,11 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
static class BindingFactoryFake implements BindingsFactory {
@Override
public boolean isAddressBound(SimpleString address) throws Exception {
return false;
}
@Override
public Bindings createBindings(SimpleString address) {
return new BindingsFake(address);