diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java index 52ec5db299..a6e04a89e1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit; /** * A Validators. @@ -193,6 +194,19 @@ public final class Validators { } }; + public static final Validator SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = new Validator() { + @Override + public void validate(final String name, final Object value) { + String val = (String) value; + if (val == null || !val.equals(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_SECOND.toString()) && + !val.equals(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_MINUTE.toString()) && + !val.equals(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_HOUR.toString()) && + !val.equals(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_DAY.toString())) { + throw ActiveMQMessageBundle.BUNDLE.invalidSlowConsumerThresholdMeasurementUnit(val); + } + } + }; + public static final Validator SLOW_CONSUMER_POLICY_TYPE = new Validator() { @Override public void validate(final String name, final Object value) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 494829bca7..2dda424f18 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -94,6 +94,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; @@ -240,6 +241,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String SLOW_CONSUMER_THRESHOLD_NODE_NAME = "slow-consumer-threshold"; + private static final String SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT_NODE_NAME = "slow-consumer-threshold-measurement-unit"; + private static final String SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME = "slow-consumer-check-period"; private static final String SLOW_CONSUMER_POLICY_NODE_NAME = "slow-consumer-policy"; @@ -1247,6 +1250,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { Validators.MINUS_ONE_OR_GT_ZERO.validate(SLOW_CONSUMER_THRESHOLD_NODE_NAME, slowConsumerThreshold); addressSettings.setSlowConsumerThreshold(slowConsumerThreshold); + } else if (SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT_NODE_NAME.equalsIgnoreCase(name)) { + String slowConsumerThresholdMeasurementUnit = getTrimmedTextContent(child); + Validators.SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT.validate(SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT_NODE_NAME, slowConsumerThresholdMeasurementUnit); + + addressSettings.setSlowConsumerThresholdMeasurementUnit(SlowConsumerThresholdMeasurementUnit.valueOf(slowConsumerThresholdMeasurementUnit)); } else if (SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME.equalsIgnoreCase(name)) { long slowConsumerCheckPeriod = XMLUtil.parseLong(child); Validators.GT_ZERO.validate(SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME, slowConsumerCheckPeriod); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 1ed32661ce..2003f10b77 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -2934,6 +2934,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active .add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()) .add("addressFullMessagePolicy", policy) .add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()) + .add("slowConsumerThresholdMeasurementUnit", addressSettings.getSlowConsumerThresholdMeasurementUnit().toString()) .add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()) .add("slowConsumerPolicy", consumerPolicy) .add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues()) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 6a375156f1..332e7bdcad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -498,4 +498,7 @@ public interface ActiveMQMessageBundle { @Message(id = 229233, value = "Cannot set ActiveMQSecurityManager during startup or while started") IllegalStateException cannotSetSecurityManager(); + + @Message(id = 229234, value = "Invalid slow consumer threshold measurement unit {0}", format = Message.Format.MESSAGE_FORMAT) + IllegalArgumentException invalidSlowConsumerThresholdMeasurementUnit(String val); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index d9a93a6ee8..118ef69433 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -94,6 +94,7 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; @@ -4344,7 +4345,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } else { if (slowConsumerReaperRunnable == null) { scheduleSlowConsumerReaper(settings); - } else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.threshold != settings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy())) { + } else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != settings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy())) { if (slowConsumerReaperFuture != null) { slowConsumerReaperFuture.cancel(false); slowConsumerReaperFuture = null; @@ -4355,12 +4356,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } void scheduleSlowConsumerReaper(AddressSettings settings) { - slowConsumerReaperRunnable = new SlowConsumerReaperRunnable(settings.getSlowConsumerCheckPeriod(), settings.getSlowConsumerThreshold(), settings.getSlowConsumerPolicy()); + slowConsumerReaperRunnable = new SlowConsumerReaperRunnable(settings.getSlowConsumerCheckPeriod(), settings.getSlowConsumerThreshold(), settings.getSlowConsumerThresholdMeasurementUnit(), settings.getSlowConsumerPolicy()); slowConsumerReaperFuture = scheduledExecutor.scheduleWithFixedDelay(slowConsumerReaperRunnable, settings.getSlowConsumerCheckPeriod(), settings.getSlowConsumerCheckPeriod(), TimeUnit.SECONDS); if (logger.isDebugEnabled()) { - logger.debug("Scheduled slow-consumer-reaper thread for queue \"" + getName() + "\"; slow-consumer-check-period=" + settings.getSlowConsumerCheckPeriod() + ", slow-consumer-threshold=" + settings.getSlowConsumerThreshold() + ", slow-consumer-policy=" + settings.getSlowConsumerPolicy()); + logger.debug("Scheduled slow-consumer-reaper thread for queue \"" + getName() + "\"; slow-consumer-check-period=" + settings.getSlowConsumerCheckPeriod() + ", slow-consumer-threshold=" + settings.getSlowConsumerThreshold() + ", slow-consumer-threshold-measurement-unit=" + settings.getSlowConsumerThresholdMeasurementUnit().toString() + ", slow-consumer-policy=" + settings.getSlowConsumerPolicy()); } } @@ -4424,13 +4425,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final class SlowConsumerReaperRunnable implements Runnable { private final SlowConsumerPolicy policy; - private final float threshold; + private final float thresholdInMsgPerSecond; private final long checkPeriod; - private SlowConsumerReaperRunnable(long checkPeriod, float threshold, SlowConsumerPolicy policy) { + private SlowConsumerReaperRunnable(long checkPeriod, float slowConsumerThreshold, SlowConsumerThresholdMeasurementUnit unit, SlowConsumerPolicy policy) { this.checkPeriod = checkPeriod; this.policy = policy; - this.threshold = threshold; + this.thresholdInMsgPerSecond = slowConsumerThreshold / unit.getValue(); } @Override @@ -4447,7 +4448,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { logger.debug("There are no consumers, no need to check slow consumer's rate"); return; } else { - float queueThreshold = threshold * consumers.size(); + float queueThreshold = thresholdInMsgPerSecond * consumers.size(); if (queueRate < queueThreshold && queueMessages < queueThreshold) { if (logger.isDebugEnabled()) { @@ -4462,7 +4463,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (consumer instanceof ServerConsumerImpl) { ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; float consumerRate = serverConsumer.getRate(); - if (consumerRate < threshold) { + if (consumerRate < thresholdInMsgPerSecond) { RemotingConnection connection = null; ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer(); RemotingService remotingService = server.getRemotingService(); @@ -4476,7 +4477,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { serverConsumer.fireSlowConsumer(); if (connection != null) { - ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate); + ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), + thresholdInMsgPerSecond, consumerRate); if (policy.equals(SlowConsumerPolicy.KILL)) { connection.killMessage(server.getNodeID()); remotingService.removeConnection(connection.getID()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 6467dead49..8f26b4e1dd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -131,6 +131,8 @@ public class AddressSettings implements Mergeable, Serializable public static final int MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT = 256; + public static final SlowConsumerThresholdMeasurementUnit DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_SECOND; + private AddressFullMessagePolicy addressFullMessagePolicy = null; private Long maxSizeBytes = null; @@ -185,6 +187,8 @@ public class AddressSettings implements Mergeable, Serializable private Long slowConsumerThreshold = null; + private SlowConsumerThresholdMeasurementUnit slowConsumerThresholdMeasurementUnit = DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT; + private Long slowConsumerCheckPeriod = null; private SlowConsumerPolicy slowConsumerPolicy = null; @@ -327,6 +331,7 @@ public class AddressSettings implements Mergeable, Serializable this.defaultRingSize = other.defaultRingSize; this.enableMetrics = other.enableMetrics; this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit; + this.slowConsumerThresholdMeasurementUnit = other.slowConsumerThresholdMeasurementUnit; } public AddressSettings() { @@ -785,6 +790,15 @@ public class AddressSettings implements Mergeable, Serializable return this; } + public SlowConsumerThresholdMeasurementUnit getSlowConsumerThresholdMeasurementUnit() { + return slowConsumerThresholdMeasurementUnit != null ? slowConsumerThresholdMeasurementUnit : AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT; + } + + public AddressSettings setSlowConsumerThresholdMeasurementUnit(final SlowConsumerThresholdMeasurementUnit slowConsumerThresholdMeasurementUnit) { + this.slowConsumerThresholdMeasurementUnit = slowConsumerThresholdMeasurementUnit; + return this; + } + public long getSlowConsumerCheckPeriod() { return slowConsumerCheckPeriod != null ? slowConsumerCheckPeriod : AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD; } @@ -1005,6 +1019,9 @@ public class AddressSettings implements Mergeable, Serializable if (slowConsumerThreshold == null) { slowConsumerThreshold = merged.slowConsumerThreshold; } + if (slowConsumerThresholdMeasurementUnit == null) { + slowConsumerThresholdMeasurementUnit = merged.slowConsumerThresholdMeasurementUnit; + } if (slowConsumerCheckPeriod == null) { slowConsumerCheckPeriod = merged.slowConsumerCheckPeriod; } @@ -1354,6 +1371,12 @@ public class AddressSettings implements Mergeable, Serializable managementMessageAttributeSizeLimit = BufferHelper.readNullableInteger(buffer); } + if (buffer.readableBytes() > 0) { + Integer slowConsumerMeasurementUnitEnumValue = BufferHelper.readNullableInteger(buffer); + if (slowConsumerMeasurementUnitEnumValue != null) { + slowConsumerThresholdMeasurementUnit = SlowConsumerThresholdMeasurementUnit.valueOf(slowConsumerMeasurementUnitEnumValue); + } + } } @Override @@ -1418,7 +1441,8 @@ public class AddressSettings implements Mergeable, Serializable SimpleString.sizeofNullableString(expiryQueueSuffix) + BufferHelper.sizeOfNullableBoolean(enableMetrics) + BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) + - BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit); + BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit) + + BufferHelper.sizeOfNullableInteger(slowConsumerThresholdMeasurementUnit.getValue()); } @Override @@ -1546,6 +1570,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch); BufferHelper.writeNullableInteger(buffer, managementMessageAttributeSizeLimit); + + BufferHelper.writeNullableInteger(buffer, slowConsumerThresholdMeasurementUnit == null ? null : slowConsumerThresholdMeasurementUnit.getValue()); } /* (non-Javadoc) @@ -1619,6 +1645,7 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((expiryQueueSuffix == null) ? 0 : expiryQueueSuffix.hashCode()); result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode()); result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode()); + result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode()); return result; } @@ -1976,6 +2003,9 @@ public class AddressSettings implements Mergeable, Serializable } else if (!enableMetrics.equals(other.enableMetrics)) return false; + if (slowConsumerThresholdMeasurementUnit != other.slowConsumerThresholdMeasurementUnit) + return false; + return true; } @@ -2031,6 +2061,8 @@ public class AddressSettings implements Mergeable, Serializable sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + + ", slowConsumerThresholdMeasurementUnit=" + + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/SlowConsumerThresholdMeasurementUnit.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/SlowConsumerThresholdMeasurementUnit.java new file mode 100644 index 0000000000..ea84ae6cbe --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/SlowConsumerThresholdMeasurementUnit.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.settings.impl; + +public enum SlowConsumerThresholdMeasurementUnit { + MESSAGES_PER_SECOND(1), MESSAGES_PER_MINUTE(60), MESSAGES_PER_HOUR(3600), MESSAGES_PER_DAY(3600 * 24); + + private final int measurementUnitInSeconds; + + SlowConsumerThresholdMeasurementUnit(int measurementUnitInSeconds) { + this.measurementUnitInSeconds = measurementUnitInSeconds; + } + + public static SlowConsumerThresholdMeasurementUnit valueOf(int measurementUnitInSeconds) { + switch (measurementUnitInSeconds) { + case 1: + return MESSAGES_PER_SECOND; + case 60: + return MESSAGES_PER_MINUTE; + case 3600: + return MESSAGES_PER_HOUR; + case 3600 * 24: + return MESSAGES_PER_DAY; + default: + return null; + } + } + + public int getValue() { + return measurementUnitInSeconds; + } +} diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 7698008d9a..c9e44e64ca 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -3685,12 +3685,29 @@ - The minimum rate of message consumption allowed before a consumer is considered "slow." Measured - in messages-per-second. + The minimum rate of message consumption allowed before a consumer is considered "slow." Measurement + unit is defined by the slow-consumer-threshold-measurement-unit parameter. By default this is + messages-per-seconds + + + + The units used to measure the slow consumer threshold. Default is messages-per-second. + + + + + + + + + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 926a78bab2..ea1b4d8890 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; import org.junit.AfterClass; @@ -367,6 +368,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(10, conf.getAddressesSettings().get("a1").getPageCacheMaxSize()); assertEquals(4, conf.getAddressesSettings().get("a1").getMessageCounterHistoryDayLimit()); assertEquals(10, conf.getAddressesSettings().get("a1").getSlowConsumerThreshold()); + assertEquals(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_HOUR, conf.getAddressesSettings().get("a1").getSlowConsumerThresholdMeasurementUnit()); assertEquals(5, conf.getAddressesSettings().get("a1").getSlowConsumerCheckPeriod()); assertEquals(SlowConsumerPolicy.NOTIFY, conf.getAddressesSettings().get("a1").getSlowConsumerPolicy()); assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsQueues()); @@ -401,6 +403,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(20, conf.getAddressesSettings().get("a2").getPageCacheMaxSize()); assertEquals(8, conf.getAddressesSettings().get("a2").getMessageCounterHistoryDayLimit()); assertEquals(20, conf.getAddressesSettings().get("a2").getSlowConsumerThreshold()); + assertEquals(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_DAY, conf.getAddressesSettings().get("a2").getSlowConsumerThresholdMeasurementUnit()); assertEquals(15, conf.getAddressesSettings().get("a2").getSlowConsumerCheckPeriod()); assertEquals(SlowConsumerPolicy.KILL, conf.getAddressesSettings().get("a2").getSlowConsumerPolicy()); assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java index 0497dd7fa3..aca86950f9 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java @@ -38,6 +38,7 @@ public class AddressSettingsTest extends ActiveMQTestBase { Assert.assertEquals(AddressSettings.DEFAULT_REDELIVER_DELAY, addressSettings.getRedeliveryDelay()); Assert.assertEquals(AddressSettings.DEFAULT_REDELIVER_MULTIPLIER, addressSettings.getRedeliveryMultiplier(), 0.000001); Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD, addressSettings.getSlowConsumerThreshold()); + Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT, addressSettings.getSlowConsumerThresholdMeasurementUnit()); Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD, addressSettings.getSlowConsumerCheckPeriod()); Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY, addressSettings.getSlowConsumerPolicy()); Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_JMS_QUEUES, addressSettings.isAutoCreateJmsQueues()); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index a576035f18..fa41ef390e 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -458,6 +458,7 @@ 10 5 NOTIFY + MESSAGES_PER_HOUR true true true @@ -488,6 +489,7 @@ 8 20 15 + MESSAGES_PER_DAY KILL false false diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml index 5be0f08c01..846c0e5a5b 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml @@ -30,6 +30,7 @@ 10 5 NOTIFY + MESSAGES_PER_HOUR true true true @@ -61,6 +62,7 @@ 20 15 KILL + MESSAGES_PER_DAY false false false diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md index c665f6b679..ce04547c4e 100644 --- a/docs/user-manual/en/address-model.md +++ b/docs/user-manual/en/address-model.md @@ -660,6 +660,7 @@ that would be found in the `broker.xml` file. 0 true -1 + MESSAGES_PER_SECOND NOTIFY 5 true @@ -823,8 +824,19 @@ message will instead be sent to the `dead-letter-address` (DLA) for that address, if it exists. `slow-consumer-threshold`. The minimum rate of message consumption allowed -before a consumer is considered "slow." Measured in messages-per-second. -Default is `-1` (i.e. disabled); any other valid value must be greater than 0. +before a consumer is considered "slow." Measured in units specified by the +slow-consumer-threshold-measurement-unit configuration option. Default is `-1` + (i.e. disabled); any other valid value must be greater than 0. + Read more about [slow consumers](slow-consumers.md). + +`slow-consumer-threshold-measurement-unit`. The units used to measure the +slow-consumer-threshold. Valid options are: +* MESSAGES_PER_SECOND +* MESSAGES_PER_MINUTE +* MESSAGES_PER_HOUR +* MESSAGES_PER_DAY + +If no unit is specified the default MESSAGES_PER_SECOND will be used. Read more about [slow consumers](slow-consumers.md). `slow-consumer-policy`. What should happen when a slow consumer is detected. @@ -834,7 +846,13 @@ CONSUMER\_SLOW management notification which an application could receive and take action with. Read more about [slow consumers](slow-consumers.md). `slow-consumer-check-period`. How often to check for slow consumers on a -particular queue. Measured in *seconds*. Default is `5`. Read more about [slow +particular queue. Measured in *seconds*. Default is `5`. + +* Note: This should be at least 2x the maximum time it takes a consumer to process +1 message. For example, if the slow-consumer-threshold is set to 1 and the +slow-consumer-threshold-measurement-unit is set to MESSAGES_PER_MINUTE then this +should be set to at least 2 x 60s i.e. 120s. +Read more about [slow consumers](slow-consumers.md). `auto-create-jms-queues` is **deprecated**. See `auto-create-queues`. Whether diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java index a03d5ec5c5..4665ae2b2b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java @@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.TimeUtils; @@ -55,24 +56,42 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_MINUTE; +import static org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_SECOND; + @RunWith(value = Parameterized.class) public class SlowConsumerTest extends ActiveMQTestBase { private static final Logger logger = Logger.getLogger(SlowConsumerTest.class); - private int threshold = 10; + private int threshold; + private float consumerRate; // The rate actual consumers will run in the test. + + private SlowConsumerThresholdMeasurementUnit unit; private long checkPeriod = 1; private boolean isNetty = false; private boolean isPaging = false; // this will ensure that all tests in this class are run twice, // once with "true" passed to the class' constructor and once with "false" - @Parameterized.Parameters(name = "netty={0}, paging={1}") + @Parameterized.Parameters(name = "netty={0}, paging={1}, threshold={2}, threshold_units={3}") public static Collection getParameters() { - return Arrays.asList(new Object[][]{{true, false}, {false, false}, {true, true}, {false, true}}); + return Arrays.asList(new Object[][]{ + {true, false, 10, MESSAGES_PER_SECOND}, + {false, false, 10, MESSAGES_PER_SECOND}, + {true, true, 10, MESSAGES_PER_SECOND}, + {false, true, 10, MESSAGES_PER_SECOND}, + {true, false, 1, MESSAGES_PER_MINUTE}, + {false, false, 1, MESSAGES_PER_MINUTE}, + {true, true, 1, MESSAGES_PER_MINUTE}, + {false, true, 1, MESSAGES_PER_MINUTE} + }); } - public SlowConsumerTest(boolean isNetty, boolean isPaging) { + public SlowConsumerTest(boolean isNetty, boolean isPaging, int threshold, SlowConsumerThresholdMeasurementUnit unit) { + this.threshold = threshold; + this.unit = unit; + this.consumerRate = threshold / unit.getValue(); this.isNetty = isNetty; this.isPaging = isPaging; } @@ -93,6 +112,7 @@ public class SlowConsumerTest extends ActiveMQTestBase { AddressSettings addressSettings = new AddressSettings(); addressSettings.setSlowConsumerCheckPeriod(checkPeriod); addressSettings.setSlowConsumerThreshold(threshold); + addressSettings.setSlowConsumerThresholdMeasurementUnit(unit); addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); if (isPaging) { @@ -442,14 +462,14 @@ public class SlowConsumerTest extends ActiveMQTestBase { final int messages = 10 * threshold; - FixedRateProducer producer = new FixedRateProducer(threshold * 2, sf1, QUEUE, messages); + FixedRateProducer producer = new FixedRateProducer(threshold * 2, unit, sf1, QUEUE, messages); final Set consumers = new ConcurrentHashSet<>(); final Set receivedMessages = new ConcurrentHashSet<>(); - consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf2, QUEUE, 1)); - consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf3, QUEUE, 2)); - consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf4, QUEUE, 3)); + consumers.add(new FixedRateConsumer(threshold, unit, receivedMessages, sf2, QUEUE, 1)); + consumers.add(new FixedRateConsumer(threshold, unit, receivedMessages, sf3, QUEUE, 2)); + consumers.add(new FixedRateConsumer(threshold, unit, receivedMessages, sf4, QUEUE, 3)); try { producer.start(); @@ -481,8 +501,8 @@ public class SlowConsumerTest extends ActiveMQTestBase { int messages; ClientProducer producer; - FixedRateProducer(int rate, ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException { - super(sf, queue, rate); + FixedRateProducer(int rate, SlowConsumerThresholdMeasurementUnit unit, ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException { + super(sf, queue, rate, unit); this.messages = messages; } @@ -518,11 +538,12 @@ public class SlowConsumerTest extends ActiveMQTestBase { int id; FixedRateConsumer(int rate, + SlowConsumerThresholdMeasurementUnit unit, Set receivedMessages, ClientSessionFactory sf, SimpleString queue, int id) throws ActiveMQException { - super(sf, queue, rate); + super(sf, queue, rate, unit); this.id = id; this.receivedMessages = receivedMessages; } @@ -568,10 +589,10 @@ public class SlowConsumerTest extends ActiveMQTestBase { protected volatile boolean working; boolean failed; - FixedRateClient(ClientSessionFactory sf, SimpleString queue, int rate) throws ActiveMQException { + FixedRateClient(ClientSessionFactory sf, SimpleString queue, int rate, SlowConsumerThresholdMeasurementUnit unit) throws ActiveMQException { this.sf = sf; this.queue = queue; - this.sleepTime = 1000 / rate; + this.sleepTime = (int) (1000f / (unit.getValue() / rate)); } protected void prepareWork() throws ActiveMQException {