ARTEMIS-1747 - Fix Configuration change loss when network Issue

Add Test Case to stop and restart server after config reload and check state, this re-creates network health check issue where config changes are lost when network health check de-activates the server and then re-activates.

Add fix to update the held configuration thats used when initialisation steps during start are done.
This commit is contained in:
Michael André Pearce 2018-03-14 22:13:48 +00:00 committed by Clebert Suconic
parent 4f1c94898f
commit c7622d58c0
4 changed files with 444 additions and 0 deletions

View File

@ -3107,17 +3107,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Configuration config = new FileConfigurationParser().parseMainConfig(uri.openStream()); Configuration config = new FileConfigurationParser().parseMainConfig(uri.openStream());
ActiveMQServerLogger.LOGGER.reloadingConfiguration("security"); ActiveMQServerLogger.LOGGER.reloadingConfiguration("security");
securityRepository.swap(config.getSecurityRoles().entrySet()); securityRepository.swap(config.getSecurityRoles().entrySet());
configuration.setSecurityRoles(config.getSecurityRoles());
ActiveMQServerLogger.LOGGER.reloadingConfiguration("address settings"); ActiveMQServerLogger.LOGGER.reloadingConfiguration("address settings");
addressSettingsRepository.swap(config.getAddressesSettings().entrySet()); addressSettingsRepository.swap(config.getAddressesSettings().entrySet());
configuration.setAddressesSettings(config.getAddressesSettings());
ActiveMQServerLogger.LOGGER.reloadingConfiguration("diverts"); ActiveMQServerLogger.LOGGER.reloadingConfiguration("diverts");
for (DivertConfiguration divertConfig : config.getDivertConfigurations()) { for (DivertConfiguration divertConfig : config.getDivertConfigurations()) {
if (postOffice.getBinding(new SimpleString(divertConfig.getName())) == null) { if (postOffice.getBinding(new SimpleString(divertConfig.getName())) == null) {
deployDivert(divertConfig); deployDivert(divertConfig);
} }
} }
ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses"); ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses");
deployAddressesFromConfiguration(config); deployAddressesFromConfiguration(config);
undeployAddressesAndQueueNotInConfiguration(config); undeployAddressesAndQueueNotInConfiguration(config);
configuration.setAddressConfigurations(config.getAddressConfigurations());
configuration.setQueueConfigurations(config.getQueueConfigurations());
} }
} }
} }

View File

@ -28,12 +28,15 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -179,6 +182,117 @@ public class RedeployTest extends ActiveMQTestBase {
} }
} }
/**
* Simulates Stop and Start that occurs when network health checker stops the server when network is detected unhealthy
* and re-starts the broker once detected that it is healthy again.
*
* @throws Exception for anything un-expected, test will fail.
*/
@Test
public void testRedeployStopAndRestart() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-original.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-changed.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedJMS embeddedJMS = new EmbeddedJMS();
embeddedJMS.setConfigResourcePath(brokerXML.toUri().toString());
embeddedJMS.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = latch::countDown;
embeddedJMS.getActiveMQServer().getReloadManager().setTick(tick);
try {
latch.await(10, TimeUnit.SECONDS);
Assert.assertEquals(getSecurityRoles(embeddedJMS, "security_address").size(), 1);
Assert.assertEquals(getSecurityRoles(embeddedJMS, "security_address").iterator().next().getName(), "b");
Assert.assertEquals(getAddressSettings(embeddedJMS, "address_settings_address").getDeadLetterAddress(), SimpleString.toSimpleString("OriginalDLQ"));
Assert.assertEquals(getAddressSettings(embeddedJMS, "address_settings_address").getExpiryAddress(), SimpleString.toSimpleString("OriginalExpiryQueue"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_address_removal_no_queue"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_change"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(10, getQueue(embeddedJMS, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(false, getQueue(embeddedJMS, "config_test_queue_change_queue").isPurgeOnNoConsumers());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedJMS.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
//Assert that the security settings change applied
Assert.assertEquals(getSecurityRoles(embeddedJMS, "security_address").size(), 1);
Assert.assertEquals(getSecurityRoles(embeddedJMS, "security_address").iterator().next().getName(), "c");
//Assert that the address settings change applied
Assert.assertEquals(getAddressSettings(embeddedJMS, "address_settings_address").getDeadLetterAddress(), SimpleString.toSimpleString("NewDLQ"));
Assert.assertEquals(getAddressSettings(embeddedJMS, "address_settings_address").getExpiryAddress(), SimpleString.toSimpleString("NewExpiryQueue"));
//Assert the address and queue changes applied
Assert.assertNull(getAddressInfo(embeddedJMS, "config_test_address_removal_no_queue"));
Assert.assertNull(getAddressInfo(embeddedJMS, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_1"));
Assert.assertFalse(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_change"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(1, getQueue(embeddedJMS, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(true, getQueue(embeddedJMS, "config_test_queue_change_queue").isPurgeOnNoConsumers());
} finally {
embeddedJMS.stop();
}
try {
embeddedJMS.start();
//Assert that the security settings changes persist a stop and start server (e.g. like what occurs if network health check stops the node), but JVM remains up.
Assert.assertEquals(getSecurityRoles(embeddedJMS, "security_address").size(), 1);
Assert.assertEquals(getSecurityRoles(embeddedJMS, "security_address").iterator().next().getName(), "c");
//Assert that the address settings changes persist a stop and start server (e.g. like what occurs if network health check stops the node), but JVM remains up.
Assert.assertEquals(getAddressSettings(embeddedJMS, "address_settings_address").getDeadLetterAddress(), SimpleString.toSimpleString("NewDLQ"));
Assert.assertEquals(getAddressSettings(embeddedJMS, "address_settings_address").getExpiryAddress(), SimpleString.toSimpleString("NewExpiryQueue"));
//Assert that the address and queue changes persist a stop and start server (e.g. like what occurs if network health check stops the node), but JVM remains up.
Assert.assertNull(getAddressInfo(embeddedJMS, "config_test_address_removal_no_queue"));
Assert.assertNull(getAddressInfo(embeddedJMS, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_1"));
Assert.assertFalse(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_change"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(1, getQueue(embeddedJMS, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(true, getQueue(embeddedJMS, "config_test_queue_change_queue").isPurgeOnNoConsumers());
} finally {
embeddedJMS.stop();
}
}
private AddressSettings getAddressSettings(EmbeddedJMS embeddedJMS, String address) {
return embeddedJMS.getActiveMQServer().getAddressSettingsRepository().getMatch(address);
}
private Set<Role> getSecurityRoles(EmbeddedJMS embeddedJMS, String address) {
return embeddedJMS.getActiveMQServer().getSecurityRepository().getMatch(address);
}
private AddressInfo getAddressInfo(EmbeddedJMS embeddedJMS, String address) { private AddressInfo getAddressInfo(EmbeddedJMS embeddedJMS, String address) {
return embeddedJMS.getActiveMQServer().getPostOffice().getAddressInfo(SimpleString.toSimpleString(address)); return embeddedJMS.getActiveMQServer().getPostOffice().getAddressInfo(SimpleString.toSimpleString(address));
} }

View File

@ -0,0 +1,155 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<name>0.0.0.0</name>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<!--
This value was determined through a calculation.
Your system could perform 25 writes per millisecond
on the current journal configuration.
That translates as a sync write every 40000 nanoseconds
-->
<journal-buffer-timeout>40000</journal-buffer-timeout>
<acceptors>
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="a"/>
<permission type="deleteNonDurableQueue" roles="a"/>
<permission type="createDurableQueue" roles="a"/>
<permission type="deleteDurableQueue" roles="a"/>
<permission type="browse" roles="a"/>
<permission type="send" roles="a"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="a"/>
</security-setting>
<security-setting match="security_#">
<permission type="createNonDurableQueue" roles="c"/>
<permission type="deleteNonDurableQueue" roles="c"/>
<permission type="createDurableQueue" roles="c"/>
<permission type="deleteDurableQueue" roles="c"/>
<permission type="browse" roles="c"/>
<permission type="send" roles="c"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="c"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
<address-setting match="config_#">
<auto-create-queues>false</auto-create-queues>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
<address-setting match="address_settings_#">
<auto-create-queues>false</auto-create-queues>
<dead-letter-address>NewDLQ</dead-letter-address>
<expiry-address>NewExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>
<wildcard-addresses>
<delimiter>_</delimiter>
</wildcard-addresses>
<addresses>
<address name="config_test_queue_removal">
<multicast>
<queue name="config_test_queue_removal_queue_1"/>
</multicast>
</address>
<address name="config_test_queue_change">
<multicast>
<queue name="config_test_queue_change_queue" max-consumers="1" purge-on-no-consumers="true" />
</multicast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,168 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<name>0.0.0.0</name>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<!--
This value was determined through a calculation.
Your system could perform 25 writes per millisecond
on the current journal configuration.
That translates as a sync write every 40000 nanoseconds
-->
<journal-buffer-timeout>40000</journal-buffer-timeout>
<acceptors>
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="a"/>
<permission type="deleteNonDurableQueue" roles="a"/>
<permission type="createDurableQueue" roles="a"/>
<permission type="deleteDurableQueue" roles="a"/>
<permission type="browse" roles="a"/>
<permission type="send" roles="a"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="a"/>
</security-setting>
<security-setting match="security_#">
<permission type="createNonDurableQueue" roles="b"/>
<permission type="deleteNonDurableQueue" roles="b"/>
<permission type="createDurableQueue" roles="b"/>
<permission type="deleteDurableQueue" roles="b"/>
<permission type="browse" roles="b"/>
<permission type="send" roles="b"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="b"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<auto-create-queues>false</auto-create-queues>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
<address-setting match="config_#">
<auto-create-queues>false</auto-create-queues>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
<address-setting match="address_settings_#">
<auto-create-queues>false</auto-create-queues>
<dead-letter-address>OriginalDLQ</dead-letter-address>
<expiry-address>OriginalExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>
<wildcard-addresses>
<delimiter>_</delimiter>
</wildcard-addresses>
<addresses>
<address name="config_test_queue_removal">
<multicast>
<queue name="config_test_queue_removal_queue_1"/>
<queue name="config_test_queue_removal_queue_2"/>
</multicast>
</address>
<address name="config_test_address_removal">
<multicast>
<queue name="config_test_address_removal_queue"/>
</multicast>
</address>
<address name="config_test_address_removal_no_queue">
<multicast>
</multicast>
</address>
<address name="config_test_queue_change">
<multicast>
<queue name="config_test_queue_change_queue" max-consumers="10" purge-on-no-consumers="false" />
</multicast>
</address>
</addresses>
</core>
</configuration>