ARTEMIS-2909 revert ARTEMIS-2322

This reverts commit dbb3a90fe6.

The org.apache.activemq.artemis.core.server.Queue#getRate method is for
slow-consumer detection and is designed for internal use only.

Furthermore, it's too opaque to be trusted by a remote user as it only
returns the number of message added to the queue since *the last time
it was called*. The problem here is that the user calling it doesn't
know when it was invoked last. Therefore, they could be getting the
rate of messages added for the last 5 minutes or the last 5
milliseconds. This can lead to inconsistent and misleading results.

There are three main ways for users to track rates of message
production and consumption:

 1. Use a metrics plugin. This is the most feature-rich and flexible
way to track broker metrics, although it requires tools (e.g.
Prometheus) to store the metrics and display them (e.g. Grafana).

 2. Invoke the getMessageCount() and getMessagesAdded() management
methods and store the returned values along with the time they were
retrieved. A time-series database is a great tool for this job. This is
exactly what tools like Prometheus do. That data can then be used to
create informative graphs, etc. using tools like Grafana. Of course, one
can skip all the tools and just do some simple math to calculate rates
based on the last time the counts were retrieved.

 3. Use the broker's message counters. Message counters are the broker's
simple way of providing historical information about the queue. They
provide similar results to the previous solutions, but with less
flexibility since they only track data while the broker is up and
there's not really any good options for graphing.
This commit is contained in:
Justin Bertram 2020-09-21 10:10:22 -05:00 committed by Clebert Suconic
parent ddef3895fd
commit 246bf08391
4 changed files with 6 additions and 36 deletions

View File

@ -2299,14 +2299,6 @@ public interface AuditLogger extends BasicLogger {
@Message(id = 601267, value = "User {0} is creating a core session on target resource {1} {2}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 601267, value = "User {0} is creating a core session on target resource {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void createCoreSession(String user, Object source, Object... args); void createCoreSession(String user, Object source, Object... args);
static void getProducedRate(Object source) {
LOGGER.getProducedRate(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601268, value = "User {0} is getting produced message rate on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getProducedRate(String user, Object source, Object... args);
static void getAcknowledgeAttempts(Object source) { static void getAcknowledgeAttempts(Object source) {
LOGGER.getMessagesAcknowledged(getCaller(), source); LOGGER.getMessagesAcknowledged(getCaller(), source);
} }

View File

@ -105,12 +105,6 @@ public interface QueueControl {
@Attribute(desc = MESSAGE_COUNT_DESCRIPTION) @Attribute(desc = MESSAGE_COUNT_DESCRIPTION)
long getMessageCount(); long getMessageCount();
/**
* Returns the rate of writing messages to the queue.
*/
@Attribute(desc = "rate of writing messages to the queue currently (based on default window function)")
float getProducedRate();
/** /**
* Returns the persistent size of all messages currently in this queue. The persistent size of a message * Returns the persistent size of all messages currently in this queue. The persistent size of a message
* is the amount of space the message would take up on disk which is used to track how much data there * is the amount of space the message would take up on disk which is used to track how much data there

View File

@ -270,17 +270,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
} }
} }
@Override
public float getProducedRate() {
if (AuditLogger.isEnabled()) {
AuditLogger.getProducedRate(queue);
}
checkStarted();
// This is an attribute, no need to blockOnIO
return queue.getRate();
}
@Override @Override
public long getPersistentSize() { public long getPersistentSize() {
if (AuditLogger.isEnabled()) { if (AuditLogger.isEnabled()) {
@ -920,7 +909,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
AuditLogger.countMessages(queue, filterStr); AuditLogger.countMessages(queue, filterStr);
} }
Long value = internalCountMessages(filterStr, null).get(null); Long value = intenalCountMessages(filterStr, null).get(null);
return value == null ? 0 : value; return value == null ? 0 : value;
} }
@ -930,10 +919,10 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
AuditLogger.countMessages(queue, filterStr, groupByProperty); AuditLogger.countMessages(queue, filterStr, groupByProperty);
} }
return JsonUtil.toJsonObject(internalCountMessages(filterStr, groupByProperty)).toString(); return JsonUtil.toJsonObject(intenalCountMessages(filterStr, groupByProperty)).toString();
} }
private Map<String, Long> internalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception { private Map<String, Long> intenalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
checkStarted(); checkStarted();
clearIO(); clearIO();
@ -968,7 +957,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
AuditLogger.countDeliveringMessages(queue, filterStr); AuditLogger.countDeliveringMessages(queue, filterStr);
} }
Long value = internalCountDeliveryMessages(filterStr, null).get(null); Long value = intenalCountDeliveryMessages(filterStr, null).get(null);
return value == null ? 0 : value; return value == null ? 0 : value;
} }
@ -978,10 +967,10 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
AuditLogger.countDeliveringMessages(queue, filterStr, groupByProperty); AuditLogger.countDeliveringMessages(queue, filterStr, groupByProperty);
} }
return JsonUtil.toJsonObject(internalCountDeliveryMessages(filterStr, groupByProperty)).toString(); return JsonUtil.toJsonObject(intenalCountDeliveryMessages(filterStr, groupByProperty)).toString();
} }
private Map<String, Long> internalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception { private Map<String, Long> intenalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
checkStarted(); checkStarted();
clearIO(); clearIO();

View File

@ -259,11 +259,6 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (Long) proxy.retrieveAttributeValue("messageCount", Long.class); return (Long) proxy.retrieveAttributeValue("messageCount", Long.class);
} }
@Override
public float getProducedRate() {
return (Long) proxy.retrieveAttributeValue("producedRate", Long.class);
}
@Override @Override
public long getMessagesAdded() { public long getMessagesAdded() {
return (Integer) proxy.retrieveAttributeValue("messagesAdded", Integer.class); return (Integer) proxy.retrieveAttributeValue("messagesAdded", Integer.class);