ARTEMIS-4190 Fix config-delete-queues when address changes
This commit is contained in:
parent
bb08a573eb
commit
7a0bf52ed8
|
@ -216,6 +216,7 @@ import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.util.stream.Collectors.groupingBy;
|
||||
import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
|
||||
|
||||
/**
|
||||
|
@ -3547,10 +3548,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
.map(CoreAddressConfiguration::getName)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
Set<SimpleString> queuesInConfig = configuration.getAddressConfigurations().stream()
|
||||
Map<SimpleString, List<QueueConfiguration>> queuesInConfig = configuration.getAddressConfigurations().stream()
|
||||
.map(CoreAddressConfiguration::getQueueConfigs)
|
||||
.flatMap(List::stream).map(QueueConfiguration::getName)
|
||||
.collect(Collectors.toSet());
|
||||
.flatMap(List::stream).collect(groupingBy(QueueConfiguration::getName));
|
||||
|
||||
for (SimpleString addressName : listAddressNames()) {
|
||||
AddressInfo addressInfo = getAddressInfo(addressName);
|
||||
|
@ -3574,7 +3574,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
} else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) {
|
||||
for (Queue queue : listConfiguredQueues(addressName)) {
|
||||
if (!queuesInConfig.contains(queue.getName())) {
|
||||
List<QueueConfiguration> queueConfigsInConfig = queuesInConfig.get(queue.getName());
|
||||
if (queueConfigsInConfig == null || !queueConfigsInConfig.stream().anyMatch(
|
||||
queueConfiguration -> queueConfiguration.getAddress().equals(addressName))) {
|
||||
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
|
||||
try {
|
||||
queue.deleteQueue(true);
|
||||
|
|
|
@ -1243,6 +1243,52 @@ public class RedeployTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRedeployChangeAddressQueueRoutingType() throws Exception {
|
||||
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
||||
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-address-queue-routingtype.xml");
|
||||
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-address-queue-routingtype-updated.xml");
|
||||
Files.copy(url1.openStream(), brokerXML);
|
||||
|
||||
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
|
||||
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
|
||||
embeddedActiveMQ.start();
|
||||
|
||||
final ReusableLatch latch = new ReusableLatch(1);
|
||||
|
||||
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(latch::countDown);
|
||||
|
||||
try {
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "TEST.QUEUE.0"));
|
||||
Assert.assertTrue(getAddressInfo(embeddedActiveMQ, "TEST.QUEUE.0").getRoutingTypes().contains(RoutingType.ANYCAST));
|
||||
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "TEST.QUEUE.0.1").getRoutingType());
|
||||
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "TEST.QUEUE.0.2").getRoutingType());
|
||||
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "TEST.QUEUE.0.3").getRoutingType());
|
||||
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "TEST.QUEUE.0.4").getRoutingType());
|
||||
|
||||
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
|
||||
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
|
||||
latch.setCount(1);
|
||||
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(latch::countDown);
|
||||
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
|
||||
|
||||
AddressInfo testQueue0AddressInfo = getAddressInfo(embeddedActiveMQ, "TEST.QUEUE.0");
|
||||
Assert.assertNotNull(testQueue0AddressInfo);
|
||||
Assert.assertTrue(testQueue0AddressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
|
||||
Assert.assertTrue(testQueue0AddressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
|
||||
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "TEST.QUEUE.0.1").getRoutingType());
|
||||
Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "TEST.QUEUE.0.2").getRoutingType());
|
||||
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "TEST.QUEUE.0.3"));
|
||||
Assert.assertTrue(getAddressInfo(embeddedActiveMQ, "TEST.QUEUE.0.3").getRoutingTypes().contains(RoutingType.ANYCAST));
|
||||
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "TEST.QUEUE.0.3").getRoutingType());
|
||||
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "TEST.QUEUE.0.4"));
|
||||
Assert.assertTrue(getAddressInfo(embeddedActiveMQ, "TEST.QUEUE.0.4").getRoutingTypes().contains(RoutingType.MULTICAST));
|
||||
Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "TEST.QUEUE.0.4").getRoutingType());
|
||||
} finally {
|
||||
embeddedActiveMQ.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simulates Stop and Start that occurs when network health checker stops the server when network is detected unhealthy
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
<security-enabled>false</security-enabled>
|
||||
<persistence-enabled>false</persistence-enabled>
|
||||
<configuration-file-refresh-period>100</configuration-file-refresh-period>
|
||||
|
||||
<acceptors>
|
||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
<address-settings>
|
||||
<address-setting match="#">
|
||||
<config-delete-addresses>FORCE</config-delete-addresses>
|
||||
<config-delete-queues>FORCE</config-delete-queues>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="TEST.QUEUE.0">
|
||||
<anycast>
|
||||
<queue name="TEST.QUEUE.0.1"/>
|
||||
</anycast>
|
||||
<multicast>
|
||||
<queue name="TEST.QUEUE.0.2"/>
|
||||
</multicast>
|
||||
</address>
|
||||
<address name="TEST.QUEUE.0.3">
|
||||
<anycast>
|
||||
<queue name="TEST.QUEUE.0.3"/>
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="TEST.QUEUE.0.4">
|
||||
<multicast>
|
||||
<queue name="TEST.QUEUE.0.4"/>
|
||||
</multicast>
|
||||
</address>
|
||||
</addresses>
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,55 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
<security-enabled>false</security-enabled>
|
||||
<persistence-enabled>false</persistence-enabled>
|
||||
<configuration-file-refresh-period>100</configuration-file-refresh-period>
|
||||
|
||||
<acceptors>
|
||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
<address-settings>
|
||||
<address-setting match="#">
|
||||
<config-delete-addresses>FORCE</config-delete-addresses>
|
||||
<config-delete-queues>FORCE</config-delete-queues>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="TEST.QUEUE.0">
|
||||
<anycast>
|
||||
<queue name="TEST.QUEUE.0.1"/>
|
||||
<queue name="TEST.QUEUE.0.2"/>
|
||||
<queue name="TEST.QUEUE.0.3"/>
|
||||
<queue name="TEST.QUEUE.0.4"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
</core>
|
||||
</configuration>
|
Loading…
Reference in New Issue