diff --git a/artemis-cli/pom.xml b/artemis-cli/pom.xml index d065060382..0e057825cc 100644 --- a/artemis-cli/pom.xml +++ b/artemis-cli/pom.xml @@ -155,6 +155,13 @@ ${project.version} test + + org.apache.activemq + artemis-core-client + ${project.version} + test + test-jar + diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java index cfd63a8e39..0b444633ac 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.Pair; @@ -92,6 +93,7 @@ import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder; import org.apache.commons.configuration2.builder.fluent.Configurations; import org.jboss.logging.Logger; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -122,6 +124,19 @@ public class ArtemisTest extends CliTestBase { super.setup(); } + long timeBefore; + + @Before + public void setupScanTimeout() throws Exception { + timeBefore = ActiveMQDefaultConfiguration.getDefaultAddressQueueScanPeriod(); + org.apache.activemq.artemis.api.config.ActiveMQDefaultConfigurationTestAccessor.setDefaultAddressQueueScanPeriod(100); + } + + @After + public void resetScanTimeout() throws Exception { + org.apache.activemq.artemis.api.config.ActiveMQDefaultConfigurationTestAccessor.setDefaultAddressQueueScanPeriod(timeBefore); + } + @Test public void invalidCliDoesntThrowException() { testCli("--silent", "create"); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 41351a2ea3..092887ec06 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -882,6 +882,11 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_ADDRESS_QUEUE_SCAN_PERIOD; } + // FOR TESTING + static void setDefaultAddressQueueScanPeriod(long scanPeriod) { + DEFAULT_ADDRESS_QUEUE_SCAN_PERIOD = scanPeriod; + } + /** * the size of the cache for pre-creating message ID's */ diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfigurationTestAccessor.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfigurationTestAccessor.java new file mode 100644 index 0000000000..4a6c909844 --- /dev/null +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfigurationTestAccessor.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.config; + +public class ActiveMQDefaultConfigurationTestAccessor { + + public static void setDefaultAddressQueueScanPeriod(long scanPeriod) { + ActiveMQDefaultConfiguration.setDefaultAddressQueueScanPeriod(scanPeriod); + } + +} diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 1957a7dae8..9e8dee7eb5 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -52,8 +52,6 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.OperationContext; -import org.apache.activemq.artemis.core.postoffice.Binding; -import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; @@ -69,7 +67,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; @@ -1072,23 +1069,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se //this is ok, ActiveMQ 5 allows this and will actually do it quite often ActiveMQServerLogger.LOGGER.debug("queue never existed"); } - - - } else { - Bindings bindings = server.getPostOffice().lookupBindingsForAddress(new SimpleString(dest.getPhysicalName())); - - if (bindings != null) { - for (Binding binding : bindings.getBindings()) { - Queue b = (Queue) binding.getBindable(); - if (b.getConsumerCount() > 0) { - throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName()); - } - if (b.isDurable()) { - throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName()); - } - b.deleteQueue(); - } - } } if (!AdvisorySupport.isAdvisoryTopic(dest)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java index 9630f0ebae..f60b9a129d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; /** @@ -82,4 +83,8 @@ public interface AddressManager { void scanAddresses(MirrorController mirrorController) throws Exception; + boolean checkAutoRemoveAddress(SimpleString address, + AddressInfo addressInfo, + AddressSettings settings) throws Exception; + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/BindingsFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/BindingsFactory.java index 5b4c6c9a67..6d11961e2a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/BindingsFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/BindingsFactory.java @@ -24,4 +24,8 @@ import org.apache.activemq.artemis.api.core.SimpleString; public interface BindingsFactory { Bindings createBindings(SimpleString address) throws Exception; + + default boolean isAddressBound(SimpleString address) throws Exception { + return false; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index a6756e3486..8f82a172ea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -220,4 +220,8 @@ public interface PostOffice extends ActiveMQComponent { default void scanAddresses(MirrorController mirrorController) throws Exception { } + + default AddressManager getAddressManager() { + return null; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 5d5bcd9c31..3e51f23b49 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1843,47 +1843,75 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public void run() { - getLocalQueues().forEach(queue -> { - if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue) && queueWasUsed(queue)) { + reapAddresses(); + } + } + + private static boolean queueWasUsed(Queue queue) { + return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged() > 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() != -1; + } + + /** To be used by the AddressQueueReaper. + * It is also exposed for tests through PostOfficeTestAccessor */ + void reapAddresses() { + getLocalQueues().forEach(queue -> { + if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue) && queueWasUsed(queue)) { + if (queue.isSwept()) { + if (logger.isDebugEnabled()) { + logger.debug("Removing queue " + queue.getName() + " after it being swept twice on reaping process"); + } QueueManagerImpl.performAutoDeleteQueue(server, queue); + } else { + queue.setSwept(true); } - }); + } else { + queue.setSwept(false); + } + }); - Set addresses = addressManager.getAddresses(); + Set addresses = addressManager.getAddresses(); - for (SimpleString address : addresses) { - AddressInfo addressInfo = getAddressInfo(address); - AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); + for (SimpleString address : addresses) { + AddressInfo addressInfo = getAddressInfo(address); + AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); - try { - if (settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address) && addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay())) { + try { + if (addressManager.checkAutoRemoveAddress(address, addressInfo, settings)) { + if (addressInfo.isSwept()) { - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("deleting auto-created address \"" + address + ".\""); - } - - server.removeAddressInfo(address, null); - } - } catch (ActiveMQShutdownException e) { - // the address and queue reaper is asynchronous so it may happen - // that the broker is shutting down while the reaper iterates - // through the addresses, next restart this operation will be retried - logger.debug(e.getMessage(), e); - } catch (Exception e) { - if (e instanceof ActiveMQAddressDoesNotExistException && getAddressInfo(address) == null) { - // the address and queue reaper is asynchronous so it may happen - // that the address is removed before the reaper removes it - logger.debug(e.getMessage(), e); + server.autoRemoveAddressInfo(address, null); } else { - ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, address); + if (logger.isDebugEnabled()) { + logger.debug("Sweeping address " + address); + } + addressInfo.setSwept(true); } + } else { + if (addressInfo != null) { + addressInfo.setSwept(false); + } + } + } catch (ActiveMQShutdownException e) { + // the address and queue reaper is asynchronous so it may happen + // that the broker is shutting down while the reaper iterates + // through the addresses, next restart this operation will be retried + logger.debug(e.getMessage(), e); + } catch (Exception e) { + if (e instanceof ActiveMQAddressDoesNotExistException && getAddressInfo(address) == null) { + // the address and queue reaper is asynchronous so it may happen + // that the address is removed before the reaper removes it + logger.debug(e.getMessage(), e); + } else { + ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedDestination(e, address, "address"); } } } + } - private boolean queueWasUsed(Queue queue) { - return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged() > 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() != -1; - } + public boolean checkAutoRemoveAddress(SimpleString address, + AddressInfo addressInfo, + AddressSettings settings) throws Exception { + return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address) && addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay()); } private Stream getLocalQueues() { @@ -1965,7 +1993,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return bindings; } - // For tests only + @Override public AddressManager getAddressManager() { return addressManager; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index bbeb4eabd4..5a4a94a8bf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.CompositeAddress; import org.jboss.logging.Logger; @@ -364,6 +365,13 @@ public class SimpleAddressManager implements AddressManager { } } + @Override + public boolean checkAutoRemoveAddress(SimpleString address, + AddressInfo addressInfo, + AddressSettings settings) throws Exception { + return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(address) && addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay()); + } + @Override public AddressInfo removeAddressInfo(SimpleString address) throws Exception { return addressInfoMap.remove(CompositeAddress.extractAddressName(address)); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index dc5e53311c..60681f58d1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -923,6 +923,7 @@ public interface ActiveMQServer extends ServiceComponent { * @throws Exception */ AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception; + /** * Remove an {@code AddressInfo} from the broker. * @@ -932,6 +933,15 @@ public interface ActiveMQServer extends ServiceComponent { */ void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception; + /** + * Remove an {@code AddressInfo} from the broker. + * + * @param address the {@code AddressInfo} to remove + * @param auth authorization information; {@code null} is valid + * @throws Exception + */ + void autoRemoveAddressInfo(SimpleString address, SecurityAuth auth) throws Exception; + /** * Remove an {@code AddressInfo} from the broker. * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index a6a7187d80..bac17c05ee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -2006,8 +2006,8 @@ public interface ActiveMQServerLogger extends BasicLogger { void incompatibleWithHAPolicyChosen(String parameter); @LogMessage(level = Logger.Level.ERROR) - @Message(id = 224065, value = "Failed to remove auto-created queue {0}", format = Message.Format.MESSAGE_FORMAT) - void errorRemovingAutoCreatedQueue(@Cause Exception e, SimpleString bindingName); + @Message(id = 224065, value = "Failed to remove auto-created {1} {0}", format = Message.Format.MESSAGE_FORMAT) + void errorRemovingAutoCreatedDestination(@Cause Exception e, SimpleString bindingName, String destinationType); @LogMessage(level = Logger.Level.ERROR) @Message(id = 224066, value = "Error opening context for LDAP", format = Message.Format.MESSAGE_FORMAT) @@ -2190,4 +2190,12 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.WARN) @Message(id = 224111, value = "Both 'whitelist' and 'allowlist' detected. Configuration 'whitelist' is deprecated, please use only the 'allowlist' configuration", format = Message.Format.MESSAGE_FORMAT) void useOnlyAllowList(); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 224112, value = "Auto removing Queue {0} with queueID={1} on address={2}", format = Message.Format.MESSAGE_FORMAT) + void autoRemoveQueue(String name, long queueID, String address); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 224113, value = "Auto removing Address {0}", format = Message.Format.MESSAGE_FORMAT) + void autoRemoveAddress(String name); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 8bf2b31629..dd1a90fd10 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -88,6 +88,13 @@ public interface Queue extends Bindable,CriticalComponent { boolean isAutoDelete(); + default boolean isSwept() { + return false; + } + + default void setSwept(boolean sweep) { + } + long getAutoDeleteDelay(); long getAutoDeleteMessageCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 8039a58a25..b30b77fe8e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2396,13 +2396,16 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (hasBrokerQueuePlugins()) { callBrokerQueuePlugins(plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount, removeConsumers, autoDeleteAddress)); } - AddressInfo addressInfo = getAddressInfo(address); - if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address.toString()) && addressSettingsRepository.getMatch(address.toString()).getAutoDeleteAddressesDelay() == 0) { - try { - removeAddressInfo(address, session); - } catch (ActiveMQDeleteAddressException e) { - // Could be thrown if the address has bindings or is not deletable. + if (queue.isTemporary()) { + AddressInfo addressInfo = getAddressInfo(address); + + if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address.toString()) && addressSettingsRepository.getMatch(address.toString()).getAutoDeleteAddressesDelay() == 0) { + try { + removeAddressInfo(address, session); + } catch (ActiveMQDeleteAddressException e) { + // Could be thrown if the address has bindings or is not deletable. + } } } @@ -3658,26 +3661,41 @@ public class ActiveMQServerImpl implements ActiveMQServer { removeAddressInfo(address, auth, false); } + @Override + public void autoRemoveAddressInfo(SimpleString address, SecurityAuth auth) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("deleting auto-created address \"" + address + ".\""); + } + + ActiveMQServerLogger.LOGGER.autoRemoveAddress("" + address); + + removeAddressInfo(address, auth); + } + @Override public void removeAddressInfo(final SimpleString address, final SecurityAuth auth, boolean force) throws Exception { if (auth != null) { securityStore.check(address, CheckType.DELETE_ADDRESS, auth); } - AddressInfo addressInfo = getAddressInfo(address); - if (postOffice.removeAddressInfo(address, force) == null) { - throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address); - } + try { + AddressInfo addressInfo = getAddressInfo(address); + if (postOffice.removeAddressInfo(address, force) == null) { + throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address); + } - if (addressInfo.getRepositoryChangeListener() != null) { - addressSettingsRepository.unRegisterListener(addressInfo.getRepositoryChangeListener()); - addressInfo.setRepositoryChangeListener(null); - } + if (addressInfo.getRepositoryChangeListener() != null) { + addressSettingsRepository.unRegisterListener(addressInfo.getRepositoryChangeListener()); + addressInfo.setRepositoryChangeListener(null); + } - long txID = storageManager.generateID(); - storageManager.deleteAddressBinding(txID, addressInfo.getId()); - storageManager.commitBindings(txID); - pagingManager.deletePageStore(address); + long txID = storageManager.generateID(); + storageManager.deleteAddressBinding(txID, addressInfo.getId()); + storageManager.commitBindings(txID); + pagingManager.deletePageStore(address); + } finally { + clearAddressCache(); + } } @Override @@ -3753,102 +3771,105 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public Queue createQueue(final QueueConfiguration queueConfiguration, boolean ignoreIfExists) throws Exception { - if (queueConfiguration.getName() == null || queueConfiguration.getName().length() == 0) { - throw ActiveMQMessageBundle.BUNDLE.invalidQueueName(queueConfiguration.getName()); + final PostOffice postOfficeInUse = postOffice; + if (postOfficeInUse == null) { + return null; } - - final Binding rawBinding = postOffice.getBinding(queueConfiguration.getName()); - if (rawBinding != null) { - if (rawBinding.getType() != BindingType.LOCAL_QUEUE) { - throw ActiveMQMessageBundle.BUNDLE.bindingAlreadyExists(queueConfiguration.getName().toString(), rawBinding.toManagementString()); + synchronized (postOfficeInUse) { + if (queueConfiguration.getName() == null || queueConfiguration.getName().length() == 0) { + throw ActiveMQMessageBundle.BUNDLE.invalidQueueName(queueConfiguration.getName()); } - final QueueBinding queueBinding = (QueueBinding) rawBinding; - if (ignoreIfExists) { - return queueBinding.getQueue(); - } else { - throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueConfiguration.getName(), queueBinding.getAddress()); + + final Binding rawBinding = postOfficeInUse.getBinding(queueConfiguration.getName()); + if (rawBinding != null) { + if (rawBinding.getType() != BindingType.LOCAL_QUEUE) { + throw ActiveMQMessageBundle.BUNDLE.bindingAlreadyExists(queueConfiguration.getName().toString(), rawBinding.toManagementString()); + } + final QueueBinding queueBinding = (QueueBinding) rawBinding; + if (ignoreIfExists) { + return queueBinding.getQueue(); + } else { + throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueConfiguration.getName(), queueBinding.getAddress()); + } } - } - QueueConfigurationUtils.applyDynamicQueueDefaults(queueConfiguration, addressSettingsRepository.getMatch(getRuntimeTempQueueNamespace(queueConfiguration.isTemporary()) + queueConfiguration.getAddress().toString())); + QueueConfigurationUtils.applyDynamicQueueDefaults(queueConfiguration, addressSettingsRepository.getMatch(getRuntimeTempQueueNamespace(queueConfiguration.isTemporary()) + queueConfiguration.getAddress().toString())); - AddressInfo info = postOffice.getAddressInfo(queueConfiguration.getAddress()); - if (queueConfiguration.isAutoCreateAddress() || queueConfiguration.isTemporary()) { - if (info == null) { - addAddressInfo(new AddressInfo(queueConfiguration.getAddress(), queueConfiguration.getRoutingType()) - .setAutoCreated(true) - .setTemporary(queueConfiguration.isTemporary()) - .setInternal(queueConfiguration.isInternal())); + AddressInfo info = postOfficeInUse.getAddressInfo(queueConfiguration.getAddress()); + if (queueConfiguration.isAutoCreateAddress() || queueConfiguration.isTemporary()) { + if (info == null) { + addAddressInfo(new AddressInfo(queueConfiguration.getAddress(), queueConfiguration.getRoutingType()).setAutoCreated(true).setTemporary(queueConfiguration.isTemporary()).setInternal(queueConfiguration.isInternal())); + } else if (!info.getRoutingTypes().contains(queueConfiguration.getRoutingType())) { + EnumSet routingTypes = EnumSet.copyOf(info.getRoutingTypes()); + routingTypes.add(queueConfiguration.getRoutingType()); + updateAddressInfo(info.getName(), routingTypes); + } + } else if (info == null) { + throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(queueConfiguration.getAddress()); } else if (!info.getRoutingTypes().contains(queueConfiguration.getRoutingType())) { - EnumSet routingTypes = EnumSet.copyOf(info.getRoutingTypes()); - routingTypes.add(queueConfiguration.getRoutingType()); - updateAddressInfo(info.getName(), routingTypes); + throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(queueConfiguration.getRoutingType(), info.getName().toString(), info.getRoutingTypes()); } - } else if (info == null) { - throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(queueConfiguration.getAddress()); - } else if (!info.getRoutingTypes().contains(queueConfiguration.getRoutingType())) { - throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(queueConfiguration.getRoutingType(), info.getName().toString(), info.getRoutingTypes()); - } - if (hasBrokerQueuePlugins()) { - callBrokerQueuePlugins(plugin -> plugin.beforeCreateQueue(queueConfiguration)); - } + if (hasBrokerQueuePlugins()) { + callBrokerQueuePlugins(plugin -> plugin.beforeCreateQueue(queueConfiguration)); + } - if (mirrorControllerService != null) { - mirrorControllerService.createQueue(queueConfiguration); - } + if (mirrorControllerService != null) { + mirrorControllerService.createQueue(queueConfiguration); + } - queueConfiguration.setId(storageManager.generateID()); + queueConfiguration.setId(storageManager.generateID()); - // preemptive check to ensure the filterString is good - FilterImpl.createFilter(queueConfiguration.getFilterString()); + // preemptive check to ensure the filterString is good + FilterImpl.createFilter(queueConfiguration.getFilterString()); - final Queue queue = queueFactory.createQueueWith(queueConfiguration, pagingManager); + final Queue queue = queueFactory.createQueueWith(queueConfiguration, pagingManager); - final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); + final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); - long txID = 0; - if (queue.isDurable()) { - txID = storageManager.generateID(); - storageManager.addQueueBinding(txID, localQueueBinding); - } - - try { - postOffice.addBinding(localQueueBinding); + long txID = 0; if (queue.isDurable()) { - storageManager.commitBindings(txID); + txID = storageManager.generateID(); + storageManager.addQueueBinding(txID, localQueueBinding); } - } catch (Exception e) { + try { - if (queueConfiguration.isDurable()) { - storageManager.rollbackBindings(txID); + postOfficeInUse.addBinding(localQueueBinding); + if (queue.isDurable()) { + storageManager.commitBindings(txID); } + } catch (Exception e) { try { - queue.close(); - } finally { - if (queue.getPageSubscription() != null) { - queue.getPageSubscription().destroy(); + if (queueConfiguration.isDurable()) { + storageManager.rollbackBindings(txID); } + try { + queue.close(); + } finally { + if (queue.getPageSubscription() != null) { + queue.getPageSubscription().destroy(); + } + } + } catch (Throwable ignored) { + logger.debug(ignored.getMessage(), ignored); } - } catch (Throwable ignored) { - logger.debug(ignored.getMessage(), ignored); + throw e; } - throw e; + + if (!queueConfiguration.isInternal()) { + managementService.registerQueue(queue, queue.getAddress(), storageManager); + } + + copyRetroactiveMessages(queue); + + if (hasBrokerQueuePlugins()) { + callBrokerQueuePlugins(plugin -> plugin.afterCreateQueue(queue)); + } + + callPostQueueCreationCallbacks(queue.getName()); + + return queue; } - - if (!queueConfiguration.isInternal()) { - managementService.registerQueue(queue, queue.getAddress(), storageManager); - } - - copyRetroactiveMessages(queue); - - if (hasBrokerQueuePlugins()) { - callBrokerQueuePlugins(plugin -> plugin.afterCreateQueue(queue)); - } - - callPostQueueCreationCallbacks(queue.getName()); - - return queue; } public String getRuntimeTempQueueNamespace(boolean temporary) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index 257b4a38de..0da2946592 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -46,6 +46,7 @@ public class AddressInfo { private long id; private long pauseStatusRecord = -1; + private boolean swept; private SimpleString name; @@ -75,6 +76,14 @@ public class AddressInfo { private StorageManager storageManager; private HierarchicalRepositoryChangeListener repositoryChangeListener; + public boolean isSwept() { + return swept; + } + + public void setSwept(boolean swept) { + this.swept = swept; + } + /** * Private constructor used on JSON decoding. */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 5f0d5d2eb5..fef2d4b5e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -333,6 +333,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final boolean autoDelete; + private volatile boolean swept; + private final long autoDeleteDelay; private final long autoDeleteMessageCount; @@ -353,6 +355,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } + @Override + public boolean isSwept() { + return swept; + } + + @Override + public void setSwept(boolean swept) { + this.swept = swept; + } /** * This is to avoid multi-thread races on calculating direct delivery, @@ -1408,6 +1419,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { logger.debug(this + " adding consumer " + consumer); } + this.setSwept(false); + try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) { synchronized (this) { if (maxConsumers != MAX_CONSUMERS_UNLIMITED && getConsumerCount() >= maxConsumers) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java index 549686f8a6..d64f576cf1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java @@ -23,9 +23,12 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueManager; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.utils.ReferenceCounterUtil; +import org.jboss.logging.Logger; public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManager { + private static final Logger logger = Logger.getLogger(QueueManagerImpl.class); + private final SimpleString queueName; private final ActiveMQServer server; @@ -34,15 +37,13 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag Queue queue = server.locateQueue(queueName); //the queue may already have been deleted and this is a result of that if (queue == null) { - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("no queue to delete \"" + queueName + "\"."); + if (logger.isDebugEnabled()) { + logger.debug("no queue to delete \"" + queueName + "\"."); } return; } - if (isAutoDelete(queue) && consumerCountCheck(queue) && delayCheck(queue) && messageCountCheck(queue)) { - performAutoDeleteQueue(server, queue); - } else if (queue.isPurgeOnNoConsumers()) { + if (queue.isPurgeOnNoConsumers()) { purge(queue); } } @@ -51,8 +52,8 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag long consumerCount = queue.getConsumerCount(); long messageCount = queue.getMessageCount(); - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queue.getName() + "\": consumerCount = " + consumerCount + "; messageCount = " + messageCount); + if (logger.isDebugEnabled()) { + logger.debug("purging queue \"" + queue.getName() + "\": consumerCount = " + consumerCount + "; messageCount = " + messageCount); } try { queue.deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED); @@ -64,14 +65,16 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag public static void performAutoDeleteQueue(ActiveMQServer server, Queue queue) { SimpleString queueName = queue.getName(); AddressSettings settings = server.getAddressSettingsRepository().getMatch(queue.getAddress().toString()); - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + "\": consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount() + "; isAutoDelete = " + queue.isAutoDelete()); + if (logger.isDebugEnabled()) { + logger.debug("deleting auto-created queue \"" + queueName + "\": consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount() + "; isAutoDelete = " + queue.isAutoDelete()); } + ActiveMQServerLogger.LOGGER.autoRemoveQueue("" + queue.getName(), queue.getID(), "" + queue.getAddress()); + try { server.destroyQueue(queueName, null, true, false, settings.isAutoDeleteAddresses(), true); } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName); + ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedDestination(e, queueName, "queue"); } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeTestAccessor.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeTestAccessor.java new file mode 100644 index 0000000000..6236de9b08 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeTestAccessor.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.postoffice.impl; + +public class PostOfficeTestAccessor { + + public static void reapAddresses(PostOfficeImpl postOffice) { + postOffice.reapAddresses(); + } + +} diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 330b55af89..f9a43b3a2d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -1448,7 +1448,7 @@ public abstract class ActiveMQTestBase extends Assert { } } - protected final ActiveMQServer createServer(final boolean realFiles, + protected ActiveMQServer createServer(final boolean realFiles, final Configuration configuration, final int pageSize, final long maxAddressSize) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/QueueCommandTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/QueueCommandTest.java index d4a9009ec3..fd6ae46bb8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/QueueCommandTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/QueueCommandTest.java @@ -30,11 +30,13 @@ import org.apache.activemq.artemis.cli.commands.queue.CreateQueue; import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue; import org.apache.activemq.artemis.cli.commands.queue.PurgeQueue; import org.apache.activemq.artemis.cli.commands.queue.UpdateQueue; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Before; import org.junit.Test; @@ -44,6 +46,12 @@ public class QueueCommandTest extends JMSTestBase { private ByteArrayOutputStream output; private ByteArrayOutputStream error; + @Override + protected void extraServerConfig(ActiveMQServer server) { + super.extraServerConfig(server); + server.getConfiguration().setAddressQueueScanPeriod(100); + } + @Before @Override public void setUp() throws Exception { @@ -236,7 +244,7 @@ public class QueueCommandTest extends JMSTestBase { delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); checkExecutionPassed(command); - assertNull(server.getAddressInfo(queueName)); + Wait.assertTrue(() -> server.getAddressInfo(queueName) == null, 2000, 10); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java index a7918dbea0..d62ca79f00 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java @@ -294,7 +294,7 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase { connection.close(); - assertNull(server.getManagementService().getResource(ResourceNames.ADDRESS + topicName)); + Wait.assertTrue(() -> server.getManagementService().getResource(ResourceNames.ADDRESS + topicName) == null); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java new file mode 100644 index 0000000000..fad1357af7 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; +import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.jboss.logging.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class AutoCreateTest extends ActiveMQTestBase { + private static final Logger logger = Logger.getLogger(AutoCreateTest.class); + + public final SimpleString addressA = new SimpleString("addressA"); + public final SimpleString queueA = new SimpleString("queueA"); + + private ActiveMQServer server; + + @After + public void clearLogg() { + AssertionLoggerHandler.stopCapture(); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server = createServer(true, true); + AddressSettings settings = new AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true); + + server.getConfiguration().getAddressesSettings().clear(); + server.getConfiguration().getAddressesSettings().put("#", settings); + } + + @Test + public void testAutoCreateDeleteRecreate() throws Exception { + // This test is about validating the behaviour of queues recreates with the default configuration + Assert.assertEquals("Supposed to use default configuration on this test", ActiveMQDefaultConfiguration.getDefaultAddressQueueScanPeriod(), server.getConfiguration().getAddressQueueScanPeriod()); + server.start(); + int THREADS = 40; + ExecutorService executor = Executors.newFixedThreadPool(THREADS); + try { + + String QUEUE_NAME = getName(); + + AtomicInteger errors = new AtomicInteger(0); + + for (int i = 0; i < 50; i++) { + ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + logger.debug("*******************************************************************************************************************************"); + logger.debug("run " + i); + CyclicBarrier barrier = new CyclicBarrier(THREADS + 1); + CountDownLatch done = new CountDownLatch(THREADS); + Runnable consumerThread = () -> { + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + barrier.await(10, TimeUnit.SECONDS); + Queue queue = session.createQueue(QUEUE_NAME); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + done.countDown(); + } + }; + + for (int t = 0; t < THREADS; t++) { + executor.execute(consumerThread); + } + + barrier.await(10, TimeUnit.SECONDS); + Assert.assertTrue(done.await(10, TimeUnit.SECONDS)); + Assert.assertEquals(0, errors.get()); + + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("hello")); + + Assert.assertNotNull(consumer.receive(5000)); + } + } + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testSweep() throws Exception { + + AssertionLoggerHandler.startCapture(); + server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually + server.start(); + String QUEUE_NAME = "autoCreateAndRecreate"; + + ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + } + + try (Connection connection = cf.createConnection()) { + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(QUEUE_NAME); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + Assert.assertTrue(serverQueue.isSwept()); + MessageConsumer consumer = session.createConsumer(queue); + // no need to wait reaping to wait reaping to set it false. A simple add consumer on the queue should clear this + Wait.assertFalse(serverQueue::isSwept); + connection.start(); + } + + AddressInfo info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME)); + Assert.assertNotNull(info); + Assert.assertTrue(info.isAutoCreated()); + + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112")); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224112")); + Assert.assertTrue("Queue name should be mentioned on logs", AssertionLoggerHandler.findText(QUEUE_NAME)); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113")); // we need another sweep to remove it + } + + @Test + public void testSweepAddress() throws Exception { + AssertionLoggerHandler.startCapture(); + server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually + AddressSettings settings = new AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10); + server.getConfiguration().getAddressesSettings().clear(); + server.getConfiguration().getAddressesSettings().put("#", settings); + server.start(); + String ADDRESS_NAME = getName(); + + AddressInfo info = new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true); + server.getPostOffice().addAddressInfo(info); + info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME)); + + ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(ADDRESS_NAME); + session.createConsumer(topic); + } + + { // just a namespace area + final AddressInfo infoRef = info; + Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1); + } + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); + Thread.sleep(50); + Assert.assertFalse(info.isSwept()); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); + Assert.assertTrue(info.isSwept()); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113")); + } + + + @Test + public void testNegativeSweepAddress() throws Exception { + AssertionLoggerHandler.startCapture(); + server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually + AddressSettings settings = new AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10); + server.getConfiguration().getAddressesSettings().clear(); + server.getConfiguration().getAddressesSettings().put("#", settings); + server.start(); + String ADDRESS_NAME = getName(); + + AddressInfo info = new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true); + server.getPostOffice().addAddressInfo(info); + info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME)); + + ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(ADDRESS_NAME); + session.createConsumer(topic); + } + + { // just a namespace area + final AddressInfo infoRef = info; + Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1); + } + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); + Thread.sleep(50); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); + Assert.assertTrue(info.isSwept()); + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(ADDRESS_NAME); + session.createConsumer(topic); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); + Assert.assertFalse(info.isSwept()); // it should be cleared because there is a consumer now + } + } + + @Test + public void testNegativeSweepBecauseOfConsumer() throws Exception { + + AssertionLoggerHandler.startCapture(); + server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually + server.start(); + String QUEUE_NAME = getName(); + + ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + } + + AddressInfo info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME)); + Assert.assertNotNull(info); + Assert.assertTrue(info.isAutoCreated()); + + + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(QUEUE_NAME); + Assert.assertTrue(serverQueue.isSwept()); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112")); + MessageConsumer consumer = session.createConsumer(queue); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + Assert.assertFalse(serverQueue.isSwept()); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); // we need another sweep to remove it + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112")); + } + } + + @Test + public void testNegativeSweepBecauseOfSend() throws Exception { + + AssertionLoggerHandler.startCapture(); + server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually + server.start(); + String QUEUE_NAME = getName(); + + ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + } + + AddressInfo info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME)); + Assert.assertNotNull(info); + Assert.assertTrue(info.isAutoCreated()); + + + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(QUEUE_NAME); + Assert.assertTrue(serverQueue.isSwept()); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112")); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("hello")); + Wait.assertEquals(1, serverQueue::getMessageCount); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + Assert.assertFalse(serverQueue.isSwept()); + PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice()); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); // we need another sweep to remove it + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112")); + } + } + + + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java index 6b82935b7b..0079997893 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java @@ -43,6 +43,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase { super.setUp(); locator = createInVMNonHALocator(); server = createServer(false); + server.getConfiguration().setAddressQueueScanPeriod(10); server.start(); cf = createSessionFactory(locator); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsDestinationTest.java index 149d120929..f05a3dc963 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsDestinationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsDestinationTest.java @@ -26,6 +26,7 @@ import javax.jms.TextMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.JMSTestBase; @@ -35,6 +36,12 @@ import org.junit.Test; public class AutoDeleteJmsDestinationTest extends JMSTestBase { + @Override + protected void extraServerConfig(ActiveMQServer server) { + super.extraServerConfig(server); + server.getConfiguration().setAddressQueueScanPeriod(100); + } + @Test public void testAutoDeleteQueue() throws Exception { Connection connection = cf.createConnection(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 6b558a5683..abdf7e0d09 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -111,6 +111,8 @@ public class ConsumerTest extends ActiveMQTestBase { server = createServer(durable, isNetty()); + server.getConfiguration().setAddressQueueScanPeriod(10); + server.start(); locator = createFactory(isNetty()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TemporaryQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TemporaryQueueTest.java index 76904100c0..1d778a2a24 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TemporaryQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TemporaryQueueTest.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; import org.apache.activemq.artemis.core.remoting.CloseListener; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; @@ -50,24 +51,23 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.SingleServerTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.RandomUtil; import org.jboss.logging.Logger; import org.junit.Test; public class TemporaryQueueTest extends SingleServerTestBase { - // Constants ----------------------------------------------------- private static final Logger log = Logger.getLogger(TemporaryQueueTest.class); private static final long CONNECTION_TTL = 2000; - // Attributes ---------------------------------------------------- - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- + @Override + protected ActiveMQServer createServer() throws Exception { + ActiveMQServer server = super.createServer(); + server.getConfiguration().setAddressQueueScanPeriod(100); + return server; + } @Test public void testConsumeFromTemporaryQueue() throws Exception { @@ -108,7 +108,7 @@ public class TemporaryQueueTest extends SingleServerTestBase { sf.close(); - assertTrue(server.getAddressSettingsRepository().getCacheSize() < 10); + Wait.assertTrue("server.getAddressSettingsRepository().getCacheSize() = " + server.getAddressSettingsRepository().getCacheSize(), () -> server.getAddressSettingsRepository().getCacheSize() < 10); } @Test @@ -129,17 +129,14 @@ public class TemporaryQueueTest extends SingleServerTestBase { assertNotNull(message); message.acknowledge(); - SimpleString[] storeNames = server.getPagingManager().getStoreNames(); - assertTrue(Arrays.asList(storeNames).contains(address)); + Wait.assertTrue(() -> Arrays.asList(server.getPagingManager().getStoreNames()).contains(address)); consumer.close(); session.deleteQueue(queue); session.close(); - storeNames = server.getPagingManager().getStoreNames(); - assertFalse(Arrays.asList(storeNames).contains(address)); - + Wait.assertFalse(() -> Arrays.asList(server.getPagingManager().getStoreNames()).contains(address)); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TransientQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TransientQueueTest.java index a0417e7bec..cf1a09e99b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TransientQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TransientQueueTest.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.tests.util.SingleServerTestBase; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.Wait; @@ -32,6 +33,13 @@ import org.junit.Test; public class TransientQueueTest extends SingleServerTestBase { + @Override + protected ActiveMQServer createServer() throws Exception { + ActiveMQServer server = super.createServer(); + server.getConfiguration().setAddressQueueScanPeriod(100); + return server; + } + @Test public void testSimpleTransientQueue() throws Exception { SimpleString queue = RandomUtil.randomSimpleString(); @@ -177,6 +185,7 @@ public class TransientQueueTest extends SingleServerTestBase { session.createConsumer(queue).close(); Wait.assertTrue(() -> server.locateQueue(queue) == null, 2000, 100); + Wait.assertTrue(() -> server.getAddressInfo(queue) == null, 2000, 100); session.createSharedQueue(new QueueConfiguration(queue).setAddress(address).setFilterString(SimpleString.toSimpleString("q=1")).setDurable(false)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index 874637baba..127b2ef3c1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -209,8 +209,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { public void testSecurityCacheSizes() throws Exception { ActiveMQServerControl serverControl = createManagementControl(); - Assert.assertEquals(usingCore() ? 1 : 0, serverControl.getAuthenticationCacheSize()); - Assert.assertEquals(usingCore() ? 7 : 0, serverControl.getAuthorizationCacheSize()); + Wait.assertEquals(usingCore() ? 1 : 0, serverControl::getAuthenticationCacheSize); + Wait.assertEquals(usingCore() ? 7 : 0, serverControl::getAuthorizationCacheSize); ServerLocator loc = createInVMNonHALocator(); ClientSessionFactory csf = createSessionFactory(loc); @@ -228,7 +228,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { producer.send(m); Assert.assertEquals(usingCore() ? 2 : 1, serverControl.getAuthenticationCacheSize()); - Assert.assertEquals(usingCore() ? 8 : 1, serverControl.getAuthorizationCacheSize()); + Wait.assertEquals(usingCore() ? 8 : 1, () -> serverControl.getAuthorizationCacheSize()); } @Test @@ -827,7 +827,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { Assert.assertTrue(countBeforeCreate < serverControl.getAddressCount()); serverControl.destroyQueue(name.toString(), true, true); - Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames())); + Wait.assertFalse(() -> ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames())); } @Test @@ -849,7 +849,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { Assert.assertTrue(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames())); serverControl.destroyQueue(name.toString(), true, true); - Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames())); + Wait.assertFalse(() -> ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames())); } @Test @@ -4308,6 +4308,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), securityConfiguration); conf.setJournalRetentionDirectory(conf.getJournalDirectory() + "_ret"); // needed for replay tests server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, securityManager, true)); + server.getConfiguration().setAddressQueueScanPeriod(100); server.start(); HashSet role = new HashSet<>(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java index baaaab3a78..83aeeec1bd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -91,11 +92,11 @@ public abstract class ManagementTestBase extends ActiveMQTestBase { } protected void checkNoResource(final ObjectName on) { - Assert.assertFalse("unexpected resource for " + on, mbeanServer.isRegistered(on)); + Wait.assertFalse("unexpected resource for " + on, () -> mbeanServer.isRegistered(on)); } protected void checkResource(final ObjectName on) { - Assert.assertTrue("no resource for " + on, mbeanServer.isRegistered(on)); + Wait.assertTrue("no resource for " + on, () -> mbeanServer.isRegistered(on)); } protected QueueControl createManagementControl(final String address, final String queue) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java index 7b5fe289d9..e151a1f8f1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java @@ -153,7 +153,7 @@ public class NotificationTest extends ActiveMQTestBase { session.deleteQueue(queue); //There will be 2 notifications, first is for binding removal, second is for address removal - ClientMessage[] notifications = NotificationTest.consumeMessages(2, notifConsumer); + ClientMessage[] notifications = NotificationTest.consumeMessages(2, notifConsumer, 5000); Assert.assertEquals(BINDING_REMOVED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); Assert.assertEquals(queue.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME).toString()); Assert.assertEquals(address.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString()); @@ -440,6 +440,7 @@ public class NotificationTest extends ActiveMQTestBase { super.setUp(); server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setMessageExpiryScanPeriod(100), false)); + server.getConfiguration().setAddressQueueScanPeriod(100); NotificationActiveMQServerPlugin notificationPlugin = new NotificationActiveMQServerPlugin(); notificationPlugin.setSendAddressNotifications(true); notificationPlugin.setSendConnectionNotifications(true); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index a3839fec93..89479a65b3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -82,6 +82,14 @@ public class MQTTTest extends MQTTTestSupport { private static final String AMQP_URI = "tcp://localhost:61616"; + + @Override + public void configureBroker() throws Exception { + super.configureBroker(); + server.getConfiguration().setAddressQueueScanPeriod(100); + } + + @Test public void testConnectWithLargePassword() throws Exception { for (String version : Arrays.asList("3.1", "3.1.1")) { @@ -2074,6 +2082,6 @@ public class MQTTTest extends MQTTTestSupport { subscriptionProvider.disconnect(); - assertNull(server.getAddressInfo(SimpleString.toSimpleString("foo.bar"))); + Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString("foo.bar")) == null); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index d911e715ac..7777710361 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -63,6 +63,7 @@ import org.apache.activemq.ActiveMQSession; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.api.core.RoutingType; @@ -90,6 +91,12 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { private final String testProp = "BASE_DATE"; private final String propValue = "2017-11-01"; + @Override + protected void extraServerConfig(Configuration configuration) { + super.extraServerConfig(configuration); + configuration.setAddressQueueScanPeriod(100); + } + @Override @Before public void setUp() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerAutoCreateQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerAutoCreateQueueTest.java index 5697ce5b7d..f9ed2acd51 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerAutoCreateQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerAutoCreateQueueTest.java @@ -36,6 +36,7 @@ public class ProducerAutoCreateQueueTest extends BasicOpenWireTest { @Override protected void extraServerConfig(Configuration serverConfig) { + serverConfig.setAddressQueueScanPeriod(100); String match = "#"; Map asMap = serverConfig.getAddressesSettings(); asMap.get(match).setAutoCreateAddresses(true).setAutoCreateQueues(true); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java index 3dee2b295e..1eb90394e6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; @@ -105,6 +106,7 @@ public class GlobalPagingTest extends PagingTest { server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); server.getConfiguration().setGlobalMaxSize(-1); + server.getConfiguration().setAddressQueueScanPeriod(100); server.start(); @@ -143,12 +145,14 @@ public class GlobalPagingTest extends PagingTest { serverImpl.getMonitor().tick(); + AtomicInteger errors = new AtomicInteger(0); Thread t = new Thread() { @Override public void run() { try { sendFewMessages(numberOfMessages, session, producer, body); } catch (Exception e) { + errors.incrementAndGet(); e.printStackTrace(); } } @@ -159,9 +163,12 @@ public class GlobalPagingTest extends PagingTest { t.join(1000); Assert.assertTrue(t.isAlive()); + Assert.assertEquals(0, errors.get()); + // releasing the disk serverImpl.getMonitor().setMaxUsage(1).tick(); t.join(5000); + Assert.assertEquals(0, errors.get()); Assert.assertFalse(t.isAlive()); session.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 8b6b09460f..29a66944b5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -6768,10 +6768,10 @@ public class PagingTest extends ActiveMQTestBase { locator.close(); locator = null; sf = null; - assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS)); + Wait.assertFalse(() -> Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS)); // Ensure pagingStore is physically deleted server.getPagingManager().reloadStores(); - assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS)); + Wait.assertFalse(() -> Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS)); server.stop(); server.start(); @@ -7068,6 +7068,16 @@ public class PagingTest extends ActiveMQTestBase { session.close(); } + @Override + protected final ActiveMQServer createServer(final boolean realFiles, + final Configuration configuration, + final int pageSize, + final long maxAddressSize) { + ActiveMQServer server = super.createServer(realFiles, configuration, pageSize, maxAddressSize); + server.getConfiguration().setAddressQueueScanPeriod(100); + return server; + } + @Override protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception { Configuration configuration = super.createDefaultConfig(serverID, netty); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java index 3e90e052b1..e6b3dbb5cc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java @@ -68,6 +68,7 @@ public class MqttPluginTest extends MQTTTestSupport { public void configureBroker() throws Exception { super.configureBroker(); server.registerBrokerPlugin(verifier); + server.getConfiguration().setAddressQueueScanPeriod(100); AddressSettings addressSettings = new AddressSettings(); addressSettings.setAutoDeleteQueues(true).setAutoDeleteAddresses(true); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java index eb1bf57d5e..3c4551695c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java @@ -117,6 +117,7 @@ public class StompPluginTest extends StompTestBase { @Override protected ActiveMQServer createServer() throws Exception { ActiveMQServer server = super.createServer(); + server.getConfiguration().setAddressQueueScanPeriod(100); server.registerBrokerPlugin(verifier); server.registerBrokerPlugin(new ActiveMQServerPlugin() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java index 9651b0e6c4..b533b845a9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java @@ -52,6 +52,7 @@ public class AutoCreateDeadLetterResourcesTest extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); server = createServer(false); + server.getConfiguration().setAddressQueueScanPeriod(100); // set common address settings needed for all tests; make sure to use getMatch instead of addMatch in invidual tests or these will be overwritten server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateDeadLetterResources(true).setDeadLetterAddress(dla).setMaxDeliveryAttempts(1)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateExpiryResourcesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateExpiryResourcesTest.java index 29a99916da..ee88bc6b15 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateExpiryResourcesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateExpiryResourcesTest.java @@ -52,6 +52,7 @@ public class AutoCreateExpiryResourcesTest extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); server = createServer(false); + server.getConfiguration().setAddressQueueScanPeriod(100); // set common address settings needed for all tests; make sure to use getMatch instead of addMatch in invidual tests or these will be overwritten server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(EXPIRY_DELAY)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 69428e0bc6..a48f553952 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -81,6 +82,13 @@ public class StompTest extends StompTestBase { protected StompClientConnection conn; + @Override + protected ActiveMQServer createServer() throws Exception { + ActiveMQServer server = super.createServer(); + server.getConfiguration().setAddressQueueScanPeriod(100); + return server; + } + @Override @Before public void setUp() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java index 50074e97f6..0480b32980 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java @@ -134,6 +134,7 @@ public class JMSTestBase extends ActiveMQTestBase { setTransactionTimeoutScanPeriod(100); config.getConnectorConfigurations().put("netty", new TransportConfiguration(NETTY_CONNECTOR_FACTORY)); server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, usePersistence())); + extraServerConfig(server); jmsServer = new JMSServerManagerImpl(server); namingContext = new InVMNamingContext(); jmsServer.setRegistry(new JndiBindingRegistry(namingContext)); @@ -142,6 +143,9 @@ public class JMSTestBase extends ActiveMQTestBase { registerConnectionFactory(); } + protected void extraServerConfig(ActiveMQServer server) { + } + @Override protected Configuration createDefaultConfig(boolean netty) throws Exception { return super.createDefaultConfig(netty).setJMXManagementEnabled(true); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java index 6ab742298d..013f545683 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java @@ -125,6 +125,11 @@ public class WildcardAddressManagerPerfTest { class BindingFactoryFake implements BindingsFactory { + @Override + public boolean isAddressBound(SimpleString address) throws Exception { + return false; + } + @Override public Bindings createBindings(SimpleString address) throws Exception { return new BindingsImpl(address, null); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java index 1a6f92f369..3195d8e516 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java @@ -319,6 +319,11 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { static class BindingFactoryFake implements BindingsFactory { + @Override + public boolean isAddressBound(SimpleString address) throws Exception { + return false; + } + @Override public Bindings createBindings(SimpleString address) { return new BindingsFake(address);