ARTEMIS-3329 ability to purge all queues on address

This commit is contained in:
Justin Bertram 2021-06-04 08:20:31 -05:00 committed by clebertsuconic
parent d7f30e7a33
commit 82f0ece67c
5 changed files with 96 additions and 2 deletions

View File

@ -2859,4 +2859,32 @@ public interface AuditLogger extends BasicLogger {
@LogMessage(level = Logger.Level.INFO) @LogMessage(level = Logger.Level.INFO)
@Message(id = 601749, value = "User {0} is getting activation sequence on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 601749, value = "User {0} is getting activation sequence on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getActivationSequence(String user, Object source, Object... args); void getActivationSequence(String user, Object source, Object... args);
static void purge(Object source) {
RESOURCE_LOGGER.purge(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601750, value = "User {0} is purging target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void purge(String user, Object source, Object... args);
static void purgeAddressSuccess(String queueName) {
RESOURCE_LOGGER.purgeAddressSuccess(getCaller(), queueName);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601751, value = "User {0} has purged address {1}", format = Message.Format.MESSAGE_FORMAT)
void purgeAddressSuccess(String user, String queueName);
static void purgeAddressFailure(String queueName) {
RESOURCE_LOGGER.purgeAddressFailure(getCaller(), queueName);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601752, value = "User {0} failed to purge address {1}", format = Message.Format.MESSAGE_FORMAT)
void purgeAddressFailure(String user, String queueName);
} }

View File

@ -203,4 +203,11 @@ public interface AddressControl {
@Attribute(desc = "whether this address is temporary") @Attribute(desc = "whether this address is temporary")
boolean isTemporary(); boolean isTemporary();
/**
* Purge all the queues bound of this address. Returns the total number of messages purged.
* @throws java.lang.Exception
*/
@Operation(desc = "Purges the queues bound to this address. Returns the total number of messages purged.", impact = MBeanOperationInfo.ACTION)
long purge() throws Exception;
} }

View File

@ -44,7 +44,9 @@ import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.logs.AuditLogger;
@ -530,9 +532,36 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
return addressInfo.isTemporary(); return addressInfo.isTemporary();
} }
// Package protected --------------------------------------------- @Override
public long purge() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.purge(this.addressInfo);
}
clearIO();
long totalMsgs = 0;
try {
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
}
}
}
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.purgeAddressSuccess(addressInfo.getName().toString());
}
} catch (Throwable t) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.purgeAddressFailure(addressInfo.getName().toString());
}
throw new IllegalStateException(t.getMessage());
} finally {
blockOnIO();
}
// Protected ----------------------------------------------------- return totalMsgs;
}
// Private ------------------------------------------------------- // Private -------------------------------------------------------

View File

@ -526,6 +526,31 @@ public class AddressControlTest extends ManagementTestBase {
} }
} }
@Test
public void testPurge() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
session.createAddress(address, RoutingType.ANYCAST, false);
AddressControl addressControl = createManagementControl(address);
assertEquals(0, addressControl.getMessageCount());
ClientProducer producer = session.createProducer(address.toString());
producer.send(session.createMessage(false));
assertEquals(0, addressControl.getMessageCount());
session.createQueue(new QueueConfiguration(address).setRoutingType(RoutingType.ANYCAST));
producer.send(session.createMessage(false));
assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 1, 2000, 100));
session.createQueue(new QueueConfiguration(address.concat('2')).setAddress(address).setRoutingType(RoutingType.ANYCAST));
producer.send(session.createMessage(false));
assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100));
assertEquals(2L, addressControl.purge());
Wait.assertEquals(0L, () -> addressControl.getMessageCount(), 2000, 100);
}
// Package protected --------------------------------------------- // Package protected ---------------------------------------------
// Protected ----------------------------------------------------- // Protected -----------------------------------------------------

View File

@ -173,6 +173,11 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
return (boolean) proxy.retrieveAttributeValue("temporary"); return (boolean) proxy.retrieveAttributeValue("temporary");
} }
@Override
public long purge() throws Exception {
return (long) proxy.invokeOperation("purge");
}
@Override @Override
public String sendMessage(Map<String, String> headers, public String sendMessage(Map<String, String> headers,
int type, int type,