ARTEMIS-2065 Change routing-type isnt destructive.

Revert previous fix
Keep original ConfigChangeTest
Apply new non-destructive fix.
Enhance tests to ensure messages in queues are not lost either on reload when running or when config changed on-restart (e.g. queue i not destroyed)
This commit is contained in:
Michael André Pearce 2018-09-06 18:41:21 +01:00 committed by Clebert Suconic
parent 0adee3c33f
commit dbfdc18f49
6 changed files with 107 additions and 60 deletions

View File

@ -44,7 +44,6 @@ import io.netty.channel.Channel;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration;
@ -1903,8 +1902,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void undeployAddress(SimpleString addressName);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224077, value = "Undeploying {0} queue {1}", format = Message.Format.MESSAGE_FORMAT)
void undeployQueue(RoutingType routingType, SimpleString queueName);
@Message(id = 224077, value = "Undeploying queue {0}", format = Message.Format.MESSAGE_FORMAT)
void undeployQueue(SimpleString queueName);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224078, value = "The size of duplicate cache detection (<id_cache-size/>) appears to be too large {0}. It should be no greater than the number of messages that can be squeezed into conformation buffer (<confirmation-window-size/>) {1}.", format = Message.Format.MESSAGE_FORMAT)

View File

@ -2601,9 +2601,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
}
// Undeploy any addresses and queues not in config
undeployAddressesAndQueueNotInConfiguration();
// Deploy the rest of the stuff
// Deploy predefined addresses
@ -2612,6 +2609,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// Deploy any predefined queues
deployQueuesFromConfiguration();
// Undeploy any addresses and queues not in config
undeployAddressesAndQueueNotInConfiguration();
// We need to call this here, this gives any dependent server a chance to deploy its own addresses
// this needs to be done before clustering is fully activated
callActivateCallbacks();
@ -2695,31 +2695,28 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void undeployAddressesAndQueueNotInConfiguration(Configuration configuration) throws Exception {
Set<String> addressesInConfig = configuration.getAddressConfigurations().stream()
.map(CoreAddressConfiguration::getName)
.collect(Collectors.toSet());
.map(CoreAddressConfiguration::getName)
.collect(Collectors.toSet());
Set<String> queuesInConfig = new HashSet<>();
for (CoreAddressConfiguration cac : configuration.getAddressConfigurations()) {
for (CoreQueueConfiguration cqc : cac.getQueueConfigurations()) {
// combine the routing-type and queue name as the unique identifier as it's possible to change the routing-type without changing the name
queuesInConfig.add(cqc.getRoutingType().toString() + cqc.getName());
}
}
Set<String> queuesInConfig = configuration.getAddressConfigurations().stream()
.map(CoreAddressConfiguration::getQueueConfigurations)
.flatMap(List::stream).map(CoreQueueConfiguration::getName)
.collect(Collectors.toSet());
for (SimpleString addressName : listAddressNames()) {
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) {
for (Queue queue : listQueues(addressName)) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
queue.deleteQueue(true);
}
ActiveMQServerLogger.LOGGER.undeployAddress(addressName);
removeAddressInfo(addressName, null);
} else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) {
for (Queue queue : listConfiguredQueues(addressName)) {
if (!queuesInConfig.contains(queue.getRoutingType().toString() + queue.getName().toString())) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
if (!queuesInConfig.contains(queue.getName().toString())) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
queue.deleteQueue(true);
}
}
@ -2747,15 +2744,38 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) {
try {
ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString());
AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes());
addOrUpdateAddressInfo(info);
SimpleString address = SimpleString.toSimpleString(config.getName());
AddressInfo tobe = new AddressInfo(address, config.getRoutingTypes());
//During this stage until all queues re-configured we combine the current (if exists) with to-be routing types to allow changes in queues
AddressInfo current = getAddressInfo(address);
AddressInfo merged = new AddressInfo(address, tobe.getRoutingType());
if (current != null) {
merged.getRoutingTypes().addAll(current.getRoutingTypes());
}
addOrUpdateAddressInfo(merged);
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
//Now all queues updated we apply the actual address info expected tobe.
addOrUpdateAddressInfo(tobe);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.problemDeployingAddress(config.getName(), e.getMessage());
}
}
}
private AddressInfo mergedRoutingTypes(SimpleString address, AddressInfo... addressInfos) {
EnumSet<RoutingType> mergedRoutingTypes = EnumSet.noneOf(RoutingType.class);
for (AddressInfo addressInfo : addressInfos) {
if (addressInfo != null) {
mergedRoutingTypes.addAll(addressInfo.getRoutingTypes());
}
}
return new AddressInfo(address, mergedRoutingTypes);
}
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) {
try {

View File

@ -17,12 +17,6 @@
package org.apache.activemq.artemis.tests.integration.jms;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
@ -32,6 +26,16 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@ -289,9 +293,14 @@ public class RedeployTest extends ActiveMQTestBase {
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616");
try (JMSContext context = connectionFactory.createContext()) {
context.createProducer().send(context.createQueue("myAddress"), "hello");
}
latch.await(10, TimeUnit.SECONDS);
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
@ -300,7 +309,14 @@ public class RedeployTest extends ActiveMQTestBase {
latch.await(10, TimeUnit.SECONDS);
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
//Ensures the queue isnt detroyed by checking message sent before change is consumable after (e.g. no message loss)
try (JMSContext context = connectionFactory.createContext()) {
Message message = context.createSharedDurableConsumer(context.createTopic("myAddress"), "myQueue").receive();
assertEquals("hello", ((TextMessage) message).getText());
}
} finally {
embeddedActiveMQ.stop();
}

View File

@ -20,6 +20,10 @@ package org.apache.activemq.artemis.tests.integration.persistence;
import java.util.ArrayList;
import java.util.List;
import javax.jms.ConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
@ -27,7 +31,7 @@ import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;
@ -37,21 +41,8 @@ public class ConfigChangeTest extends ActiveMQTestBase {
@Test
public void testChangeQueueRoutingTypeOnRestart() throws Exception {
internalTestChangeQueueRoutingTypeOnRestart(false);
}
@Test
public void testChangeQueueRoutingTypeOnRestartNegative() throws Exception {
internalTestChangeQueueRoutingTypeOnRestart(true);
}
public void internalTestChangeQueueRoutingTypeOnRestart(boolean negative) throws Exception {
// if negative == true then the queue's routing type should *not* change
Configuration configuration = createDefaultInVMConfig();
configuration.addAddressesSetting("#", new AddressSettings()
.setConfigDeleteQueues(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE)
.setConfigDeleteAddresses(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE));
configuration.addAddressesSetting("#", new AddressSettings());
List addressConfigurations = new ArrayList();
CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration()
@ -65,6 +56,14 @@ public class ConfigChangeTest extends ActiveMQTestBase {
configuration.setAddressConfigurations(addressConfigurations);
server = createServer(true, configuration);
server.start();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0");
try (JMSContext context = connectionFactory.createContext()) {
context.createProducer().send(context.createQueue("myAddress"), "hello");
}
server.stop();
addressConfiguration = new CoreAddressConfiguration()
@ -77,10 +76,16 @@ public class ConfigChangeTest extends ActiveMQTestBase {
addressConfigurations.clear();
addressConfigurations.add(addressConfiguration);
configuration.setAddressConfigurations(addressConfigurations);
server.start();
assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType());
assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType());
assertEquals(RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType());
assertEquals(RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType());
//Ensures the queue isnt detroyed by checking message sent before change is consumable after (e.g. no message loss)
try (JMSContext context = connectionFactory.createContext()) {
Message message = context.createSharedDurableConsumer(context.createTopic("myAddress"), "myQueue").receive();
assertEquals("hello", ((TextMessage) message).getText());
}
server.stop();
}
}

View File

@ -23,17 +23,21 @@ under the License.
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<address-settings>
<address-setting match="#">
<config-delete-queues>FORCE</config-delete-queues>
</address-setting>
</address-settings>
<security-enabled>false</security-enabled>
<acceptors>
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
</acceptors>
<addresses>
<address name="myAddress">
<anycast>
<multicast>
<queue name="myQueue"/>
</anycast>
</multicast>
</address>
</addresses>
</core>

View File

@ -23,17 +23,20 @@ under the License.
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<address-settings>
<address-setting match="#">
<config-delete-queues>FORCE</config-delete-queues>
</address-setting>
</address-settings>
<security-enabled>false</security-enabled>
<acceptors>
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
</acceptors>
<addresses>
<address name="myAddress">
<multicast>
<anycast>
<queue name="myQueue"/>
</multicast>
</anycast>
</address>
</addresses>
</core>