diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java index 499321dbd4..65552212c1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java @@ -73,26 +73,27 @@ public interface AddressControl { /** * Returns the sum of messages on queue(s), including messages in delivery. */ - @Attribute(desc = "the sum of messages on queue(s), including messages in delivery") - long getNumberOfMessages() throws Exception; + @Deprecated + @Attribute(desc = "the sum of messages on queue(s), including messages in delivery; DEPRECATED: use getMessageCount() instead") + long getNumberOfMessages(); /** * Returns the names of the remote queue(s) bound to this address. */ @Attribute(desc = "names of the remote queue(s) bound to this address") - String[] getRemoteQueueNames() throws Exception; + String[] getRemoteQueueNames(); /** * Returns the names of the local queue(s) bound to this address. */ @Attribute(desc = "names of the local queue(s) bound to this address") - String[] getQueueNames() throws Exception; + String[] getQueueNames(); /** * Returns the names of both the local and remote queue(s) bound to this address. */ @Attribute(desc = "names of both the local & remote queue(s) bound to this address") - String[] getAllQueueNames() throws Exception; + String[] getAllQueueNames(); /** * Returns the number of pages used by this address. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index 2dce5a2b4d..2bf527daca 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -16,12 +16,10 @@ */ package org.apache.activemq.artemis.core.management.impl; -import org.apache.activemq.artemis.json.JsonArrayBuilder; import javax.management.MBeanAttributeInfo; import javax.management.MBeanOperationInfo; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.List; @@ -30,7 +28,6 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.management.AddressControl; -import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -45,6 +42,7 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -52,6 +50,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.replay.ReplayManager; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.json.JsonArrayBuilder; import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.utils.JsonLoader; @@ -130,17 +129,17 @@ public class AddressControlImpl extends AbstractControl implements AddressContro } @Override - public String[] getRemoteQueueNames() throws Exception { + public String[] getRemoteQueueNames() { return getQueueNames(SearchType.REMOTE); } @Override - public String[] getQueueNames() throws Exception { + public String[] getQueueNames() { return getQueueNames(SearchType.LOCAL); } @Override - public String[] getAllQueueNames() throws Exception { + public String[] getAllQueueNames() { return getQueueNames(SearchType.ALL); } @@ -148,7 +147,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro LOCAL, REMOTE, ALL } - private String[] getQueueNames(SearchType searchType) throws Exception { + private String[] getQueueNames(SearchType searchType) { if (AuditLogger.isBaseLoggingEnabled()) { AuditLogger.getQueueNames(this.addressInfo, searchType); } @@ -301,22 +300,14 @@ public class AddressControlImpl extends AbstractControl implements AddressContro } @Override - public long getNumberOfMessages() throws Exception { + @Deprecated + public long getNumberOfMessages() { if (AuditLogger.isBaseLoggingEnabled()) { AuditLogger.getNumberOfMessages(this.addressInfo); } clearIO(); - long totalMsgs = 0; try { - Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName()); - if (bindings != null) { - for (Binding binding : bindings.getBindings()) { - if (binding instanceof QueueBinding) { - totalMsgs += ((QueueBinding) binding).getQueue().getMessageCount(); - } - } - } - return totalMsgs; + return getMessageCount(); } catch (Throwable t) { throw new IllegalStateException(t.getMessage()); } finally { @@ -668,42 +659,25 @@ public class AddressControlImpl extends AbstractControl implements AddressContro private long getMessageCount(final DurabilityType durability) { - List queues = getQueues(durability); long count = 0; - for (QueueControl queue : queues) { - count += queue.getMessageCount(); + for (String queueName : getQueueNames()) { + Queue queue = server.locateQueue(queueName); + if (queue != null && + (durability == DurabilityType.ALL || + (durability == DurabilityType.DURABLE && queue.isDurable()) || + (durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) { + count += queue.getMessageCount(); + } } return count; } - private List getQueues(final DurabilityType durability) { - try { - List matchingQueues = new ArrayList<>(); - String[] queues = getQueueNames(); - for (String queue : queues) { - QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.QUEUE + queue); - - // Ignore the "special" subscription - if (coreQueueControl != null) { - if (durability == DurabilityType.ALL || durability == DurabilityType.DURABLE && coreQueueControl.isDurable() || - durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()) { - matchingQueues.add(coreQueueControl); - } - } - } - return matchingQueues; - } catch (Exception e) { - return Collections.emptyList(); - } - } - private void checkStarted() { if (!server.getPostOffice().isStarted()) { throw new IllegalStateException("Broker is not started. Queues can not be managed yet"); } } - private enum DurabilityType { ALL, DURABLE, NON_DURABLE } diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index ec6754bea3..7e383eda83 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -455,6 +455,12 @@ test + + org.mockito + mockito-core + test + + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java index 56c8671b2a..6e4e72bf37 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java @@ -22,14 +22,10 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; - -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.json.JsonArray; -import org.apache.activemq.artemis.json.JsonString; import java.text.SimpleDateFormat; import java.util.Arrays; -import java.util.EnumSet; import java.util.Date; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -51,15 +47,19 @@ import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.RoleInfo; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.json.JsonArray; +import org.apache.activemq.artemis.json.JsonString; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.Base64; @@ -67,6 +67,7 @@ import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import static org.apache.activemq.artemis.tests.util.RandomUtil.randomString; @@ -451,6 +452,36 @@ public class AddressControlTest extends ManagementTestBase { assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100)); } + @Test + public void testNumberOfMessages() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + session.createAddress(address, RoutingType.ANYCAST, false); + + AddressControl addressControl = createManagementControl(address); + assertEquals(0, addressControl.getNumberOfMessages()); + + ClientProducer producer = session.createProducer(address.toString()); + producer.send(session.createMessage(false)); + assertEquals(0, addressControl.getNumberOfMessages()); + + session.createQueue(new QueueConfiguration(address).setRoutingType(RoutingType.ANYCAST)); + producer.send(session.createMessage(false)); + Wait.assertTrue(() -> addressControl.getNumberOfMessages() == 1, 2000, 100); + + RemoteQueueBinding binding = Mockito.mock(RemoteQueueBinding.class); + Mockito.when(binding.getAddress()).thenReturn(address); + Queue queue = Mockito.mock(Queue.class); + Mockito.when(queue.getMessageCount()).thenReturn((long) 999); + Mockito.when(binding.getQueue()).thenReturn(queue); + Mockito.when(binding.getUniqueName()).thenReturn(RandomUtil.randomSimpleString()); + Mockito.when(binding.getRoutingName()).thenReturn(RandomUtil.randomSimpleString()); + Mockito.when(binding.getClusterName()).thenReturn(RandomUtil.randomSimpleString()); + Mockito.when(binding.getType()).thenReturn(BindingType.REMOTE_QUEUE); + server.getPostOffice().addBinding(binding); + + assertEquals(1, addressControl.getNumberOfMessages()); + } + @Test public void testGetRoutedMessageCounts() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java index 8aaace56bb..4ff0d43768 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java @@ -62,22 +62,22 @@ public class AddressControlUsingCoreTest extends AddressControlTest { } @Override - public long getNumberOfMessages() throws Exception { + public long getNumberOfMessages() { return (long) proxy.retrieveAttributeValue("numberOfMessages"); } @Override - public String[] getRemoteQueueNames() throws Exception { + public String[] getRemoteQueueNames() { return (String[]) proxy.retrieveAttributeValue("remoteQueueNames", String.class); } @Override - public String[] getAllQueueNames() throws Exception { + public String[] getAllQueueNames() { return (String[]) proxy.retrieveAttributeValue("allQueueNames", String.class); } @Override - public String[] getQueueNames() throws Exception { + public String[] getQueueNames() { return (String[]) proxy.retrieveAttributeValue("queueNames", String.class); }