This closes #909
This commit is contained in:
commit
52e4fc4a53
|
@ -22,10 +22,10 @@ package org.apache.activemq.artemis.api.core;
|
||||||
public final class ActiveMQAddressDoesNotExistException extends ActiveMQException {
|
public final class ActiveMQAddressDoesNotExistException extends ActiveMQException {
|
||||||
|
|
||||||
public ActiveMQAddressDoesNotExistException() {
|
public ActiveMQAddressDoesNotExistException() {
|
||||||
super(ActiveMQExceptionType.ADDRESS_EXISTS);
|
super(ActiveMQExceptionType.ADDRESS_DOES_NOT_EXIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ActiveMQAddressDoesNotExistException(String msg) {
|
public ActiveMQAddressDoesNotExistException(String msg) {
|
||||||
super(ActiveMQExceptionType.ADDRESS_EXISTS, msg);
|
super(ActiveMQExceptionType.ADDRESS_DOES_NOT_EXIST, msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -406,7 +406,7 @@ public interface ActiveMQMessageBundle {
|
||||||
@Message(id = 119206, value = "Queue {0} has invalid max consumer setting: {1}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 119206, value = "Queue {0} has invalid max consumer setting: {1}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
IllegalArgumentException invalidMaxConsumers(String queueName, int value);
|
IllegalArgumentException invalidMaxConsumers(String queueName, int value);
|
||||||
|
|
||||||
@Message(id = 119207, value = "Can not create queue with delivery mode: {0}, Supported delivery modes for address: {1} are {2}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 119207, value = "Can not create queue with routing type: {0}, Supported routing types for address: {1} are {2}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
IllegalArgumentException invalidRoutingTypeForAddress(RoutingType routingType,
|
IllegalArgumentException invalidRoutingTypeForAddress(RoutingType routingType,
|
||||||
String address,
|
String address,
|
||||||
Set<RoutingType> supportedRoutingTypes);
|
Set<RoutingType> supportedRoutingTypes);
|
||||||
|
|
|
@ -2512,30 +2512,20 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
|
defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
|
||||||
AddressInfo info = postOffice.getAddressInfo(addressName);
|
AddressInfo info = postOffice.getAddressInfo(addressName);
|
||||||
|
|
||||||
boolean addressAlreadyExists = true;
|
|
||||||
|
|
||||||
if (autoCreateAddress) {
|
if (autoCreateAddress) {
|
||||||
if (info == null || !info.getRoutingTypes().contains(routingType)) {
|
if (info == null || !info.getRoutingTypes().contains(routingType)) {
|
||||||
createOrUpdateAddressInfo(defaultAddressInfo.setAutoCreated(true));
|
createOrUpdateAddressInfo(defaultAddressInfo.setAutoCreated(true));
|
||||||
addressAlreadyExists = false;
|
|
||||||
}
|
}
|
||||||
} else if (info == null) {
|
} else if (info == null) {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
|
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
|
||||||
|
} else if (!info.getRoutingTypes().contains(routingType)) {
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, info.getName().toString(), info.getRoutingTypes());
|
||||||
}
|
}
|
||||||
|
|
||||||
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).deleteOnNoConsumers(deleteOnNoConsumers).build();
|
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).deleteOnNoConsumers(deleteOnNoConsumers).build();
|
||||||
|
|
||||||
final Queue queue = queueFactory.createQueueWith(queueConfig);
|
final Queue queue = queueFactory.createQueueWith(queueConfig);
|
||||||
|
|
||||||
AddressInfo addressInfo = postOffice.getAddressInfo(queue.getAddress());
|
|
||||||
if (addressInfo == null) {
|
|
||||||
createAddressInfo(new AddressInfo(queue.getAddress()));
|
|
||||||
} else {
|
|
||||||
if (!addressInfo.getRoutingTypes().contains(routingType)) {
|
|
||||||
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, addressInfo.getName().toString(), addressInfo.getRoutingTypes());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (transientQueue) {
|
if (transientQueue) {
|
||||||
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
|
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
|
||||||
} else if (queue.isAutoCreated()) {
|
} else if (queue.isAutoCreated()) {
|
||||||
|
@ -2546,9 +2536,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
if (queue.isDurable()) {
|
if (queue.isDurable()) {
|
||||||
storageManager.addQueueBinding(txID, localQueueBinding);
|
storageManager.addQueueBinding(txID, localQueueBinding);
|
||||||
if (!addressAlreadyExists) {
|
|
||||||
storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -540,7 +540,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
|
|
||||||
server.checkQueueCreationLimit(getUsername());
|
server.checkQueueCreationLimit(getUsername());
|
||||||
|
|
||||||
Queue queue = server.createQueue(art.getA(), art.getB(), name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, true);
|
Queue queue = server.createQueue(art.getA(), art.getB(), name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateAddresses());
|
||||||
|
|
||||||
if (temporary) {
|
if (temporary) {
|
||||||
// Temporary queue in core simply means the queue will be deleted if
|
// Temporary queue in core simply means the queue will be deleted if
|
||||||
|
|
|
@ -0,0 +1,107 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class CreateQueueTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
public final SimpleString addressA = new SimpleString("addressA");
|
||||||
|
public final SimpleString addressB = new SimpleString("addressB");
|
||||||
|
public final SimpleString queueA = new SimpleString("queueA");
|
||||||
|
public final SimpleString queueB = new SimpleString("queueB");
|
||||||
|
public final SimpleString queueC = new SimpleString("queueC");
|
||||||
|
public final SimpleString queueD = new SimpleString("queueD");
|
||||||
|
|
||||||
|
private ServerLocator locator;
|
||||||
|
private ActiveMQServer server;
|
||||||
|
private ClientSessionFactory cf;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
locator = createInVMNonHALocator();
|
||||||
|
server = createServer(false);
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
cf = createSessionFactory(locator);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnsupportedRoutingType() throws Exception {
|
||||||
|
ClientSession sendSession = cf.createSession(false, true, true);
|
||||||
|
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoCreateAddresses(false));
|
||||||
|
server.getAddressSettingsRepository().addMatch(addressB.toString(), new AddressSettings().setAutoCreateAddresses(false));
|
||||||
|
|
||||||
|
Set<RoutingType> routingTypes = new HashSet<>();
|
||||||
|
routingTypes.add(RoutingType.ANYCAST);
|
||||||
|
sendSession.createAddress(addressA, routingTypes, false);
|
||||||
|
try {
|
||||||
|
sendSession.createQueue(addressA, RoutingType.MULTICAST, queueA);
|
||||||
|
fail("Creating a queue here should fail since the queue routing type differs from what is supported on the address.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertTrue(e instanceof ActiveMQException);
|
||||||
|
ActiveMQException ae = (ActiveMQException) e;
|
||||||
|
assertEquals(ActiveMQExceptionType.INTERNAL_ERROR, ae.getType());
|
||||||
|
}
|
||||||
|
|
||||||
|
routingTypes = new HashSet<>();
|
||||||
|
routingTypes.add(RoutingType.MULTICAST);
|
||||||
|
sendSession.createAddress(addressB, routingTypes, false);
|
||||||
|
try {
|
||||||
|
sendSession.createQueue(addressB, RoutingType.ANYCAST, queueB);
|
||||||
|
fail("Creating a queue here should fail since the queue routing type differs from what is supported on the address.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertTrue(e instanceof ActiveMQException);
|
||||||
|
ActiveMQException ae = (ActiveMQException) e;
|
||||||
|
assertEquals(ActiveMQExceptionType.INTERNAL_ERROR, ae.getType());
|
||||||
|
}
|
||||||
|
sendSession.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddressDoesNotExist() throws Exception {
|
||||||
|
ClientSession sendSession = cf.createSession(false, true, true);
|
||||||
|
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoCreateAddresses(false));
|
||||||
|
Set<RoutingType> routingTypes = new HashSet<>();
|
||||||
|
routingTypes.add(RoutingType.ANYCAST);
|
||||||
|
try {
|
||||||
|
sendSession.createQueue(addressA, RoutingType.MULTICAST, queueA);
|
||||||
|
fail("Creating a queue here should fail since the queue's address doesn't exist and auto-create-addresses = false.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertTrue(e instanceof ActiveMQException);
|
||||||
|
ActiveMQException ae = (ActiveMQException) e;
|
||||||
|
assertEquals(ActiveMQExceptionType.ADDRESS_DOES_NOT_EXIST, ae.getType());
|
||||||
|
}
|
||||||
|
sendSession.close();
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,6 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.client;
|
package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
@ -24,6 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -32,9 +37,11 @@ import org.junit.Test;
|
||||||
public class RoutingTest extends ActiveMQTestBase {
|
public class RoutingTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
public final SimpleString addressA = new SimpleString("addressA");
|
public final SimpleString addressA = new SimpleString("addressA");
|
||||||
|
public final SimpleString addressB = new SimpleString("addressB");
|
||||||
public final SimpleString queueA = new SimpleString("queueA");
|
public final SimpleString queueA = new SimpleString("queueA");
|
||||||
public final SimpleString queueB = new SimpleString("queueB");
|
public final SimpleString queueB = new SimpleString("queueB");
|
||||||
public final SimpleString queueC = new SimpleString("queueC");
|
public final SimpleString queueC = new SimpleString("queueC");
|
||||||
|
public final SimpleString queueD = new SimpleString("queueD");
|
||||||
|
|
||||||
private ServerLocator locator;
|
private ServerLocator locator;
|
||||||
private ActiveMQServer server;
|
private ActiveMQServer server;
|
||||||
|
@ -216,4 +223,61 @@ public class RoutingTest extends ActiveMQTestBase {
|
||||||
sendSession.close();
|
sendSession.close();
|
||||||
session.close();
|
session.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAnycastMessageRoutingExclusivity() throws Exception {
|
||||||
|
ClientSession sendSession = cf.createSession(false, true, true);
|
||||||
|
Set<RoutingType> routingTypes = new HashSet<>();
|
||||||
|
routingTypes.add(RoutingType.ANYCAST);
|
||||||
|
routingTypes.add(RoutingType.MULTICAST);
|
||||||
|
sendSession.createAddress(addressA, routingTypes, false);
|
||||||
|
sendSession.createQueue(addressA, RoutingType.ANYCAST, queueA);
|
||||||
|
sendSession.createQueue(addressA, RoutingType.ANYCAST, queueB);
|
||||||
|
sendSession.createQueue(addressA, RoutingType.MULTICAST, queueC);
|
||||||
|
ClientProducer p = sendSession.createProducer(addressA);
|
||||||
|
ClientMessage message = sendSession.createMessage(false);
|
||||||
|
message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
|
||||||
|
p.send(message);
|
||||||
|
sendSession.close();
|
||||||
|
assertEquals(1, server.locateQueue(queueA).getMessageCount() + server.locateQueue(queueB).getMessageCount());
|
||||||
|
assertEquals(0, server.locateQueue(queueC).getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMulticastMessageRoutingExclusivity() throws Exception {
|
||||||
|
ClientSession sendSession = cf.createSession(false, true, true);
|
||||||
|
Set<RoutingType> routingTypes = new HashSet<>();
|
||||||
|
routingTypes.add(RoutingType.ANYCAST);
|
||||||
|
routingTypes.add(RoutingType.MULTICAST);
|
||||||
|
sendSession.createAddress(addressA, routingTypes, false);
|
||||||
|
sendSession.createQueue(addressA, RoutingType.ANYCAST, queueA);
|
||||||
|
sendSession.createQueue(addressA, RoutingType.MULTICAST, queueB);
|
||||||
|
sendSession.createQueue(addressA, RoutingType.MULTICAST, queueC);
|
||||||
|
ClientProducer p = sendSession.createProducer(addressA);
|
||||||
|
ClientMessage message = sendSession.createMessage(false);
|
||||||
|
message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
|
||||||
|
p.send(message);
|
||||||
|
sendSession.close();
|
||||||
|
assertEquals(0, server.locateQueue(queueA).getMessageCount());
|
||||||
|
assertEquals(2, server.locateQueue(queueB).getMessageCount() + server.locateQueue(queueC).getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAmbiguousMessageRouting() throws Exception {
|
||||||
|
ClientSession sendSession = cf.createSession(false, true, true);
|
||||||
|
Set<RoutingType> routingTypes = new HashSet<>();
|
||||||
|
routingTypes.add(RoutingType.ANYCAST);
|
||||||
|
routingTypes.add(RoutingType.MULTICAST);
|
||||||
|
sendSession.createAddress(addressA, routingTypes, false);
|
||||||
|
sendSession.createQueue(addressA, RoutingType.ANYCAST, queueA);
|
||||||
|
sendSession.createQueue(addressA, RoutingType.ANYCAST, queueB);
|
||||||
|
sendSession.createQueue(addressA, RoutingType.MULTICAST, queueC);
|
||||||
|
sendSession.createQueue(addressA, RoutingType.MULTICAST, queueD);
|
||||||
|
ClientProducer p = sendSession.createProducer(addressA);
|
||||||
|
ClientMessage message = sendSession.createMessage(false);
|
||||||
|
p.send(message);
|
||||||
|
sendSession.close();
|
||||||
|
assertEquals(1, server.locateQueue(queueA).getMessageCount() + server.locateQueue(queueB).getMessageCount());
|
||||||
|
assertEquals(2, server.locateQueue(queueC).getMessageCount() + server.locateQueue(queueD).getMessageCount());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue