This commit is contained in:
Clebert Suconic 2017-09-13 18:26:29 -04:00
commit 01134e2348
5 changed files with 71 additions and 17 deletions

View File

@ -121,8 +121,8 @@ public interface ActiveMQMessageBundle {
@Message(id = 119018, value = "Binding already exists {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQQueueExistsException bindingAlreadyExists(Binding binding);
@Message(id = 119019, value = "Queue already exists {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQQueueExistsException queueAlreadyExists(SimpleString queueName);
@Message(id = 119019, value = "Queue {0} already exists on address {1}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQQueueExistsException queueAlreadyExists(SimpleString queueName, SimpleString addressName);
@Message(id = 119020, value = "Invalid filter: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQInvalidFilterExpressionException invalidFilter(@Cause Throwable e, SimpleString filter);

View File

@ -94,8 +94,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void serverStopped(String version, SimpleString nodeId, String uptime);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 221003, value = "Deploying queue {0}", format = Message.Format.MESSAGE_FORMAT)
void deployQueue(SimpleString queueName);
@Message(id = 221003, value = "Deploying queue {0} on address {1}", format = Message.Format.MESSAGE_FORMAT)
void deployQueue(String queueName, String addressName);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 221004, value = "{0}", format = Message.Format.MESSAGE_FORMAT)

View File

@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -2511,20 +2512,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) {
addOrUpdateQueue(config);
}
}
private Queue addOrUpdateQueue(CoreQueueConfiguration config) throws Exception {
SimpleString queueName = SimpleString.toSimpleString(config.getName());
ActiveMQServerLogger.LOGGER.deployQueue(queueName);
Queue queue = updateQueue(config.getName(), config.getRoutingType(), config.getMaxConsumers(), config.getPurgeOnNoConsumers());
if (queue == null) {
queue = createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
queueName, SimpleString.toSimpleString(config.getFilterString()), null,
config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getPurgeOnNoConsumers(), true);
ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress());
// determine if there is an address::queue match; update it if so
if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
updateQueue(config.getName(), config.getRoutingType(), config.getMaxConsumers(), config.getPurgeOnNoConsumers());
} else {
// if the address::queue doesn't exist then create it
try {
createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
queueName, SimpleString.toSimpleString(config.getFilterString()),null,
config.isDurable(),false,false,false,false,config.getMaxConsumers(),config.getPurgeOnNoConsumers(),true);
} catch (ActiveMQQueueExistsException e) {
// the queue may exist on a *different* address
ActiveMQServerLogger.LOGGER.warn(e.getMessage());
}
}
}
return queue;
}
private void deployQueuesFromConfiguration() throws Exception {
@ -2701,7 +2706,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (ignoreIfExists) {
return binding.getQueue();
} else {
throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueName);
throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueName, binding.getAddress());
}
}

View File

@ -22,12 +22,14 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
import org.junit.Assert;
@ -72,6 +74,18 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
deploymentManager.readConfiguration();
}
@Test
public void testDuplicateQueue() throws Exception {
String filename = "duplicateQueue.xml";
FileConfiguration fc = new FileConfiguration();
FileDeploymentManager deploymentManager = new FileDeploymentManager(filename);
deploymentManager.addDeployable(fc);
deploymentManager.readConfiguration();
ActiveMQServer server = addServer((ActiveMQServer) deploymentManager.buildService(null, null).get("core"));
server.start();
assertEquals(0, server.locateQueue(SimpleString.toSimpleString("q")).getMaxConsumers());
}
@Test
public void testParsingClusterConnectionURIs() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();

View File

@ -0,0 +1,35 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<addresses>
<address name="a">
<anycast>
<queue name="q" max-consumers="0"/>
</anycast>
</address>
<address name="b">
<anycast>
<queue name="q" max-consumers="1"/>
</anycast>
</address>
</addresses>
</core>
</configuration>