This closes #2285
This commit is contained in:
commit
c811ccbeb5
|
@ -44,6 +44,7 @@ import io.netty.channel.Channel;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
|
@ -1898,12 +1899,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
void criticalSystemLog(Object component);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 224076, value = "UnDeploying address {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
@Message(id = 224076, value = "Undeploying address {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void undeployAddress(SimpleString addressName);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 224077, value = "UnDeploying queue {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void undeployQueue(SimpleString queueName);
|
||||
@Message(id = 224077, value = "Undeploying {0} queue {1}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void undeployQueue(RoutingType routingType, SimpleString queueName);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 224078, value = "The size of duplicate cache detection (<id_cache-size/>) appears to be too large {0}. It should be no greater than the number of messages that can be squeezed into conformation buffer (<confirmation-window-size/>) {1}.", format = Message.Format.MESSAGE_FORMAT)
|
||||
|
|
|
@ -2601,6 +2601,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
// Undeploy any addresses and queues not in config
|
||||
undeployAddressesAndQueueNotInConfiguration();
|
||||
|
||||
// Deploy the rest of the stuff
|
||||
|
||||
// Deploy predefined addresses
|
||||
|
@ -2609,9 +2612,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
// Deploy any predefined queues
|
||||
deployQueuesFromConfiguration();
|
||||
|
||||
// Undeploy any addresses and queues not in config
|
||||
undeployAddressesAndQueueNotInConfiguration();
|
||||
|
||||
// We need to call this here, this gives any dependent server a chance to deploy its own addresses
|
||||
// this needs to be done before clustering is fully activated
|
||||
callActivateCallbacks();
|
||||
|
@ -2698,25 +2698,28 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
.map(CoreAddressConfiguration::getName)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
Set<String> queuesInConfig = configuration.getAddressConfigurations().stream()
|
||||
.map(CoreAddressConfiguration::getQueueConfigurations)
|
||||
.flatMap(List::stream).map(CoreQueueConfiguration::getName)
|
||||
.collect(Collectors.toSet());
|
||||
Set<String> queuesInConfig = new HashSet<>();
|
||||
for (CoreAddressConfiguration cac : configuration.getAddressConfigurations()) {
|
||||
for (CoreQueueConfiguration cqc : cac.getQueueConfigurations()) {
|
||||
// combine the routing-type and queue name as the unique identifier as it's possible to change the routing-type without changing the name
|
||||
queuesInConfig.add(cqc.getRoutingType().toString() + cqc.getName());
|
||||
}
|
||||
}
|
||||
|
||||
for (SimpleString addressName : listAddressNames()) {
|
||||
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
|
||||
|
||||
if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) {
|
||||
for (Queue queue : listQueues(addressName)) {
|
||||
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
|
||||
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
|
||||
queue.deleteQueue(true);
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.undeployAddress(addressName);
|
||||
removeAddressInfo(addressName, null);
|
||||
} else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) {
|
||||
for (Queue queue : listConfiguredQueues(addressName)) {
|
||||
if (!queuesInConfig.contains(queue.getName().toString())) {
|
||||
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
|
||||
if (!queuesInConfig.contains(queue.getRoutingType().toString() + queue.getName().toString())) {
|
||||
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
|
||||
queue.deleteQueue(true);
|
||||
}
|
||||
}
|
||||
|
@ -3441,8 +3444,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
|
||||
ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses");
|
||||
deployAddressesFromConfiguration(config);
|
||||
undeployAddressesAndQueueNotInConfiguration(config);
|
||||
deployAddressesFromConfiguration(config);
|
||||
configuration.setAddressConfigurations(config.getAddressConfigurations());
|
||||
configuration.setQueueConfigurations(config.getQueueConfigurations());
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.security.Role;
|
||||
|
@ -196,9 +197,6 @@ public class RedeployTest extends ActiveMQTestBase {
|
|||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testRedeployAddressQueue() throws Exception {
|
||||
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
||||
|
@ -268,6 +266,46 @@ public class RedeployTest extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRedeployChangeQueueRoutingType() throws Exception {
|
||||
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
||||
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype.xml");
|
||||
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype-updated.xml");
|
||||
Files.copy(url1.openStream(), brokerXML);
|
||||
|
||||
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
|
||||
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
|
||||
embeddedActiveMQ.start();
|
||||
|
||||
final ReusableLatch latch = new ReusableLatch(1);
|
||||
|
||||
Runnable tick = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
|
||||
|
||||
try {
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
|
||||
Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
|
||||
|
||||
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
|
||||
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
|
||||
latch.setCount(1);
|
||||
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
|
||||
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
|
||||
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
|
||||
} finally {
|
||||
embeddedActiveMQ.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.persistence;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ConfigChangeTest extends ActiveMQTestBase {
|
||||
|
||||
private ActiveMQServer server;
|
||||
|
||||
@Test
|
||||
public void testChangeQueueRoutingTypeOnRestart() throws Exception {
|
||||
internalTestChangeQueueRoutingTypeOnRestart(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChangeQueueRoutingTypeOnRestartNegative() throws Exception {
|
||||
internalTestChangeQueueRoutingTypeOnRestart(true);
|
||||
}
|
||||
|
||||
public void internalTestChangeQueueRoutingTypeOnRestart(boolean negative) throws Exception {
|
||||
// if negative == true then the queue's routing type should *not* change
|
||||
|
||||
Configuration configuration = createDefaultInVMConfig();
|
||||
configuration.addAddressesSetting("#", new AddressSettings()
|
||||
.setConfigDeleteQueues(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE)
|
||||
.setConfigDeleteAddresses(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE));
|
||||
|
||||
List addressConfigurations = new ArrayList();
|
||||
CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration()
|
||||
.setName("myAddress")
|
||||
.addRoutingType(RoutingType.ANYCAST)
|
||||
.addQueueConfiguration(new CoreQueueConfiguration()
|
||||
.setName("myQueue")
|
||||
.setAddress("myAddress")
|
||||
.setRoutingType(RoutingType.ANYCAST));
|
||||
addressConfigurations.add(addressConfiguration);
|
||||
configuration.setAddressConfigurations(addressConfigurations);
|
||||
server = createServer(true, configuration);
|
||||
server.start();
|
||||
server.stop();
|
||||
|
||||
addressConfiguration = new CoreAddressConfiguration()
|
||||
.setName("myAddress")
|
||||
.addRoutingType(RoutingType.MULTICAST)
|
||||
.addQueueConfiguration(new CoreQueueConfiguration()
|
||||
.setName("myQueue")
|
||||
.setAddress("myAddress")
|
||||
.setRoutingType(RoutingType.MULTICAST));
|
||||
addressConfigurations.clear();
|
||||
addressConfigurations.add(addressConfiguration);
|
||||
configuration.setAddressConfigurations(addressConfigurations);
|
||||
|
||||
server.start();
|
||||
assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType());
|
||||
assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType());
|
||||
server.stop();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
<address-settings>
|
||||
<address-setting match="#">
|
||||
<config-delete-queues>FORCE</config-delete-queues>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="myAddress">
|
||||
<anycast>
|
||||
<queue name="myQueue"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,40 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
<address-settings>
|
||||
<address-setting match="#">
|
||||
<config-delete-queues>FORCE</config-delete-queues>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="myAddress">
|
||||
<multicast>
|
||||
<queue name="myQueue"/>
|
||||
</multicast>
|
||||
</address>
|
||||
</addresses>
|
||||
</core>
|
||||
</configuration>
|
Loading…
Reference in New Issue