[ARTEMIS-2863] Add support to pause dispatch when group rebalance

Add test case
Add implementation
Add docs
This commit is contained in:
Michael Pearce 2020-08-04 18:50:49 +01:00 committed by Clebert Suconic
parent c63c1f40ca
commit 2c506cc52a
35 changed files with 517 additions and 27 deletions

View File

@ -28,6 +28,7 @@ public class QueueAttributes implements Serializable {
public static final String MAX_CONSUMERS = "max-consumers";
public static final String EXCLUSIVE = "exclusive";
public static final String GROUP_REBALANCE = "group-rebalance";
public static final String GROUP_REBALANCE_PAUSE_DISPATCH = "group-rebalance-pause-dispatch";
public static final String GROUP_BUCKETS = "group-buckets";
public static final String GROUP_FIRST_KEY = "group-first-key";
public static final String LAST_VALUE = "last-value";
@ -49,6 +50,7 @@ public class QueueAttributes implements Serializable {
private Integer maxConsumers;
private Boolean exclusive;
private Boolean groupRebalance;
private Boolean groupRebalancePauseDispatch;
private Integer groupBuckets;
private SimpleString groupFirstKey;
private Boolean lastValue;
@ -93,6 +95,8 @@ public class QueueAttributes implements Serializable {
setConsumerPriority(Integer.valueOf(value));
} else if (key.equals(GROUP_REBALANCE)) {
setGroupRebalance(Boolean.valueOf(value));
} else if (key.equals(GROUP_REBALANCE_PAUSE_DISPATCH)) {
setGroupRebalancePauseDispatch(Boolean.valueOf(value));
} else if (key.equals(GROUP_BUCKETS)) {
setGroupBuckets(Integer.valueOf(value));
} else if (key.equals(GROUP_FIRST_KEY)) {
@ -119,6 +123,7 @@ public class QueueAttributes implements Serializable {
.setRingSize(this.getRingSize())
.setEnabled(this.isEnabled())
.setGroupRebalance(this.getGroupRebalance())
.setGroupRebalancePauseDispatch(this.getGroupRebalancePauseDispatch())
.setNonDestructive(this.getNonDestructive())
.setLastValue(this.getLastValue())
.setFilterString(this.getFilterString())
@ -146,6 +151,7 @@ public class QueueAttributes implements Serializable {
.setRingSize(queueConfiguration.getRingSize())
.setEnabled(queueConfiguration.isEnabled())
.setGroupRebalance(queueConfiguration.isGroupRebalance())
.setGroupRebalancePauseDispatch(queueConfiguration.isGroupRebalancePauseDispatch())
.setNonDestructive(queueConfiguration.isNonDestructive())
.setLastValue(queueConfiguration.isLastValue())
.setFilterString(queueConfiguration.getFilterString())
@ -280,6 +286,15 @@ public class QueueAttributes implements Serializable {
return this;
}
public Boolean getGroupRebalancePauseDispatch() {
return groupRebalancePauseDispatch;
}
public QueueAttributes setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) {
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
return this;
}
public Integer getGroupBuckets() {
return groupBuckets;
}

View File

@ -56,6 +56,7 @@ public class QueueConfiguration implements Serializable {
public static final String MAX_CONSUMERS = "max-consumers";
public static final String EXCLUSIVE = "exclusive";
public static final String GROUP_REBALANCE = "group-rebalance";
public static final String GROUP_REBALANCE_PAUSE_DISPATCH = "group-rebalance-pause-dispatch";
public static final String GROUP_BUCKETS = "group-buckets";
public static final String GROUP_FIRST_KEY = "group-first-key";
public static final String LAST_VALUE = "last-value";
@ -87,6 +88,7 @@ public class QueueConfiguration implements Serializable {
private Integer maxConsumers;
private Boolean exclusive;
private Boolean groupRebalance;
private Boolean groupRebalancePauseDispatch;
private Integer groupBuckets;
private SimpleString groupFirstKey;
private Boolean lastValue;
@ -459,6 +461,15 @@ public class QueueConfiguration implements Serializable {
return this;
}
public Boolean isGroupRebalancePauseDispatch() {
return groupRebalancePauseDispatch;
}
public QueueConfiguration setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) {
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
return this;
}
public Integer getGroupBuckets() {
return groupBuckets;
}
@ -630,6 +641,9 @@ public class QueueConfiguration implements Serializable {
if (isGroupRebalance() != null) {
builder.add(GROUP_REBALANCE, isGroupRebalance());
}
if (isGroupRebalancePauseDispatch() != null) {
builder.add(GROUP_REBALANCE_PAUSE_DISPATCH, isGroupRebalancePauseDispatch());
}
if (getGroupBuckets() != null) {
builder.add(GROUP_BUCKETS, getGroupBuckets());
}
@ -746,6 +760,8 @@ public class QueueConfiguration implements Serializable {
return false;
if (!Objects.equals(groupRebalance, that.groupRebalance))
return false;
if (!Objects.equals(groupRebalancePauseDispatch, that.groupRebalancePauseDispatch))
return false;
if (!Objects.equals(groupBuckets, that.groupBuckets))
return false;
if (!Objects.equals(groupFirstKey, that.groupFirstKey))
@ -802,6 +818,7 @@ public class QueueConfiguration implements Serializable {
result = 31 * result + Objects.hashCode(maxConsumers);
result = 31 * result + Objects.hashCode(exclusive);
result = 31 * result + Objects.hashCode(groupRebalance);
result = 31 * result + Objects.hashCode(groupRebalancePauseDispatch);
result = 31 * result + Objects.hashCode(groupBuckets);
result = 31 * result + Objects.hashCode(groupFirstKey);
result = 31 * result + Objects.hashCode(lastValue);
@ -838,6 +855,7 @@ public class QueueConfiguration implements Serializable {
+ ", maxConsumers=" + maxConsumers
+ ", exclusive=" + exclusive
+ ", groupRebalance=" + groupRebalance
+ ", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch
+ ", groupBuckets=" + groupBuckets
+ ", groupFirstKey=" + groupFirstKey
+ ", lastValue=" + lastValue

View File

@ -2729,4 +2729,12 @@ public interface AuditLogger extends BasicLogger {
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601734, value = "User {0} failed to resume address {1}", format = Message.Format.MESSAGE_FORMAT)
void resumeAddressFailure(String user, String queueName);
static void isGroupRebalancePauseDispatch(Object source) {
LOGGER.isGroupRebalancePauseDispatch(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601735, value = "User {0} is getting group rebalance pause dispatch property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void isGroupRebalancePauseDispatch(String user, Object source, Object... args);
}

View File

@ -534,6 +534,8 @@ public final class ActiveMQDefaultConfiguration {
public static final boolean DEFAULT_GROUP_REBALANCE = false;
public static final boolean DEFAULT_GROUP_REBALANCE_PAUSE_DISPATCH = false;
public static final SimpleString DEFAULT_GROUP_FIRST_KEY = null;
public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST;
@ -1503,6 +1505,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_GROUP_REBALANCE;
}
public static boolean getDefaultGroupRebalancePauseDispatch() {
return DEFAULT_GROUP_REBALANCE_PAUSE_DISPATCH;
}
public static SimpleString getDefaultGroupFirstKey() {
return DEFAULT_GROUP_FIRST_KEY;
}

View File

@ -162,6 +162,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
Boolean isGroupRebalance();
Boolean isGroupRebalancePauseDispatch();
Integer getGroupBuckets();
SimpleString getGroupFirstKey();

View File

@ -701,6 +701,12 @@ public interface QueueControl {
@Attribute(desc = "whether the groups of this queue are automatically rebalanced")
boolean isGroupRebalance();
/**
* Returns whether the dispatch is paused when groups of this queue are automatically rebalanced.
*/
@Attribute(desc = "whether the dispatch is paused when groups of this queue are automatically rebalanced")
boolean isGroupRebalancePauseDispatch();
/**
* Will return the group buckets.
*/

View File

@ -52,6 +52,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
private final Boolean groupRebalance;
private final Boolean groupRebalancePauseDispatch;
private final Integer groupBuckets;
private final SimpleString groupFirstKey;
@ -162,7 +164,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize) {
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null);
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null);
}
public QueueQueryImpl(final boolean durable,
@ -180,6 +182,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final RoutingType routingType,
final Boolean exclusive,
final Boolean groupRebalance,
final Boolean groupRebalancePauseDispatch,
final Integer groupBuckets,
final SimpleString groupFirstKey,
final Boolean lastValue,
@ -208,6 +211,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
this.routingType = routingType;
this.exclusive = exclusive;
this.groupRebalance = groupRebalance;
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
this.groupBuckets = groupBuckets;
this.groupFirstKey = groupFirstKey;
this.lastValue = lastValue;
@ -328,6 +332,11 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
return groupRebalance;
}
@Override
public Boolean isGroupRebalancePauseDispatch() {
return groupRebalancePauseDispatch;
}
@Override
public Integer getGroupBuckets() {
return groupBuckets;

View File

@ -905,7 +905,7 @@ public class ActiveMQSessionContext extends SessionContext {
// We try to recreate any non-durable or auto-created queues, since they might not be there on failover/reconnect.
// This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover/reconnection
if (!queueInfo.isDurable() || queueInfo.isAutoCreated()) {
CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.getGroupBuckets(), queueInfo.getGroupFirstKey(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), queueInfo.isNonDestructive(), queueInfo.getConsumersBeforeDispatch(), queueInfo.getDelayBeforeDispatch(), queueInfo.isAutoDelete(), queueInfo.getAutoDeleteDelay(), queueInfo.getAutoDeleteMessageCount(), queueInfo.getRingSize(), queueInfo.isEnabled());
CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.isGroupRebalancePauseDispatch(), queueInfo.getGroupBuckets(), queueInfo.getGroupFirstKey(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), queueInfo.isNonDestructive(), queueInfo.getConsumersBeforeDispatch(), queueInfo.getDelayBeforeDispatch(), queueInfo.isAutoDelete(), queueInfo.getAutoDeleteDelay(), queueInfo.getAutoDeleteMessageCount(), queueInfo.getRingSize(), queueInfo.isEnabled());
sendPacketWithoutLock(sessionChannel, createQueueRequest);
}

View File

@ -37,6 +37,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
private Boolean groupRebalance;
private Boolean groupRebalancePauseDispatch;
private Integer groupBuckets;
private SimpleString groupFirstKey;
@ -81,6 +83,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
requiresResponse,
queueAttributes.getExclusive(),
queueAttributes.getGroupRebalance(),
queueAttributes.getGroupRebalancePauseDispatch(),
queueAttributes.getGroupBuckets(),
queueAttributes.getGroupFirstKey(),
queueAttributes.getLastValue(),
@ -111,6 +114,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
requiresResponse,
queueConfiguration.isExclusive(),
queueConfiguration.isGroupRebalance(),
queueConfiguration.isGroupRebalancePauseDispatch(),
queueConfiguration.getGroupBuckets(),
queueConfiguration.getGroupFirstKey(),
queueConfiguration.isLastValue(),
@ -138,6 +142,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
final boolean requiresResponse,
final Boolean exclusive,
final Boolean groupRebalance,
final Boolean groupRebalancePauseDispatch,
final Integer groupBuckets,
final SimpleString groupFirstKey,
final Boolean lastValue,
@ -164,6 +169,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
this.purgeOnNoConsumers = purgeOnNoConsumers;
this.exclusive = exclusive;
this.groupRebalance = groupRebalance;
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
this.groupBuckets = groupBuckets;
this.groupFirstKey = groupFirstKey;
this.lastValue = lastValue;
@ -191,6 +197,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
.setRoutingType(routingType)
.setExclusive(exclusive)
.setGroupRebalance(groupRebalance)
.setGroupRebalancePauseDispatch(groupRebalancePauseDispatch)
.setNonDestructive(nonDestructive)
.setLastValue(lastValue)
.setFilterString(filterString)
@ -219,6 +226,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
buff.append(", purgeOnNoConsumers=" + purgeOnNoConsumers);
buff.append(", exclusive=" + exclusive);
buff.append(", groupRebalance=" + groupRebalance);
buff.append(", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch);
buff.append(", groupBuckets=" + groupBuckets);
buff.append(", groupFirstKey=" + groupFirstKey);
buff.append(", lastValue=" + lastValue);
@ -324,6 +332,14 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
this.groupRebalance = groupRebalance;
}
public Boolean isGroupRebalancePauseDispatch() {
return groupRebalancePauseDispatch;
}
public void setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) {
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
}
public Integer getGroupBuckets() {
return groupBuckets;
}
@ -401,6 +417,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
buffer.writeNullableSimpleString(groupFirstKey);
BufferHelper.writeNullableLong(buffer, ringSize);
BufferHelper.writeNullableBoolean(buffer, enabled);
BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch);
}
@Override
@ -434,6 +451,9 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
if (buffer.readableBytes() > 0) {
enabled = BufferHelper.readNullableBoolean(buffer);
}
if (buffer.readableBytes() > 0) {
groupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
}
}
@Override
@ -446,6 +466,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
result = prime * result + (purgeOnNoConsumers ? 1231 : 1237);
result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
result = prime * result + (groupRebalance == null ? 0 : groupRebalance ? 1231 : 1237);
result = prime * result + (groupRebalancePauseDispatch == null ? 0 : groupRebalancePauseDispatch ? 1231 : 1237);
result = prime * result + (groupBuckets == null ? 0 : groupBuckets.hashCode());
result = prime * result + (groupFirstKey == null ? 0 : groupFirstKey.hashCode());
result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
@ -486,6 +507,11 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
return false;
} else if (!groupRebalance.equals(other.groupRebalance))
return false;
if (groupRebalancePauseDispatch == null) {
if (other.groupRebalancePauseDispatch != null)
return false;
} else if (!groupRebalancePauseDispatch.equals(other.groupRebalancePauseDispatch))
return false;
if (groupBuckets == null) {
if (other.groupBuckets != null)
return false;

View File

@ -29,6 +29,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
private Boolean purgeOnNoConsumers;
private Boolean exclusive;
private Boolean groupRebalance;
private Boolean groupRebalancePauseDispatch;
private Integer groupBuckets;
private SimpleString groupFirstKey;
private Boolean lastValue;
@ -53,6 +54,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
queueConfiguration.isPurgeOnNoConsumers(),
queueConfiguration.isExclusive(),
queueConfiguration.isGroupRebalance(),
queueConfiguration.isGroupRebalancePauseDispatch(),
queueConfiguration.getGroupBuckets(),
queueConfiguration.getGroupFirstKey(),
queueConfiguration.isLastValue(),
@ -78,6 +80,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
final Boolean purgeOnNoConsumers,
final Boolean exclusive,
final Boolean groupRebalance,
final Boolean groupRebalancePauseDispatch,
final Integer groupBuckets,
final SimpleString groupFirstKey,
final Boolean lastValue,
@ -102,6 +105,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
this.purgeOnNoConsumers = purgeOnNoConsumers;
this.exclusive = exclusive;
this.groupRebalance = groupRebalance;
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
this.groupBuckets = groupBuckets;
this.groupFirstKey = groupFirstKey;
this.lastValue = lastValue;
@ -201,6 +205,14 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
this.groupRebalance = groupRebalance;
}
public Boolean isGroupRebalancePauseDispatch() {
return groupRebalancePauseDispatch;
}
public void setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) {
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
}
public Integer getGroupBuckets() {
return groupBuckets;
}
@ -264,6 +276,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
.setRoutingType(routingType)
.setExclusive(exclusive)
.setGroupRebalance(groupRebalance)
.setGroupRebalancePauseDispatch(groupRebalancePauseDispatch)
.setNonDestructive(nonDestructive)
.setLastValue(lastValue)
.setFilterString(filterString)
@ -293,6 +306,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
buff.append(", purgeOnNoConsumers=" + purgeOnNoConsumers);
buff.append(", exclusive=" + exclusive);
buff.append(", groupRebalance=" + groupRebalance);
buff.append(", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch);
buff.append(", groupBuckets=" + groupBuckets);
buff.append(", groupFirstKey=" + groupFirstKey);
buff.append(", lastValue=" + lastValue);
@ -334,6 +348,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
buffer.writeNullableSimpleString(groupFirstKey);
BufferHelper.writeNullableLong(buffer, ringSize);
BufferHelper.writeNullableBoolean(buffer, enabled);
BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch);
}
@Override
@ -370,6 +385,9 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
if (buffer.readableBytes() > 0) {
enabled = buffer.readNullableBoolean();
}
if (buffer.readableBytes() > 0) {
groupRebalancePauseDispatch = buffer.readNullableBoolean();
}
}
@Override
@ -386,6 +404,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
result = prime * result + (purgeOnNoConsumers == null ? 0 : purgeOnNoConsumers ? 1231 : 1237);
result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
result = prime * result + (groupRebalance == null ? 0 : groupRebalance ? 1231 : 1237);
result = prime * result + (groupRebalancePauseDispatch == null ? 0 : groupRebalancePauseDispatch ? 1231 : 1237);
result = prime * result + (groupBuckets == null ? 0 : groupBuckets.hashCode());
result = prime * result + (groupFirstKey == null ? 0 : groupFirstKey.hashCode());
result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
@ -451,6 +470,11 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
return false;
} else if (!groupRebalance.equals(other.groupRebalance))
return false;
if (groupRebalancePauseDispatch == null) {
if (other.groupRebalancePauseDispatch != null)
return false;
} else if (!groupRebalancePauseDispatch.equals(other.groupRebalancePauseDispatch))
return false;
if (groupBuckets == null) {
if (other.groupBuckets != null)
return false;

View File

@ -38,6 +38,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
protected Boolean groupRebalance;
protected Boolean groupRebalancePauseDispatch;
protected Integer groupBuckets;
protected SimpleString groupFirstKey;
@ -65,11 +67,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
private Boolean enabled;
public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize(), result.getRingSize(), result.isEnabled());
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize(), result.getRingSize(), result.isEnabled());
}
public SessionQueueQueryResponseMessage_V3() {
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null);
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null);
}
private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@ -87,6 +89,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
final int maxConsumers,
final Boolean exclusive,
final Boolean groupRebalance,
final Boolean groupRebalancePauseDispatch,
final Integer groupBuckets,
final SimpleString groupFirstKey,
final Boolean lastValue,
@ -132,6 +135,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.groupRebalance = groupRebalance;
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
this.groupBuckets = groupBuckets;
this.groupFirstKey = groupFirstKey;
@ -255,6 +260,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.groupRebalance = groupRebalance;
}
public Boolean isGroupRebalancePauseDispatch() {
return groupRebalancePauseDispatch;
}
public void setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) {
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
}
public Integer getGroupBuckets() {
return groupBuckets;
}
@ -321,6 +334,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
buffer.writeNullableSimpleString(groupFirstKey);
BufferHelper.writeNullableLong(buffer, ringSize);
BufferHelper.writeNullableBoolean(buffer, enabled);
BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch);
}
@ -358,6 +372,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
if (buffer.readableBytes() > 0) {
enabled = BufferHelper.readNullableBoolean(buffer);
}
if (buffer.readableBytes() > 0) {
groupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
}
}
@Override
@ -370,6 +387,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
result = prime * result + maxConsumers;
result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
result = prime * result + (groupRebalance == null ? 0 : groupRebalance ? 1231 : 1237);
result = prime * result + (groupRebalancePauseDispatch == null ? 0 : groupRebalancePauseDispatch ? 1231 : 1237);
result = prime * result + (groupBuckets == null ? 0 : groupBuckets.hashCode());
result = prime * result + (groupFirstKey == null ? 0 : groupFirstKey.hashCode());
result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
@ -402,6 +420,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
buff.append(", maxConsumers=" + maxConsumers);
buff.append(", exclusive=" + exclusive);
buff.append(", groupRebalance=" + groupRebalance);
buff.append(", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch);
buff.append(", groupBuckets=" + groupBuckets);
buff.append(", groupFirstKey=" + groupFirstKey);
buff.append(", lastValue=" + lastValue);
@ -420,7 +439,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
@Override
public ClientSession.QueueQuery toQueueQuery() {
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize(), getRingSize(), isEnabled());
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize(), getRingSize(), isEnabled());
}
@Override
@ -446,6 +465,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
return false;
} else if (!groupRebalance.equals(other.groupRebalance))
return false;
if (groupRebalancePauseDispatch == null) {
if (other.groupRebalancePauseDispatch != null)
return false;
} else if (!groupRebalancePauseDispatch.equals(other.groupRebalancePauseDispatch))
return false;
if (groupBuckets == null) {
if (other.groupBuckets != null)
return false;

View File

@ -51,6 +51,8 @@ public class QueueQueryResult {
private Boolean groupRebalance;
private Boolean groupRebalancePauseDispatch;
private Integer groupBuckets;
private SimpleString groupFirstKey;
@ -92,6 +94,7 @@ public class QueueQueryResult {
final int maxConsumers,
final Boolean exclusive,
final Boolean groupRebalance,
final Boolean groupRebalancePauseDispatch,
final Integer groupBuckets,
final SimpleString groupFirstKey,
final Boolean lastValue,
@ -135,6 +138,8 @@ public class QueueQueryResult {
this.groupRebalance = groupRebalance;
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
this.groupBuckets = groupBuckets;
this.groupFirstKey = groupFirstKey;
@ -250,6 +255,10 @@ public class QueueQueryResult {
return groupRebalance;
}
public Boolean isGroupRebalancePauseDispatch() {
return groupRebalancePauseDispatch;
}
public Integer getGroupBuckets() {
return groupBuckets;
}

View File

@ -216,6 +216,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String DEFAULT_GROUP_REBALANCE = "default-group-rebalance";
private static final String DEFAULT_GROUP_REBALANCE_PAUSE_DISPATCH = "default-group-rebalance-pause-dispatch";
private static final String DEFAULT_GROUP_BUCKETS = "default-group-buckets";
private static final String DEFAULT_GROUP_FIRST_KEY = "default-group-first-key";
@ -1149,6 +1151,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
addressSettings.setDefaultExclusiveQueue(XMLUtil.parseBoolean(child));
} else if (DEFAULT_GROUP_REBALANCE.equalsIgnoreCase(name)) {
addressSettings.setDefaultGroupRebalance(XMLUtil.parseBoolean(child));
} else if (DEFAULT_GROUP_REBALANCE_PAUSE_DISPATCH.equalsIgnoreCase(name)) {
addressSettings.setDefaultGroupRebalancePauseDispatch(XMLUtil.parseBoolean(child));
} else if (DEFAULT_GROUP_BUCKETS.equalsIgnoreCase(name)) {
addressSettings.setDefaultGroupBuckets(XMLUtil.parseInt(child));
} else if (DEFAULT_GROUP_FIRST_KEY.equalsIgnoreCase(name)) {
@ -1294,6 +1298,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
String user = null;
Boolean exclusive = null;
Boolean groupRebalance = null;
Boolean groupRebalancePauseDispatch = null;
Integer groupBuckets = null;
String groupFirstKey = null;
Boolean lastValue = null;
@ -1316,6 +1321,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
exclusive = Boolean.parseBoolean(item.getNodeValue());
} else if (item.getNodeName().equals("group-rebalance")) {
groupRebalance = Boolean.parseBoolean(item.getNodeValue());
} else if (item.getNodeName().equals("group-rebalance-pause-dispatch")) {
groupRebalancePauseDispatch = Boolean.parseBoolean(item.getNodeValue());
} else if (item.getNodeName().equals("group-buckets")) {
groupBuckets = Integer.parseInt(item.getNodeValue());
} else if (item.getNodeName().equals("group-first-key")) {
@ -1361,6 +1368,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
.setUser(user)
.setExclusive(exclusive)
.setGroupRebalance(groupRebalance)
.setGroupRebalancePauseDispatch(groupRebalancePauseDispatch)
.setGroupBuckets(groupBuckets)
.setGroupFirstKey(groupFirstKey)
.setLastValue(lastValue)

View File

@ -2881,6 +2881,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
.add("defaultNonDestructive", addressSettings.isDefaultNonDestructive())
.add("defaultExclusiveQueue", addressSettings.isDefaultExclusiveQueue())
.add("defaultGroupRebalance", addressSettings.isDefaultGroupRebalance())
.add("defaultGroupRebalancePauseDispatch", addressSettings.isDefaultGroupRebalancePauseDispatch())
.add("defaultGroupBuckets", addressSettings.getDefaultGroupBuckets())
.add("defaultGroupFirstKey", addressSettings.getDefaultGroupFirstKey() == null ? "" : addressSettings.getDefaultGroupFirstKey().toString())
.add("defaultMaxConsumers", addressSettings.getDefaultMaxConsumers())

View File

@ -1807,6 +1807,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public boolean isGroupRebalancePauseDispatch() {
if (AuditLogger.isEnabled()) {
AuditLogger.isGroupRebalancePauseDispatch(queue);
}
checkStarted();
clearIO();
try {
return queue.isGroupRebalancePauseDispatch();
} finally {
blockOnIO();
}
}
@Override
public int getGroupBuckets() {
if (AuditLogger.isEnabled()) {

View File

@ -65,6 +65,7 @@ public class QueueView extends ActiveMQAbstractView<QueueControl> {
.add("lastValue", toString(queue.isLastValue()))
.add("scheduledCount", toString(queue.getScheduledCount()))
.add("groupRebalance", toString(queue.isGroupRebalance()))
.add("groupRebalancePauseDispatch", toString(queue.isGroupRebalancePauseDispatch()))
.add("groupBuckets", toString(queue.getGroupBuckets()))
.add("groupFirstKey", toString(queue.getGroupFirstKey()));
return obj;
@ -122,6 +123,8 @@ public class QueueView extends ActiveMQAbstractView<QueueControl> {
return q.getScheduledCount();
case "groupRebalance":
return queue.isGroupRebalance();
case "groupRebalancePauseDispatch":
return queue.isGroupRebalancePauseDispatch();
case "groupBuckets":
return queue.getGroupBuckets();
case "groupFirstKey":

View File

@ -92,6 +92,8 @@ public interface QueueBindingInfo {
boolean isGroupRebalance();
boolean isGroupRebalancePauseDispatch();
int getGroupBuckets();
SimpleString getGroupFirstKey();

View File

@ -1307,7 +1307,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
SimpleString filterString = filter == null ? null : filter.getFilterString();
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize());
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize());
readLock();
try {

View File

@ -66,6 +66,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public boolean groupRebalance;
public boolean groupRebalancePauseDispatch;
public int groupBuckets;
public SimpleString groupFirstKey;
@ -118,6 +120,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
configurationManaged +
", groupRebalance=" +
groupRebalance +
", groupRebalancePauseDispatch=" +
groupRebalancePauseDispatch +
", groupBuckets=" +
groupBuckets +
", groupFirstKey=" +
@ -141,6 +145,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final boolean enabled,
final boolean exclusive,
final boolean groupRebalance,
final boolean groupRebalancePauseDispatch,
final int groupBuckets,
final SimpleString groupFirstKey,
final boolean lastValue,
@ -161,6 +166,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.autoCreated = autoCreated;
this.maxConsumers = maxConsumers;
this.purgeOnNoConsumers = purgeOnNoConsumers;
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
this.enabled = enabled;
this.exclusive = exclusive;
this.groupRebalance = groupRebalance;
@ -346,6 +352,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
return groupRebalance;
}
@Override
public boolean isGroupRebalancePauseDispatch() {
return groupRebalancePauseDispatch;
}
@Override
public int getGroupBuckets() {
return groupBuckets;
@ -482,6 +493,12 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
} else {
enabled = ActiveMQDefaultConfiguration.getDefaultEnabled();
}
if (buffer.readableBytes() > 0) {
groupRebalancePauseDispatch = buffer.readBoolean();
} else {
groupRebalancePauseDispatch = ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch();
}
}
@Override
@ -509,6 +526,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeNullableSimpleString(groupFirstKey);
buffer.writeLong(ringSize);
buffer.writeBoolean(enabled);
buffer.writeBoolean(groupRebalancePauseDispatch);
}
@Override
@ -533,6 +551,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
DataConstants.SIZE_LONG +
SimpleString.sizeofNullableString(groupFirstKey) +
DataConstants.SIZE_LONG +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN;
}

View File

@ -682,6 +682,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
changed = true;
queue.setGroupRebalance(queueConfiguration.isGroupRebalance());
}
if ((forceUpdate || queueConfiguration.isGroupRebalancePauseDispatch() != null) && !Objects.equals(queue.isGroupRebalancePauseDispatch(), queueConfiguration.isGroupRebalancePauseDispatch())) {
changed = true;
queue.setGroupRebalancePauseDispatch(queueConfiguration.isGroupRebalancePauseDispatch());
}
if ((forceUpdate || queueConfiguration.getGroupBuckets() != null) && !Objects.equals(queue.getGroupBuckets(), queueConfiguration.getGroupBuckets())) {
changed = true;
queue.setGroupBuckets(queueConfiguration.getGroupBuckets());

View File

@ -136,6 +136,10 @@ public interface Queue extends Bindable,CriticalComponent {
void setGroupRebalance(boolean groupRebalance);
boolean isGroupRebalancePauseDispatch();
void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach);
SimpleString getGroupFirstKey();
void setGroupFirstKey(SimpleString groupFirstKey);

View File

@ -969,6 +969,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
long defaultDelayBeforeDispatch = addressSettings.getDefaultDelayBeforeDispatch();
int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
boolean defaultGroupRebalance = addressSettings.isDefaultGroupRebalance();
boolean defaultGroupRebalancePauseDispatch = addressSettings.isDefaultGroupRebalancePauseDispatch();
int defaultGroupBuckets = addressSettings.getDefaultGroupBuckets();
SimpleString defaultGroupFirstKey = addressSettings.getDefaultGroupFirstKey();
long autoDeleteQueuesDelay = addressSettings.getAutoDeleteQueuesDelay();
@ -985,12 +986,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString filterString = filter == null ? null : filter.getFilterString();
response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize(), queue.isEnabled());
response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize(), queue.isEnabled());
} else if (realName.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29)
response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null);
response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null);
} else {
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, defaultRingSize, defaultEnabled);
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, defaultRingSize, defaultEnabled);
}
return response;

View File

@ -27,6 +27,7 @@ public class QueueConfigurationUtils {
config.setMaxConsumers(config.getMaxConsumers() == null ? as.getDefaultMaxConsumers() : config.getMaxConsumers());
config.setExclusive(config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive());
config.setGroupRebalance(config.isGroupRebalance() == null ? as.isDefaultGroupRebalance() : config.isGroupRebalance());
config.setGroupRebalancePauseDispatch(config.isGroupRebalancePauseDispatch() == null ? as.isDefaultGroupRebalancePauseDispatch() : config.isGroupRebalancePauseDispatch());
config.setGroupBuckets(config.getGroupBuckets() == null ? as.getDefaultGroupBuckets() : config.getGroupBuckets());
config.setGroupFirstKey(config.getGroupFirstKey() == null ? as.getDefaultGroupFirstKey() : config.getGroupFirstKey());
config.setLastValue(config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue());

View File

@ -21,6 +21,7 @@ import org.apache.activemq.artemis.utils.collections.RepeatableIterator;
import org.apache.activemq.artemis.utils.collections.ResettableIterator;
import java.util.Set;
import java.util.stream.Stream;
public interface QueueConsumers<T extends PriorityAware> extends Iterable<T>, RepeatableIterator<T>, ResettableIterator<T> {
@ -34,4 +35,6 @@ public interface QueueConsumers<T extends PriorityAware> extends Iterable<T>, Re
boolean isEmpty();
Stream<T> stream();
}

View File

@ -27,6 +27,7 @@ import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
import java.util.stream.Stream;
/**
* This class's purpose is to hold the consumers.
@ -104,6 +105,11 @@ public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsume
return consumers.isEmpty();
}
@Override
public Stream<T> stream() {
return unmodifiableConsumers.stream();
}
@Override
public Iterator<T> iterator() {
return unmodifiableConsumers.iterator();

View File

@ -254,6 +254,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile boolean groupRebalance;
private volatile boolean groupRebalancePauseDispatch;
private volatile int groupBuckets;
private volatile SimpleString groupFirstKey;
@ -641,6 +643,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.groupRebalance = queueConfiguration.isGroupRebalance() == null ? ActiveMQDefaultConfiguration.getDefaultGroupRebalance() : queueConfiguration.isGroupRebalance();
this.groupRebalancePauseDispatch = queueConfiguration.isGroupRebalancePauseDispatch() == null ? ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch() : queueConfiguration.isGroupRebalancePauseDispatch();
this.groupBuckets = queueConfiguration.getGroupBuckets() == null ? ActiveMQDefaultConfiguration.getDefaultGroupBuckets() : queueConfiguration.getGroupBuckets();
this.groups = groupMap(this.groupBuckets);
@ -917,6 +921,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.groupRebalance = groupRebalance;
}
@Override
public boolean isGroupRebalancePauseDispatch() {
return groupRebalancePauseDispatch;
}
@Override
public synchronized void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDispatch) {
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
}
@Override
public SimpleString getGroupFirstKey() {
return groupFirstKey;
@ -1330,16 +1344,32 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (canDispatch) {
return true;
} else {
//Dont change that we can dispatch until inflight's are handled avoids issues with out of order messages.
if (inFlightMessages()) {
return false;
}
if (consumers.size() >= consumersBeforeDispatch) {
if (dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(false), BooleanUtil.toInt(true))) {
dispatchStartTimeUpdater.set(this, System.currentTimeMillis());
}
return true;
}
long currentDispatchStartTime = dispatchStartTimeUpdater.get(this);
if (currentDispatchStartTime != -1 && currentDispatchStartTime < System.currentTimeMillis()) {
dispatchingUpdater.set(this, BooleanUtil.toInt(true));
return true;
} else {
return false;
}
return false;
}
}
private boolean inFlightMessages() {
return consumers.stream().mapToInt(c -> c.consumer().getDeliveringMessages().size()).sum() != 0;
}
@Override
public void addConsumer(final Consumer consumer) throws Exception {
if (logger.isDebugEnabled()) {
@ -1362,21 +1392,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
cancelRedistributor();
if (groupRebalance) {
if (groupRebalancePauseDispatch) {
stopDispatch();
}
groups.removeAll();
}
ConsumerHolder<Consumer> newConsumerHolder = new ConsumerHolder<>(consumer);
if (consumers.add(newConsumerHolder)) {
int currentConsumerCount = consumers.size();
if (delayBeforeDispatch >= 0) {
dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis());
}
if (currentConsumerCount >= consumersBeforeDispatch) {
if (dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(false), BooleanUtil.toInt(true))) {
dispatchStartTimeUpdater.set(this, System.currentTimeMillis());
}
}
}
if (groupRebalance) {
groups.removeAll();
}
if (refCountForConsumers != null) {
@ -1423,9 +1452,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (consumerRemoved) {
consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis());
boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(consumers.size() != 0));
if (stopped) {
dispatchStartTimeUpdater.set(this, -1);
if (consumers.size() == 0) {
stopDispatch();
}
}
@ -1446,6 +1474,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
private void stopDispatch() {
boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(false));
if (stopped) {
dispatchStartTimeUpdater.set(this, -1);
}
}
private boolean checkConsumerDirectDeliver() {
if (consumers.isEmpty()) {
return false;

View File

@ -169,6 +169,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Boolean defaultGroupRebalance = null;
private Boolean defaultGroupRebalancePauseDispatch = null;
private Integer defaultGroupBuckets = null;
private SimpleString defaultGroupFirstKey = null;
@ -311,6 +313,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.defaultAddressRoutingType = other.defaultAddressRoutingType;
this.defaultConsumerWindowSize = other.defaultConsumerWindowSize;
this.defaultGroupRebalance = other.defaultGroupRebalance;
this.defaultGroupRebalancePauseDispatch = other.defaultGroupRebalancePauseDispatch;
this.defaultGroupBuckets = other.defaultGroupBuckets;
this.defaultGroupFirstKey = other.defaultGroupFirstKey;
this.defaultRingSize = other.defaultRingSize;
@ -839,6 +842,21 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
/**
* @return the defaultGroupRebalancePauseDispatch
*/
public boolean isDefaultGroupRebalancePauseDispatch() {
return defaultGroupRebalancePauseDispatch != null ? defaultGroupRebalancePauseDispatch : ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch();
}
/**
* @param defaultGroupRebalancePauseDispatch the defaultGroupBuckets to set
*/
public AddressSettings setDefaultGroupRebalancePauseDispatch(boolean defaultGroupRebalancePauseDispatch) {
this.defaultGroupRebalancePauseDispatch = defaultGroupRebalancePauseDispatch;
return this;
}
/**
* @return the defaultGroupBuckets
*/
@ -1053,6 +1071,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (defaultGroupRebalance == null) {
defaultGroupRebalance = merged.defaultGroupRebalance;
}
if (defaultGroupRebalancePauseDispatch == null) {
defaultGroupRebalancePauseDispatch = merged.defaultGroupRebalancePauseDispatch;
}
if (defaultGroupBuckets == null) {
defaultGroupBuckets = merged.defaultGroupBuckets;
}
@ -1294,6 +1315,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) {
enableMetrics = BufferHelper.readNullableBoolean(buffer);
}
if (buffer.readableBytes() > 0) {
defaultGroupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
}
}
@Override
@ -1356,7 +1382,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableBoolean(autoCreateExpiryResources) +
SimpleString.sizeofNullableString(expiryQueuePrefix) +
SimpleString.sizeofNullableString(expiryQueueSuffix) +
BufferHelper.sizeOfNullableBoolean(enableMetrics);
BufferHelper.sizeOfNullableBoolean(enableMetrics) +
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch);
}
@Override
@ -1480,6 +1507,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableLong(buffer, maxExpiryDelay);
BufferHelper.writeNullableBoolean(buffer, enableMetrics);
BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch);
}
/* (non-Javadoc)
@ -1539,6 +1569,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode());
result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
result = prime * result + ((defaultGroupRebalance == null) ? 0 : defaultGroupRebalance.hashCode());
result = prime * result + ((defaultGroupRebalancePauseDispatch == null) ? 0 : defaultGroupRebalancePauseDispatch.hashCode());
result = prime * result + ((defaultGroupBuckets == null) ? 0 : defaultGroupBuckets.hashCode());
result = prime * result + ((defaultGroupFirstKey == null) ? 0 : defaultGroupFirstKey.hashCode());
result = prime * result + ((defaultRingSize == null) ? 0 : defaultRingSize.hashCode());
@ -1825,6 +1856,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
} else if (!defaultGroupRebalance.equals(other.defaultGroupRebalance))
return false;
if (defaultGroupRebalancePauseDispatch == null) {
if (other.defaultGroupRebalancePauseDispatch != null)
return false;
} else if (!defaultGroupRebalancePauseDispatch.equals(other.defaultGroupRebalancePauseDispatch))
return false;
if (defaultGroupBuckets == null) {
if (other.defaultGroupBuckets != null)
return false;
@ -1996,6 +2033,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
defaultConsumerWindowSize +
", defaultGroupRebalance=" +
defaultGroupRebalance +
", defaultGroupRebalancePauseDispatch=" +
defaultGroupRebalancePauseDispatch +
", defaultGroupBuckets=" +
defaultGroupBuckets +
", defaultGroupFirstKey=" +

View File

@ -526,6 +526,7 @@
<xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
<xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
<xsd:attribute name="group-rebalance" type="xsd:boolean" use="optional"/>
<xsd:attribute name="group-rebalance-pause-dispatch" type="xsd:boolean" use="optional"/>
<xsd:attribute name="group-buckets" type="xsd:int" use="optional"/>
<xsd:attribute name="group-first-key" type="xsd:string" use="optional"/>
<xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
@ -3365,6 +3366,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="default-group-rebalance-pause-dispatch" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
whether to pause dispatch when rebalancing groups
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="default-group-buckets" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -51,6 +51,7 @@ public class QueueBindingEncodingTest extends Assert {
final boolean configurationManaged = RandomUtil.randomBoolean();
final long ringSize = RandomUtil.randomLong();
final boolean enabled = RandomUtil.randomBoolean();
final boolean groupRebalancePauseDispatch = RandomUtil.randomBoolean();
PersistentQueueBindingEncoding encoding = new PersistentQueueBindingEncoding(name,
address,
@ -62,6 +63,7 @@ public class QueueBindingEncodingTest extends Assert {
enabled,
exclusive,
groupRebalance,
groupRebalancePauseDispatch,
groupBuckets,
groupFirstKey,
lastValue,
@ -105,5 +107,8 @@ public class QueueBindingEncodingTest extends Assert {
assertEquals(routingType, decoding.getRoutingType());
assertEquals(configurationManaged, decoding.isConfigurationManaged());
assertEquals(ringSize, decoding.getRingSize());
assertEquals(groupRebalancePauseDispatch, decoding.isGroupRebalancePauseDispatch());
}
}

View File

@ -926,6 +926,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public boolean isGroupRebalancePauseDispatch() {
return false;
}
@Override
public void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach) {
}
@Override
public SimpleString getGroupFirstKey() {
return null;

View File

@ -518,6 +518,7 @@
<xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
<xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
<xsd:attribute name="group-rebalance" type="xsd:boolean" use="optional"/>
<xsd:attribute name="group-rebalance-pause-dispatch" type="xsd:boolean" use="optional"/>
<xsd:attribute name="group-buckets" type="xsd:int" use="optional"/>
<xsd:attribute name="group-first-key" type="xsd:string" use="optional"/>
<xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
@ -3158,6 +3159,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="default-group-rebalance-pause-dispatch" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
whether to pause dispatch when rebalancing groups
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="default-group-buckets" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -152,10 +152,13 @@ via the management API or managment console by invoking `resetAllGroups`
By setting `group-rebalance` to `true` at the queue level, every time a consumer is added it will trigger a rebalance/reset of the groups.
As noted above, when group rebalance is done, there is a risk you may have inflight messages being processed, by default the broker will continue to dispatch whilst rebalance is occuring. To ensure that inflight messages are processed before dispatch of new messages post rebalance,
to different consumers, you can set `group-rebalance-pause-dispatch` to `true` which will cause the dispatch to pause whilst rebalance occurs, until all inflight messages are processed.
```xml
<address name="foo.bar">
<multicast>
<queue name="orders1" group-rebalance="true"/>
<queue name="orders1" group-rebalance="true" group-rebalance-pause-dispatch="true"/>
</multicast>
</address>
```
@ -164,8 +167,8 @@ Or on auto-create when using the JMS Client by using address parameters when
creating the destination used by the consumer.
```java
Queue queue = session.createQueue("my.destination.name?group-rebalance=true");
Topic topic = session.createTopic("my.destination.name?group-rebalance=true");
Queue queue = session.createQueue("my.destination.name?group-rebalance=true&group-rebalance-pause-dispatch=true");
Topic topic = session.createTopic("my.destination.name?group-rebalance=true&group-rebalance-pause-dispatch=true");
```
Also the default for all queues under and address can be defaulted using the
@ -174,11 +177,13 @@ Also the default for all queues under and address can be defaulted using the
```xml
<address-setting match="my.address">
<default-group-rebalance>true</default-group-rebalance>
<default-group-rebalance-pause-dispatch>true</default-group-rebalance-pause-dispatch>
</address-setting>
```
By default, `default-group-rebalance` is `false` meaning this is disabled/off.
By default, `default-group-rebalance-pause-dispatch` is `false` meaning this is disabled/off.
#### Group Buckets

View File

@ -518,6 +518,155 @@ public class GroupingTest extends JMSTestBase {
ctx.close();
}
/**
* This tests ensures that when we have group rebalance and pause dispatch,
* the broker pauses dispatch of new messages to consumers whilst rebalance and awaits existing inflight messages to be handled before restarting dispatch with new reblanced group allocations,
* this allows us to provide a guarantee of message ordering even with rebalance, at the expense that during rebalance dispatch will pause till all consumers with inflight messages are handled.
*
* @throws Exception
*/
@Test
public void testGroupRebalancePauseDispatch() throws Exception {
ConnectionFactory fact = getCF();
Assume.assumeFalse("only makes sense withOUT auto-group", ((ActiveMQConnectionFactory) fact).isAutoGroup());
Assume.assumeTrue("only makes sense withOUT explicit group-id", ((ActiveMQConnectionFactory) fact).getGroupID() == null);
String testQueueName = getName() + "_group_rebalance";
server.createQueue(new QueueConfiguration(testQueueName).setRoutingType(RoutingType.ANYCAST).setGroupRebalance(true).setGroupRebalancePauseDispatch(true));
JMSContext ctx = addContext(getCF().createContext(JMSContext.SESSION_TRANSACTED));
Queue testQueue = ctx.createQueue(testQueueName);
final String groupID1 = "groupA";
final String groupID2 = "groupB";
final String groupID3 = "groupC";
JMSProducer producer1 = ctx.createProducer().setProperty("JMSXGroupID", groupID1);
JMSProducer producer2 = ctx.createProducer().setProperty("JMSXGroupID", groupID2);
JMSProducer producer3 = ctx.createProducer().setProperty("JMSXGroupID", groupID3);
JMSConsumer consumer1 = ctx.createConsumer(testQueue);
JMSConsumer consumer2 = ctx.createConsumer(testQueue);
ctx.start();
for (int j = 0; j < 10; j++) {
send(ctx, testQueue, groupID1, producer1, j);
}
for (int j = 10; j < 20; j++) {
send(ctx, testQueue, groupID2, producer2, j);
}
for (int j = 20; j < 30; j++) {
send(ctx, testQueue, groupID3, producer3, j);
}
ctx.commit();
//First set of msgs should go to the first consumer only
for (int j = 0; j < 10; j++) {
TextMessage tm = (TextMessage) consumer1.receive(10000);
assertNotNull(tm);
tm.acknowledge();
assertEquals("Message" + j, tm.getText());
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1);
}
ctx.commit();
//Second set of msgs should go to the second consumers only
for (int j = 10; j < 20; j++) {
TextMessage tm = (TextMessage) consumer2.receive(10000);
assertNotNull(tm);
tm.acknowledge();
assertEquals("Message" + j, tm.getText());
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2);
}
ctx.commit();
//Add new consumer but where third set we have not consumed so should inflight, that should cause rebalance
JMSConsumer consumer3 = ctx.createConsumer(testQueue);
//Send next set of messages
for (int j = 0; j < 10; j++) {
send(ctx, testQueue, groupID1, producer1, j);
}
for (int j = 10; j < 20; j++) {
send(ctx, testQueue, groupID2, producer2, j);
}
for (int j = 20; j < 30; j++) {
send(ctx, testQueue, groupID3, producer3, j);
}
ctx.commit();
//Ensure we dont get anything on the other consumers, whilst we rebalance and there is inflight messages. - e.g. ensure ordering guarentee.
assertNull(consumer2.receiveNoWait());
assertNull(consumer3.receiveNoWait());
//Ensure the inflight set of msgs should go to the first consumer only
for (int j = 20; j < 30; j++) {
TextMessage tm = (TextMessage) consumer1.receive(10000);
assertNotNull(tm);
tm.acknowledge();
assertEquals("Message" + j, tm.getText());
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3);
}
ctx.commit();
//Now we cleared the "inflightm messages" expect that consumers 1,2 and 3 are rebalanced and the messages sent earlier are received.
//First set of msgs should go to the first consumer only
for (int j = 0; j < 10; j++) {
TextMessage tm = (TextMessage) consumer1.receive(10000);
assertNotNull(tm);
tm.acknowledge();
assertEquals("Message" + j, tm.getText());
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1);
}
//Second set of msgs should go to the second consumers only
for (int j = 10; j < 20; j++) {
TextMessage tm = (TextMessage) consumer2.receive(10000);
assertNotNull(tm);
tm.acknowledge();
assertEquals("Message" + j, tm.getText());
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2);
}
//Third set of msgs should now go to the third consumer now
for (int j = 20; j < 30; j++) {
TextMessage tm = (TextMessage) consumer3.receive(10000);
assertNotNull(tm);
tm.acknowledge();
assertEquals("Message" + j, tm.getText());
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3);
}
ctx.commit();
ctx.close();
}
@Test
public void testGroupFirstKey() throws Exception {

View File

@ -95,6 +95,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (Boolean) proxy.retrieveAttributeValue("groupRebalance");
}
@Override
public boolean isGroupRebalancePauseDispatch() {
return (Boolean) proxy.retrieveAttributeValue("groupRebalancePauseDispatch");
}
@Override
public int getGroupBuckets() {
return (Integer) proxy.retrieveAttributeValue("groupBuckets", Integer.class);

View File

@ -187,6 +187,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public boolean isGroupRebalancePauseDispatch() {
return false;
}
@Override
public void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach) {
}
@Override
public SimpleString getGroupFirstKey() {
return null;