ARTEMIS-3709 Add group-rebalance-pause-dispatch attribute to queueType

This commit is contained in:
Domenico Francesco Bruscino 2022-03-16 09:40:29 +01:00 committed by clebertsuconic
parent 842ac1df5d
commit 90af0b3ea9
6 changed files with 56 additions and 5 deletions

View File

@ -145,7 +145,8 @@ public class QueueConfiguration implements Serializable {
* <li>user: {@link #USER}
* <li>max-consumers: {@link #MAX_CONSUMERS}
* <li>exclusive: {@link #EXCLUSIVE}
* <li>group-rebalance: {@link #GROUP_BUCKETS}
* <li>group-rebalance: {@link #GROUP_REBALANCE}
* <li>group-rebalance-pause-dispatch: {@link #GROUP_REBALANCE_PAUSE_DISPATCH}
* <li>group-buckets: {@link #GROUP_BUCKETS}
* <li>group-first-key: {@link #GROUP_FIRST_KEY}
* <li>last-value: {@link #LAST_VALUE}
@ -194,6 +195,8 @@ public class QueueConfiguration implements Serializable {
setExclusive(Boolean.valueOf(value));
} else if (key.equals(GROUP_REBALANCE)) {
setGroupRebalance(Boolean.valueOf(value));
} else if (key.equals(GROUP_REBALANCE_PAUSE_DISPATCH)) {
setGroupRebalancePauseDispatch(Boolean.valueOf(value));
} else if (key.equals(GROUP_BUCKETS)) {
setGroupBuckets(Integer.valueOf(value));
} else if (key.equals(GROUP_FIRST_KEY)) {

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
import org.junit.Assert;
import org.junit.Test;
public class QueueConfigurationTest {
@Test
public void testSetGroupRebalancePauseDispatch() {
QueueConfiguration queueConfiguration = new QueueConfiguration("TEST");
Assert.assertEquals(null, queueConfiguration.isGroupRebalancePauseDispatch());
queueConfiguration.setGroupRebalancePauseDispatch(true);
Assert.assertEquals(true, queueConfiguration.isGroupRebalancePauseDispatch());
queueConfiguration.setGroupRebalancePauseDispatch(false);
Assert.assertEquals(false, queueConfiguration.isGroupRebalancePauseDispatch());
queueConfiguration.set(QueueConfiguration.GROUP_REBALANCE_PAUSE_DISPATCH, Boolean.toString(true));
Assert.assertEquals(true, queueConfiguration.isGroupRebalancePauseDispatch());
queueConfiguration.set(QueueConfiguration.GROUP_REBALANCE_PAUSE_DISPATCH, Boolean.toString(false));
Assert.assertEquals(false, queueConfiguration.isGroupRebalancePauseDispatch());
}
}

View File

@ -4525,6 +4525,7 @@
<xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
<xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
<xsd:attribute name="group-rebalance" type="xsd:boolean" use="optional"/>
<xsd:attribute name="group-rebalance-pause-dispatch" type="xsd:boolean" use="optional"/>
<xsd:attribute name="group-buckets" type="xsd:int" use="optional"/>
<xsd:attribute name="group-first-key" type="xsd:string" use="optional"/>
<xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>

View File

@ -580,6 +580,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals("addr1", queueConfiguration.getAddress().toString());
// If null, then default will be taken from address-settings (which defaults to ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers())
assertEquals(null, queueConfiguration.getMaxConsumers());
assertEquals(null, queueConfiguration.isGroupRebalancePauseDispatch());
// Addr 1 Queue 2
queueConfiguration = addressConfiguration.getQueueConfigs().get(1);
@ -591,6 +592,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(Queue.MAX_CONSUMERS_UNLIMITED, queueConfiguration.getMaxConsumers().intValue());
assertFalse(queueConfiguration.isPurgeOnNoConsumers());
assertEquals("addr1", queueConfiguration.getAddress().toString());
assertEquals(true, queueConfiguration.isGroupRebalancePauseDispatch());
// Addr 2
addressConfiguration = conf.getAddressConfigurations().get(1);
@ -609,6 +611,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(10, queueConfiguration.getMaxConsumers().intValue());
assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queueConfiguration.isPurgeOnNoConsumers());
assertEquals("addr2", queueConfiguration.getAddress().toString());
assertEquals(null, queueConfiguration.isGroupRebalancePauseDispatch());
// Addr 2 Queue 2
queueConfiguration = addressConfiguration.getQueueConfigs().get(1);
@ -620,6 +623,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(null, queueConfiguration.getMaxConsumers());
assertTrue(queueConfiguration.isPurgeOnNoConsumers());
assertEquals("addr2", queueConfiguration.getAddress().toString());
assertEquals(true, queueConfiguration.isGroupRebalancePauseDispatch());
// Addr 3
addressConfiguration = conf.getAddressConfigurations().get(2);

View File

@ -587,7 +587,7 @@
<durable>${falseProp}</durable>
<filter string="color='blue'"/>
</queue>
<queue name="q2" max-consumers="-1" purge-on-no-consumers="${falseProp}">
<queue name="q2" max-consumers="-1" purge-on-no-consumers="${falseProp}" group-rebalance-pause-dispatch="true">
<durable>${trueProp}</durable>
<filter string="color='green'"/>
</queue>
@ -598,7 +598,7 @@
<queue name="q3" max-consumers="10" >
<filter string="color='red'"/>
</queue>
<queue name="q4" purge-on-no-consumers="${trueProp}">
<queue name="q4" purge-on-no-consumers="${trueProp}" group-rebalance-pause-dispatch="true">
<durable>${trueProp}</durable>
</queue>
</multicast>

View File

@ -21,7 +21,7 @@
<durable>${falseProp}</durable>
<filter string="color='blue'"/>
</queue>
<queue name="q2" max-consumers="-1" purge-on-no-consumers="${falseProp}">
<queue name="q2" max-consumers="-1" purge-on-no-consumers="${falseProp}" group-rebalance-pause-dispatch="true">
<durable>${trueProp}</durable>
<filter string="color='green'"/>
</queue>
@ -32,7 +32,7 @@
<queue name="q3" max-consumers="10" >
<filter string="color='red'"/>
</queue>
<queue name="q4" purge-on-no-consumers="${trueProp}">
<queue name="q4" purge-on-no-consumers="${trueProp}" group-rebalance-pause-dispatch="true">
<durable>${trueProp}</durable>
</queue>
</multicast>