This closes #3833
This commit is contained in:
commit
e41ec90a66
|
@ -1090,7 +1090,7 @@ public interface ActiveMQServerControl {
|
||||||
@Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
|
@Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
|
||||||
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
|
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
|
||||||
@Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers,
|
@Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers,
|
||||||
@Parameter(name = "autoDeleteAddress", desc = "Automatically delete the address if this was the last queue") boolean autoDeleteAddress) throws Exception;
|
@Parameter(name = "forceAutoDeleteAddress", desc = "Automatically delete the address if this was the last queue") boolean autoDeleteAddress) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enables message counters for this server.
|
* Enables message counters for this server.
|
||||||
|
|
|
@ -817,7 +817,7 @@ public class JMSServerManagerImpl extends CleaningActivateCallback implements JM
|
||||||
|
|
||||||
// We can't remove the remote binding. As this would be the bridge associated with the topic on this case
|
// We can't remove the remote binding. As this would be the bridge associated with the topic on this case
|
||||||
if (binding.getType() != BindingType.REMOTE_QUEUE) {
|
if (binding.getType() != BindingType.REMOTE_QUEUE) {
|
||||||
server.destroyQueue(SimpleString.toSimpleString(queueName), null, !removeConsumers, removeConsumers, true);
|
server.destroyQueue(SimpleString.toSimpleString(queueName), null, !removeConsumers, removeConsumers, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1563,9 +1563,9 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroyQueue(final String name, final boolean removeConsumers, final boolean autoDeleteAddress) throws Exception {
|
public void destroyQueue(final String name, final boolean removeConsumers, final boolean forceAutoDeleteAddress) throws Exception {
|
||||||
if (AuditLogger.isBaseLoggingEnabled()) {
|
if (AuditLogger.isBaseLoggingEnabled()) {
|
||||||
AuditLogger.destroyQueue(this.server, null, null, name, removeConsumers, autoDeleteAddress);
|
AuditLogger.destroyQueue(this.server, null, null, name, removeConsumers, forceAutoDeleteAddress);
|
||||||
}
|
}
|
||||||
checkStarted();
|
checkStarted();
|
||||||
|
|
||||||
|
@ -1573,7 +1573,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
try {
|
try {
|
||||||
SimpleString queueName = new SimpleString(name);
|
SimpleString queueName = new SimpleString(name);
|
||||||
try {
|
try {
|
||||||
server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, autoDeleteAddress);
|
server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, forceAutoDeleteAddress);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (AuditLogger.isResourceLoggingEnabled()) {
|
if (AuditLogger.isResourceLoggingEnabled()) {
|
||||||
AuditLogger.destroyQueueFailure(name);
|
AuditLogger.destroyQueueFailure(name);
|
||||||
|
|
|
@ -672,13 +672,13 @@ public interface ActiveMQServer extends ServiceComponent {
|
||||||
SecurityAuth session,
|
SecurityAuth session,
|
||||||
boolean checkConsumerCount,
|
boolean checkConsumerCount,
|
||||||
boolean removeConsumers,
|
boolean removeConsumers,
|
||||||
boolean autoDeleteAddress) throws Exception;
|
boolean forceAutoDeleteAddress) throws Exception;
|
||||||
|
|
||||||
void destroyQueue(SimpleString queueName,
|
void destroyQueue(SimpleString queueName,
|
||||||
SecurityAuth session,
|
SecurityAuth session,
|
||||||
boolean checkConsumerCount,
|
boolean checkConsumerCount,
|
||||||
boolean removeConsumers,
|
boolean removeConsumers,
|
||||||
boolean autoDeleteAddress,
|
boolean forceAutoDeleteAddress,
|
||||||
boolean checkMessageCount) throws Exception;
|
boolean checkMessageCount) throws Exception;
|
||||||
|
|
||||||
String destroyConnectionWithSessionMetadata(String metaKey, String metaValue) throws Exception;
|
String destroyConnectionWithSessionMetadata(String metaKey, String metaValue) throws Exception;
|
||||||
|
|
|
@ -2329,9 +2329,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
|
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
String address = binding.getAddress().toString();
|
destroyQueue(queueName, session, checkConsumerCount, removeConsumers, false);
|
||||||
|
|
||||||
destroyQueue(queueName, session, checkConsumerCount, removeConsumers, addressSettingsRepository.getMatch(address).isAutoDeleteAddresses());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2339,8 +2337,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
final SecurityAuth session,
|
final SecurityAuth session,
|
||||||
final boolean checkConsumerCount,
|
final boolean checkConsumerCount,
|
||||||
final boolean removeConsumers,
|
final boolean removeConsumers,
|
||||||
final boolean autoDeleteAddress) throws Exception {
|
final boolean forceAutoDeleteAddress) throws Exception {
|
||||||
destroyQueue(queueName, session, checkConsumerCount, removeConsumers, autoDeleteAddress, false);
|
destroyQueue(queueName, session, checkConsumerCount, removeConsumers, forceAutoDeleteAddress, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2348,7 +2346,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
final SecurityAuth session,
|
final SecurityAuth session,
|
||||||
final boolean checkConsumerCount,
|
final boolean checkConsumerCount,
|
||||||
final boolean removeConsumers,
|
final boolean removeConsumers,
|
||||||
final boolean autoDeleteAddress,
|
final boolean forceAutoDeleteAddress,
|
||||||
final boolean checkMessageCount) throws Exception {
|
final boolean checkMessageCount) throws Exception {
|
||||||
if (postOffice == null) {
|
if (postOffice == null) {
|
||||||
return;
|
return;
|
||||||
|
@ -2384,7 +2382,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasBrokerQueuePlugins()) {
|
if (hasBrokerQueuePlugins()) {
|
||||||
callBrokerQueuePlugins(plugin -> plugin.beforeDestroyQueue(queue, session, checkConsumerCount, removeConsumers, autoDeleteAddress));
|
callBrokerQueuePlugins(plugin -> plugin.beforeDestroyQueue(queue, session, checkConsumerCount, removeConsumers, forceAutoDeleteAddress));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mirrorControllerService != null) {
|
if (mirrorControllerService != null) {
|
||||||
|
@ -2394,13 +2392,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
queue.deleteQueue(removeConsumers);
|
queue.deleteQueue(removeConsumers);
|
||||||
|
|
||||||
if (hasBrokerQueuePlugins()) {
|
if (hasBrokerQueuePlugins()) {
|
||||||
callBrokerQueuePlugins(plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount, removeConsumers, autoDeleteAddress));
|
callBrokerQueuePlugins(plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount, removeConsumers, forceAutoDeleteAddress));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (queue.isTemporary()) {
|
if (forceAutoDeleteAddress) {
|
||||||
AddressInfo addressInfo = getAddressInfo(address);
|
AddressInfo addressInfo = getAddressInfo(address);
|
||||||
|
|
||||||
if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address.toString()) && addressSettingsRepository.getMatch(address.toString()).getAutoDeleteAddressesDelay() == 0) {
|
if (postOffice != null && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address.toString()) && addressSettingsRepository.getMatch(address.toString()).getAutoDeleteAddressesDelay() == 0) {
|
||||||
try {
|
try {
|
||||||
removeAddressInfo(address, session);
|
removeAddressInfo(address, session);
|
||||||
} catch (ActiveMQDeleteAddressException e) {
|
} catch (ActiveMQDeleteAddressException e) {
|
||||||
|
|
|
@ -72,7 +72,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
|
||||||
ActiveMQServerLogger.LOGGER.autoRemoveQueue("" + queue.getName(), queue.getID(), "" + queue.getAddress());
|
ActiveMQServerLogger.LOGGER.autoRemoveQueue("" + queue.getName(), queue.getID(), "" + queue.getAddress());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
server.destroyQueue(queueName, null, true, false, settings.isAutoDeleteAddresses(), true);
|
server.destroyQueue(queueName, null, true, false, false, true);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedDestination(e, queueName, "queue");
|
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedDestination(e, queueName, "queue");
|
||||||
}
|
}
|
||||||
|
|
|
@ -1129,7 +1129,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
logger.debug("deleting temporary queue " + bindingName);
|
logger.debug("deleting temporary queue " + bindingName);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
server.destroyQueue(bindingName, null, false);
|
server.destroyQueue(bindingName, null, false, false, true);
|
||||||
if (observer != null) {
|
if (observer != null) {
|
||||||
observer.tempQueueDeleted(bindingName);
|
observer.tempQueueDeleted(bindingName);
|
||||||
}
|
}
|
||||||
|
@ -1177,7 +1177,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
throw new ActiveMQNonExistentQueueException();
|
throw new ActiveMQNonExistentQueueException();
|
||||||
}
|
}
|
||||||
|
|
||||||
server.destroyQueue(unPrefixedQueueName, this, true);
|
server.destroyQueue(unPrefixedQueueName, this, true, false, true);
|
||||||
|
|
||||||
TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(unPrefixedQueueName);
|
TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(unPrefixedQueueName);
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.utils.Wait;
|
import org.apache.activemq.artemis.utils.Wait;
|
||||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -47,6 +48,11 @@ import static org.hamcrest.CoreMatchers.is;
|
||||||
|
|
||||||
public class TopicDurableTests extends JMSClientTestSupport {
|
public class TopicDurableTests extends JMSClientTestSupport {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void addConfiguration(ActiveMQServer server) {
|
||||||
|
server.getConfiguration().setAddressQueueScanPeriod(100);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMessageDurableSubscription() throws Exception {
|
public void testMessageDurableSubscription() throws Exception {
|
||||||
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient");
|
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient");
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.artemis.tests.integration.jms.client;
|
package org.apache.activemq.artemis.tests.integration.jms.client;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ConnectionFactory;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
|
@ -35,7 +36,9 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||||
|
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -278,6 +281,8 @@ public class TemporaryDestinationTest extends JMSTestBase {
|
||||||
for (ServerSession serverSession : server.getSessions()) {
|
for (ServerSession serverSession : server.getSessions()) {
|
||||||
assertFalse(((ServerSessionImpl)serverSession).cloneTargetAddresses().containsKey(SimpleString.toSimpleString(temporaryQueue.getQueueName())));
|
assertFalse(((ServerSessionImpl)serverSession).cloneTargetAddresses().containsKey(SimpleString.toSimpleString(temporaryQueue.getQueueName())));
|
||||||
}
|
}
|
||||||
|
Wait.assertTrue(() -> server.locateQueue(temporaryQueue.getQueueName()) == null, 1000, 100);
|
||||||
|
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(temporaryQueue.getQueueName())) == null, 1000, 100);
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) {
|
if (conn != null) {
|
||||||
conn.close();
|
conn.close();
|
||||||
|
@ -285,6 +290,39 @@ public class TemporaryDestinationTest extends JMSTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTemporaryQueueConnectionClosedRemovedAMQP() throws Exception {
|
||||||
|
testTemporaryQueueConnectionClosedRemoved("AMQP");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTemporaryQueueConnectionClosedRemovedCORE() throws Exception {
|
||||||
|
testTemporaryQueueConnectionClosedRemoved("CORE");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTemporaryQueueConnectionClosedRemovedOpenWire() throws Exception {
|
||||||
|
testTemporaryQueueConnectionClosedRemoved("OPENWIRE");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testTemporaryQueueConnectionClosedRemoved(String protocol) throws Exception {
|
||||||
|
ConnectionFactory factory = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
|
||||||
|
final TemporaryQueue temporaryQueue;
|
||||||
|
try (Connection conn = factory.createConnection()) {
|
||||||
|
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
temporaryQueue = s.createTemporaryQueue();
|
||||||
|
MessageProducer producer = s.createProducer(temporaryQueue);
|
||||||
|
producer.send(s.createMessage());
|
||||||
|
// These next two assertions are here to validate the test itself
|
||||||
|
// The queue and address should be found on the server while they still exist on the connection
|
||||||
|
Wait.assertFalse(() -> server.locateQueue(temporaryQueue.getQueueName()) == null, 1000, 100);
|
||||||
|
Wait.assertFalse(() -> server.getAddressInfo(SimpleString.toSimpleString(temporaryQueue.getQueueName())) == null, 1000, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
Wait.assertTrue(() -> server.locateQueue(temporaryQueue.getQueueName()) == null, 1000, 100);
|
||||||
|
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(temporaryQueue.getQueueName())) == null, 1000, 100);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testForTempQueueTargetInfosSizeLimit() throws Exception {
|
public void testForTempQueueTargetInfosSizeLimit() throws Exception {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -81,6 +81,7 @@ import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
|
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||||
|
@ -506,6 +507,35 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
checkNoResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address));
|
checkNoResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateAndDestroyQueueWithAutoDeleteAddress() throws Exception {
|
||||||
|
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoDeleteAddresses(false));
|
||||||
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString name = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
ActiveMQServerControl serverControl = createManagementControl();
|
||||||
|
|
||||||
|
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
|
||||||
|
if (legacyCreateQueue) {
|
||||||
|
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, true);
|
||||||
|
} else {
|
||||||
|
serverControl.createQueue(new QueueConfiguration(name).setAddress(address).setRoutingType(RoutingType.ANYCAST).setAutoCreateAddress(true).toJSON());
|
||||||
|
}
|
||||||
|
|
||||||
|
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
|
||||||
|
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
|
||||||
|
Assert.assertEquals(address.toString(), queueControl.getAddress());
|
||||||
|
Assert.assertEquals(name.toString(), queueControl.getName());
|
||||||
|
Assert.assertNull(queueControl.getFilter());
|
||||||
|
Assert.assertEquals(true, queueControl.isDurable());
|
||||||
|
Assert.assertEquals(false, queueControl.isTemporary());
|
||||||
|
|
||||||
|
serverControl.destroyQueue(name.toString(), false, true);
|
||||||
|
|
||||||
|
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
|
||||||
|
checkNoResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveQueueFilter() throws Exception {
|
public void testRemoveQueueFilter() throws Exception {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue