ARTEMIS-1964 fix and deprecate getNumberOfMessages() on AddressControl

AddressControl has 2 methods to get same metric. Both
getNumberOfMessages() and getMessageCount() return the same metric
albeit in different ways.

Also, getNumberOfMessages() inspects both "local" and "remote" queue
bindings which is wrong.

This commit fixes these issues via the following changes:

 - Deprecate getNumberOfMessages().
 - Change getNumberOfMessages() to invoke getMessageCount().
 - Add a test to ensure getNumberOfMessages() does not count remote
queue bindings.
 - Simplify getMessageCount(DurabilityType).
This commit is contained in:
Justin Bertram 2022-07-16 21:55:23 -05:00 committed by clebertsuconic
parent 8e54a65227
commit ff770d540d
5 changed files with 69 additions and 57 deletions

View File

@ -73,26 +73,27 @@ public interface AddressControl {
/** /**
* Returns the sum of messages on queue(s), including messages in delivery. * Returns the sum of messages on queue(s), including messages in delivery.
*/ */
@Attribute(desc = "the sum of messages on queue(s), including messages in delivery") @Deprecated
long getNumberOfMessages() throws Exception; @Attribute(desc = "the sum of messages on queue(s), including messages in delivery; DEPRECATED: use getMessageCount() instead")
long getNumberOfMessages();
/** /**
* Returns the names of the remote queue(s) bound to this address. * Returns the names of the remote queue(s) bound to this address.
*/ */
@Attribute(desc = "names of the remote queue(s) bound to this address") @Attribute(desc = "names of the remote queue(s) bound to this address")
String[] getRemoteQueueNames() throws Exception; String[] getRemoteQueueNames();
/** /**
* Returns the names of the local queue(s) bound to this address. * Returns the names of the local queue(s) bound to this address.
*/ */
@Attribute(desc = "names of the local queue(s) bound to this address") @Attribute(desc = "names of the local queue(s) bound to this address")
String[] getQueueNames() throws Exception; String[] getQueueNames();
/** /**
* Returns the names of both the local and remote queue(s) bound to this address. * Returns the names of both the local and remote queue(s) bound to this address.
*/ */
@Attribute(desc = "names of both the local & remote queue(s) bound to this address") @Attribute(desc = "names of both the local & remote queue(s) bound to this address")
String[] getAllQueueNames() throws Exception; String[] getAllQueueNames();
/** /**
* Returns the number of pages used by this address. * Returns the number of pages used by this address.

View File

@ -16,12 +16,10 @@
*/ */
package org.apache.activemq.artemis.core.management.impl; package org.apache.activemq.artemis.core.management.impl;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import javax.management.MBeanAttributeInfo; import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo; import javax.management.MBeanOperationInfo;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
@ -30,7 +28,6 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
@ -45,6 +42,7 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@ -52,6 +50,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.replay.ReplayManager; import org.apache.activemq.artemis.core.server.replay.ReplayManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.utils.JsonLoader; import org.apache.activemq.artemis.utils.JsonLoader;
@ -130,17 +129,17 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
} }
@Override @Override
public String[] getRemoteQueueNames() throws Exception { public String[] getRemoteQueueNames() {
return getQueueNames(SearchType.REMOTE); return getQueueNames(SearchType.REMOTE);
} }
@Override @Override
public String[] getQueueNames() throws Exception { public String[] getQueueNames() {
return getQueueNames(SearchType.LOCAL); return getQueueNames(SearchType.LOCAL);
} }
@Override @Override
public String[] getAllQueueNames() throws Exception { public String[] getAllQueueNames() {
return getQueueNames(SearchType.ALL); return getQueueNames(SearchType.ALL);
} }
@ -148,7 +147,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
LOCAL, REMOTE, ALL LOCAL, REMOTE, ALL
} }
private String[] getQueueNames(SearchType searchType) throws Exception { private String[] getQueueNames(SearchType searchType) {
if (AuditLogger.isBaseLoggingEnabled()) { if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getQueueNames(this.addressInfo, searchType); AuditLogger.getQueueNames(this.addressInfo, searchType);
} }
@ -301,22 +300,14 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
} }
@Override @Override
public long getNumberOfMessages() throws Exception { @Deprecated
public long getNumberOfMessages() {
if (AuditLogger.isBaseLoggingEnabled()) { if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getNumberOfMessages(this.addressInfo); AuditLogger.getNumberOfMessages(this.addressInfo);
} }
clearIO(); clearIO();
long totalMsgs = 0;
try { try {
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName()); return getMessageCount();
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
totalMsgs += ((QueueBinding) binding).getQueue().getMessageCount();
}
}
}
return totalMsgs;
} catch (Throwable t) { } catch (Throwable t) {
throw new IllegalStateException(t.getMessage()); throw new IllegalStateException(t.getMessage());
} finally { } finally {
@ -668,42 +659,25 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
private long getMessageCount(final DurabilityType durability) { private long getMessageCount(final DurabilityType durability) {
List<QueueControl> queues = getQueues(durability);
long count = 0; long count = 0;
for (QueueControl queue : queues) { for (String queueName : getQueueNames()) {
count += queue.getMessageCount(); Queue queue = server.locateQueue(queueName);
if (queue != null &&
(durability == DurabilityType.ALL ||
(durability == DurabilityType.DURABLE && queue.isDurable()) ||
(durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) {
count += queue.getMessageCount();
}
} }
return count; return count;
} }
private List<QueueControl> getQueues(final DurabilityType durability) {
try {
List<QueueControl> matchingQueues = new ArrayList<>();
String[] queues = getQueueNames();
for (String queue : queues) {
QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.QUEUE + queue);
// Ignore the "special" subscription
if (coreQueueControl != null) {
if (durability == DurabilityType.ALL || durability == DurabilityType.DURABLE && coreQueueControl.isDurable() ||
durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()) {
matchingQueues.add(coreQueueControl);
}
}
}
return matchingQueues;
} catch (Exception e) {
return Collections.emptyList();
}
}
private void checkStarted() { private void checkStarted() {
if (!server.getPostOffice().isStarted()) { if (!server.getPostOffice().isStarted()) {
throw new IllegalStateException("Broker is not started. Queues can not be managed yet"); throw new IllegalStateException("Broker is not started. Queues can not be managed yet");
} }
} }
private enum DurabilityType { private enum DurabilityType {
ALL, DURABLE, NON_DURABLE ALL, DURABLE, NON_DURABLE
} }

View File

@ -455,6 +455,12 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -22,14 +22,10 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonString;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet;
import java.util.Date; import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -51,15 +47,19 @@ import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.core.management.RoleInfo; import org.apache.activemq.artemis.api.core.management.RoleInfo;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl; import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonString;
import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.Base64;
@ -67,6 +67,7 @@ import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import static org.apache.activemq.artemis.tests.util.RandomUtil.randomString; import static org.apache.activemq.artemis.tests.util.RandomUtil.randomString;
@ -451,6 +452,36 @@ public class AddressControlTest extends ManagementTestBase {
assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100)); assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100));
} }
@Test
public void testNumberOfMessages() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
session.createAddress(address, RoutingType.ANYCAST, false);
AddressControl addressControl = createManagementControl(address);
assertEquals(0, addressControl.getNumberOfMessages());
ClientProducer producer = session.createProducer(address.toString());
producer.send(session.createMessage(false));
assertEquals(0, addressControl.getNumberOfMessages());
session.createQueue(new QueueConfiguration(address).setRoutingType(RoutingType.ANYCAST));
producer.send(session.createMessage(false));
Wait.assertTrue(() -> addressControl.getNumberOfMessages() == 1, 2000, 100);
RemoteQueueBinding binding = Mockito.mock(RemoteQueueBinding.class);
Mockito.when(binding.getAddress()).thenReturn(address);
Queue queue = Mockito.mock(Queue.class);
Mockito.when(queue.getMessageCount()).thenReturn((long) 999);
Mockito.when(binding.getQueue()).thenReturn(queue);
Mockito.when(binding.getUniqueName()).thenReturn(RandomUtil.randomSimpleString());
Mockito.when(binding.getRoutingName()).thenReturn(RandomUtil.randomSimpleString());
Mockito.when(binding.getClusterName()).thenReturn(RandomUtil.randomSimpleString());
Mockito.when(binding.getType()).thenReturn(BindingType.REMOTE_QUEUE);
server.getPostOffice().addBinding(binding);
assertEquals(1, addressControl.getNumberOfMessages());
}
@Test @Test
public void testGetRoutedMessageCounts() throws Exception { public void testGetRoutedMessageCounts() throws Exception {
SimpleString address = RandomUtil.randomSimpleString(); SimpleString address = RandomUtil.randomSimpleString();

View File

@ -62,22 +62,22 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
} }
@Override @Override
public long getNumberOfMessages() throws Exception { public long getNumberOfMessages() {
return (long) proxy.retrieveAttributeValue("numberOfMessages"); return (long) proxy.retrieveAttributeValue("numberOfMessages");
} }
@Override @Override
public String[] getRemoteQueueNames() throws Exception { public String[] getRemoteQueueNames() {
return (String[]) proxy.retrieveAttributeValue("remoteQueueNames", String.class); return (String[]) proxy.retrieveAttributeValue("remoteQueueNames", String.class);
} }
@Override @Override
public String[] getAllQueueNames() throws Exception { public String[] getAllQueueNames() {
return (String[]) proxy.retrieveAttributeValue("allQueueNames", String.class); return (String[]) proxy.retrieveAttributeValue("allQueueNames", String.class);
} }
@Override @Override
public String[] getQueueNames() throws Exception { public String[] getQueueNames() {
return (String[]) proxy.retrieveAttributeValue("queueNames", String.class); return (String[]) proxy.retrieveAttributeValue("queueNames", String.class);
} }