This commit is contained in:
Francesco Nigro 2019-09-11 17:37:11 +02:00
commit f3b984dbf7
5 changed files with 322 additions and 3 deletions

View File

@ -2029,4 +2029,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 224101, value = "Apache ActiveMQ Artemis is using a scheduled pool without remove on cancel policy, so a cancelled task could be not automatically removed from the work queue, it may also cause unbounded retention of cancelled tasks.", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224101, value = "Apache ActiveMQ Artemis is using a scheduled pool without remove on cancel policy, so a cancelled task could be not automatically removed from the work queue, it may also cause unbounded retention of cancelled tasks.", format = Message.Format.MESSAGE_FORMAT)
void scheduledPoolWithNoRemoveOnCancelPolicy(); void scheduledPoolWithNoRemoveOnCancelPolicy();
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224102, value = "unable to undeploy address {0} : reason {1}", format = Message.Format.MESSAGE_FORMAT)
void unableToUndeployAddress(SimpleString addressName, String reason);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224103, value = "unable to undeploy queue {0} : reason {1}", format = Message.Format.MESSAGE_FORMAT)
void unableToUndeployQueue(SimpleString queueName, String reason);
} }

View File

@ -3019,15 +3019,27 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) { if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) {
for (Queue queue : listQueues(addressName)) { for (Queue queue : listQueues(addressName)) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName()); ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
queue.deleteQueue(true); try {
queue.deleteQueue(true);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToUndeployQueue(addressName, e.getMessage());
}
} }
ActiveMQServerLogger.LOGGER.undeployAddress(addressName); ActiveMQServerLogger.LOGGER.undeployAddress(addressName);
removeAddressInfo(addressName, null); try {
removeAddressInfo(addressName, null);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToUndeployAddress(addressName, e.getMessage());
}
} else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) { } else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) {
for (Queue queue : listConfiguredQueues(addressName)) { for (Queue queue : listConfiguredQueues(addressName)) {
if (!queuesInConfig.contains(queue.getName().toString())) { if (!queuesInConfig.contains(queue.getName().toString())) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName()); ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
queue.deleteQueue(true); try {
queue.deleteQueue(true);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToUndeployQueue(addressName, e.getMessage());
}
} }
} }
} }

View File

@ -42,11 +42,14 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
@ -55,6 +58,81 @@ import org.junit.Test;
public class RedeployTest extends ActiveMQTestBase { public class RedeployTest extends ActiveMQTestBase {
@Test
/*
* This tests that the broker doesnt fall over when it tries to delete any autocreated addresses/queues in a clustered environment
* If the undeploy fails then bridges etc can stop working, we need to make sure if undeploy fails on anything the broker is still live
* */
public void testRedeployAutoCreateAddress() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-test-autocreateaddress.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-test-autocreateaddress-reload.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("autoQueue");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("text"));
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue("autoQueue"));
Assert.assertNotNull("Address wasn't autocreated accordingly", consumer.receive(5000));
}
// this simulates a remote queue or other type being added that wouldnt get deleted, its not valid to have this happen but it can happen when addresses and queues are auto created in a clustered env
embeddedActiveMQ.getActiveMQServer().getPostOffice().addBinding(new RemoteQueueBindingImpl(5L,
new SimpleString("autoQueue"),
new SimpleString("uniqueName"),
new SimpleString("routingName"),
6L,
null,
new FakeQueue(new SimpleString("foo"), 6L),
new SimpleString("bridge"),
1,
MessageLoadBalancingType.OFF));
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = new Runnable() {
@Override
public void run() {
latch.countDown();
}
};
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
try {
latch.await(10, TimeUnit.SECONDS);
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
Assert.assertTrue(tryConsume());
factory = new ActiveMQConnectionFactory();
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("autoQueue");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("text"));
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue("autoQueue"));
Assert.assertNotNull("autoQueue redeployed accordingly", consumer.receive(5000));
}
} finally {
embeddedActiveMQ.stop();
}
}
@Test @Test
public void testRedeploy() throws Exception { public void testRedeploy() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");

View File

@ -0,0 +1,109 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<name>0.0.0.0</name>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<!--
This value was determined through a calculation.
Your system could perform 25 writes per millisecond
on the current journal configuration.
That translates as a sync write every 40000 nanoseconds
-->
<journal-buffer-timeout>40000</journal-buffer-timeout>
<acceptors>
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="a"/>
<permission type="deleteNonDurableQueue" roles="a"/>
<permission type="createDurableQueue" roles="a"/>
<permission type="deleteDurableQueue" roles="a"/>
<permission type="browse" roles="a"/>
<permission type="send" roles="a"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="a"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<auto-create-addresses>true</auto-create-addresses>
<auto-create-queues>true</auto-create-queues>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
<config-delete-addresses>FORCE</config-delete-addresses>
<config-delete-queues>FORCE</config-delete-queues>
</address-setting>
</address-settings>
</core>
</configuration>

View File

@ -0,0 +1,112 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<name>0.0.0.0</name>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<!--
This value was determined through a calculation.
Your system could perform 25 writes per millisecond
on the current journal configuration.
That translates as a sync write every 40000 nanoseconds
-->
<journal-buffer-timeout>40000</journal-buffer-timeout>
<acceptors>
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="a"/>
<permission type="deleteNonDurableQueue" roles="a"/>
<permission type="createDurableQueue" roles="a"/>
<permission type="deleteDurableQueue" roles="a"/>
<permission type="browse" roles="a"/>
<permission type="send" roles="a"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="a"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<auto-create-addresses>true</auto-create-addresses>
<auto-create-queues>true</auto-create-queues>
<auto-delete-addresses>false</auto-delete-addresses>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-created-queues>false</auto-delete-created-queues>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
<config-delete-addresses>FORCE</config-delete-addresses>
<config-delete-queues>FORCE</config-delete-queues>
</address-setting>
</address-settings>
</core>
</configuration>