From 2c506cc52a8eca3fb0d7ee3a5a4e5bf4f3b82d72 Mon Sep 17 00:00:00 2001 From: Michael Pearce Date: Tue, 4 Aug 2020 18:50:49 +0100 Subject: [PATCH] [ARTEMIS-2863] Add support to pause dispatch when group rebalance Add test case Add implementation Add docs --- .../artemis/api/core/QueueAttributes.java | 15 ++ .../artemis/api/core/QueueConfiguration.java | 18 +++ .../activemq/artemis/logs/AuditLogger.java | 8 + .../config/ActiveMQDefaultConfiguration.java | 6 + .../api/core/client/ClientSession.java | 2 + .../api/core/management/QueueControl.java | 6 + .../core/client/impl/QueueQueryImpl.java | 11 +- .../core/impl/ActiveMQSessionContext.java | 2 +- .../wireformat/CreateQueueMessage_V2.java | 26 +++ .../CreateSharedQueueMessage_V2.java | 24 +++ .../SessionQueueQueryResponseMessage_V3.java | 30 +++- .../artemis/core/server/QueueQueryResult.java | 9 ++ .../impl/FileConfigurationParser.java | 8 + .../impl/ActiveMQServerControlImpl.java | 1 + .../management/impl/QueueControlImpl.java | 15 ++ .../core/management/impl/view/QueueView.java | 3 + .../core/persistence/QueueBindingInfo.java | 2 + .../AbstractJournalStorageManager.java | 2 +- .../codec/PersistentQueueBindingEncoding.java | 19 +++ .../core/postoffice/impl/PostOfficeImpl.java | 4 + .../activemq/artemis/core/server/Queue.java | 4 + .../core/server/impl/ActiveMQServerImpl.java | 7 +- .../server/impl/QueueConfigurationUtils.java | 1 + .../core/server/impl/QueueConsumers.java | 3 + .../core/server/impl/QueueConsumersImpl.java | 6 + .../artemis/core/server/impl/QueueImpl.java | 63 ++++++-- .../core/settings/impl/AddressSettings.java | 41 ++++- .../schema/artemis-configuration.xsd | 9 ++ .../journal/QueueBindingEncodingTest.java | 5 + .../impl/ScheduledDeliveryHandlerTest.java | 10 ++ .../test/resources/artemis-configuration.xsd | 9 ++ docs/user-manual/en/message-grouping.md | 11 +- .../integration/jms/client/GroupingTest.java | 149 ++++++++++++++++++ .../management/QueueControlUsingCoreTest.java | 5 + .../unit/core/postoffice/impl/FakeQueue.java | 10 ++ 35 files changed, 517 insertions(+), 27 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java index ca4d4d8c34..13dc12e3a6 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java @@ -28,6 +28,7 @@ public class QueueAttributes implements Serializable { public static final String MAX_CONSUMERS = "max-consumers"; public static final String EXCLUSIVE = "exclusive"; public static final String GROUP_REBALANCE = "group-rebalance"; + public static final String GROUP_REBALANCE_PAUSE_DISPATCH = "group-rebalance-pause-dispatch"; public static final String GROUP_BUCKETS = "group-buckets"; public static final String GROUP_FIRST_KEY = "group-first-key"; public static final String LAST_VALUE = "last-value"; @@ -49,6 +50,7 @@ public class QueueAttributes implements Serializable { private Integer maxConsumers; private Boolean exclusive; private Boolean groupRebalance; + private Boolean groupRebalancePauseDispatch; private Integer groupBuckets; private SimpleString groupFirstKey; private Boolean lastValue; @@ -93,6 +95,8 @@ public class QueueAttributes implements Serializable { setConsumerPriority(Integer.valueOf(value)); } else if (key.equals(GROUP_REBALANCE)) { setGroupRebalance(Boolean.valueOf(value)); + } else if (key.equals(GROUP_REBALANCE_PAUSE_DISPATCH)) { + setGroupRebalancePauseDispatch(Boolean.valueOf(value)); } else if (key.equals(GROUP_BUCKETS)) { setGroupBuckets(Integer.valueOf(value)); } else if (key.equals(GROUP_FIRST_KEY)) { @@ -119,6 +123,7 @@ public class QueueAttributes implements Serializable { .setRingSize(this.getRingSize()) .setEnabled(this.isEnabled()) .setGroupRebalance(this.getGroupRebalance()) + .setGroupRebalancePauseDispatch(this.getGroupRebalancePauseDispatch()) .setNonDestructive(this.getNonDestructive()) .setLastValue(this.getLastValue()) .setFilterString(this.getFilterString()) @@ -146,6 +151,7 @@ public class QueueAttributes implements Serializable { .setRingSize(queueConfiguration.getRingSize()) .setEnabled(queueConfiguration.isEnabled()) .setGroupRebalance(queueConfiguration.isGroupRebalance()) + .setGroupRebalancePauseDispatch(queueConfiguration.isGroupRebalancePauseDispatch()) .setNonDestructive(queueConfiguration.isNonDestructive()) .setLastValue(queueConfiguration.isLastValue()) .setFilterString(queueConfiguration.getFilterString()) @@ -280,6 +286,15 @@ public class QueueAttributes implements Serializable { return this; } + public Boolean getGroupRebalancePauseDispatch() { + return groupRebalancePauseDispatch; + } + + public QueueAttributes setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) { + this.groupRebalancePauseDispatch = groupRebalancePauseDispatch; + return this; + } + public Integer getGroupBuckets() { return groupBuckets; } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java index afe438e7ca..2257721458 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java @@ -56,6 +56,7 @@ public class QueueConfiguration implements Serializable { public static final String MAX_CONSUMERS = "max-consumers"; public static final String EXCLUSIVE = "exclusive"; public static final String GROUP_REBALANCE = "group-rebalance"; + public static final String GROUP_REBALANCE_PAUSE_DISPATCH = "group-rebalance-pause-dispatch"; public static final String GROUP_BUCKETS = "group-buckets"; public static final String GROUP_FIRST_KEY = "group-first-key"; public static final String LAST_VALUE = "last-value"; @@ -87,6 +88,7 @@ public class QueueConfiguration implements Serializable { private Integer maxConsumers; private Boolean exclusive; private Boolean groupRebalance; + private Boolean groupRebalancePauseDispatch; private Integer groupBuckets; private SimpleString groupFirstKey; private Boolean lastValue; @@ -459,6 +461,15 @@ public class QueueConfiguration implements Serializable { return this; } + public Boolean isGroupRebalancePauseDispatch() { + return groupRebalancePauseDispatch; + } + + public QueueConfiguration setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) { + this.groupRebalancePauseDispatch = groupRebalancePauseDispatch; + return this; + } + public Integer getGroupBuckets() { return groupBuckets; } @@ -630,6 +641,9 @@ public class QueueConfiguration implements Serializable { if (isGroupRebalance() != null) { builder.add(GROUP_REBALANCE, isGroupRebalance()); } + if (isGroupRebalancePauseDispatch() != null) { + builder.add(GROUP_REBALANCE_PAUSE_DISPATCH, isGroupRebalancePauseDispatch()); + } if (getGroupBuckets() != null) { builder.add(GROUP_BUCKETS, getGroupBuckets()); } @@ -746,6 +760,8 @@ public class QueueConfiguration implements Serializable { return false; if (!Objects.equals(groupRebalance, that.groupRebalance)) return false; + if (!Objects.equals(groupRebalancePauseDispatch, that.groupRebalancePauseDispatch)) + return false; if (!Objects.equals(groupBuckets, that.groupBuckets)) return false; if (!Objects.equals(groupFirstKey, that.groupFirstKey)) @@ -802,6 +818,7 @@ public class QueueConfiguration implements Serializable { result = 31 * result + Objects.hashCode(maxConsumers); result = 31 * result + Objects.hashCode(exclusive); result = 31 * result + Objects.hashCode(groupRebalance); + result = 31 * result + Objects.hashCode(groupRebalancePauseDispatch); result = 31 * result + Objects.hashCode(groupBuckets); result = 31 * result + Objects.hashCode(groupFirstKey); result = 31 * result + Objects.hashCode(lastValue); @@ -838,6 +855,7 @@ public class QueueConfiguration implements Serializable { + ", maxConsumers=" + maxConsumers + ", exclusive=" + exclusive + ", groupRebalance=" + groupRebalance + + ", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch + ", groupBuckets=" + groupBuckets + ", groupFirstKey=" + groupFirstKey + ", lastValue=" + lastValue diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index 209402d9f3..7fcc0ed007 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2729,4 +2729,12 @@ public interface AuditLogger extends BasicLogger { @LogMessage(level = Logger.Level.INFO) @Message(id = 601734, value = "User {0} failed to resume address {1}", format = Message.Format.MESSAGE_FORMAT) void resumeAddressFailure(String user, String queueName); + + static void isGroupRebalancePauseDispatch(Object source) { + LOGGER.isGroupRebalancePauseDispatch(getCaller(), source); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601735, value = "User {0} is getting group rebalance pause dispatch property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT) + void isGroupRebalancePauseDispatch(String user, Object source, Object... args); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 60656e5533..f7d33ffb68 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -534,6 +534,8 @@ public final class ActiveMQDefaultConfiguration { public static final boolean DEFAULT_GROUP_REBALANCE = false; + public static final boolean DEFAULT_GROUP_REBALANCE_PAUSE_DISPATCH = false; + public static final SimpleString DEFAULT_GROUP_FIRST_KEY = null; public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST; @@ -1503,6 +1505,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_GROUP_REBALANCE; } + public static boolean getDefaultGroupRebalancePauseDispatch() { + return DEFAULT_GROUP_REBALANCE_PAUSE_DISPATCH; + } + public static SimpleString getDefaultGroupFirstKey() { return DEFAULT_GROUP_FIRST_KEY; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index 5db79aa57e..d2c0f431b2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -162,6 +162,8 @@ public interface ClientSession extends XAResource, AutoCloseable { Boolean isGroupRebalance(); + Boolean isGroupRebalancePauseDispatch(); + Integer getGroupBuckets(); SimpleString getGroupFirstKey(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index bbb27c09c3..b07cc3c89a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -701,6 +701,12 @@ public interface QueueControl { @Attribute(desc = "whether the groups of this queue are automatically rebalanced") boolean isGroupRebalance(); + /** + * Returns whether the dispatch is paused when groups of this queue are automatically rebalanced. + */ + @Attribute(desc = "whether the dispatch is paused when groups of this queue are automatically rebalanced") + boolean isGroupRebalancePauseDispatch(); + /** * Will return the group buckets. */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java index d28f4f4d84..d532dc01cf 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java @@ -52,6 +52,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { private final Boolean groupRebalance; + private final Boolean groupRebalancePauseDispatch; + private final Integer groupBuckets; private final SimpleString groupFirstKey; @@ -162,7 +164,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { final Long autoDeleteDelay, final Long autoDeleteMessageCount, final Integer defaultConsumerWindowSize) { - this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null); + this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null); } public QueueQueryImpl(final boolean durable, @@ -180,6 +182,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { final RoutingType routingType, final Boolean exclusive, final Boolean groupRebalance, + final Boolean groupRebalancePauseDispatch, final Integer groupBuckets, final SimpleString groupFirstKey, final Boolean lastValue, @@ -208,6 +211,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { this.routingType = routingType; this.exclusive = exclusive; this.groupRebalance = groupRebalance; + this.groupRebalancePauseDispatch = groupRebalancePauseDispatch; this.groupBuckets = groupBuckets; this.groupFirstKey = groupFirstKey; this.lastValue = lastValue; @@ -328,6 +332,11 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { return groupRebalance; } + @Override + public Boolean isGroupRebalancePauseDispatch() { + return groupRebalancePauseDispatch; + } + @Override public Integer getGroupBuckets() { return groupBuckets; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index a4aa8abec2..b4c1dcdb9d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -905,7 +905,7 @@ public class ActiveMQSessionContext extends SessionContext { // We try to recreate any non-durable or auto-created queues, since they might not be there on failover/reconnect. // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover/reconnection if (!queueInfo.isDurable() || queueInfo.isAutoCreated()) { - CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.getGroupBuckets(), queueInfo.getGroupFirstKey(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), queueInfo.isNonDestructive(), queueInfo.getConsumersBeforeDispatch(), queueInfo.getDelayBeforeDispatch(), queueInfo.isAutoDelete(), queueInfo.getAutoDeleteDelay(), queueInfo.getAutoDeleteMessageCount(), queueInfo.getRingSize(), queueInfo.isEnabled()); + CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.isGroupRebalancePauseDispatch(), queueInfo.getGroupBuckets(), queueInfo.getGroupFirstKey(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), queueInfo.isNonDestructive(), queueInfo.getConsumersBeforeDispatch(), queueInfo.getDelayBeforeDispatch(), queueInfo.isAutoDelete(), queueInfo.getAutoDeleteDelay(), queueInfo.getAutoDeleteMessageCount(), queueInfo.getRingSize(), queueInfo.isEnabled()); sendPacketWithoutLock(sessionChannel, createQueueRequest); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java index 3fe20a77ae..aeebbe4b69 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java @@ -37,6 +37,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { private Boolean groupRebalance; + private Boolean groupRebalancePauseDispatch; + private Integer groupBuckets; private SimpleString groupFirstKey; @@ -81,6 +83,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { requiresResponse, queueAttributes.getExclusive(), queueAttributes.getGroupRebalance(), + queueAttributes.getGroupRebalancePauseDispatch(), queueAttributes.getGroupBuckets(), queueAttributes.getGroupFirstKey(), queueAttributes.getLastValue(), @@ -111,6 +114,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { requiresResponse, queueConfiguration.isExclusive(), queueConfiguration.isGroupRebalance(), + queueConfiguration.isGroupRebalancePauseDispatch(), queueConfiguration.getGroupBuckets(), queueConfiguration.getGroupFirstKey(), queueConfiguration.isLastValue(), @@ -138,6 +142,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { final boolean requiresResponse, final Boolean exclusive, final Boolean groupRebalance, + final Boolean groupRebalancePauseDispatch, final Integer groupBuckets, final SimpleString groupFirstKey, final Boolean lastValue, @@ -164,6 +169,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { this.purgeOnNoConsumers = purgeOnNoConsumers; this.exclusive = exclusive; this.groupRebalance = groupRebalance; + this.groupRebalancePauseDispatch = groupRebalancePauseDispatch; this.groupBuckets = groupBuckets; this.groupFirstKey = groupFirstKey; this.lastValue = lastValue; @@ -191,6 +197,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { .setRoutingType(routingType) .setExclusive(exclusive) .setGroupRebalance(groupRebalance) + .setGroupRebalancePauseDispatch(groupRebalancePauseDispatch) .setNonDestructive(nonDestructive) .setLastValue(lastValue) .setFilterString(filterString) @@ -219,6 +226,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { buff.append(", purgeOnNoConsumers=" + purgeOnNoConsumers); buff.append(", exclusive=" + exclusive); buff.append(", groupRebalance=" + groupRebalance); + buff.append(", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch); buff.append(", groupBuckets=" + groupBuckets); buff.append(", groupFirstKey=" + groupFirstKey); buff.append(", lastValue=" + lastValue); @@ -324,6 +332,14 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { this.groupRebalance = groupRebalance; } + public Boolean isGroupRebalancePauseDispatch() { + return groupRebalancePauseDispatch; + } + + public void setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) { + this.groupRebalancePauseDispatch = groupRebalancePauseDispatch; + } + public Integer getGroupBuckets() { return groupBuckets; } @@ -401,6 +417,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { buffer.writeNullableSimpleString(groupFirstKey); BufferHelper.writeNullableLong(buffer, ringSize); BufferHelper.writeNullableBoolean(buffer, enabled); + BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch); } @Override @@ -434,6 +451,9 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { if (buffer.readableBytes() > 0) { enabled = BufferHelper.readNullableBoolean(buffer); } + if (buffer.readableBytes() > 0) { + groupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer); + } } @Override @@ -446,6 +466,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { result = prime * result + (purgeOnNoConsumers ? 1231 : 1237); result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237); result = prime * result + (groupRebalance == null ? 0 : groupRebalance ? 1231 : 1237); + result = prime * result + (groupRebalancePauseDispatch == null ? 0 : groupRebalancePauseDispatch ? 1231 : 1237); result = prime * result + (groupBuckets == null ? 0 : groupBuckets.hashCode()); result = prime * result + (groupFirstKey == null ? 0 : groupFirstKey.hashCode()); result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237); @@ -486,6 +507,11 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { return false; } else if (!groupRebalance.equals(other.groupRebalance)) return false; + if (groupRebalancePauseDispatch == null) { + if (other.groupRebalancePauseDispatch != null) + return false; + } else if (!groupRebalancePauseDispatch.equals(other.groupRebalancePauseDispatch)) + return false; if (groupBuckets == null) { if (other.groupBuckets != null) return false; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java index 7588e74bf3..17bd1adc48 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java @@ -29,6 +29,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { private Boolean purgeOnNoConsumers; private Boolean exclusive; private Boolean groupRebalance; + private Boolean groupRebalancePauseDispatch; private Integer groupBuckets; private SimpleString groupFirstKey; private Boolean lastValue; @@ -53,6 +54,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { queueConfiguration.isPurgeOnNoConsumers(), queueConfiguration.isExclusive(), queueConfiguration.isGroupRebalance(), + queueConfiguration.isGroupRebalancePauseDispatch(), queueConfiguration.getGroupBuckets(), queueConfiguration.getGroupFirstKey(), queueConfiguration.isLastValue(), @@ -78,6 +80,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { final Boolean purgeOnNoConsumers, final Boolean exclusive, final Boolean groupRebalance, + final Boolean groupRebalancePauseDispatch, final Integer groupBuckets, final SimpleString groupFirstKey, final Boolean lastValue, @@ -102,6 +105,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { this.purgeOnNoConsumers = purgeOnNoConsumers; this.exclusive = exclusive; this.groupRebalance = groupRebalance; + this.groupRebalancePauseDispatch = groupRebalancePauseDispatch; this.groupBuckets = groupBuckets; this.groupFirstKey = groupFirstKey; this.lastValue = lastValue; @@ -201,6 +205,14 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { this.groupRebalance = groupRebalance; } + public Boolean isGroupRebalancePauseDispatch() { + return groupRebalancePauseDispatch; + } + + public void setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) { + this.groupRebalancePauseDispatch = groupRebalancePauseDispatch; + } + public Integer getGroupBuckets() { return groupBuckets; } @@ -264,6 +276,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { .setRoutingType(routingType) .setExclusive(exclusive) .setGroupRebalance(groupRebalance) + .setGroupRebalancePauseDispatch(groupRebalancePauseDispatch) .setNonDestructive(nonDestructive) .setLastValue(lastValue) .setFilterString(filterString) @@ -293,6 +306,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { buff.append(", purgeOnNoConsumers=" + purgeOnNoConsumers); buff.append(", exclusive=" + exclusive); buff.append(", groupRebalance=" + groupRebalance); + buff.append(", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch); buff.append(", groupBuckets=" + groupBuckets); buff.append(", groupFirstKey=" + groupFirstKey); buff.append(", lastValue=" + lastValue); @@ -334,6 +348,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { buffer.writeNullableSimpleString(groupFirstKey); BufferHelper.writeNullableLong(buffer, ringSize); BufferHelper.writeNullableBoolean(buffer, enabled); + BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch); } @Override @@ -370,6 +385,9 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { if (buffer.readableBytes() > 0) { enabled = buffer.readNullableBoolean(); } + if (buffer.readableBytes() > 0) { + groupRebalancePauseDispatch = buffer.readNullableBoolean(); + } } @Override @@ -386,6 +404,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { result = prime * result + (purgeOnNoConsumers == null ? 0 : purgeOnNoConsumers ? 1231 : 1237); result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237); result = prime * result + (groupRebalance == null ? 0 : groupRebalance ? 1231 : 1237); + result = prime * result + (groupRebalancePauseDispatch == null ? 0 : groupRebalancePauseDispatch ? 1231 : 1237); result = prime * result + (groupBuckets == null ? 0 : groupBuckets.hashCode()); result = prime * result + (groupFirstKey == null ? 0 : groupFirstKey.hashCode()); result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237); @@ -451,6 +470,11 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { return false; } else if (!groupRebalance.equals(other.groupRebalance)) return false; + if (groupRebalancePauseDispatch == null) { + if (other.groupRebalancePauseDispatch != null) + return false; + } else if (!groupRebalancePauseDispatch.equals(other.groupRebalancePauseDispatch)) + return false; if (groupBuckets == null) { if (other.groupBuckets != null) return false; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java index 5f09e96c1d..bf8322217f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java @@ -38,6 +38,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon protected Boolean groupRebalance; + protected Boolean groupRebalancePauseDispatch; + protected Integer groupBuckets; protected SimpleString groupFirstKey; @@ -65,11 +67,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon private Boolean enabled; public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) { - this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize(), result.getRingSize(), result.isEnabled()); + this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize(), result.getRingSize(), result.isEnabled()); } public SessionQueueQueryResponseMessage_V3() { - this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private SessionQueueQueryResponseMessage_V3(final SimpleString name, @@ -87,6 +89,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon final int maxConsumers, final Boolean exclusive, final Boolean groupRebalance, + final Boolean groupRebalancePauseDispatch, final Integer groupBuckets, final SimpleString groupFirstKey, final Boolean lastValue, @@ -132,6 +135,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon this.groupRebalance = groupRebalance; + this.groupRebalancePauseDispatch = groupRebalancePauseDispatch; + this.groupBuckets = groupBuckets; this.groupFirstKey = groupFirstKey; @@ -255,6 +260,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon this.groupRebalance = groupRebalance; } + public Boolean isGroupRebalancePauseDispatch() { + return groupRebalancePauseDispatch; + } + + public void setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) { + this.groupRebalancePauseDispatch = groupRebalancePauseDispatch; + } + public Integer getGroupBuckets() { return groupBuckets; } @@ -321,6 +334,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon buffer.writeNullableSimpleString(groupFirstKey); BufferHelper.writeNullableLong(buffer, ringSize); BufferHelper.writeNullableBoolean(buffer, enabled); + BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch); } @@ -358,6 +372,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon if (buffer.readableBytes() > 0) { enabled = BufferHelper.readNullableBoolean(buffer); } + if (buffer.readableBytes() > 0) { + groupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer); + } } @Override @@ -370,6 +387,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon result = prime * result + maxConsumers; result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237); result = prime * result + (groupRebalance == null ? 0 : groupRebalance ? 1231 : 1237); + result = prime * result + (groupRebalancePauseDispatch == null ? 0 : groupRebalancePauseDispatch ? 1231 : 1237); result = prime * result + (groupBuckets == null ? 0 : groupBuckets.hashCode()); result = prime * result + (groupFirstKey == null ? 0 : groupFirstKey.hashCode()); result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237); @@ -402,6 +420,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon buff.append(", maxConsumers=" + maxConsumers); buff.append(", exclusive=" + exclusive); buff.append(", groupRebalance=" + groupRebalance); + buff.append(", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch); buff.append(", groupBuckets=" + groupBuckets); buff.append(", groupFirstKey=" + groupFirstKey); buff.append(", lastValue=" + lastValue); @@ -420,7 +439,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon @Override public ClientSession.QueueQuery toQueueQuery() { - return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize(), getRingSize(), isEnabled()); + return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize(), getRingSize(), isEnabled()); } @Override @@ -446,6 +465,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon return false; } else if (!groupRebalance.equals(other.groupRebalance)) return false; + if (groupRebalancePauseDispatch == null) { + if (other.groupRebalancePauseDispatch != null) + return false; + } else if (!groupRebalancePauseDispatch.equals(other.groupRebalancePauseDispatch)) + return false; if (groupBuckets == null) { if (other.groupBuckets != null) return false; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java index 3ae86dcfcd..798f4175d6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java @@ -51,6 +51,8 @@ public class QueueQueryResult { private Boolean groupRebalance; + private Boolean groupRebalancePauseDispatch; + private Integer groupBuckets; private SimpleString groupFirstKey; @@ -92,6 +94,7 @@ public class QueueQueryResult { final int maxConsumers, final Boolean exclusive, final Boolean groupRebalance, + final Boolean groupRebalancePauseDispatch, final Integer groupBuckets, final SimpleString groupFirstKey, final Boolean lastValue, @@ -135,6 +138,8 @@ public class QueueQueryResult { this.groupRebalance = groupRebalance; + this.groupRebalancePauseDispatch = groupRebalancePauseDispatch; + this.groupBuckets = groupBuckets; this.groupFirstKey = groupFirstKey; @@ -250,6 +255,10 @@ public class QueueQueryResult { return groupRebalance; } + public Boolean isGroupRebalancePauseDispatch() { + return groupRebalancePauseDispatch; + } + public Integer getGroupBuckets() { return groupBuckets; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index c42085b3b5..95cae11e01 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -216,6 +216,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String DEFAULT_GROUP_REBALANCE = "default-group-rebalance"; + private static final String DEFAULT_GROUP_REBALANCE_PAUSE_DISPATCH = "default-group-rebalance-pause-dispatch"; + private static final String DEFAULT_GROUP_BUCKETS = "default-group-buckets"; private static final String DEFAULT_GROUP_FIRST_KEY = "default-group-first-key"; @@ -1149,6 +1151,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { addressSettings.setDefaultExclusiveQueue(XMLUtil.parseBoolean(child)); } else if (DEFAULT_GROUP_REBALANCE.equalsIgnoreCase(name)) { addressSettings.setDefaultGroupRebalance(XMLUtil.parseBoolean(child)); + } else if (DEFAULT_GROUP_REBALANCE_PAUSE_DISPATCH.equalsIgnoreCase(name)) { + addressSettings.setDefaultGroupRebalancePauseDispatch(XMLUtil.parseBoolean(child)); } else if (DEFAULT_GROUP_BUCKETS.equalsIgnoreCase(name)) { addressSettings.setDefaultGroupBuckets(XMLUtil.parseInt(child)); } else if (DEFAULT_GROUP_FIRST_KEY.equalsIgnoreCase(name)) { @@ -1294,6 +1298,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { String user = null; Boolean exclusive = null; Boolean groupRebalance = null; + Boolean groupRebalancePauseDispatch = null; Integer groupBuckets = null; String groupFirstKey = null; Boolean lastValue = null; @@ -1316,6 +1321,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { exclusive = Boolean.parseBoolean(item.getNodeValue()); } else if (item.getNodeName().equals("group-rebalance")) { groupRebalance = Boolean.parseBoolean(item.getNodeValue()); + } else if (item.getNodeName().equals("group-rebalance-pause-dispatch")) { + groupRebalancePauseDispatch = Boolean.parseBoolean(item.getNodeValue()); } else if (item.getNodeName().equals("group-buckets")) { groupBuckets = Integer.parseInt(item.getNodeValue()); } else if (item.getNodeName().equals("group-first-key")) { @@ -1361,6 +1368,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { .setUser(user) .setExclusive(exclusive) .setGroupRebalance(groupRebalance) + .setGroupRebalancePauseDispatch(groupRebalancePauseDispatch) .setGroupBuckets(groupBuckets) .setGroupFirstKey(groupFirstKey) .setLastValue(lastValue) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 9084bebac4..e6493af522 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -2881,6 +2881,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active .add("defaultNonDestructive", addressSettings.isDefaultNonDestructive()) .add("defaultExclusiveQueue", addressSettings.isDefaultExclusiveQueue()) .add("defaultGroupRebalance", addressSettings.isDefaultGroupRebalance()) + .add("defaultGroupRebalancePauseDispatch", addressSettings.isDefaultGroupRebalancePauseDispatch()) .add("defaultGroupBuckets", addressSettings.getDefaultGroupBuckets()) .add("defaultGroupFirstKey", addressSettings.getDefaultGroupFirstKey() == null ? "" : addressSettings.getDefaultGroupFirstKey().toString()) .add("defaultMaxConsumers", addressSettings.getDefaultMaxConsumers()) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 0ce51b8c4d..33d241f35d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -1807,6 +1807,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override + public boolean isGroupRebalancePauseDispatch() { + if (AuditLogger.isEnabled()) { + AuditLogger.isGroupRebalancePauseDispatch(queue); + } + checkStarted(); + + clearIO(); + try { + return queue.isGroupRebalancePauseDispatch(); + } finally { + blockOnIO(); + } + } + @Override public int getGroupBuckets() { if (AuditLogger.isEnabled()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueView.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueView.java index c84b09b809..deaa0bf7cd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueView.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueView.java @@ -65,6 +65,7 @@ public class QueueView extends ActiveMQAbstractView { .add("lastValue", toString(queue.isLastValue())) .add("scheduledCount", toString(queue.getScheduledCount())) .add("groupRebalance", toString(queue.isGroupRebalance())) + .add("groupRebalancePauseDispatch", toString(queue.isGroupRebalancePauseDispatch())) .add("groupBuckets", toString(queue.getGroupBuckets())) .add("groupFirstKey", toString(queue.getGroupFirstKey())); return obj; @@ -122,6 +123,8 @@ public class QueueView extends ActiveMQAbstractView { return q.getScheduledCount(); case "groupRebalance": return queue.isGroupRebalance(); + case "groupRebalancePauseDispatch": + return queue.isGroupRebalancePauseDispatch(); case "groupBuckets": return queue.getGroupBuckets(); case "groupFirstKey": diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java index 62ed7afa2b..9d3c96f797 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java @@ -92,6 +92,8 @@ public interface QueueBindingInfo { boolean isGroupRebalance(); + boolean isGroupRebalancePauseDispatch(); + int getGroupBuckets(); SimpleString getGroupFirstKey(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 551c64f776..8a4b0b9d2b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1307,7 +1307,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp SimpleString filterString = filter == null ? null : filter.getFilterString(); - PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize()); + PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize()); readLock(); try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java index e753395434..1dc7b119fe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java @@ -66,6 +66,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin public boolean groupRebalance; + public boolean groupRebalancePauseDispatch; + public int groupBuckets; public SimpleString groupFirstKey; @@ -118,6 +120,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin configurationManaged + ", groupRebalance=" + groupRebalance + + ", groupRebalancePauseDispatch=" + + groupRebalancePauseDispatch + ", groupBuckets=" + groupBuckets + ", groupFirstKey=" + @@ -141,6 +145,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin final boolean enabled, final boolean exclusive, final boolean groupRebalance, + final boolean groupRebalancePauseDispatch, final int groupBuckets, final SimpleString groupFirstKey, final boolean lastValue, @@ -161,6 +166,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin this.autoCreated = autoCreated; this.maxConsumers = maxConsumers; this.purgeOnNoConsumers = purgeOnNoConsumers; + this.groupRebalancePauseDispatch = groupRebalancePauseDispatch; this.enabled = enabled; this.exclusive = exclusive; this.groupRebalance = groupRebalance; @@ -346,6 +352,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin return groupRebalance; } + @Override + public boolean isGroupRebalancePauseDispatch() { + return groupRebalancePauseDispatch; + } + @Override public int getGroupBuckets() { return groupBuckets; @@ -482,6 +493,12 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin } else { enabled = ActiveMQDefaultConfiguration.getDefaultEnabled(); } + + if (buffer.readableBytes() > 0) { + groupRebalancePauseDispatch = buffer.readBoolean(); + } else { + groupRebalancePauseDispatch = ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch(); + } } @Override @@ -509,6 +526,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin buffer.writeNullableSimpleString(groupFirstKey); buffer.writeLong(ringSize); buffer.writeBoolean(enabled); + buffer.writeBoolean(groupRebalancePauseDispatch); } @Override @@ -533,6 +551,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(groupFirstKey) + DataConstants.SIZE_LONG + + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_BOOLEAN; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 9e063fad27..a3576b37a7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -682,6 +682,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding changed = true; queue.setGroupRebalance(queueConfiguration.isGroupRebalance()); } + if ((forceUpdate || queueConfiguration.isGroupRebalancePauseDispatch() != null) && !Objects.equals(queue.isGroupRebalancePauseDispatch(), queueConfiguration.isGroupRebalancePauseDispatch())) { + changed = true; + queue.setGroupRebalancePauseDispatch(queueConfiguration.isGroupRebalancePauseDispatch()); + } if ((forceUpdate || queueConfiguration.getGroupBuckets() != null) && !Objects.equals(queue.getGroupBuckets(), queueConfiguration.getGroupBuckets())) { changed = true; queue.setGroupBuckets(queueConfiguration.getGroupBuckets()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index be7b1550f9..0f2a071da5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -136,6 +136,10 @@ public interface Queue extends Bindable,CriticalComponent { void setGroupRebalance(boolean groupRebalance); + boolean isGroupRebalancePauseDispatch(); + + void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach); + SimpleString getGroupFirstKey(); void setGroupFirstKey(SimpleString groupFirstKey); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 3384912f82..4fe0ff4f2e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -969,6 +969,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { long defaultDelayBeforeDispatch = addressSettings.getDefaultDelayBeforeDispatch(); int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize(); boolean defaultGroupRebalance = addressSettings.isDefaultGroupRebalance(); + boolean defaultGroupRebalancePauseDispatch = addressSettings.isDefaultGroupRebalancePauseDispatch(); int defaultGroupBuckets = addressSettings.getDefaultGroupBuckets(); SimpleString defaultGroupFirstKey = addressSettings.getDefaultGroupFirstKey(); long autoDeleteQueuesDelay = addressSettings.getAutoDeleteQueuesDelay(); @@ -985,12 +986,12 @@ public class ActiveMQServerImpl implements ActiveMQServer { SimpleString filterString = filter == null ? null : filter.getFilterString(); - response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize(), queue.isEnabled()); + response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize(), queue.isEnabled()); } else if (realName.equals(managementAddress)) { // make an exception for the management address (see HORNETQ-29) - response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null); + response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null); } else { - response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, defaultRingSize, defaultEnabled); + response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, defaultRingSize, defaultEnabled); } return response; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java index 0b3b1ce254..5bbf7c29ed 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java @@ -27,6 +27,7 @@ public class QueueConfigurationUtils { config.setMaxConsumers(config.getMaxConsumers() == null ? as.getDefaultMaxConsumers() : config.getMaxConsumers()); config.setExclusive(config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive()); config.setGroupRebalance(config.isGroupRebalance() == null ? as.isDefaultGroupRebalance() : config.isGroupRebalance()); + config.setGroupRebalancePauseDispatch(config.isGroupRebalancePauseDispatch() == null ? as.isDefaultGroupRebalancePauseDispatch() : config.isGroupRebalancePauseDispatch()); config.setGroupBuckets(config.getGroupBuckets() == null ? as.getDefaultGroupBuckets() : config.getGroupBuckets()); config.setGroupFirstKey(config.getGroupFirstKey() == null ? as.getDefaultGroupFirstKey() : config.getGroupFirstKey()); config.setLastValue(config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java index 895b1ee06b..fb8f096430 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java @@ -21,6 +21,7 @@ import org.apache.activemq.artemis.utils.collections.RepeatableIterator; import org.apache.activemq.artemis.utils.collections.ResettableIterator; import java.util.Set; +import java.util.stream.Stream; public interface QueueConsumers extends Iterable, RepeatableIterator, ResettableIterator { @@ -34,4 +35,6 @@ public interface QueueConsumers extends Iterable, Re boolean isEmpty(); + Stream stream(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java index 0495c5172e..1afca46a6a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.Spliterator; import java.util.concurrent.CopyOnWriteArraySet; import java.util.function.Consumer; +import java.util.stream.Stream; /** * This class's purpose is to hold the consumers. @@ -104,6 +105,11 @@ public class QueueConsumersImpl implements QueueConsume return consumers.isEmpty(); } + @Override + public Stream stream() { + return unmodifiableConsumers.stream(); + } + @Override public Iterator iterator() { return unmodifiableConsumers.iterator(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 9105fc2049..b44d1e0849 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -254,6 +254,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private volatile boolean groupRebalance; + private volatile boolean groupRebalancePauseDispatch; + private volatile int groupBuckets; private volatile SimpleString groupFirstKey; @@ -641,6 +643,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.groupRebalance = queueConfiguration.isGroupRebalance() == null ? ActiveMQDefaultConfiguration.getDefaultGroupRebalance() : queueConfiguration.isGroupRebalance(); + this.groupRebalancePauseDispatch = queueConfiguration.isGroupRebalancePauseDispatch() == null ? ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch() : queueConfiguration.isGroupRebalancePauseDispatch(); + this.groupBuckets = queueConfiguration.getGroupBuckets() == null ? ActiveMQDefaultConfiguration.getDefaultGroupBuckets() : queueConfiguration.getGroupBuckets(); this.groups = groupMap(this.groupBuckets); @@ -917,6 +921,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.groupRebalance = groupRebalance; } + @Override + public boolean isGroupRebalancePauseDispatch() { + return groupRebalancePauseDispatch; + } + + @Override + public synchronized void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDispatch) { + this.groupRebalancePauseDispatch = groupRebalancePauseDispatch; + } + @Override public SimpleString getGroupFirstKey() { return groupFirstKey; @@ -1330,16 +1344,32 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (canDispatch) { return true; } else { + + //Dont change that we can dispatch until inflight's are handled avoids issues with out of order messages. + if (inFlightMessages()) { + return false; + } + + if (consumers.size() >= consumersBeforeDispatch) { + if (dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(false), BooleanUtil.toInt(true))) { + dispatchStartTimeUpdater.set(this, System.currentTimeMillis()); + } + return true; + } + long currentDispatchStartTime = dispatchStartTimeUpdater.get(this); if (currentDispatchStartTime != -1 && currentDispatchStartTime < System.currentTimeMillis()) { dispatchingUpdater.set(this, BooleanUtil.toInt(true)); return true; - } else { - return false; } + return false; } } + private boolean inFlightMessages() { + return consumers.stream().mapToInt(c -> c.consumer().getDeliveringMessages().size()).sum() != 0; + } + @Override public void addConsumer(final Consumer consumer) throws Exception { if (logger.isDebugEnabled()) { @@ -1362,21 +1392,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } cancelRedistributor(); + + if (groupRebalance) { + if (groupRebalancePauseDispatch) { + stopDispatch(); + } + groups.removeAll(); + } + ConsumerHolder newConsumerHolder = new ConsumerHolder<>(consumer); if (consumers.add(newConsumerHolder)) { - int currentConsumerCount = consumers.size(); if (delayBeforeDispatch >= 0) { dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis()); } - if (currentConsumerCount >= consumersBeforeDispatch) { - if (dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(false), BooleanUtil.toInt(true))) { - dispatchStartTimeUpdater.set(this, System.currentTimeMillis()); - } - } - } - if (groupRebalance) { - groups.removeAll(); } if (refCountForConsumers != null) { @@ -1423,9 +1452,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (consumerRemoved) { consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis()); - boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(consumers.size() != 0)); - if (stopped) { - dispatchStartTimeUpdater.set(this, -1); + if (consumers.size() == 0) { + stopDispatch(); } } @@ -1446,6 +1474,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } + private void stopDispatch() { + boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(false)); + if (stopped) { + dispatchStartTimeUpdater.set(this, -1); + } + } + private boolean checkConsumerDirectDeliver() { if (consumers.isEmpty()) { return false; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 4c9205c686..e713b08f54 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -169,6 +169,8 @@ public class AddressSettings implements Mergeable, Serializable private Boolean defaultGroupRebalance = null; + private Boolean defaultGroupRebalancePauseDispatch = null; + private Integer defaultGroupBuckets = null; private SimpleString defaultGroupFirstKey = null; @@ -311,6 +313,7 @@ public class AddressSettings implements Mergeable, Serializable this.defaultAddressRoutingType = other.defaultAddressRoutingType; this.defaultConsumerWindowSize = other.defaultConsumerWindowSize; this.defaultGroupRebalance = other.defaultGroupRebalance; + this.defaultGroupRebalancePauseDispatch = other.defaultGroupRebalancePauseDispatch; this.defaultGroupBuckets = other.defaultGroupBuckets; this.defaultGroupFirstKey = other.defaultGroupFirstKey; this.defaultRingSize = other.defaultRingSize; @@ -839,6 +842,21 @@ public class AddressSettings implements Mergeable, Serializable return this; } + /** + * @return the defaultGroupRebalancePauseDispatch + */ + public boolean isDefaultGroupRebalancePauseDispatch() { + return defaultGroupRebalancePauseDispatch != null ? defaultGroupRebalancePauseDispatch : ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch(); + } + + /** + * @param defaultGroupRebalancePauseDispatch the defaultGroupBuckets to set + */ + public AddressSettings setDefaultGroupRebalancePauseDispatch(boolean defaultGroupRebalancePauseDispatch) { + this.defaultGroupRebalancePauseDispatch = defaultGroupRebalancePauseDispatch; + return this; + } + /** * @return the defaultGroupBuckets */ @@ -1053,6 +1071,9 @@ public class AddressSettings implements Mergeable, Serializable if (defaultGroupRebalance == null) { defaultGroupRebalance = merged.defaultGroupRebalance; } + if (defaultGroupRebalancePauseDispatch == null) { + defaultGroupRebalancePauseDispatch = merged.defaultGroupRebalancePauseDispatch; + } if (defaultGroupBuckets == null) { defaultGroupBuckets = merged.defaultGroupBuckets; } @@ -1294,6 +1315,11 @@ public class AddressSettings implements Mergeable, Serializable if (buffer.readableBytes() > 0) { enableMetrics = BufferHelper.readNullableBoolean(buffer); } + + if (buffer.readableBytes() > 0) { + defaultGroupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer); + } + } @Override @@ -1356,7 +1382,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.sizeOfNullableBoolean(autoCreateExpiryResources) + SimpleString.sizeofNullableString(expiryQueuePrefix) + SimpleString.sizeofNullableString(expiryQueueSuffix) + - BufferHelper.sizeOfNullableBoolean(enableMetrics); + BufferHelper.sizeOfNullableBoolean(enableMetrics) + + BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch); } @Override @@ -1480,6 +1507,9 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableLong(buffer, maxExpiryDelay); BufferHelper.writeNullableBoolean(buffer, enableMetrics); + + BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch); + } /* (non-Javadoc) @@ -1539,6 +1569,7 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode()); result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode()); result = prime * result + ((defaultGroupRebalance == null) ? 0 : defaultGroupRebalance.hashCode()); + result = prime * result + ((defaultGroupRebalancePauseDispatch == null) ? 0 : defaultGroupRebalancePauseDispatch.hashCode()); result = prime * result + ((defaultGroupBuckets == null) ? 0 : defaultGroupBuckets.hashCode()); result = prime * result + ((defaultGroupFirstKey == null) ? 0 : defaultGroupFirstKey.hashCode()); result = prime * result + ((defaultRingSize == null) ? 0 : defaultRingSize.hashCode()); @@ -1825,6 +1856,12 @@ public class AddressSettings implements Mergeable, Serializable } else if (!defaultGroupRebalance.equals(other.defaultGroupRebalance)) return false; + if (defaultGroupRebalancePauseDispatch == null) { + if (other.defaultGroupRebalancePauseDispatch != null) + return false; + } else if (!defaultGroupRebalancePauseDispatch.equals(other.defaultGroupRebalancePauseDispatch)) + return false; + if (defaultGroupBuckets == null) { if (other.defaultGroupBuckets != null) return false; @@ -1996,6 +2033,8 @@ public class AddressSettings implements Mergeable, Serializable defaultConsumerWindowSize + ", defaultGroupRebalance=" + defaultGroupRebalance + + ", defaultGroupRebalancePauseDispatch=" + + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 05c91fd87e..6387a69185 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -526,6 +526,7 @@ + @@ -3365,6 +3366,14 @@ + + + + whether to pause dispatch when rebalancing groups + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java index 2ac3a2ada9..92ee1eb937 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java @@ -51,6 +51,7 @@ public class QueueBindingEncodingTest extends Assert { final boolean configurationManaged = RandomUtil.randomBoolean(); final long ringSize = RandomUtil.randomLong(); final boolean enabled = RandomUtil.randomBoolean(); + final boolean groupRebalancePauseDispatch = RandomUtil.randomBoolean(); PersistentQueueBindingEncoding encoding = new PersistentQueueBindingEncoding(name, address, @@ -62,6 +63,7 @@ public class QueueBindingEncodingTest extends Assert { enabled, exclusive, groupRebalance, + groupRebalancePauseDispatch, groupBuckets, groupFirstKey, lastValue, @@ -105,5 +107,8 @@ public class QueueBindingEncodingTest extends Assert { assertEquals(routingType, decoding.getRoutingType()); assertEquals(configurationManaged, decoding.isConfigurationManaged()); assertEquals(ringSize, decoding.getRingSize()); + + assertEquals(groupRebalancePauseDispatch, decoding.isGroupRebalancePauseDispatch()); + } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 40c472c4da..00b205efc7 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -926,6 +926,16 @@ public class ScheduledDeliveryHandlerTest extends Assert { } + @Override + public boolean isGroupRebalancePauseDispatch() { + return false; + } + + @Override + public void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach) { + + } + @Override public SimpleString getGroupFirstKey() { return null; diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd index a3e5812cad..5a96eef392 100644 --- a/artemis-tools/src/test/resources/artemis-configuration.xsd +++ b/artemis-tools/src/test/resources/artemis-configuration.xsd @@ -518,6 +518,7 @@ + @@ -3158,6 +3159,14 @@ + + + + whether to pause dispatch when rebalancing groups + + + + diff --git a/docs/user-manual/en/message-grouping.md b/docs/user-manual/en/message-grouping.md index 405b86878f..e71efc7f3e 100644 --- a/docs/user-manual/en/message-grouping.md +++ b/docs/user-manual/en/message-grouping.md @@ -152,10 +152,13 @@ via the management API or managment console by invoking `resetAllGroups` By setting `group-rebalance` to `true` at the queue level, every time a consumer is added it will trigger a rebalance/reset of the groups. +As noted above, when group rebalance is done, there is a risk you may have inflight messages being processed, by default the broker will continue to dispatch whilst rebalance is occuring. To ensure that inflight messages are processed before dispatch of new messages post rebalance, +to different consumers, you can set `group-rebalance-pause-dispatch` to `true` which will cause the dispatch to pause whilst rebalance occurs, until all inflight messages are processed. + ```xml
- +
``` @@ -164,8 +167,8 @@ Or on auto-create when using the JMS Client by using address parameters when creating the destination used by the consumer. ```java -Queue queue = session.createQueue("my.destination.name?group-rebalance=true"); -Topic topic = session.createTopic("my.destination.name?group-rebalance=true"); +Queue queue = session.createQueue("my.destination.name?group-rebalance=true&group-rebalance-pause-dispatch=true"); +Topic topic = session.createTopic("my.destination.name?group-rebalance=true&group-rebalance-pause-dispatch=true"); ``` Also the default for all queues under and address can be defaulted using the @@ -174,11 +177,13 @@ Also the default for all queues under and address can be defaulted using the ```xml true + true ``` By default, `default-group-rebalance` is `false` meaning this is disabled/off. +By default, `default-group-rebalance-pause-dispatch` is `false` meaning this is disabled/off. #### Group Buckets diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java index 284d0e84d8..7b033b1abb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java @@ -518,6 +518,155 @@ public class GroupingTest extends JMSTestBase { ctx.close(); } + /** + * This tests ensures that when we have group rebalance and pause dispatch, + * the broker pauses dispatch of new messages to consumers whilst rebalance and awaits existing inflight messages to be handled before restarting dispatch with new reblanced group allocations, + * this allows us to provide a guarantee of message ordering even with rebalance, at the expense that during rebalance dispatch will pause till all consumers with inflight messages are handled. + * + * @throws Exception + */ + @Test + public void testGroupRebalancePauseDispatch() throws Exception { + ConnectionFactory fact = getCF(); + Assume.assumeFalse("only makes sense withOUT auto-group", ((ActiveMQConnectionFactory) fact).isAutoGroup()); + Assume.assumeTrue("only makes sense withOUT explicit group-id", ((ActiveMQConnectionFactory) fact).getGroupID() == null); + String testQueueName = getName() + "_group_rebalance"; + + server.createQueue(new QueueConfiguration(testQueueName).setRoutingType(RoutingType.ANYCAST).setGroupRebalance(true).setGroupRebalancePauseDispatch(true)); + + JMSContext ctx = addContext(getCF().createContext(JMSContext.SESSION_TRANSACTED)); + + Queue testQueue = ctx.createQueue(testQueueName); + + + final String groupID1 = "groupA"; + final String groupID2 = "groupB"; + final String groupID3 = "groupC"; + + + JMSProducer producer1 = ctx.createProducer().setProperty("JMSXGroupID", groupID1); + JMSProducer producer2 = ctx.createProducer().setProperty("JMSXGroupID", groupID2); + JMSProducer producer3 = ctx.createProducer().setProperty("JMSXGroupID", groupID3); + + JMSConsumer consumer1 = ctx.createConsumer(testQueue); + JMSConsumer consumer2 = ctx.createConsumer(testQueue); + + ctx.start(); + + for (int j = 0; j < 10; j++) { + send(ctx, testQueue, groupID1, producer1, j); + } + for (int j = 10; j < 20; j++) { + send(ctx, testQueue, groupID2, producer2, j); + } + for (int j = 20; j < 30; j++) { + send(ctx, testQueue, groupID3, producer3, j); + } + + ctx.commit(); + + //First set of msgs should go to the first consumer only + for (int j = 0; j < 10; j++) { + TextMessage tm = (TextMessage) consumer1.receive(10000); + assertNotNull(tm); + tm.acknowledge(); + assertEquals("Message" + j, tm.getText()); + assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1); + } + ctx.commit(); + + + //Second set of msgs should go to the second consumers only + for (int j = 10; j < 20; j++) { + TextMessage tm = (TextMessage) consumer2.receive(10000); + + assertNotNull(tm); + + tm.acknowledge(); + + assertEquals("Message" + j, tm.getText()); + + assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2); + } + ctx.commit(); + + + //Add new consumer but where third set we have not consumed so should inflight, that should cause rebalance + JMSConsumer consumer3 = ctx.createConsumer(testQueue); + + //Send next set of messages + for (int j = 0; j < 10; j++) { + send(ctx, testQueue, groupID1, producer1, j); + } + for (int j = 10; j < 20; j++) { + send(ctx, testQueue, groupID2, producer2, j); + } + for (int j = 20; j < 30; j++) { + send(ctx, testQueue, groupID3, producer3, j); + } + ctx.commit(); + + //Ensure we dont get anything on the other consumers, whilst we rebalance and there is inflight messages. - e.g. ensure ordering guarentee. + assertNull(consumer2.receiveNoWait()); + assertNull(consumer3.receiveNoWait()); + + //Ensure the inflight set of msgs should go to the first consumer only + for (int j = 20; j < 30; j++) { + TextMessage tm = (TextMessage) consumer1.receive(10000); + + assertNotNull(tm); + + tm.acknowledge(); + + assertEquals("Message" + j, tm.getText()); + + assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3); + } + ctx.commit(); + + //Now we cleared the "inflightm messages" expect that consumers 1,2 and 3 are rebalanced and the messages sent earlier are received. + + //First set of msgs should go to the first consumer only + for (int j = 0; j < 10; j++) { + TextMessage tm = (TextMessage) consumer1.receive(10000); + assertNotNull(tm); + tm.acknowledge(); + assertEquals("Message" + j, tm.getText()); + assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1); + } + + //Second set of msgs should go to the second consumers only + for (int j = 10; j < 20; j++) { + TextMessage tm = (TextMessage) consumer2.receive(10000); + + assertNotNull(tm); + + tm.acknowledge(); + + assertEquals("Message" + j, tm.getText()); + + assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2); + } + + //Third set of msgs should now go to the third consumer now + for (int j = 20; j < 30; j++) { + TextMessage tm = (TextMessage) consumer3.receive(10000); + + assertNotNull(tm); + + tm.acknowledge(); + + assertEquals("Message" + j, tm.getText()); + + assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3); + } + + ctx.commit(); + + ctx.close(); + } + + @Test public void testGroupFirstKey() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index d969998f82..663954ba71 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -95,6 +95,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (Boolean) proxy.retrieveAttributeValue("groupRebalance"); } + @Override + public boolean isGroupRebalancePauseDispatch() { + return (Boolean) proxy.retrieveAttributeValue("groupRebalancePauseDispatch"); + } + @Override public int getGroupBuckets() { return (Integer) proxy.retrieveAttributeValue("groupBuckets", Integer.class); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index b1896bdea8..1ab01c1634 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -187,6 +187,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } + @Override + public boolean isGroupRebalancePauseDispatch() { + return false; + } + + @Override + public void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach) { + + } + @Override public SimpleString getGroupFirstKey() { return null;