ARTEMIS-2065 Can't change queue routing-type between restarts

This commit is contained in:
Justin Bertram 2018-08-30 13:21:07 -05:00
parent 4f1e74b7f5
commit 3827c54c05
6 changed files with 225 additions and 17 deletions

View File

@ -44,6 +44,7 @@ import io.netty.channel.Channel;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
@ -1898,12 +1899,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
void criticalSystemLog(Object component); void criticalSystemLog(Object component);
@LogMessage(level = Logger.Level.INFO) @LogMessage(level = Logger.Level.INFO)
@Message(id = 224076, value = "UnDeploying address {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224076, value = "Undeploying address {0}", format = Message.Format.MESSAGE_FORMAT)
void undeployAddress(SimpleString addressName); void undeployAddress(SimpleString addressName);
@LogMessage(level = Logger.Level.INFO) @LogMessage(level = Logger.Level.INFO)
@Message(id = 224077, value = "UnDeploying queue {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224077, value = "Undeploying {0} queue {1}", format = Message.Format.MESSAGE_FORMAT)
void undeployQueue(SimpleString queueName); void undeployQueue(RoutingType routingType, SimpleString queueName);
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 224078, value = "The size of duplicate cache detection (<id_cache-size/>) appears to be too large {0}. It should be no greater than the number of messages that can be squeezed into conformation buffer (<confirmation-window-size/>) {1}.", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224078, value = "The size of duplicate cache detection (<id_cache-size/>) appears to be too large {0}. It should be no greater than the number of messages that can be squeezed into conformation buffer (<confirmation-window-size/>) {1}.", format = Message.Format.MESSAGE_FORMAT)

View File

@ -2601,6 +2601,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}, 0, dumpInfoInterval, TimeUnit.MILLISECONDS); }, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
} }
// Undeploy any addresses and queues not in config
undeployAddressesAndQueueNotInConfiguration();
// Deploy the rest of the stuff // Deploy the rest of the stuff
// Deploy predefined addresses // Deploy predefined addresses
@ -2609,9 +2612,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// Deploy any predefined queues // Deploy any predefined queues
deployQueuesFromConfiguration(); deployQueuesFromConfiguration();
// Undeploy any addresses and queues not in config
undeployAddressesAndQueueNotInConfiguration();
// We need to call this here, this gives any dependent server a chance to deploy its own addresses // We need to call this here, this gives any dependent server a chance to deploy its own addresses
// this needs to be done before clustering is fully activated // this needs to be done before clustering is fully activated
callActivateCallbacks(); callActivateCallbacks();
@ -2698,25 +2698,28 @@ public class ActiveMQServerImpl implements ActiveMQServer {
.map(CoreAddressConfiguration::getName) .map(CoreAddressConfiguration::getName)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
Set<String> queuesInConfig = configuration.getAddressConfigurations().stream() Set<String> queuesInConfig = new HashSet<>();
.map(CoreAddressConfiguration::getQueueConfigurations) for (CoreAddressConfiguration cac : configuration.getAddressConfigurations()) {
.flatMap(List::stream).map(CoreQueueConfiguration::getName) for (CoreQueueConfiguration cqc : cac.getQueueConfigurations()) {
.collect(Collectors.toSet()); // combine the routing-type and queue name as the unique identifier as it's possible to change the routing-type without changing the name
queuesInConfig.add(cqc.getRoutingType().toString() + cqc.getName());
}
}
for (SimpleString addressName : listAddressNames()) { for (SimpleString addressName : listAddressNames()) {
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString()); AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) { if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) {
for (Queue queue : listQueues(addressName)) { for (Queue queue : listQueues(addressName)) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName()); ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
queue.deleteQueue(true); queue.deleteQueue(true);
} }
ActiveMQServerLogger.LOGGER.undeployAddress(addressName); ActiveMQServerLogger.LOGGER.undeployAddress(addressName);
removeAddressInfo(addressName, null); removeAddressInfo(addressName, null);
} else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) { } else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) {
for (Queue queue : listConfiguredQueues(addressName)) { for (Queue queue : listConfiguredQueues(addressName)) {
if (!queuesInConfig.contains(queue.getName().toString())) { if (!queuesInConfig.contains(queue.getRoutingType().toString() + queue.getName().toString())) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName()); ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
queue.deleteQueue(true); queue.deleteQueue(true);
} }
} }
@ -3441,8 +3444,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses"); ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses");
deployAddressesFromConfiguration(config);
undeployAddressesAndQueueNotInConfiguration(config); undeployAddressesAndQueueNotInConfiguration(config);
deployAddressesFromConfiguration(config);
configuration.setAddressConfigurations(config.getAddressConfigurations()); configuration.setAddressConfigurations(config.getAddressConfigurations());
configuration.setQueueConfigurations(config.getQueueConfigurations()); configuration.setQueueConfigurations(config.getQueueConfigurations());
} }

View File

@ -32,6 +32,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
@ -196,9 +197,6 @@ public class RedeployTest extends ActiveMQTestBase {
} }
@Test @Test
public void testRedeployAddressQueue() throws Exception { public void testRedeployAddressQueue() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
@ -268,6 +266,46 @@ public class RedeployTest extends ActiveMQTestBase {
} }
} }
@Test
public void testRedeployChangeQueueRoutingType() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype-updated.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = new Runnable() {
@Override
public void run() {
latch.countDown();
}
};
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
try {
latch.await(10, TimeUnit.SECONDS);
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
} finally {
embeddedActiveMQ.stop();
}
}
/** /**

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;
public class ConfigChangeTest extends ActiveMQTestBase {
private ActiveMQServer server;
@Test
public void testChangeQueueRoutingTypeOnRestart() throws Exception {
internalTestChangeQueueRoutingTypeOnRestart(false);
}
@Test
public void testChangeQueueRoutingTypeOnRestartNegative() throws Exception {
internalTestChangeQueueRoutingTypeOnRestart(true);
}
public void internalTestChangeQueueRoutingTypeOnRestart(boolean negative) throws Exception {
// if negative == true then the queue's routing type should *not* change
Configuration configuration = createDefaultInVMConfig();
configuration.addAddressesSetting("#", new AddressSettings()
.setConfigDeleteQueues(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE)
.setConfigDeleteAddresses(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE));
List addressConfigurations = new ArrayList();
CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration()
.setName("myAddress")
.addRoutingType(RoutingType.ANYCAST)
.addQueueConfiguration(new CoreQueueConfiguration()
.setName("myQueue")
.setAddress("myAddress")
.setRoutingType(RoutingType.ANYCAST));
addressConfigurations.add(addressConfiguration);
configuration.setAddressConfigurations(addressConfigurations);
server = createServer(true, configuration);
server.start();
server.stop();
addressConfiguration = new CoreAddressConfiguration()
.setName("myAddress")
.addRoutingType(RoutingType.MULTICAST)
.addQueueConfiguration(new CoreQueueConfiguration()
.setName("myQueue")
.setAddress("myAddress")
.setRoutingType(RoutingType.MULTICAST));
addressConfigurations.clear();
addressConfigurations.add(addressConfiguration);
configuration.setAddressConfigurations(addressConfigurations);
server.start();
assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType());
assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType());
server.stop();
}
}

View File

@ -0,0 +1,40 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<address-settings>
<address-setting match="#">
<config-delete-queues>FORCE</config-delete-queues>
</address-setting>
</address-settings>
<addresses>
<address name="myAddress">
<anycast>
<queue name="myQueue"/>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,40 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<address-settings>
<address-setting match="#">
<config-delete-queues>FORCE</config-delete-queues>
</address-setting>
</address-settings>
<addresses>
<address name="myAddress">
<multicast>
<queue name="myQueue"/>
</multicast>
</address>
</addresses>
</core>
</configuration>