This commit is contained in:
Justin Bertram 2021-05-24 11:49:38 -05:00
commit a2db9d862b
14 changed files with 198 additions and 28 deletions

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
/** /**
* A Validators. * A Validators.
@ -193,6 +194,19 @@ public final class Validators {
} }
}; };
public static final Validator SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = new Validator() {
@Override
public void validate(final String name, final Object value) {
String val = (String) value;
if (val == null || !val.equals(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_SECOND.toString()) &&
!val.equals(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_MINUTE.toString()) &&
!val.equals(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_HOUR.toString()) &&
!val.equals(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_DAY.toString())) {
throw ActiveMQMessageBundle.BUNDLE.invalidSlowConsumerThresholdMeasurementUnit(val);
}
}
};
public static final Validator SLOW_CONSUMER_POLICY_TYPE = new Validator() { public static final Validator SLOW_CONSUMER_POLICY_TYPE = new Validator() {
@Override @Override
public void validate(final String name, final Object value) { public void validate(final String name, final Object value) {

View File

@ -94,6 +94,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
@ -240,6 +241,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String SLOW_CONSUMER_THRESHOLD_NODE_NAME = "slow-consumer-threshold"; private static final String SLOW_CONSUMER_THRESHOLD_NODE_NAME = "slow-consumer-threshold";
private static final String SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT_NODE_NAME = "slow-consumer-threshold-measurement-unit";
private static final String SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME = "slow-consumer-check-period"; private static final String SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME = "slow-consumer-check-period";
private static final String SLOW_CONSUMER_POLICY_NODE_NAME = "slow-consumer-policy"; private static final String SLOW_CONSUMER_POLICY_NODE_NAME = "slow-consumer-policy";
@ -1247,6 +1250,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
Validators.MINUS_ONE_OR_GT_ZERO.validate(SLOW_CONSUMER_THRESHOLD_NODE_NAME, slowConsumerThreshold); Validators.MINUS_ONE_OR_GT_ZERO.validate(SLOW_CONSUMER_THRESHOLD_NODE_NAME, slowConsumerThreshold);
addressSettings.setSlowConsumerThreshold(slowConsumerThreshold); addressSettings.setSlowConsumerThreshold(slowConsumerThreshold);
} else if (SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT_NODE_NAME.equalsIgnoreCase(name)) {
String slowConsumerThresholdMeasurementUnit = getTrimmedTextContent(child);
Validators.SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT.validate(SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT_NODE_NAME, slowConsumerThresholdMeasurementUnit);
addressSettings.setSlowConsumerThresholdMeasurementUnit(SlowConsumerThresholdMeasurementUnit.valueOf(slowConsumerThresholdMeasurementUnit));
} else if (SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME.equalsIgnoreCase(name)) { } else if (SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME.equalsIgnoreCase(name)) {
long slowConsumerCheckPeriod = XMLUtil.parseLong(child); long slowConsumerCheckPeriod = XMLUtil.parseLong(child);
Validators.GT_ZERO.validate(SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME, slowConsumerCheckPeriod); Validators.GT_ZERO.validate(SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME, slowConsumerCheckPeriod);

View File

@ -2934,6 +2934,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
.add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()) .add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute())
.add("addressFullMessagePolicy", policy) .add("addressFullMessagePolicy", policy)
.add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()) .add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold())
.add("slowConsumerThresholdMeasurementUnit", addressSettings.getSlowConsumerThresholdMeasurementUnit().toString())
.add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()) .add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod())
.add("slowConsumerPolicy", consumerPolicy) .add("slowConsumerPolicy", consumerPolicy)
.add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues()) .add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues())

View File

@ -498,4 +498,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229233, value = "Cannot set ActiveMQSecurityManager during startup or while started") @Message(id = 229233, value = "Cannot set ActiveMQSecurityManager during startup or while started")
IllegalStateException cannotSetSecurityManager(); IllegalStateException cannotSetSecurityManager();
@Message(id = 229234, value = "Invalid slow consumer threshold measurement unit {0}", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException invalidSlowConsumerThresholdMeasurementUnit(String val);
} }

View File

@ -94,6 +94,7 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener; import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
@ -4344,7 +4345,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} else { } else {
if (slowConsumerReaperRunnable == null) { if (slowConsumerReaperRunnable == null) {
scheduleSlowConsumerReaper(settings); scheduleSlowConsumerReaper(settings);
} else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.threshold != settings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy())) { } else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != settings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy())) {
if (slowConsumerReaperFuture != null) { if (slowConsumerReaperFuture != null) {
slowConsumerReaperFuture.cancel(false); slowConsumerReaperFuture.cancel(false);
slowConsumerReaperFuture = null; slowConsumerReaperFuture = null;
@ -4355,12 +4356,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
void scheduleSlowConsumerReaper(AddressSettings settings) { void scheduleSlowConsumerReaper(AddressSettings settings) {
slowConsumerReaperRunnable = new SlowConsumerReaperRunnable(settings.getSlowConsumerCheckPeriod(), settings.getSlowConsumerThreshold(), settings.getSlowConsumerPolicy()); slowConsumerReaperRunnable = new SlowConsumerReaperRunnable(settings.getSlowConsumerCheckPeriod(), settings.getSlowConsumerThreshold(), settings.getSlowConsumerThresholdMeasurementUnit(), settings.getSlowConsumerPolicy());
slowConsumerReaperFuture = scheduledExecutor.scheduleWithFixedDelay(slowConsumerReaperRunnable, settings.getSlowConsumerCheckPeriod(), settings.getSlowConsumerCheckPeriod(), TimeUnit.SECONDS); slowConsumerReaperFuture = scheduledExecutor.scheduleWithFixedDelay(slowConsumerReaperRunnable, settings.getSlowConsumerCheckPeriod(), settings.getSlowConsumerCheckPeriod(), TimeUnit.SECONDS);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Scheduled slow-consumer-reaper thread for queue \"" + getName() + "\"; slow-consumer-check-period=" + settings.getSlowConsumerCheckPeriod() + ", slow-consumer-threshold=" + settings.getSlowConsumerThreshold() + ", slow-consumer-policy=" + settings.getSlowConsumerPolicy()); logger.debug("Scheduled slow-consumer-reaper thread for queue \"" + getName() + "\"; slow-consumer-check-period=" + settings.getSlowConsumerCheckPeriod() + ", slow-consumer-threshold=" + settings.getSlowConsumerThreshold() + ", slow-consumer-threshold-measurement-unit=" + settings.getSlowConsumerThresholdMeasurementUnit().toString() + ", slow-consumer-policy=" + settings.getSlowConsumerPolicy());
} }
} }
@ -4424,13 +4425,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final class SlowConsumerReaperRunnable implements Runnable { private final class SlowConsumerReaperRunnable implements Runnable {
private final SlowConsumerPolicy policy; private final SlowConsumerPolicy policy;
private final float threshold; private final float thresholdInMsgPerSecond;
private final long checkPeriod; private final long checkPeriod;
private SlowConsumerReaperRunnable(long checkPeriod, float threshold, SlowConsumerPolicy policy) { private SlowConsumerReaperRunnable(long checkPeriod, float slowConsumerThreshold, SlowConsumerThresholdMeasurementUnit unit, SlowConsumerPolicy policy) {
this.checkPeriod = checkPeriod; this.checkPeriod = checkPeriod;
this.policy = policy; this.policy = policy;
this.threshold = threshold; this.thresholdInMsgPerSecond = slowConsumerThreshold / unit.getValue();
} }
@Override @Override
@ -4447,7 +4448,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.debug("There are no consumers, no need to check slow consumer's rate"); logger.debug("There are no consumers, no need to check slow consumer's rate");
return; return;
} else { } else {
float queueThreshold = threshold * consumers.size(); float queueThreshold = thresholdInMsgPerSecond * consumers.size();
if (queueRate < queueThreshold && queueMessages < queueThreshold) { if (queueRate < queueThreshold && queueMessages < queueThreshold) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
@ -4462,7 +4463,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (consumer instanceof ServerConsumerImpl) { if (consumer instanceof ServerConsumerImpl) {
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
float consumerRate = serverConsumer.getRate(); float consumerRate = serverConsumer.getRate();
if (consumerRate < threshold) { if (consumerRate < thresholdInMsgPerSecond) {
RemotingConnection connection = null; RemotingConnection connection = null;
ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer(); ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();
RemotingService remotingService = server.getRemotingService(); RemotingService remotingService = server.getRemotingService();
@ -4476,7 +4477,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
serverConsumer.fireSlowConsumer(); serverConsumer.fireSlowConsumer();
if (connection != null) { if (connection != null) {
ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate); ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(),
thresholdInMsgPerSecond, consumerRate);
if (policy.equals(SlowConsumerPolicy.KILL)) { if (policy.equals(SlowConsumerPolicy.KILL)) {
connection.killMessage(server.getNodeID()); connection.killMessage(server.getNodeID());
remotingService.removeConnection(connection.getID()); remotingService.removeConnection(connection.getID());

View File

@ -131,6 +131,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final int MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT = 256; public static final int MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT = 256;
public static final SlowConsumerThresholdMeasurementUnit DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_SECOND;
private AddressFullMessagePolicy addressFullMessagePolicy = null; private AddressFullMessagePolicy addressFullMessagePolicy = null;
private Long maxSizeBytes = null; private Long maxSizeBytes = null;
@ -185,6 +187,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Long slowConsumerThreshold = null; private Long slowConsumerThreshold = null;
private SlowConsumerThresholdMeasurementUnit slowConsumerThresholdMeasurementUnit = DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT;
private Long slowConsumerCheckPeriod = null; private Long slowConsumerCheckPeriod = null;
private SlowConsumerPolicy slowConsumerPolicy = null; private SlowConsumerPolicy slowConsumerPolicy = null;
@ -327,6 +331,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.defaultRingSize = other.defaultRingSize; this.defaultRingSize = other.defaultRingSize;
this.enableMetrics = other.enableMetrics; this.enableMetrics = other.enableMetrics;
this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit; this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit;
this.slowConsumerThresholdMeasurementUnit = other.slowConsumerThresholdMeasurementUnit;
} }
public AddressSettings() { public AddressSettings() {
@ -785,6 +790,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this; return this;
} }
public SlowConsumerThresholdMeasurementUnit getSlowConsumerThresholdMeasurementUnit() {
return slowConsumerThresholdMeasurementUnit != null ? slowConsumerThresholdMeasurementUnit : AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT;
}
public AddressSettings setSlowConsumerThresholdMeasurementUnit(final SlowConsumerThresholdMeasurementUnit slowConsumerThresholdMeasurementUnit) {
this.slowConsumerThresholdMeasurementUnit = slowConsumerThresholdMeasurementUnit;
return this;
}
public long getSlowConsumerCheckPeriod() { public long getSlowConsumerCheckPeriod() {
return slowConsumerCheckPeriod != null ? slowConsumerCheckPeriod : AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD; return slowConsumerCheckPeriod != null ? slowConsumerCheckPeriod : AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD;
} }
@ -1005,6 +1019,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (slowConsumerThreshold == null) { if (slowConsumerThreshold == null) {
slowConsumerThreshold = merged.slowConsumerThreshold; slowConsumerThreshold = merged.slowConsumerThreshold;
} }
if (slowConsumerThresholdMeasurementUnit == null) {
slowConsumerThresholdMeasurementUnit = merged.slowConsumerThresholdMeasurementUnit;
}
if (slowConsumerCheckPeriod == null) { if (slowConsumerCheckPeriod == null) {
slowConsumerCheckPeriod = merged.slowConsumerCheckPeriod; slowConsumerCheckPeriod = merged.slowConsumerCheckPeriod;
} }
@ -1354,6 +1371,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
managementMessageAttributeSizeLimit = BufferHelper.readNullableInteger(buffer); managementMessageAttributeSizeLimit = BufferHelper.readNullableInteger(buffer);
} }
if (buffer.readableBytes() > 0) {
Integer slowConsumerMeasurementUnitEnumValue = BufferHelper.readNullableInteger(buffer);
if (slowConsumerMeasurementUnitEnumValue != null) {
slowConsumerThresholdMeasurementUnit = SlowConsumerThresholdMeasurementUnit.valueOf(slowConsumerMeasurementUnitEnumValue);
}
}
} }
@Override @Override
@ -1418,7 +1441,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
SimpleString.sizeofNullableString(expiryQueueSuffix) + SimpleString.sizeofNullableString(expiryQueueSuffix) +
BufferHelper.sizeOfNullableBoolean(enableMetrics) + BufferHelper.sizeOfNullableBoolean(enableMetrics) +
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) + BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) +
BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit); BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit) +
BufferHelper.sizeOfNullableInteger(slowConsumerThresholdMeasurementUnit.getValue());
} }
@Override @Override
@ -1546,6 +1570,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch); BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch);
BufferHelper.writeNullableInteger(buffer, managementMessageAttributeSizeLimit); BufferHelper.writeNullableInteger(buffer, managementMessageAttributeSizeLimit);
BufferHelper.writeNullableInteger(buffer, slowConsumerThresholdMeasurementUnit == null ? null : slowConsumerThresholdMeasurementUnit.getValue());
} }
/* (non-Javadoc) /* (non-Javadoc)
@ -1619,6 +1645,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((expiryQueueSuffix == null) ? 0 : expiryQueueSuffix.hashCode()); result = prime * result + ((expiryQueueSuffix == null) ? 0 : expiryQueueSuffix.hashCode());
result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode()); result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode());
result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode()); result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode());
result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode());
return result; return result;
} }
@ -1976,6 +2003,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
} else if (!enableMetrics.equals(other.enableMetrics)) } else if (!enableMetrics.equals(other.enableMetrics))
return false; return false;
if (slowConsumerThresholdMeasurementUnit != other.slowConsumerThresholdMeasurementUnit)
return false;
return true; return true;
} }
@ -2031,6 +2061,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
sendToDLAOnNoRoute + sendToDLAOnNoRoute +
", slowConsumerThreshold=" + ", slowConsumerThreshold=" +
slowConsumerThreshold + slowConsumerThreshold +
", slowConsumerThresholdMeasurementUnit=" +
slowConsumerThresholdMeasurementUnit +
", slowConsumerCheckPeriod=" + ", slowConsumerCheckPeriod=" +
slowConsumerCheckPeriod + slowConsumerCheckPeriod +
", slowConsumerPolicy=" + ", slowConsumerPolicy=" +

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.settings.impl;
public enum SlowConsumerThresholdMeasurementUnit {
MESSAGES_PER_SECOND(1), MESSAGES_PER_MINUTE(60), MESSAGES_PER_HOUR(3600), MESSAGES_PER_DAY(3600 * 24);
private final int measurementUnitInSeconds;
SlowConsumerThresholdMeasurementUnit(int measurementUnitInSeconds) {
this.measurementUnitInSeconds = measurementUnitInSeconds;
}
public static SlowConsumerThresholdMeasurementUnit valueOf(int measurementUnitInSeconds) {
switch (measurementUnitInSeconds) {
case 1:
return MESSAGES_PER_SECOND;
case 60:
return MESSAGES_PER_MINUTE;
case 3600:
return MESSAGES_PER_HOUR;
case 3600 * 24:
return MESSAGES_PER_DAY;
default:
return null;
}
}
public int getValue() {
return measurementUnitInSeconds;
}
}

View File

@ -3685,12 +3685,29 @@
<xsd:element name="slow-consumer-threshold" type="xsd:long" maxOccurs="1" minOccurs="0"> <xsd:element name="slow-consumer-threshold" type="xsd:long" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
The minimum rate of message consumption allowed before a consumer is considered "slow." Measured The minimum rate of message consumption allowed before a consumer is considered "slow." Measurement
in messages-per-second. unit is defined by the slow-consumer-threshold-measurement-unit parameter. By default this is
messages-per-seconds
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="slow-consumer-threshold-measurement-unit" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The units used to measure the slow consumer threshold. Default is messages-per-second.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="MESSAGES_PER_SECOND"/>
<xsd:enumeration value="MESSAGES_PER_MINUTE"/>
<xsd:enumeration value="MESSAGES_PER_HOUR"/>
<xsd:enumeration value="MESSAGES_PER_DAY"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
<xsd:element name="slow-consumer-policy" default="NOTIFY" maxOccurs="1" minOccurs="0"> <xsd:element name="slow-consumer-policy" default="NOTIFY" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>

View File

@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -367,6 +368,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(10, conf.getAddressesSettings().get("a1").getPageCacheMaxSize()); assertEquals(10, conf.getAddressesSettings().get("a1").getPageCacheMaxSize());
assertEquals(4, conf.getAddressesSettings().get("a1").getMessageCounterHistoryDayLimit()); assertEquals(4, conf.getAddressesSettings().get("a1").getMessageCounterHistoryDayLimit());
assertEquals(10, conf.getAddressesSettings().get("a1").getSlowConsumerThreshold()); assertEquals(10, conf.getAddressesSettings().get("a1").getSlowConsumerThreshold());
assertEquals(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_HOUR, conf.getAddressesSettings().get("a1").getSlowConsumerThresholdMeasurementUnit());
assertEquals(5, conf.getAddressesSettings().get("a1").getSlowConsumerCheckPeriod()); assertEquals(5, conf.getAddressesSettings().get("a1").getSlowConsumerCheckPeriod());
assertEquals(SlowConsumerPolicy.NOTIFY, conf.getAddressesSettings().get("a1").getSlowConsumerPolicy()); assertEquals(SlowConsumerPolicy.NOTIFY, conf.getAddressesSettings().get("a1").getSlowConsumerPolicy());
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsQueues()); assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsQueues());
@ -401,6 +403,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(20, conf.getAddressesSettings().get("a2").getPageCacheMaxSize()); assertEquals(20, conf.getAddressesSettings().get("a2").getPageCacheMaxSize());
assertEquals(8, conf.getAddressesSettings().get("a2").getMessageCounterHistoryDayLimit()); assertEquals(8, conf.getAddressesSettings().get("a2").getMessageCounterHistoryDayLimit());
assertEquals(20, conf.getAddressesSettings().get("a2").getSlowConsumerThreshold()); assertEquals(20, conf.getAddressesSettings().get("a2").getSlowConsumerThreshold());
assertEquals(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_DAY, conf.getAddressesSettings().get("a2").getSlowConsumerThresholdMeasurementUnit());
assertEquals(15, conf.getAddressesSettings().get("a2").getSlowConsumerCheckPeriod()); assertEquals(15, conf.getAddressesSettings().get("a2").getSlowConsumerCheckPeriod());
assertEquals(SlowConsumerPolicy.KILL, conf.getAddressesSettings().get("a2").getSlowConsumerPolicy()); assertEquals(SlowConsumerPolicy.KILL, conf.getAddressesSettings().get("a2").getSlowConsumerPolicy());
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues()); assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues());

View File

@ -38,6 +38,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
Assert.assertEquals(AddressSettings.DEFAULT_REDELIVER_DELAY, addressSettings.getRedeliveryDelay()); Assert.assertEquals(AddressSettings.DEFAULT_REDELIVER_DELAY, addressSettings.getRedeliveryDelay());
Assert.assertEquals(AddressSettings.DEFAULT_REDELIVER_MULTIPLIER, addressSettings.getRedeliveryMultiplier(), 0.000001); Assert.assertEquals(AddressSettings.DEFAULT_REDELIVER_MULTIPLIER, addressSettings.getRedeliveryMultiplier(), 0.000001);
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD, addressSettings.getSlowConsumerThreshold()); Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD, addressSettings.getSlowConsumerThreshold());
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT, addressSettings.getSlowConsumerThresholdMeasurementUnit());
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD, addressSettings.getSlowConsumerCheckPeriod()); Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD, addressSettings.getSlowConsumerCheckPeriod());
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY, addressSettings.getSlowConsumerPolicy()); Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY, addressSettings.getSlowConsumerPolicy());
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_JMS_QUEUES, addressSettings.isAutoCreateJmsQueues()); Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_JMS_QUEUES, addressSettings.isAutoCreateJmsQueues());

View File

@ -458,6 +458,7 @@
<slow-consumer-threshold>10</slow-consumer-threshold> <slow-consumer-threshold>10</slow-consumer-threshold>
<slow-consumer-check-period>5</slow-consumer-check-period> <slow-consumer-check-period>5</slow-consumer-check-period>
<slow-consumer-policy>NOTIFY</slow-consumer-policy> <slow-consumer-policy>NOTIFY</slow-consumer-policy>
<slow-consumer-threshold-measurement-unit>MESSAGES_PER_HOUR</slow-consumer-threshold-measurement-unit>
<auto-create-jms-queues>true</auto-create-jms-queues> <auto-create-jms-queues>true</auto-create-jms-queues>
<auto-delete-jms-queues>true</auto-delete-jms-queues> <auto-delete-jms-queues>true</auto-delete-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics> <auto-create-jms-topics>true</auto-create-jms-topics>
@ -488,6 +489,7 @@
<message-counter-history-day-limit>8</message-counter-history-day-limit> <message-counter-history-day-limit>8</message-counter-history-day-limit>
<slow-consumer-threshold>20</slow-consumer-threshold> <slow-consumer-threshold>20</slow-consumer-threshold>
<slow-consumer-check-period>15</slow-consumer-check-period> <slow-consumer-check-period>15</slow-consumer-check-period>
<slow-consumer-threshold-measurement-unit>MESSAGES_PER_DAY</slow-consumer-threshold-measurement-unit>
<slow-consumer-policy>KILL</slow-consumer-policy> <slow-consumer-policy>KILL</slow-consumer-policy>
<auto-create-jms-queues>false</auto-create-jms-queues> <auto-create-jms-queues>false</auto-create-jms-queues>
<auto-delete-jms-queues>false</auto-delete-jms-queues> <auto-delete-jms-queues>false</auto-delete-jms-queues>

View File

@ -30,6 +30,7 @@
<slow-consumer-threshold>10</slow-consumer-threshold> <slow-consumer-threshold>10</slow-consumer-threshold>
<slow-consumer-check-period>5</slow-consumer-check-period> <slow-consumer-check-period>5</slow-consumer-check-period>
<slow-consumer-policy>NOTIFY</slow-consumer-policy> <slow-consumer-policy>NOTIFY</slow-consumer-policy>
<slow-consumer-threshold-measurement-unit>MESSAGES_PER_HOUR</slow-consumer-threshold-measurement-unit>
<auto-create-jms-queues>true</auto-create-jms-queues> <auto-create-jms-queues>true</auto-create-jms-queues>
<auto-delete-jms-queues>true</auto-delete-jms-queues> <auto-delete-jms-queues>true</auto-delete-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics> <auto-create-jms-topics>true</auto-create-jms-topics>
@ -61,6 +62,7 @@
<slow-consumer-threshold>20</slow-consumer-threshold> <slow-consumer-threshold>20</slow-consumer-threshold>
<slow-consumer-check-period>15</slow-consumer-check-period> <slow-consumer-check-period>15</slow-consumer-check-period>
<slow-consumer-policy>KILL</slow-consumer-policy> <slow-consumer-policy>KILL</slow-consumer-policy>
<slow-consumer-threshold-measurement-unit>MESSAGES_PER_DAY</slow-consumer-threshold-measurement-unit>
<auto-create-jms-queues>false</auto-create-jms-queues> <auto-create-jms-queues>false</auto-create-jms-queues>
<auto-delete-jms-queues>false</auto-delete-jms-queues> <auto-delete-jms-queues>false</auto-delete-jms-queues>
<auto-create-jms-topics>false</auto-create-jms-topics> <auto-create-jms-topics>false</auto-create-jms-topics>

View File

@ -660,6 +660,7 @@ that would be found in the `broker.xml` file.
<redistribution-delay>0</redistribution-delay> <redistribution-delay>0</redistribution-delay>
<send-to-dla-on-no-route>true</send-to-dla-on-no-route> <send-to-dla-on-no-route>true</send-to-dla-on-no-route>
<slow-consumer-threshold>-1</slow-consumer-threshold> <slow-consumer-threshold>-1</slow-consumer-threshold>
<slow-consumer-threshold-measurement-unit>MESSAGES_PER_SECOND</slow-consumer-threshold-measurement-unit>
<slow-consumer-policy>NOTIFY</slow-consumer-policy> <slow-consumer-policy>NOTIFY</slow-consumer-policy>
<slow-consumer-check-period>5</slow-consumer-check-period> <slow-consumer-check-period>5</slow-consumer-check-period>
<auto-create-jms-queues>true</auto-create-jms-queues> <!-- deprecated! see auto-create-queues --> <auto-create-jms-queues>true</auto-create-jms-queues> <!-- deprecated! see auto-create-queues -->
@ -823,8 +824,19 @@ message will instead be sent to the `dead-letter-address` (DLA) for that
address, if it exists. address, if it exists.
`slow-consumer-threshold`. The minimum rate of message consumption allowed `slow-consumer-threshold`. The minimum rate of message consumption allowed
before a consumer is considered "slow." Measured in messages-per-second. before a consumer is considered "slow." Measured in units specified by the
Default is `-1` (i.e. disabled); any other valid value must be greater than 0. slow-consumer-threshold-measurement-unit configuration option. Default is `-1`
(i.e. disabled); any other valid value must be greater than 0.
Read more about [slow consumers](slow-consumers.md).
`slow-consumer-threshold-measurement-unit`. The units used to measure the
slow-consumer-threshold. Valid options are:
* MESSAGES_PER_SECOND
* MESSAGES_PER_MINUTE
* MESSAGES_PER_HOUR
* MESSAGES_PER_DAY
If no unit is specified the default MESSAGES_PER_SECOND will be used.
Read more about [slow consumers](slow-consumers.md). Read more about [slow consumers](slow-consumers.md).
`slow-consumer-policy`. What should happen when a slow consumer is detected. `slow-consumer-policy`. What should happen when a slow consumer is detected.
@ -834,7 +846,13 @@ CONSUMER\_SLOW management notification which an application could receive and
take action with. Read more about [slow consumers](slow-consumers.md). take action with. Read more about [slow consumers](slow-consumers.md).
`slow-consumer-check-period`. How often to check for slow consumers on a `slow-consumer-check-period`. How often to check for slow consumers on a
particular queue. Measured in *seconds*. Default is `5`. Read more about [slow particular queue. Measured in *seconds*. Default is `5`.
* Note: This should be at least 2x the maximum time it takes a consumer to process
1 message. For example, if the slow-consumer-threshold is set to 1 and the
slow-consumer-threshold-measurement-unit is set to MESSAGES_PER_MINUTE then this
should be set to at least 2 x 60s i.e. 120s.
Read more about [slow
consumers](slow-consumers.md). consumers](slow-consumers.md).
`auto-create-jms-queues` is **deprecated**. See `auto-create-queues`. Whether `auto-create-jms-queues` is **deprecated**. See `auto-create-queues`. Whether

View File

@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TimeUtils; import org.apache.activemq.artemis.utils.TimeUtils;
@ -55,24 +56,42 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_MINUTE;
import static org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_SECOND;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
public class SlowConsumerTest extends ActiveMQTestBase { public class SlowConsumerTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(SlowConsumerTest.class); private static final Logger logger = Logger.getLogger(SlowConsumerTest.class);
private int threshold = 10; private int threshold;
private float consumerRate; // The rate actual consumers will run in the test.
private SlowConsumerThresholdMeasurementUnit unit;
private long checkPeriod = 1; private long checkPeriod = 1;
private boolean isNetty = false; private boolean isNetty = false;
private boolean isPaging = false; private boolean isPaging = false;
// this will ensure that all tests in this class are run twice, // this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false" // once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "netty={0}, paging={1}") @Parameterized.Parameters(name = "netty={0}, paging={1}, threshold={2}, threshold_units={3}")
public static Collection getParameters() { public static Collection getParameters() {
return Arrays.asList(new Object[][]{{true, false}, {false, false}, {true, true}, {false, true}}); return Arrays.asList(new Object[][]{
{true, false, 10, MESSAGES_PER_SECOND},
{false, false, 10, MESSAGES_PER_SECOND},
{true, true, 10, MESSAGES_PER_SECOND},
{false, true, 10, MESSAGES_PER_SECOND},
{true, false, 1, MESSAGES_PER_MINUTE},
{false, false, 1, MESSAGES_PER_MINUTE},
{true, true, 1, MESSAGES_PER_MINUTE},
{false, true, 1, MESSAGES_PER_MINUTE}
});
} }
public SlowConsumerTest(boolean isNetty, boolean isPaging) { public SlowConsumerTest(boolean isNetty, boolean isPaging, int threshold, SlowConsumerThresholdMeasurementUnit unit) {
this.threshold = threshold;
this.unit = unit;
this.consumerRate = threshold / unit.getValue();
this.isNetty = isNetty; this.isNetty = isNetty;
this.isPaging = isPaging; this.isPaging = isPaging;
} }
@ -93,6 +112,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
AddressSettings addressSettings = new AddressSettings(); AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(checkPeriod); addressSettings.setSlowConsumerCheckPeriod(checkPeriod);
addressSettings.setSlowConsumerThreshold(threshold); addressSettings.setSlowConsumerThreshold(threshold);
addressSettings.setSlowConsumerThresholdMeasurementUnit(unit);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
if (isPaging) { if (isPaging) {
@ -442,14 +462,14 @@ public class SlowConsumerTest extends ActiveMQTestBase {
final int messages = 10 * threshold; final int messages = 10 * threshold;
FixedRateProducer producer = new FixedRateProducer(threshold * 2, sf1, QUEUE, messages); FixedRateProducer producer = new FixedRateProducer(threshold * 2, unit, sf1, QUEUE, messages);
final Set<FixedRateConsumer> consumers = new ConcurrentHashSet<>(); final Set<FixedRateConsumer> consumers = new ConcurrentHashSet<>();
final Set<ClientMessage> receivedMessages = new ConcurrentHashSet<>(); final Set<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf2, QUEUE, 1)); consumers.add(new FixedRateConsumer(threshold, unit, receivedMessages, sf2, QUEUE, 1));
consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf3, QUEUE, 2)); consumers.add(new FixedRateConsumer(threshold, unit, receivedMessages, sf3, QUEUE, 2));
consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf4, QUEUE, 3)); consumers.add(new FixedRateConsumer(threshold, unit, receivedMessages, sf4, QUEUE, 3));
try { try {
producer.start(); producer.start();
@ -481,8 +501,8 @@ public class SlowConsumerTest extends ActiveMQTestBase {
int messages; int messages;
ClientProducer producer; ClientProducer producer;
FixedRateProducer(int rate, ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException { FixedRateProducer(int rate, SlowConsumerThresholdMeasurementUnit unit, ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException {
super(sf, queue, rate); super(sf, queue, rate, unit);
this.messages = messages; this.messages = messages;
} }
@ -518,11 +538,12 @@ public class SlowConsumerTest extends ActiveMQTestBase {
int id; int id;
FixedRateConsumer(int rate, FixedRateConsumer(int rate,
SlowConsumerThresholdMeasurementUnit unit,
Set<ClientMessage> receivedMessages, Set<ClientMessage> receivedMessages,
ClientSessionFactory sf, ClientSessionFactory sf,
SimpleString queue, SimpleString queue,
int id) throws ActiveMQException { int id) throws ActiveMQException {
super(sf, queue, rate); super(sf, queue, rate, unit);
this.id = id; this.id = id;
this.receivedMessages = receivedMessages; this.receivedMessages = receivedMessages;
} }
@ -568,10 +589,10 @@ public class SlowConsumerTest extends ActiveMQTestBase {
protected volatile boolean working; protected volatile boolean working;
boolean failed; boolean failed;
FixedRateClient(ClientSessionFactory sf, SimpleString queue, int rate) throws ActiveMQException { FixedRateClient(ClientSessionFactory sf, SimpleString queue, int rate, SlowConsumerThresholdMeasurementUnit unit) throws ActiveMQException {
this.sf = sf; this.sf = sf;
this.queue = queue; this.queue = queue;
this.sleepTime = 1000 / rate; this.sleepTime = (int) (1000f / (unit.getValue() / rate));
} }
protected void prepareWork() throws ActiveMQException { protected void prepareWork() throws ActiveMQException {