ARTEMIS-1421 duplicate queue possible in XML

This commit is contained in:
Justin Bertram 2017-09-13 10:01:39 -05:00 committed by Clebert Suconic
parent 481f753589
commit be3a66f016
5 changed files with 71 additions and 17 deletions

View File

@ -121,8 +121,8 @@ public interface ActiveMQMessageBundle {
@Message(id = 119018, value = "Binding already exists {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 119018, value = "Binding already exists {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQQueueExistsException bindingAlreadyExists(Binding binding); ActiveMQQueueExistsException bindingAlreadyExists(Binding binding);
@Message(id = 119019, value = "Queue already exists {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 119019, value = "Queue {0} already exists on address {1}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQQueueExistsException queueAlreadyExists(SimpleString queueName); ActiveMQQueueExistsException queueAlreadyExists(SimpleString queueName, SimpleString addressName);
@Message(id = 119020, value = "Invalid filter: {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 119020, value = "Invalid filter: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQInvalidFilterExpressionException invalidFilter(@Cause Throwable e, SimpleString filter); ActiveMQInvalidFilterExpressionException invalidFilter(@Cause Throwable e, SimpleString filter);

View File

@ -94,8 +94,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void serverStopped(String version, SimpleString nodeId, String uptime); void serverStopped(String version, SimpleString nodeId, String uptime);
@LogMessage(level = Logger.Level.INFO) @LogMessage(level = Logger.Level.INFO)
@Message(id = 221003, value = "Deploying queue {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 221003, value = "Deploying queue {0} on address {1}", format = Message.Format.MESSAGE_FORMAT)
void deployQueue(SimpleString queueName); void deployQueue(String queueName, String addressName);
@LogMessage(level = Logger.Level.INFO) @LogMessage(level = Logger.Level.INFO)
@Message(id = 221004, value = "{0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 221004, value = "{0}", format = Message.Format.MESSAGE_FORMAT)

View File

@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException; import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -2511,20 +2512,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception { private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) { for (CoreQueueConfiguration config : queues) {
addOrUpdateQueue(config); SimpleString queueName = SimpleString.toSimpleString(config.getName());
} ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress());
}
private Queue addOrUpdateQueue(CoreQueueConfiguration config) throws Exception { // determine if there is an address::queue match; update it if so
SimpleString queueName = SimpleString.toSimpleString(config.getName()); if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
ActiveMQServerLogger.LOGGER.deployQueue(queueName); updateQueue(config.getName(), config.getRoutingType(), config.getMaxConsumers(), config.getPurgeOnNoConsumers());
Queue queue = updateQueue(config.getName(), config.getRoutingType(), config.getMaxConsumers(), config.getPurgeOnNoConsumers()); } else {
if (queue == null) { // if the address::queue doesn't exist then create it
queue = createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), try {
queueName, SimpleString.toSimpleString(config.getFilterString()), null, createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getPurgeOnNoConsumers(), true); queueName, SimpleString.toSimpleString(config.getFilterString()),null,
config.isDurable(),false,false,false,false,config.getMaxConsumers(),config.getPurgeOnNoConsumers(),true);
} catch (ActiveMQQueueExistsException e) {
// the queue may exist on a *different* address
ActiveMQServerLogger.LOGGER.warn(e.getMessage());
}
}
} }
return queue;
} }
private void deployQueuesFromConfiguration() throws Exception { private void deployQueuesFromConfiguration() throws Exception {
@ -2701,7 +2706,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (ignoreIfExists) { if (ignoreIfExists) {
return binding.getQueue(); return binding.getQueue();
} else { } else {
throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueName); throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueName, binding.getAddress());
} }
} }

View File

@ -22,12 +22,14 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager; import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser; import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
import org.junit.Assert; import org.junit.Assert;
@ -72,6 +74,18 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
deploymentManager.readConfiguration(); deploymentManager.readConfiguration();
} }
@Test
public void testDuplicateQueue() throws Exception {
String filename = "duplicateQueue.xml";
FileConfiguration fc = new FileConfiguration();
FileDeploymentManager deploymentManager = new FileDeploymentManager(filename);
deploymentManager.addDeployable(fc);
deploymentManager.readConfiguration();
ActiveMQServer server = addServer((ActiveMQServer) deploymentManager.buildService(null, null).get("core"));
server.start();
assertEquals(0, server.locateQueue(SimpleString.toSimpleString("q")).getMaxConsumers());
}
@Test @Test
public void testParsingClusterConnectionURIs() throws Exception { public void testParsingClusterConnectionURIs() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser(); FileConfigurationParser parser = new FileConfigurationParser();

View File

@ -0,0 +1,35 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<addresses>
<address name="a">
<anycast>
<queue name="q" max-consumers="0"/>
</anycast>
</address>
<address name="b">
<anycast>
<queue name="q" max-consumers="1"/>
</anycast>
</address>
</addresses>
</core>
</configuration>