ARTEMIS-2480 - Reloading configuration can kill broker
https://issues.apache.org/jira/browse/ARTEMIS-2480
This commit is contained in:
parent
02d3384b87
commit
42327a490a
|
@ -2029,4 +2029,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 224101, value = "Apache ActiveMQ Artemis is using a scheduled pool without remove on cancel policy, so a cancelled task could be not automatically removed from the work queue, it may also cause unbounded retention of cancelled tasks.", format = Message.Format.MESSAGE_FORMAT)
|
||||
void scheduledPoolWithNoRemoveOnCancelPolicy();
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 224102, value = "unable to undeploy address {0} : reason {1}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void unableToUndeployAddress(SimpleString addressName, String reason);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 224103, value = "unable to undeploy queue {0} : reason {1}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void unableToUndeployQueue(SimpleString queueName, String reason);
|
||||
}
|
||||
|
|
|
@ -3019,15 +3019,27 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) {
|
||||
for (Queue queue : listQueues(addressName)) {
|
||||
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
|
||||
queue.deleteQueue(true);
|
||||
try {
|
||||
queue.deleteQueue(true);
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.unableToUndeployQueue(addressName, e.getMessage());
|
||||
}
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.undeployAddress(addressName);
|
||||
removeAddressInfo(addressName, null);
|
||||
try {
|
||||
removeAddressInfo(addressName, null);
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.unableToUndeployAddress(addressName, e.getMessage());
|
||||
}
|
||||
} else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) {
|
||||
for (Queue queue : listConfiguredQueues(addressName)) {
|
||||
if (!queuesInConfig.contains(queue.getName().toString())) {
|
||||
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
|
||||
queue.deleteQueue(true);
|
||||
try {
|
||||
queue.deleteQueue(true);
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.unableToUndeployQueue(addressName, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,11 +42,14 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
|||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.security.Role;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
|
||||
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
|
@ -55,6 +58,81 @@ import org.junit.Test;
|
|||
|
||||
public class RedeployTest extends ActiveMQTestBase {
|
||||
|
||||
@Test
|
||||
/*
|
||||
* This tests that the broker doesnt fall over when it tries to delete any autocreated addresses/queues in a clustered environment
|
||||
* If the undeploy fails then bridges etc can stop working, we need to make sure if undeploy fails on anything the broker is still live
|
||||
* */
|
||||
public void testRedeployAutoCreateAddress() throws Exception {
|
||||
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
||||
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-test-autocreateaddress.xml");
|
||||
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-test-autocreateaddress-reload.xml");
|
||||
Files.copy(url1.openStream(), brokerXML);
|
||||
|
||||
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
|
||||
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
|
||||
embeddedActiveMQ.start();
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
Session session = connection.createSession();
|
||||
Queue queue = session.createQueue("autoQueue");
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.send(session.createTextMessage("text"));
|
||||
connection.start();
|
||||
MessageConsumer consumer = session.createConsumer(session.createQueue("autoQueue"));
|
||||
Assert.assertNotNull("Address wasn't autocreated accordingly", consumer.receive(5000));
|
||||
}
|
||||
|
||||
// this simulates a remote queue or other type being added that wouldnt get deleted, its not valid to have this happen but it can happen when addresses and queues are auto created in a clustered env
|
||||
embeddedActiveMQ.getActiveMQServer().getPostOffice().addBinding(new RemoteQueueBindingImpl(5L,
|
||||
new SimpleString("autoQueue"),
|
||||
new SimpleString("uniqueName"),
|
||||
new SimpleString("routingName"),
|
||||
6L,
|
||||
null,
|
||||
new FakeQueue(new SimpleString("foo"), 6L),
|
||||
new SimpleString("bridge"),
|
||||
1,
|
||||
MessageLoadBalancingType.OFF));
|
||||
|
||||
final ReusableLatch latch = new ReusableLatch(1);
|
||||
|
||||
Runnable tick = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
|
||||
|
||||
try {
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
|
||||
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
|
||||
latch.setCount(1);
|
||||
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
|
||||
Assert.assertTrue(tryConsume());
|
||||
|
||||
factory = new ActiveMQConnectionFactory();
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
Session session = connection.createSession();
|
||||
Queue queue = session.createQueue("autoQueue");
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.send(session.createTextMessage("text"));
|
||||
connection.start();
|
||||
MessageConsumer consumer = session.createConsumer(session.createQueue("autoQueue"));
|
||||
Assert.assertNotNull("autoQueue redeployed accordingly", consumer.receive(5000));
|
||||
}
|
||||
|
||||
} finally {
|
||||
embeddedActiveMQ.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRedeploy() throws Exception {
|
||||
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
|
||||
<name>0.0.0.0</name>
|
||||
|
||||
<configuration-file-refresh-period>100</configuration-file-refresh-period>
|
||||
|
||||
<persistence-enabled>false</persistence-enabled>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<!-- this could be ASYNCIO or NIO
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>-1</journal-pool-files>
|
||||
|
||||
<!--
|
||||
This value was determined through a calculation.
|
||||
Your system could perform 25 writes per millisecond
|
||||
on the current journal configuration.
|
||||
That translates as a sync write every 40000 nanoseconds
|
||||
-->
|
||||
<journal-buffer-timeout>40000</journal-buffer-timeout>
|
||||
|
||||
|
||||
<acceptors>
|
||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
||||
|
||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
|
||||
|
||||
<!-- STOMP Acceptor. -->
|
||||
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
|
||||
|
||||
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
|
||||
|
||||
<!-- MQTT Acceptor -->
|
||||
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="a"/>
|
||||
<permission type="deleteNonDurableQueue" roles="a"/>
|
||||
<permission type="createDurableQueue" roles="a"/>
|
||||
<permission type="deleteDurableQueue" roles="a"/>
|
||||
<permission type="browse" roles="a"/>
|
||||
<permission type="send" roles="a"/>
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="a"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<max-size-bytes>10Mb</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>BLOCK</address-full-policy>
|
||||
<config-delete-addresses>FORCE</config-delete-addresses>
|
||||
<config-delete-queues>FORCE</config-delete-queues>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,112 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
|
||||
<name>0.0.0.0</name>
|
||||
|
||||
<configuration-file-refresh-period>100</configuration-file-refresh-period>
|
||||
|
||||
<persistence-enabled>false</persistence-enabled>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<!-- this could be ASYNCIO or NIO
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>-1</journal-pool-files>
|
||||
|
||||
<!--
|
||||
This value was determined through a calculation.
|
||||
Your system could perform 25 writes per millisecond
|
||||
on the current journal configuration.
|
||||
That translates as a sync write every 40000 nanoseconds
|
||||
-->
|
||||
<journal-buffer-timeout>40000</journal-buffer-timeout>
|
||||
|
||||
|
||||
<acceptors>
|
||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
||||
|
||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
|
||||
|
||||
<!-- STOMP Acceptor. -->
|
||||
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
|
||||
|
||||
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
|
||||
|
||||
<!-- MQTT Acceptor -->
|
||||
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="a"/>
|
||||
<permission type="deleteNonDurableQueue" roles="a"/>
|
||||
<permission type="createDurableQueue" roles="a"/>
|
||||
<permission type="deleteDurableQueue" roles="a"/>
|
||||
<permission type="browse" roles="a"/>
|
||||
<permission type="send" roles="a"/>
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="a"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-delete-addresses>false</auto-delete-addresses>
|
||||
<auto-delete-queues>false</auto-delete-queues>
|
||||
<auto-delete-created-queues>false</auto-delete-created-queues>
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<max-size-bytes>10Mb</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>BLOCK</address-full-policy>
|
||||
<config-delete-addresses>FORCE</config-delete-addresses>
|
||||
<config-delete-queues>FORCE</config-delete-queues>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
</core>
|
||||
</configuration>
|
Loading…
Reference in New Issue