ARTEMIS-1361 - Support Updating queue config from broker.xml at runtime

Add support to update Queue config via reload using existing updateQueue method at runtime.
Add/extend unit test cases to include testing reload of queue config.
This commit is contained in:
Michael Andre Pearce 2017-08-22 09:16:48 +01:00 committed by Clebert Suconic
parent 035890e2ab
commit 9fbbb7c416
4 changed files with 38 additions and 3 deletions

View File

@ -2511,12 +2511,22 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception { private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) { for (CoreQueueConfiguration config : queues) {
ActiveMQServerLogger.LOGGER.deployQueue(SimpleString.toSimpleString(config.getName())); addOrUpdateQueue(config);
createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), null, config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getPurgeOnNoConsumers(), true);
} }
} }
private Queue addOrUpdateQueue(CoreQueueConfiguration config) throws Exception {
SimpleString queueName = SimpleString.toSimpleString(config.getName());
ActiveMQServerLogger.LOGGER.deployQueue(queueName);
Queue queue = updateQueue(config.getName(), config.getRoutingType(), config.getMaxConsumers(), config.getPurgeOnNoConsumers());
if (queue == null) {
queue = createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
queueName, SimpleString.toSimpleString(config.getFilterString()), null,
config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getPurgeOnNoConsumers(), true);
}
return queue;
}
private void deployQueuesFromConfiguration() throws Exception { private void deployQueuesFromConfiguration() throws Exception {
deployQueuesFromListCoreQueueConfiguration(configuration.getQueueConfigurations()); deployQueuesFromListCoreQueueConfiguration(configuration.getQueueConfigurations());
} }

View File

@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
@ -147,6 +148,10 @@ public class RedeployTest extends ActiveMQTestBase {
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_1")); Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_2")); Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_change"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(10, getQueue(embeddedJMS, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(false, getQueue(embeddedJMS, "config_test_queue_change_queue").isPurgeOnNoConsumers());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING); Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000); brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
@ -164,6 +169,11 @@ public class RedeployTest extends ActiveMQTestBase {
Assert.assertNotNull(getAddressInfo(embeddedJMS, "permanent_test_queue_removal")); Assert.assertNotNull(getAddressInfo(embeddedJMS, "permanent_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_1")); Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_2")); Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_change"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(1, getQueue(embeddedJMS, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(true, getQueue(embeddedJMS, "config_test_queue_change_queue").isPurgeOnNoConsumers());
} finally { } finally {
embeddedJMS.stop(); embeddedJMS.stop();
} }
@ -173,6 +183,11 @@ public class RedeployTest extends ActiveMQTestBase {
return embeddedJMS.getActiveMQServer().getPostOffice().getAddressInfo(SimpleString.toSimpleString(address)); return embeddedJMS.getActiveMQServer().getPostOffice().getAddressInfo(SimpleString.toSimpleString(address));
} }
private org.apache.activemq.artemis.core.server.Queue getQueue(EmbeddedJMS embeddedJMS, String queueName) throws Exception {
QueueBinding queueBinding = (QueueBinding) embeddedJMS.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString(queueName));
return queueBinding == null ? null : queueBinding.getQueue();
}
private List<String> listQueuesNamesForAddress(EmbeddedJMS embeddedJMS, String address) throws Exception { private List<String> listQueuesNamesForAddress(EmbeddedJMS embeddedJMS, String address) throws Exception {
return embeddedJMS.getActiveMQServer().getPostOffice().listQueuesForAddress(SimpleString.toSimpleString(address)).stream().map( return embeddedJMS.getActiveMQServer().getPostOffice().listQueuesForAddress(SimpleString.toSimpleString(address)).stream().map(
org.apache.activemq.artemis.core.server.Queue::getName).map(SimpleString::toString).collect(Collectors.toList()); org.apache.activemq.artemis.core.server.Queue::getName).map(SimpleString::toString).collect(Collectors.toList());

View File

@ -123,6 +123,11 @@ under the License.
<queue name="permanent_test_queue_removal_queue_1"/> <queue name="permanent_test_queue_removal_queue_1"/>
</multicast> </multicast>
</address> </address>
<address name="config_test_queue_change">
<multicast>
<queue name="config_test_queue_change_queue" max-consumers="1" purge-on-no-consumers="true" />
</multicast>
</address>
</addresses> </addresses>
</core> </core>
</configuration> </configuration>

View File

@ -142,6 +142,11 @@ under the License.
<queue name="permanent_test_address_removal_queue"/> <queue name="permanent_test_address_removal_queue"/>
</multicast> </multicast>
</address> </address>
<address name="config_test_queue_change">
<multicast>
<queue name="config_test_queue_change_queue" max-consumers="10" purge-on-no-consumers="false" />
</multicast>
</address>
</addresses> </addresses>
</core> </core>
</configuration> </configuration>