ARTEMIS-2064 make address & queue deployment more robust
Any failure to deploy an address or queue will short-circuit the broker initialization process preventing any other addresses or queues from being deployed as well as other critical resources like acceptors, etc.
This commit is contained in:
parent
611cedf893
commit
b0d30d4da5
|
@ -65,4 +65,14 @@ public class CoreAddressConfiguration implements Serializable {
|
||||||
public List<CoreQueueConfiguration> getQueueConfigurations() {
|
public List<CoreQueueConfiguration> getQueueConfigurations() {
|
||||||
return queueConfigurations;
|
return queueConfigurations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "CoreAddressConfiguration[" +
|
||||||
|
"name=" + name +
|
||||||
|
", routingTypes=" + routingTypes +
|
||||||
|
", queueConfigurations=" + queueConfigurations +
|
||||||
|
"]";
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -201,6 +201,7 @@ public class CoreQueueConfiguration implements Serializable {
|
||||||
result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode());
|
result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode());
|
||||||
result = prime * result + ((consumersBeforeDispatch == null) ? 0 : consumersBeforeDispatch.hashCode());
|
result = prime * result + ((consumersBeforeDispatch == null) ? 0 : consumersBeforeDispatch.hashCode());
|
||||||
result = prime * result + ((delayBeforeDispatch == null) ? 0 : delayBeforeDispatch.hashCode());
|
result = prime * result + ((delayBeforeDispatch == null) ? 0 : delayBeforeDispatch.hashCode());
|
||||||
|
result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,6 +266,29 @@ public class CoreQueueConfiguration implements Serializable {
|
||||||
} else if (!delayBeforeDispatch.equals(other.delayBeforeDispatch)) {
|
} else if (!delayBeforeDispatch.equals(other.delayBeforeDispatch)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (routingType == null) {
|
||||||
|
if (other.routingType != null)
|
||||||
|
return false;
|
||||||
|
} else if (!routingType.equals(other.routingType)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "CoreQueueConfiguration[" +
|
||||||
|
"name=" + name +
|
||||||
|
", address=" + address +
|
||||||
|
", routingType=" + routingType +
|
||||||
|
", durable=" + durable +
|
||||||
|
", filterString=" + filterString +
|
||||||
|
", maxConsumers=" + maxConsumers +
|
||||||
|
", purgeOnNoConsumers=" + purgeOnNoConsumers +
|
||||||
|
", exclusive=" + exclusive +
|
||||||
|
", lastValue=" + lastValue +
|
||||||
|
", consumersBeforeDispatch=" + consumersBeforeDispatch +
|
||||||
|
", delayBeforeDispatch=" + delayBeforeDispatch +
|
||||||
|
"]";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.Reader;
|
import java.io.Reader;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
@ -150,10 +149,9 @@ public class LegacyJMSConfiguration implements Deployable {
|
||||||
*/
|
*/
|
||||||
public void parseTopicConfiguration(final Node node) throws Exception {
|
public void parseTopicConfiguration(final Node node) throws Exception {
|
||||||
String topicName = node.getAttributes().getNamedItem(NAME_ATTR).getNodeValue();
|
String topicName = node.getAttributes().getNamedItem(NAME_ATTR).getNodeValue();
|
||||||
List<CoreAddressConfiguration> coreAddressConfigurations = configuration.getAddressConfigurations();
|
configuration.addAddressConfiguration(new CoreAddressConfiguration()
|
||||||
coreAddressConfigurations.add(new CoreAddressConfiguration()
|
.setName(topicName)
|
||||||
.setName(topicName)
|
.addRoutingType(RoutingType.MULTICAST));
|
||||||
.addRoutingType(RoutingType.MULTICAST));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -173,22 +171,22 @@ public class LegacyJMSConfiguration implements Deployable {
|
||||||
for (int i = 0; i < children.getLength(); i++) {
|
for (int i = 0; i < children.getLength(); i++) {
|
||||||
Node child = children.item(i);
|
Node child = children.item(i);
|
||||||
|
|
||||||
if (QUEUE_SELECTOR_NODE_NAME.equals(children.item(i).getNodeName())) {
|
if (QUEUE_SELECTOR_NODE_NAME.equals(child.getNodeName())) {
|
||||||
Node selectorNode = children.item(i);
|
Node selectorNode = child;
|
||||||
Node attNode = selectorNode.getAttributes().getNamedItem("string");
|
Node attNode = selectorNode.getAttributes().getNamedItem("string");
|
||||||
selectorString = attNode.getNodeValue();
|
selectorString = attNode.getNodeValue();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
List<CoreAddressConfiguration> coreAddressConfigurations = configuration.getAddressConfigurations();
|
configuration.addAddressConfiguration(new CoreAddressConfiguration()
|
||||||
coreAddressConfigurations.add(new CoreAddressConfiguration()
|
.setName(queueName)
|
||||||
.setName(queueName)
|
.addRoutingType(RoutingType.ANYCAST)
|
||||||
.addRoutingType(RoutingType.ANYCAST)
|
.addQueueConfiguration(new CoreQueueConfiguration()
|
||||||
.addQueueConfiguration(new CoreQueueConfiguration()
|
.setAddress(queueName)
|
||||||
.setAddress(queueName)
|
.setName(queueName)
|
||||||
.setName(queueName)
|
.setFilterString(selectorString)
|
||||||
.setFilterString(selectorString)
|
.setDurable(durable)
|
||||||
.setRoutingType(RoutingType.ANYCAST)));
|
.setRoutingType(RoutingType.ANYCAST)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -770,8 +770,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
Element node = (Element) elements.item(0);
|
Element node = (Element) elements.item(0);
|
||||||
NodeList list = node.getElementsByTagName("address");
|
NodeList list = node.getElementsByTagName("address");
|
||||||
for (int i = 0; i < list.getLength(); i++) {
|
for (int i = 0; i < list.getLength(); i++) {
|
||||||
CoreAddressConfiguration addrConfig = parseAddressConfiguration(list.item(i));
|
config.addAddressConfiguration(parseAddressConfiguration(list.item(i)));
|
||||||
config.getAddressConfigurations().add(addrConfig);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1593,6 +1593,16 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
||||||
@Message(id = 22273, value = "Address \"{0}\" is full. Bridge {1} will disconnect", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 22273, value = "Address \"{0}\" is full. Bridge {1} will disconnect", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void bridgeAddressFull(String addressName, String bridgeName);
|
void bridgeAddressFull(String addressName, String bridgeName);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
|
@Message(id = 222274, value = "Failed to deploy address {0}: {1}",
|
||||||
|
format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void problemDeployingAddress(String addressName, String message);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
|
@Message(id = 222275, value = "Failed to deploy queue {0}: {1}",
|
||||||
|
format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void problemDeployingQueue(String queueName, String message);
|
||||||
|
|
||||||
@LogMessage(level = Logger.Level.ERROR)
|
@LogMessage(level = Logger.Level.ERROR)
|
||||||
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void initializationError(@Cause Throwable e);
|
void initializationError(@Cause Throwable e);
|
||||||
|
|
|
@ -2742,39 +2742,43 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
private void deployAddressesFromConfiguration(Configuration configuration) throws Exception {
|
private void deployAddressesFromConfiguration(Configuration configuration) throws Exception {
|
||||||
for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) {
|
for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) {
|
||||||
AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes());
|
try {
|
||||||
addOrUpdateAddressInfo(info);
|
ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString());
|
||||||
ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString());
|
AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes());
|
||||||
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
|
addOrUpdateAddressInfo(info);
|
||||||
|
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
|
||||||
|
} catch (Exception e) {
|
||||||
|
ActiveMQServerLogger.LOGGER.problemDeployingAddress(config.getName(), e.getMessage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
|
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
|
||||||
for (CoreQueueConfiguration config : queues) {
|
for (CoreQueueConfiguration config : queues) {
|
||||||
SimpleString queueName = SimpleString.toSimpleString(config.getName());
|
try {
|
||||||
ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress(), config.getRoutingType().toString());
|
SimpleString queueName = SimpleString.toSimpleString(config.getName());
|
||||||
AddressSettings as = addressSettingsRepository.getMatch(config.getAddress());
|
ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress(), config.getRoutingType().toString());
|
||||||
// determine if there is an address::queue match; update it if so
|
AddressSettings as = addressSettingsRepository.getMatch(config.getAddress());
|
||||||
int maxConsumers = config.getMaxConsumers() == null ? as.getDefaultMaxConsumers() : config.getMaxConsumers();
|
// determine if there is an address::queue match; update it if so
|
||||||
boolean isExclusive = config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive();
|
int maxConsumers = config.getMaxConsumers() == null ? as.getDefaultMaxConsumers() : config.getMaxConsumers();
|
||||||
boolean isLastValue = config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue();
|
boolean isExclusive = config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive();
|
||||||
int consumersBeforeDispatch = config.getConsumersBeforeDispatch() == null ? as.getDefaultConsumersBeforeDispatch() : config.getConsumersBeforeDispatch();
|
boolean isLastValue = config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue();
|
||||||
long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch();
|
int consumersBeforeDispatch = config.getConsumersBeforeDispatch() == null ? as.getDefaultConsumersBeforeDispatch() : config.getConsumersBeforeDispatch();
|
||||||
|
long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch();
|
||||||
|
|
||||||
if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
|
if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
|
||||||
updateQueue(config.getName(), config.getRoutingType(), maxConsumers, config.getPurgeOnNoConsumers(),
|
updateQueue(config.getName(), config.getRoutingType(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser());
|
||||||
isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser());
|
} else {
|
||||||
} else {
|
// if the address::queue doesn't exist then create it
|
||||||
// if the address::queue doesn't exist then create it
|
try {
|
||||||
try {
|
createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true);
|
||||||
createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
|
} catch (ActiveMQQueueExistsException e) {
|
||||||
queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()),
|
// the queue may exist on a *different* address
|
||||||
config.isDurable(),false,false,false,false, maxConsumers, config.getPurgeOnNoConsumers(),
|
ActiveMQServerLogger.LOGGER.warn(e.getMessage());
|
||||||
isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true);
|
}
|
||||||
} catch (ActiveMQQueueExistsException e) {
|
|
||||||
// the queue may exist on a *different* address
|
|
||||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage());
|
|
||||||
}
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
ActiveMQServerLogger.LOGGER.problemDeployingQueue(config.getName(), e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.extras.byteman;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.jboss.byteman.contrib.bmunit.BMRule;
|
||||||
|
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
|
@RunWith(BMUnitRunner.class)
|
||||||
|
public class AddressDeploymentFailedTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@BMRule(name = "blow up address deployment",
|
||||||
|
targetClass = "org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl",
|
||||||
|
targetMethod = "addOrUpdateAddressInfo(AddressInfo)",
|
||||||
|
targetLocation = "EXIT",
|
||||||
|
action = "throw new IllegalStateException(\"test exception\")")
|
||||||
|
public void testAddressDeploymentFailure() throws Exception {
|
||||||
|
ActiveMQServer server = createServer(false, createDefaultNettyConfig());
|
||||||
|
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(UUID.randomUUID().toString()).addRoutingType(RoutingType.ANYCAST));
|
||||||
|
server.start();
|
||||||
|
assertTrue(server.getRemotingService().isStarted());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.extras.byteman;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.jboss.byteman.contrib.bmunit.BMRule;
|
||||||
|
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
|
@RunWith(BMUnitRunner.class)
|
||||||
|
public class QueueDeploymentFailedTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@BMRule(name = "blow up queue deployment",
|
||||||
|
targetClass = "org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl",
|
||||||
|
targetMethod = "createQueue(SimpleString,RoutingType,SimpleString,SimpleString,SimpleString,boolean,boolean,boolean,boolean,boolean,int,boolean,boolean,boolean,int,long,boolean",
|
||||||
|
targetLocation = "EXIT",
|
||||||
|
action = "throw new IllegalStateException(\"test exception\")")
|
||||||
|
public void testQueueDeploymentFailure() throws Exception {
|
||||||
|
ActiveMQServer server = createServer(false, createDefaultNettyConfig());
|
||||||
|
String address = UUID.randomUUID().toString();
|
||||||
|
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(address).addRoutingType(RoutingType.ANYCAST).addQueueConfiguration(new CoreQueueConfiguration().setName(UUID.randomUUID().toString()).setRoutingType(RoutingType.ANYCAST).setAddress(address)));
|
||||||
|
server.start();
|
||||||
|
assertTrue(server.getRemotingService().isStarted());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue