From 42327a490ad262df99828830c24be395be2c66ed Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Tue, 10 Sep 2019 09:45:25 +0100 Subject: [PATCH] ARTEMIS-2480 - Reloading configuration can kill broker https://issues.apache.org/jira/browse/ARTEMIS-2480 --- .../core/server/ActiveMQServerLogger.java | 8 ++ .../core/server/impl/ActiveMQServerImpl.java | 18 ++- .../tests/integration/jms/RedeployTest.java | 78 ++++++++++++ .../reload-test-autocreateaddress-reload.xml | 109 +++++++++++++++++ .../reload-test-autocreateaddress.xml | 112 ++++++++++++++++++ 5 files changed, 322 insertions(+), 3 deletions(-) create mode 100644 tests/integration-tests/src/test/resources/reload-test-autocreateaddress-reload.xml create mode 100644 tests/integration-tests/src/test/resources/reload-test-autocreateaddress.xml diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 8f760148d9..4952fce92a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -2029,4 +2029,12 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.WARN) @Message(id = 224101, value = "Apache ActiveMQ Artemis is using a scheduled pool without remove on cancel policy, so a cancelled task could be not automatically removed from the work queue, it may also cause unbounded retention of cancelled tasks.", format = Message.Format.MESSAGE_FORMAT) void scheduledPoolWithNoRemoveOnCancelPolicy(); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 224102, value = "unable to undeploy address {0} : reason {1}", format = Message.Format.MESSAGE_FORMAT) + void unableToUndeployAddress(SimpleString addressName, String reason); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 224103, value = "unable to undeploy queue {0} : reason {1}", format = Message.Format.MESSAGE_FORMAT) + void unableToUndeployQueue(SimpleString queueName, String reason); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 9b5c3cb188..881b01ec4e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -3019,15 +3019,27 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) { for (Queue queue : listQueues(addressName)) { ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName()); - queue.deleteQueue(true); + try { + queue.deleteQueue(true); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.unableToUndeployQueue(addressName, e.getMessage()); + } } ActiveMQServerLogger.LOGGER.undeployAddress(addressName); - removeAddressInfo(addressName, null); + try { + removeAddressInfo(addressName, null); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.unableToUndeployAddress(addressName, e.getMessage()); + } } else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) { for (Queue queue : listConfiguredQueues(addressName)) { if (!queuesInConfig.contains(queue.getName().toString())) { ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName()); - queue.deleteQueue(true); + try { + queue.deleteQueue(true); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.unableToUndeployQueue(addressName, e.getMessage()); + } } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java index 643d10361c..e94f12b4eb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java @@ -42,11 +42,14 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl; import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ReusableLatch; @@ -55,6 +58,81 @@ import org.junit.Test; public class RedeployTest extends ActiveMQTestBase { + @Test + /* + * This tests that the broker doesnt fall over when it tries to delete any autocreated addresses/queues in a clustered environment + * If the undeploy fails then bridges etc can stop working, we need to make sure if undeploy fails on anything the broker is still live + * */ + public void testRedeployAutoCreateAddress() throws Exception { + Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); + URL url1 = RedeployTest.class.getClassLoader().getResource("reload-test-autocreateaddress.xml"); + URL url2 = RedeployTest.class.getClassLoader().getResource("reload-test-autocreateaddress-reload.xml"); + Files.copy(url1.openStream(), brokerXML); + + EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ(); + embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString()); + embeddedActiveMQ.start(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(); + Queue queue = session.createQueue("autoQueue"); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("text")); + connection.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue("autoQueue")); + Assert.assertNotNull("Address wasn't autocreated accordingly", consumer.receive(5000)); + } + + // this simulates a remote queue or other type being added that wouldnt get deleted, its not valid to have this happen but it can happen when addresses and queues are auto created in a clustered env + embeddedActiveMQ.getActiveMQServer().getPostOffice().addBinding(new RemoteQueueBindingImpl(5L, + new SimpleString("autoQueue"), + new SimpleString("uniqueName"), + new SimpleString("routingName"), + 6L, + null, + new FakeQueue(new SimpleString("foo"), 6L), + new SimpleString("bridge"), + 1, + MessageLoadBalancingType.OFF)); + + final ReusableLatch latch = new ReusableLatch(1); + + Runnable tick = new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }; + + embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick); + + try { + latch.await(10, TimeUnit.SECONDS); + Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING); + brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000); + latch.setCount(1); + embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick); + latch.await(10, TimeUnit.SECONDS); + + Assert.assertTrue(tryConsume()); + + factory = new ActiveMQConnectionFactory(); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(); + Queue queue = session.createQueue("autoQueue"); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("text")); + connection.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue("autoQueue")); + Assert.assertNotNull("autoQueue redeployed accordingly", consumer.receive(5000)); + } + + } finally { + embeddedActiveMQ.stop(); + } + } + @Test public void testRedeploy() throws Exception { Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); diff --git a/tests/integration-tests/src/test/resources/reload-test-autocreateaddress-reload.xml b/tests/integration-tests/src/test/resources/reload-test-autocreateaddress-reload.xml new file mode 100644 index 0000000000..23a703bf00 --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-test-autocreateaddress-reload.xml @@ -0,0 +1,109 @@ + + + + + + + + 0.0.0.0 + + 100 + + false + + false + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + 2 + + -1 + + + 40000 + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576 + + + tcp://0.0.0.0:5672?protocols=AMQP + + + tcp://0.0.0.0:61613?protocols=STOMP + + + tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP + + + tcp://0.0.0.0:1883?protocols=MQTT + + + + + + + + + + + + + + + + + + + + + true + true + DLQ + ExpiryQueue + 0 + 10Mb + 10 + BLOCK + FORCE + FORCE + + + + diff --git a/tests/integration-tests/src/test/resources/reload-test-autocreateaddress.xml b/tests/integration-tests/src/test/resources/reload-test-autocreateaddress.xml new file mode 100644 index 0000000000..92ce8758a5 --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-test-autocreateaddress.xml @@ -0,0 +1,112 @@ + + + + + + + + 0.0.0.0 + + 100 + + false + + false + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + 2 + + -1 + + + 40000 + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576 + + + tcp://0.0.0.0:5672?protocols=AMQP + + + tcp://0.0.0.0:61613?protocols=STOMP + + + tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP + + + tcp://0.0.0.0:1883?protocols=MQTT + + + + + + + + + + + + + + + + + + + + + true + true + false + false + false + DLQ + ExpiryQueue + 0 + 10Mb + 10 + BLOCK + FORCE + FORCE + + + +