diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index e9a63421c9..f249c751c3 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2859,4 +2859,32 @@ public interface AuditLogger extends BasicLogger { @LogMessage(level = Logger.Level.INFO) @Message(id = 601749, value = "User {0} is getting activation sequence on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT) void getActivationSequence(String user, Object source, Object... args); + + static void purge(Object source) { + RESOURCE_LOGGER.purge(getCaller(), source); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601750, value = "User {0} is purging target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT) + void purge(String user, Object source, Object... args); + + + static void purgeAddressSuccess(String queueName) { + RESOURCE_LOGGER.purgeAddressSuccess(getCaller(), queueName); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601751, value = "User {0} has purged address {1}", format = Message.Format.MESSAGE_FORMAT) + void purgeAddressSuccess(String user, String queueName); + + + static void purgeAddressFailure(String queueName) { + RESOURCE_LOGGER.purgeAddressFailure(getCaller(), queueName); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601752, value = "User {0} failed to purge address {1}", format = Message.Format.MESSAGE_FORMAT) + void purgeAddressFailure(String user, String queueName); + + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java index eab512ec62..361134c821 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java @@ -203,4 +203,11 @@ public interface AddressControl { @Attribute(desc = "whether this address is temporary") boolean isTemporary(); + /** + * Purge all the queues bound of this address. Returns the total number of messages purged. + * @throws java.lang.Exception + */ + @Operation(desc = "Purges the queues bound to this address. Returns the total number of messages purged.", impact = MBeanOperationInfo.ACTION) + long purge() throws Exception; + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index 02601a98c5..4f65e694f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -44,7 +44,9 @@ import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.logs.AuditLogger; @@ -530,9 +532,36 @@ public class AddressControlImpl extends AbstractControl implements AddressContro return addressInfo.isTemporary(); } - // Package protected --------------------------------------------- + @Override + public long purge() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.purge(this.addressInfo); + } + clearIO(); + long totalMsgs = 0; + try { + Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName()); + if (bindings != null) { + for (Binding binding : bindings.getBindings()) { + if (binding instanceof QueueBinding) { + totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED); + } + } + } + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.purgeAddressSuccess(addressInfo.getName().toString()); + } + } catch (Throwable t) { + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.purgeAddressFailure(addressInfo.getName().toString()); + } + throw new IllegalStateException(t.getMessage()); + } finally { + blockOnIO(); + } - // Protected ----------------------------------------------------- + return totalMsgs; + } // Private ------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java index 55a9bad84d..026cd9f662 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java @@ -526,6 +526,31 @@ public class AddressControlTest extends ManagementTestBase { } } + @Test + public void testPurge() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + session.createAddress(address, RoutingType.ANYCAST, false); + + AddressControl addressControl = createManagementControl(address); + assertEquals(0, addressControl.getMessageCount()); + + ClientProducer producer = session.createProducer(address.toString()); + producer.send(session.createMessage(false)); + assertEquals(0, addressControl.getMessageCount()); + + session.createQueue(new QueueConfiguration(address).setRoutingType(RoutingType.ANYCAST)); + producer.send(session.createMessage(false)); + assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 1, 2000, 100)); + + session.createQueue(new QueueConfiguration(address.concat('2')).setAddress(address).setRoutingType(RoutingType.ANYCAST)); + producer.send(session.createMessage(false)); + assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100)); + + assertEquals(2L, addressControl.purge()); + + Wait.assertEquals(0L, () -> addressControl.getMessageCount(), 2000, 100); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java index 3b9d8026a8..5d2f5f5388 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java @@ -173,6 +173,11 @@ public class AddressControlUsingCoreTest extends AddressControlTest { return (boolean) proxy.retrieveAttributeValue("temporary"); } + @Override + public long purge() throws Exception { + return (long) proxy.invokeOperation("purge"); + } + @Override public String sendMessage(Map headers, int type,