ARTEMIS-922 Addressing-related API clean-up

This commit is contained in:
Justin Bertram 2017-01-12 16:48:30 -06:00 committed by Clebert Suconic
parent 7b899cee9b
commit 0d1fa83181
62 changed files with 267 additions and 415 deletions

View File

@ -1068,8 +1068,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
if (queues.get(queueName) != null) { if (queues.get(queueName) != null) {
return false; return false;
} else { } else {
ActiveMQQueue activeMQQueue = ActiveMQDestination.createQueue(queueName);
// Convert from JMS selector to core filter // Convert from JMS selector to core filter
String coreFilterString = null; String coreFilterString = null;
@ -1077,11 +1075,12 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString); coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
} }
server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQQueue.getName())).addRoutingType(RoutingType.ANYCAST)); server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueName)).addRoutingType(RoutingType.ANYCAST));
server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), RoutingType.ANYCAST, SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false, autoCreated); AddressSettings as = server.getAddressSettingsRepository().getMatch(queueName);
server.createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(coreFilterString), null, durable, false, true, false, false, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
queues.put(queueName, activeMQQueue); queues.put(queueName, ActiveMQDestination.createQueue(queueName));
this.recoverregistryBindings(queueName, PersistedType.Queue); this.recoverregistryBindings(queueName, PersistedType.Queue);
@ -1108,12 +1107,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
return false; return false;
} else { } else {
ActiveMQTopic activeMQTopic = ActiveMQDestination.createTopic(topicName); ActiveMQTopic activeMQTopic = ActiveMQDestination.createTopic(topicName);
// We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQTopic.getAddress()), RoutingType.MULTICAST));
// checks when routing messages to a topic that
// does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
// subscriptions - core has no notion of a topic
// server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false, autoCreated);
server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQTopic.getAddress()), RoutingType.MULTICAST));
topics.put(topicName, activeMQTopic); topics.put(topicName, activeMQTopic);

View File

@ -31,7 +31,7 @@ public class FindDestinationTest extends MessageTestBase {
@Test @Test
public void testFindQueue() throws Exception { public void testFindQueue() throws Exception {
String testName = "testFindQueue"; String testName = "testFindQueue";
server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString(testName), RoutingType.MULTICAST)); server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(testName), RoutingType.MULTICAST));
server.getActiveMQServer().createQueue(new SimpleString(testName), RoutingType.MULTICAST, new SimpleString(testName), null, false, false); server.getActiveMQServer().createQueue(new SimpleString(testName), RoutingType.MULTICAST, new SimpleString(testName), null, false, false);
ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/" + testName)); ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/" + testName));
@ -62,7 +62,7 @@ public class FindDestinationTest extends MessageTestBase {
@Test @Test
public void testFindTopic() throws Exception { public void testFindTopic() throws Exception {
server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("testTopic"), RoutingType.MULTICAST)); server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString("testTopic"), RoutingType.MULTICAST));
server.getActiveMQServer().createQueue(new SimpleString("testTopic"), RoutingType.MULTICAST, new SimpleString("testTopic"), null, false, false); server.getActiveMQServer().createQueue(new SimpleString("testTopic"), RoutingType.MULTICAST, new SimpleString("testTopic"), null, false, false);
ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/topics/testTopic")); ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/topics/testTopic"));

View File

@ -67,7 +67,7 @@ public class RawAckTest {
consumerSessionFactory = serverLocator.createSessionFactory(); consumerSessionFactory = serverLocator.createSessionFactory();
SimpleString addr = SimpleString.toSimpleString("testQueue"); SimpleString addr = SimpleString.toSimpleString("testQueue");
activeMQServer.createAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST)); activeMQServer.addAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST));
activeMQServer.createQueue(addr, RoutingType.MULTICAST, addr, null, false, false); activeMQServer.createQueue(addr, RoutingType.MULTICAST, addr, null, false, false);
session = sessionFactory.createSession(true, true); session = sessionFactory.createSession(true, true);
producer = session.createProducer(addr); producer = session.createProducer(addr);

View File

@ -626,10 +626,10 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
set.add(RoutingType.valueOf(routingType)); set.add(RoutingType.valueOf(routingType));
} }
final AddressInfo addressInfo = new AddressInfo(new SimpleString(name), set); final AddressInfo addressInfo = new AddressInfo(new SimpleString(name), set);
if (server.createAddressInfo(addressInfo)) { if (server.addAddressInfo(addressInfo)) {
return AddressInfoTextFormatter.Long.format(addressInfo, new StringBuilder()).toString(); return AddressInfoTextFormatter.Long.format(addressInfo, new StringBuilder()).toString();
} else { } else {
return ""; throw ActiveMQMessageBundle.BUNDLE.addressAlreadyExists(addressInfo.getName());
} }
} finally { } finally {
blockOnIO(); blockOnIO();
@ -652,11 +652,10 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
routingTypeSet.add(RoutingType.valueOf(routingTypeName)); routingTypeSet.add(RoutingType.valueOf(routingTypeName));
} }
} }
final AddressInfo updatedAddressInfo = server.updateAddressInfo(name, routingTypeSet); if (!server.updateAddressInfo(SimpleString.toSimpleString(name), routingTypeSet)) {
if (updatedAddressInfo == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(name)); throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(name));
} }
return AddressInfoTextFormatter.Long.format(updatedAddressInfo, new StringBuilder()).toString(); return AddressInfoTextFormatter.Long.format(server.getAddressInfo(SimpleString.toSimpleString(name)), new StringBuilder()).toString();
} finally { } finally {
blockOnIO(); blockOnIO();
} }
@ -691,7 +690,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
SimpleString filter = filterStr == null ? null : new SimpleString(filterStr); SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
clearIO(); clearIO();
try { try {
server.deployQueue(SimpleString.toSimpleString(address), server.getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType(), new SimpleString(name), filter, durable, false); server.createQueue(SimpleString.toSimpleString(address), server.getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType(), new SimpleString(name), filter, durable, false);
} finally { } finally {
blockOnIO(); blockOnIO();
} }

View File

@ -64,14 +64,7 @@ public interface AddressManager {
*/ */
boolean addAddressInfo(AddressInfo addressInfo); boolean addAddressInfo(AddressInfo addressInfo);
AddressInfo updateAddressInfo(SimpleString addressName, AddressInfo updateAddressInfo(SimpleString addressName, Collection<RoutingType> routingTypes);
Collection<RoutingType> routingTypes) throws Exception;
/**
* @param addressInfo
* @return the same provided {@code addressInfo} if the address was added, another if it was updated
*/
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo removeAddressInfo(SimpleString address); AddressInfo removeAddressInfo(SimpleString address);

View File

@ -51,12 +51,6 @@ public interface PostOffice extends ActiveMQComponent {
*/ */
boolean addAddressInfo(AddressInfo addressInfo); boolean addAddressInfo(AddressInfo addressInfo);
/**
* @param addressInfo
* @return the same provided {@code addressInfo} if the address was added, another if it was updated
*/
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo removeAddressInfo(SimpleString address) throws Exception; AddressInfo removeAddressInfo(SimpleString address) throws Exception;
AddressInfo getAddressInfo(SimpleString address); AddressInfo getAddressInfo(SimpleString address);

View File

@ -440,23 +440,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
} }
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
synchronized (addressLock) {
final AddressInfo updatedAddressInfo = addressManager.addOrUpdateAddressInfo(addressInfo);
// only register address if it is newly added
final boolean isNew = updatedAddressInfo == addressInfo;
if (isNew) {
try {
managementService.registerAddress(addressInfo);
} catch (Exception e) {
e.printStackTrace();
}
}
return updatedAddressInfo;
}
}
@Override @Override
public QueueBinding updateQueue(SimpleString name, public QueueBinding updateQueue(SimpleString name,
RoutingType routingType, RoutingType routingType,

View File

@ -231,7 +231,7 @@ public class SimpleAddressManager implements AddressManager {
@Override @Override
public AddressInfo updateAddressInfo(SimpleString addressName, public AddressInfo updateAddressInfo(SimpleString addressName,
Collection<RoutingType> routingTypes) throws Exception { Collection<RoutingType> routingTypes) {
if (routingTypes == null) { if (routingTypes == null) {
return this.addressInfoMap.get(addressName); return this.addressInfoMap.get(addressName);
} else { } else {
@ -259,21 +259,6 @@ public class SimpleAddressManager implements AddressManager {
} }
} }
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
return this.addressInfoMap.compute(addressInfo.getName(), (name, oldAddressInfo) -> {
if (oldAddressInfo != null) {
final Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
validateRoutingTypes(name, routingTypes);
final Set<RoutingType> updatedRoutingTypes = EnumSet.copyOf(routingTypes);
oldAddressInfo.setRoutingTypes(updatedRoutingTypes);
return oldAddressInfo;
} else {
return addressInfo;
}
});
}
@Override @Override
public AddressInfo removeAddressInfo(SimpleString address) { public AddressInfo removeAddressInfo(SimpleString address) {
return addressInfoMap.remove(address); return addressInfoMap.remove(address);

View File

@ -267,15 +267,6 @@ public interface ActiveMQServer extends ActiveMQComponent {
*/ */
boolean waitForActivation(long timeout, TimeUnit unit) throws InterruptedException; boolean waitForActivation(long timeout, TimeUnit unit) throws InterruptedException;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
boolean autoCreated,
Integer maxConsumers,
Boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
/** /**
* Creates a transient queue. A queue that will exist as long as there are consumers. * Creates a transient queue. A queue that will exist as long as there are consumers.
* The queue will be deleted as soon as all the consumers are removed. * The queue will be deleted as soon as all the consumers are removed.
@ -290,57 +281,32 @@ public interface ActiveMQServer extends ActiveMQComponent {
* @throws NullPointerException if {@code address} is {@code null} * @throws NullPointerException if {@code address} is {@code null}
*/ */
void createSharedQueue(final SimpleString address, final RoutingType routingType, final SimpleString name, final SimpleString filterString, void createSharedQueue(final SimpleString address, final RoutingType routingType, final SimpleString name, final SimpleString filterString,
final SimpleString user, final SimpleString user, boolean durable) throws Exception;
boolean durable) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
boolean durable, boolean durable, boolean temporary) throws Exception;
boolean temporary) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
boolean durable, boolean temporary, int maxConsumers, boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
Boolean deleteOnNoConsumers, boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
boolean autoCreated, int maxConsumers, boolean deleteOnNoConsumers, boolean autoCreateAddress) throws Exception;
@Deprecated @Deprecated
Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception; Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
boolean durable,
boolean temporary,
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
boolean autoCreated) throws Exception;
@Deprecated @Deprecated
Queue deployQueue(String address, String queue, String filter, boolean durable, boolean temporary) throws Exception; Queue deployQueue(String address, String queue, String filter, boolean durable, boolean temporary) throws Exception;
@Deprecated @Deprecated
Queue deployQueue(SimpleString address, SimpleString queue, SimpleString filter, boolean durable, boolean temporary) throws Exception; Queue deployQueue(SimpleString address, SimpleString queue, SimpleString filter, boolean durable, boolean temporary) throws Exception;
Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString resourceName, SimpleString filterString,
boolean durable,
boolean temporary) throws Exception;
Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
boolean durable,
boolean temporary,
boolean autoCreated) throws Exception;
Queue locateQueue(SimpleString queueName); Queue locateQueue(SimpleString queueName);
BindingQueryResult bindingQuery(SimpleString address) throws Exception; BindingQueryResult bindingQuery(SimpleString address) throws Exception;
@ -349,14 +315,6 @@ public interface ActiveMQServer extends ActiveMQComponent {
AddressQueryResult addressQuery(SimpleString name) throws Exception; AddressQueryResult addressQuery(SimpleString name) throws Exception;
Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
boolean durable,
boolean temporary,
boolean autoCreated,
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
void destroyQueue(SimpleString queueName) throws Exception; void destroyQueue(SimpleString queueName) throws Exception;
void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception; void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception;
@ -406,22 +364,6 @@ public interface ActiveMQServer extends ActiveMQComponent {
void stop(boolean failoverOnServerShutdown) throws Exception; void stop(boolean failoverOnServerShutdown) throws Exception;
AddressInfo getAddressInfo(SimpleString address);
Queue createQueue(SimpleString addressName,
SimpleString queueName,
RoutingType routingType,
SimpleString filterString,
SimpleString user,
boolean durable,
boolean temporary,
boolean ignoreIfExists,
boolean transientQueue,
boolean autoCreated,
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
Queue updateQueue(String name, Queue updateQueue(String name,
RoutingType routingType, RoutingType routingType,
Integer maxConsumers, Integer maxConsumers,
@ -460,13 +402,45 @@ public interface ActiveMQServer extends ActiveMQComponent {
void removeClientConnection(String clientId); void removeClientConnection(String clientId);
AddressInfo updateAddressInfo(String name, Collection<RoutingType> routingTypes) throws Exception; AddressInfo getAddressInfo(SimpleString address);
boolean createAddressInfo(AddressInfo addressInfo) throws Exception; /**
* Updates an {@code AddressInfo} on the broker with the specified routing types.
*
* @param address the name of the {@code AddressInfo} to update
* @param routingTypes the routing types to update the {@code AddressInfo} with
* @return {@code true} if the {@code AddressInfo} was updated, {@code false} otherwise
* @throws Exception
*/
boolean updateAddressInfo(SimpleString address, Collection<RoutingType> routingTypes) throws Exception;
AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception; /**
* Add the {@code AddressInfo} to the broker
*
* @param addressInfo the {@code AddressInfo} to add
* @return {@code true} if the {@code AddressInfo} was added, {@code false} otherwise
* @throws Exception
*/
boolean addAddressInfo(AddressInfo addressInfo) throws Exception;
void removeAddressInfo(SimpleString address, SecurityAuth session) throws Exception; /**
* A convenience method to combine the functionality of {@code addAddressInfo} and {@code updateAddressInfo}. It will
* add the {@code AddressInfo} object to the broker if it doesn't exist or update it if it does.
*
* @param addressInfo the {@code AddressInfo} to add or the info used to update the existing {@code AddressInfo}
* @return the resulting {@code AddressInfo}
* @throws Exception
*/
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
/**
* Remove an {@code AddressInfo} from the broker.
*
* @param address the {@code AddressInfo} to remove
* @param auth authorization information; {@code null} is valid
* @throws Exception
*/
void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
String getInternalNamingPrefix(); String getInternalNamingPrefix();
} }

View File

@ -1503,68 +1503,20 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean durable, final boolean durable,
final boolean temporary) throws Exception { final boolean temporary) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString()); AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
return createQueue(address, routingType, queueName, filterString, null, durable, temporary, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses()); return createQueue(address, routingType, queueName, filterString, durable, temporary, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
}
@Override
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary) throws Exception {
return createQueue(address, getAddressSettingsRepository().getMatch(address.toString()).getDefaultQueueRoutingType(), queueName, filterString, durable, temporary);
} }
@Override @Override
public Queue createQueue(final SimpleString address, public Queue createQueue(final SimpleString address,
final RoutingType routingType, final RoutingType routingType,
final SimpleString queueName, final SimpleString queueName,
final SimpleString filterString, final SimpleString filter,
final boolean durable, final boolean durable,
final boolean temporary, final boolean temporary,
final int maxConsumers, final int maxConsumers,
final boolean deleteOnNoConsumers, final boolean deleteOnNoConsumers,
final boolean autoCreateAddress) throws Exception { final boolean autoCreateAddress) throws Exception {
return createQueue(address, queueName, routingType, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); return createQueue(address, routingType, queueName, filter, null, durable, temporary, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override
public Queue createQueue(final SimpleString address,
final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final SimpleString user,
final boolean durable,
final boolean temporary) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
return createQueue(address, routingType, queueName, filterString, user, durable, temporary, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
}
@Override
public Queue createQueue(SimpleString address,
RoutingType routingType,
SimpleString queueName,
SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception {
return createQueue(address, queueName, routingType, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override
public Queue createQueue(final SimpleString address,
final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final SimpleString user,
final boolean durable,
final boolean temporary,
final boolean autoCreated) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
return createQueue(address, routingType, queueName, filterString, user, durable, temporary, autoCreated, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
} }
@Override @Override
@ -1579,7 +1531,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Integer maxConsumers, Integer maxConsumers,
Boolean deleteOnNoConsumers, Boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception { boolean autoCreateAddress) throws Exception {
return createQueue(address, queueName, routingType, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress); return createQueue(address, routingType, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Deprecated
@Override
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary) throws Exception {
return createQueue(address, getAddressSettingsRepository().getMatch(address.toString()).getDefaultQueueRoutingType(), queueName, filterString, durable, temporary);
} }
@Override @Override
@ -1602,7 +1564,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
} }
final Queue queue = createQueue(address, name, routingType, filterString, user, durable, !durable, true, !durable, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, true, !durable, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
if (!queue.getAddress().equals(address)) { if (!queue.getAddress().equals(address)) {
throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name); throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name);
@ -1635,15 +1597,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return (Queue) binding.getBindable(); return (Queue) binding.getBindable();
} }
@Deprecated
@Override @Override
public Queue deployQueue(final SimpleString address, public Queue deployQueue(final SimpleString address,
final SimpleString resourceName, final SimpleString resourceName,
final SimpleString filterString, final SimpleString filterString,
final boolean durable, final boolean durable,
final boolean temporary) throws Exception { final boolean temporary) throws Exception {
return deployQueue(address, getAddressSettingsRepository().getMatch(address.toString()).getDefaultQueueRoutingType(), resourceName, filterString, durable, temporary); return createQueue(address, getAddressSettingsRepository().getMatch(address.toString()).getDefaultQueueRoutingType(), resourceName, filterString, durable, temporary);
} }
@Deprecated
@Override @Override
public Queue deployQueue(final String address, public Queue deployQueue(final String address,
final String resourceName, final String resourceName,
@ -1653,45 +1617,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return deployQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(resourceName), SimpleString.toSimpleString(filterString), durable, temporary); return deployQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(resourceName), SimpleString.toSimpleString(filterString), durable, temporary);
} }
@Override
public Queue deployQueue(final SimpleString address,
final RoutingType routingType,
final SimpleString resourceName,
final SimpleString filterString,
final boolean durable,
final boolean temporary) throws Exception {
return deployQueue(address, routingType, resourceName, filterString, durable, temporary, false);
}
@Override
public Queue deployQueue(final SimpleString address,
final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final boolean autoCreated) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
return deployQueue(address, routingType, queueName, filterString, durable, temporary, autoCreated, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
}
@Override
public Queue deployQueue(final SimpleString address,
final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final boolean autoCreated,
final int maxConsumers,
final boolean deleteOnNoConsumers,
final boolean autoCreateAddress) throws Exception {
ActiveMQServerLogger.LOGGER.deployQueue(queueName);
return createQueue(address, queueName, routingType, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override @Override
public void destroyQueue(final SimpleString queueName) throws Exception { public void destroyQueue(final SimpleString queueName) throws Exception {
// The session is passed as an argument to verify if the user has authorization to delete the queue // The session is passed as an argument to verify if the user has authorization to delete the queue
@ -2296,14 +2221,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployAddressesFromConfiguration() throws Exception { private void deployAddressesFromConfiguration() throws Exception {
for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) { for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) {
AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes()); AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes());
createOrUpdateAddressInfo(info); addOrUpdateAddressInfo(info);
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations()); deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
} }
} }
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception { private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) { for (CoreQueueConfiguration config : queues) {
deployQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers(), true); ActiveMQServerLogger.LOGGER.deployQueue(SimpleString.toSimpleString(config.getName()));
createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), null, config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers(), true);
} }
} }
@ -2407,25 +2334,27 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
@Override @Override
public AddressInfo updateAddressInfo(String address, Collection<RoutingType> routingTypes) throws Exception { public boolean updateAddressInfo(SimpleString address, Collection<RoutingType> routingTypes) throws Exception {
final SimpleString addressName = new SimpleString(address); if (getAddressInfo(address) == null) {
//after the postOffice call, updatedAddressInfo could change further (concurrently)! return false;
final AddressInfo updatedAddressInfo = postOffice.updateAddressInfo(addressName, routingTypes);
if (updatedAddressInfo != null) {
//it change the address info without any lock!
final long txID = storageManager.generateID();
try {
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
storageManager.addAddressBinding(txID, updatedAddressInfo);
} finally {
storageManager.commitBindings(txID);
}
} }
return updatedAddressInfo;
//after the postOffice call, updatedAddressInfo could change further (concurrently)!
final AddressInfo updatedAddressInfo = postOffice.updateAddressInfo(address, routingTypes);
//it change the address info without any lock!
final long txID = storageManager.generateID();
try {
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
storageManager.addAddressBinding(txID, updatedAddressInfo);
} finally {
storageManager.commitBindings(txID);
}
return true;
} }
@Override @Override
public boolean createAddressInfo(AddressInfo addressInfo) throws Exception { public boolean addAddressInfo(AddressInfo addressInfo) throws Exception {
boolean result = postOffice.addAddressInfo(addressInfo); boolean result = postOffice.addAddressInfo(addressInfo);
if (result) { if (result) {
@ -2433,34 +2362,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
storageManager.addAddressBinding(txID, addressInfo); storageManager.addAddressBinding(txID, addressInfo);
storageManager.commitBindings(txID); storageManager.commitBindings(txID);
} else { } else {
throw ActiveMQMessageBundle.BUNDLE.addressAlreadyExists(addressInfo.getName()); result = false;
} }
return result; return result;
} }
@Override @Override
public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception { public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception {
final AddressInfo updatedAddressInfo = postOffice.addOrUpdateAddressInfo(addressInfo); if (!addAddressInfo(addressInfo)) {
final boolean isNew = updatedAddressInfo == addressInfo; updateAddressInfo(addressInfo.getName(), addressInfo.getRoutingTypes());
final long txID = storageManager.generateID();
if (isNew) {
storageManager.addAddressBinding(txID, addressInfo);
storageManager.commitBindings(txID);
} else {
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
storageManager.addAddressBinding(txID, updatedAddressInfo);
storageManager.commitBindings(txID);
} }
return updatedAddressInfo; return getAddressInfo(addressInfo.getName());
} }
@Override @Override
public void removeAddressInfo(final SimpleString address, final SecurityAuth session) throws Exception { public void removeAddressInfo(final SimpleString address, final SecurityAuth auth) throws Exception {
if (session != null) { if (auth != null) {
securityStore.check(address, CheckType.DELETE_ADDRESS, session); securityStore.check(address, CheckType.DELETE_ADDRESS, auth);
} }
AddressInfo addressInfo = getAddressInfo(address); AddressInfo addressInfo = getAddressInfo(address);
@ -2484,9 +2404,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
@Override @Override
public Queue createQueue(final SimpleString addressName, public Queue createQueue(final SimpleString address,
final SimpleString queueName,
final RoutingType routingType, final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString, final SimpleString filterString,
final SimpleString user, final SimpleString user,
final boolean durable, final boolean durable,
@ -2513,28 +2433,28 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final long queueID = storageManager.generateID(); final long queueID = storageManager.generateID();
final QueueConfig.Builder queueConfigBuilder; final QueueConfig.Builder queueConfigBuilder;
if (addressName == null) { if (address == null) {
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName); queueConfigBuilder = QueueConfig.builderWith(queueID, queueName);
} else { } else {
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressName); queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, address);
} }
AddressInfo info = postOffice.getAddressInfo(addressName); AddressInfo info = postOffice.getAddressInfo(address);
if (autoCreateAddress) { if (autoCreateAddress) {
RoutingType rt = (routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType); RoutingType rt = (routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
if (info == null) { if (info == null) {
final AddressInfo addressInfo = new AddressInfo(addressName, rt); final AddressInfo addressInfo = new AddressInfo(address, rt);
addressInfo.setAutoCreated(true); addressInfo.setAutoCreated(true);
createAddressInfo(addressInfo); addAddressInfo(addressInfo);
} else if (!info.getRoutingTypes().contains(routingType)) { } else if (!info.getRoutingTypes().contains(routingType)) {
Set<RoutingType> routingTypes = new HashSet<>(); Set<RoutingType> routingTypes = new HashSet<>();
routingTypes.addAll(info.getRoutingTypes()); routingTypes.addAll(info.getRoutingTypes());
routingTypes.add(routingType); routingTypes.add(routingType);
updateAddressInfo(info.getName().toString(), routingTypes); updateAddressInfo(info.getName(), routingTypes);
} }
} else if (info == null) { } else if (info == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName); throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
} else if (!info.getRoutingTypes().contains(routingType)) { } else if (!info.getRoutingTypes().contains(routingType)) {
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, info.getName().toString(), info.getRoutingTypes()); throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, info.getName().toString(), info.getRoutingTypes());
} }

View File

@ -30,8 +30,6 @@ public class AddressInfo {
private boolean autoCreated = false; private boolean autoCreated = false;
private boolean deletable = false;
private Set<RoutingType> routingTypes; private Set<RoutingType> routingTypes;
public AddressInfo(SimpleString name) { public AddressInfo(SimpleString name) {

View File

@ -591,7 +591,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean autoCreated) throws Exception { final boolean autoCreated) throws Exception {
Pair<SimpleString, Set<RoutingType>> art = getAddressAndRoutingTypes(address, routingTypes); Pair<SimpleString, Set<RoutingType>> art = getAddressAndRoutingTypes(address, routingTypes);
securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this); securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
server.createOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated)); server.addOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
return server.getAddressInfo(art.getA()); return server.getAddressInfo(art.getA());
} }
@ -601,7 +601,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean autoCreated) throws Exception { final boolean autoCreated) throws Exception {
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType); Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType);
securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this); securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
server.createOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated)); server.addOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
return server.getAddressInfo(art.getA()); return server.getAddressInfo(art.getA());
} }

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry; import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -251,7 +252,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
if (coreQ == null) { if (coreQ == null) {
coreQ = new SimpleString(qname); coreQ = new SimpleString(qname);
try { try {
this.server.createQueue(coreQ, coreQ, null, false, false); this.server.createQueue(coreQ, RoutingType.MULTICAST, coreQ, null, false, false);
testQueues.put(qname, coreQ); testQueues.put(qname, coreQ);
} catch (ActiveMQQueueExistsException e) { } catch (ActiveMQQueueExistsException e) {
//ignore //ignore

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl; import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -131,7 +132,7 @@ public class PagingLeakTest extends ActiveMQTestBase {
final int maxConsumed; final int maxConsumed;
Consumer(int sleepTime, String suffix, int maxConsumed) throws Exception { Consumer(int sleepTime, String suffix, int maxConsumed) throws Exception {
server.createQueue(address, address.concat(suffix), null, true, false); server.createQueue(address, RoutingType.MULTICAST, address.concat(suffix), null, true, false);
this.sleepTime = sleepTime; this.sleepTime = sleepTime;
locator = createInVMLocator(0); locator = createInVMLocator(0);

View File

@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMRule;
@ -55,7 +56,7 @@ public class TimeoutXATest extends ActiveMQTestBase {
server.getConfiguration().setTransactionTimeoutScanPeriod(1100); server.getConfiguration().setTransactionTimeoutScanPeriod(1100);
server.getConfiguration().setJournalSyncNonTransactional(false); server.getConfiguration().setJournalSyncNonTransactional(false);
server.start(); server.start();
server.createQueue(SimpleString.toSimpleString("Queue1"), SimpleString.toSimpleString("Queue1"), null, true, false); server.createQueue(SimpleString.toSimpleString("Queue1"), RoutingType.ANYCAST, SimpleString.toSimpleString("Queue1"), null, true, false);
removingTXEntered0 = new CountDownLatch(1); removingTXEntered0 = new CountDownLatch(1);
removingTXAwait0 = new CountDownLatch(1); removingTXAwait0 = new CountDownLatch(1);

View File

@ -44,7 +44,7 @@ public class AddressConfigTest extends ActiveMQTestBase {
@Test @Test
public void persistAddressConfigTest() throws Exception { public void persistAddressConfigTest() throws Exception {
server.createQueue(SimpleString.toSimpleString("myAddress"), SimpleString.toSimpleString("myQueue"), null, true, false); server.createQueue(SimpleString.toSimpleString("myAddress"), RoutingType.MULTICAST, SimpleString.toSimpleString("myQueue"), null, true, false);
server.stop(); server.stop();
server.start(); server.start();
AddressInfo addressInfo = server.getAddressInfo(SimpleString.toSimpleString("myAddress")); AddressInfo addressInfo = server.getAddressInfo(SimpleString.toSimpleString("myAddress"));

View File

@ -73,9 +73,9 @@ public class AddressingTest extends ActiveMQTestBase {
AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress)); AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress));
addressInfo.addRoutingType(RoutingType.MULTICAST); addressInfo.addRoutingType(RoutingType.MULTICAST);
server.createOrUpdateAddressInfo(addressInfo); server.addOrUpdateAddressInfo(addressInfo);
Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false); Queue q1 = server.createQueue(new SimpleString(consumeAddress), RoutingType.MULTICAST, new SimpleString(consumeAddress + ".1"), null, true, false);
Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false); Queue q2 = server.createQueue(new SimpleString(consumeAddress), RoutingType.MULTICAST, new SimpleString(consumeAddress + ".2"), null, true, false);
ClientSession session = sessionFactory.createSession(); ClientSession session = sessionFactory.createSession();
session.start(); session.start();
@ -111,7 +111,7 @@ public class AddressingTest extends ActiveMQTestBase {
AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress)); AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress));
addressInfo.addRoutingType(RoutingType.ANYCAST); addressInfo.addRoutingType(RoutingType.ANYCAST);
server.createOrUpdateAddressInfo(addressInfo); server.addOrUpdateAddressInfo(addressInfo);
Queue q1 = server.createQueue(new SimpleString(consumeAddress), RoutingType.ANYCAST, new SimpleString(consumeAddress + ".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); Queue q1 = server.createQueue(new SimpleString(consumeAddress), RoutingType.ANYCAST, new SimpleString(consumeAddress + ".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
Queue q2 = server.createQueue(new SimpleString(consumeAddress), RoutingType.ANYCAST, new SimpleString(consumeAddress + ".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); Queue q2 = server.createQueue(new SimpleString(consumeAddress), RoutingType.ANYCAST, new SimpleString(consumeAddress + ".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
@ -145,7 +145,7 @@ public class AddressingTest extends ActiveMQTestBase {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.addRoutingType(RoutingType.ANYCAST); addressInfo.addRoutingType(RoutingType.ANYCAST);
server.createOrUpdateAddressInfo(addressInfo); server.addOrUpdateAddressInfo(addressInfo);
Queue q1 = server.createQueue(address, RoutingType.ANYCAST, address.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); Queue q1 = server.createQueue(address, RoutingType.ANYCAST, address.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
Queue q2 = server.createQueue(address, RoutingType.ANYCAST, address.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); Queue q2 = server.createQueue(address, RoutingType.ANYCAST, address.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
Queue q3 = server.createQueue(address, RoutingType.ANYCAST, address.concat(".3"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); Queue q3 = server.createQueue(address, RoutingType.ANYCAST, address.concat(".3"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
@ -199,8 +199,8 @@ public class AddressingTest extends ActiveMQTestBase {
for (String consumeAddress : testAddresses) { for (String consumeAddress : testAddresses) {
// For each address, create 2 Queues with the same address, assert both queues receive message // For each address, create 2 Queues with the same address, assert both queues receive message
Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false); Queue q1 = server.createQueue(new SimpleString(consumeAddress), RoutingType.MULTICAST, new SimpleString(consumeAddress + ".1"), null, true, false);
Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false); Queue q2 = server.createQueue(new SimpleString(consumeAddress), RoutingType.MULTICAST, new SimpleString(consumeAddress + ".2"), null, true, false);
ClientSession session = sessionFactory.createSession(); ClientSession session = sessionFactory.createSession();
session.start(); session.start();

View File

@ -60,7 +60,7 @@ public class AnycastTest extends ActiveMQTestBase {
addressInfo = new AddressInfo(baseAddress); addressInfo = new AddressInfo(baseAddress);
addressInfo.addRoutingType(RoutingType.ANYCAST); addressInfo.addRoutingType(RoutingType.ANYCAST);
server.createOrUpdateAddressInfo(addressInfo); server.addOrUpdateAddressInfo(addressInfo);
} }
@Test @Test

View File

@ -60,7 +60,7 @@ public class MulticastTest extends ActiveMQTestBase {
addressInfo = new AddressInfo(baseAddress); addressInfo = new AddressInfo(baseAddress);
addressInfo.addRoutingType(RoutingType.MULTICAST); addressInfo.addRoutingType(RoutingType.MULTICAST);
server.createOrUpdateAddressInfo(addressInfo); server.addOrUpdateAddressInfo(addressInfo);
} }
@Test @Test

View File

@ -56,7 +56,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST));
server.createQueue(new SimpleString(getTopicName()), RoutingType.MULTICAST, new SimpleString(getTopicName()), null, true, false); server.createQueue(new SimpleString(getTopicName()), RoutingType.MULTICAST, new SimpleString(getTopicName()), null, true, false);
} }

View File

@ -154,7 +154,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception { public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception {
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTestName()), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTestName()), RoutingType.ANYCAST));
server.createQueue(new SimpleString(getTestName()), RoutingType.ANYCAST, new SimpleString(getTestName()), null, true, false); server.createQueue(new SimpleString(getTestName()), RoutingType.ANYCAST, new SimpleString(getTestName()), null, true, false);
AmqpClient client = createAmqpClient(user1, password1); AmqpClient client = createAmqpClient(user1, password1);

View File

@ -43,7 +43,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameName() throws Exception { public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
sendMessages(1, address.toString()); sendMessages(1, address.toString());
@ -64,7 +64,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameNameMultipleQueues() throws Exception { public void testConsumeFromSingleQueueOnAddressSameNameMultipleQueues() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
@ -86,7 +86,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressDifferentName() throws Exception { public void testConsumeFromSingleQueueOnAddressDifferentName() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
sendMessages(1, address.toString()); sendMessages(1, address.toString());
@ -107,7 +107,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressDifferentNameMultipleQueues() throws Exception { public void testConsumeFromSingleQueueOnAddressDifferentNameMultipleQueues() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
server.createQueue(address, RoutingType.ANYCAST, queue2, null, true, false); server.createQueue(address, RoutingType.ANYCAST, queue2, null, true, false);
@ -129,7 +129,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testConsumeFromSingleQualifiedQueueOnAddressSameName() throws Exception { public void testConsumeFromSingleQualifiedQueueOnAddressSameName() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
sendMessages(1, address.toString()); sendMessages(1, address.toString());
@ -150,7 +150,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testConsumeWhenOnlyMulticast() throws Exception { public void testConsumeWhenOnlyMulticast() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST)); server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
sendMessages(1, address.toString()); sendMessages(1, address.toString());
@ -208,7 +208,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.ANYCAST); addressInfo.getRoutingTypes().add(RoutingType.ANYCAST);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, address, null, true, false); server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();

View File

@ -42,7 +42,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameName() throws Exception { public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST)); server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
server.createQueue(address, RoutingType.MULTICAST, address, null, true, false); server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
sendMessages(1, address.toString()); sendMessages(1, address.toString());
@ -63,7 +63,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testConsumeWhenOnlyAnycast() throws Exception { public void testConsumeWhenOnlyAnycast() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
sendMessages(1, address.toString()); sendMessages(1, address.toString());
@ -87,7 +87,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
addressInfo.getRoutingTypes().add(RoutingType.ANYCAST); addressInfo.getRoutingTypes().add(RoutingType.ANYCAST);
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, address, null, true, false); server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();

View File

@ -41,7 +41,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedVolatileAddress() throws Exception { public void test2ConsumersOnSharedVolatileAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId")); AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -69,7 +69,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedVolatileAddressBrokerDefined() throws Exception { public void test2ConsumersOnSharedVolatileAddressBrokerDefined() throws Exception {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false); server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
@ -98,7 +98,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedVolatileAddressNoReceiverClose() throws Exception { public void test2ConsumersOnSharedVolatileAddressNoReceiverClose() throws Exception {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId")); AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -124,7 +124,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedVolatileAddressGlobal() throws Exception { public void test2ConsumersOnSharedVolatileAddressGlobal() throws Exception {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect(false)); AmqpConnection connection = addConnection(client.connect(false));
@ -152,7 +152,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedDurableAddress() throws Exception { public void test2ConsumersOnSharedDurableAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId")); AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -180,7 +180,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedDurableAddressReconnect() throws Exception { public void test2ConsumersOnSharedDurableAddressReconnect() throws Exception {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId")); AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -218,7 +218,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedDurableAddressReconnectwithNull() throws Exception { public void test2ConsumersOnSharedDurableAddressReconnectwithNull() throws Exception {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId")); AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -256,7 +256,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedDurableAddressGlobal() throws Exception { public void test2ConsumersOnSharedDurableAddressGlobal() throws Exception {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect(false)); AmqpConnection connection = addConnection(client.connect(false));
@ -284,7 +284,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnNonSharedDurableAddress() throws Exception { public void test2ConsumersOnNonSharedDurableAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId")); AmqpConnection connection = addConnection(client.connect("myClientId"));

View File

@ -57,10 +57,10 @@ public class ProtonPubSubTest extends ProtonTestBase {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
server.createAddressInfo(new AddressInfo(ssPubAddress, RoutingType.MULTICAST)); server.addAddressInfo(new AddressInfo(ssPubAddress, RoutingType.MULTICAST));
server.createAddressInfo(new AddressInfo(ssprefixedPubAddress, RoutingType.MULTICAST)); server.addAddressInfo(new AddressInfo(ssprefixedPubAddress, RoutingType.MULTICAST));
server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true); server.createQueue(ssPubAddress, RoutingType.MULTICAST, ssPubAddress, new SimpleString("foo=bar"), false, true);
server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true); server.createQueue(ssprefixedPubAddress, RoutingType.MULTICAST, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
factory = new JmsConnectionFactory("amqp://localhost:5672"); factory = new JmsConnectionFactory("amqp://localhost:5672");
factory.setClientID("myClientID"); factory.setClientID("myClientID");
connection = factory.createConnection(); connection = factory.createConnection();

View File

@ -161,18 +161,17 @@ public class ProtonTest extends ProtonTestBase {
settings.put("#", addressSetting); settings.put("#", addressSetting);
} }
addressSetting.setAutoCreateQueues(false); addressSetting.setAutoCreateQueues(false);
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "1"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "1"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "2"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "2"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "3"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "3"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "4"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "4"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "5"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "5"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "6"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "6"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "7"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "7"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "8"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "8"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "9"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "9"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "10"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "10"), RoutingType.ANYCAST));
server.createQueue(new SimpleString(coreAddress), RoutingType.ANYCAST, new SimpleString(coreAddress), null, true, false); server.createQueue(new SimpleString(coreAddress), RoutingType.ANYCAST, new SimpleString(coreAddress), null, true, false);
server.createQueue(new SimpleString(coreAddress + "1"), RoutingType.ANYCAST, new SimpleString(coreAddress + "1"), null, true, false); server.createQueue(new SimpleString(coreAddress + "1"), RoutingType.ANYCAST, new SimpleString(coreAddress + "1"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "2"), RoutingType.ANYCAST, new SimpleString(coreAddress + "2"), null, true, false); server.createQueue(new SimpleString(coreAddress + "2"), RoutingType.ANYCAST, new SimpleString(coreAddress + "2"), null, true, false);
@ -184,7 +183,7 @@ public class ProtonTest extends ProtonTestBase {
server.createQueue(new SimpleString(coreAddress + "8"), RoutingType.ANYCAST, new SimpleString(coreAddress + "8"), null, true, false); server.createQueue(new SimpleString(coreAddress + "8"), RoutingType.ANYCAST, new SimpleString(coreAddress + "8"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "9"), RoutingType.ANYCAST, new SimpleString(coreAddress + "9"), null, true, false); server.createQueue(new SimpleString(coreAddress + "9"), RoutingType.ANYCAST, new SimpleString(coreAddress + "9"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "10"), RoutingType.ANYCAST, new SimpleString(coreAddress + "10"), null, true, false); server.createQueue(new SimpleString(coreAddress + "10"), RoutingType.ANYCAST, new SimpleString(coreAddress + "10"), null, true, false);
server.createAddressInfo(new AddressInfo(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST)); server.addAddressInfo(new AddressInfo(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST));
server.createQueue(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST, new SimpleString("amqp_testtopic"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST, new SimpleString("amqp_testtopic"), null, true, false);
/* server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false); /* server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "2"), new SimpleString("amqp_testtopic" + "2"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "2"), new SimpleString("amqp_testtopic" + "2"), null, true, false);
@ -870,7 +869,7 @@ public class ProtonTest extends ProtonTestBase {
@Test @Test
public void testClientIdIsSetInSubscriptionList() throws Exception { public void testClientIdIsSetInSubscriptionList() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST));
AmqpConnection amqpConnection = client.createConnection(); AmqpConnection amqpConnection = client.createConnection();
amqpConnection.setContainerId("testClient"); amqpConnection.setContainerId("testClient");
amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic"))); amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic")));
@ -899,7 +898,7 @@ public class ProtonTest extends ProtonTestBase {
String queueName = "TestQueueName"; String queueName = "TestQueueName";
String address = "TestAddress"; String address = "TestAddress";
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST));
server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false); server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);

View File

@ -106,7 +106,7 @@ public class AddressCommandTest extends JMSTestBase {
final String addressName = "address"; final String addressName = "address";
final SimpleString addressSimpleString = new SimpleString(addressName); final SimpleString addressSimpleString = new SimpleString(addressName);
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST)); final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST));
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
server.createQueue(addressSimpleString, RoutingType.MULTICAST, new SimpleString("queue1"), null, true, false); server.createQueue(addressSimpleString, RoutingType.MULTICAST, new SimpleString("queue1"), null, true, false);
final DeleteAddress deleteAddress = new DeleteAddress(); final DeleteAddress deleteAddress = new DeleteAddress();
@ -143,7 +143,7 @@ public class AddressCommandTest extends JMSTestBase {
// Create bindings // Create bindings
SimpleString address = new SimpleString("address"); SimpleString address = new SimpleString("address");
server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST)); server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
server.createQueue(address, RoutingType.MULTICAST, new SimpleString("queue1"), null, true, false); server.createQueue(address, RoutingType.MULTICAST, new SimpleString("queue1"), null, true, false);
server.createQueue(address, RoutingType.MULTICAST, new SimpleString("queue2"), null, true, false); server.createQueue(address, RoutingType.MULTICAST, new SimpleString("queue2"), null, true, false);
server.createQueue(address, RoutingType.MULTICAST, new SimpleString("queue3"), null, true, false); server.createQueue(address, RoutingType.MULTICAST, new SimpleString("queue3"), null, true, false);
@ -164,7 +164,7 @@ public class AddressCommandTest extends JMSTestBase {
public void testUpdateAddressRoutingTypes() throws Exception { public void testUpdateAddressRoutingTypes() throws Exception {
final String addressName = "address"; final String addressName = "address";
final SimpleString address = new SimpleString(addressName); final SimpleString address = new SimpleString(addressName);
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
final UpdateAddress updateAddress = new UpdateAddress(); final UpdateAddress updateAddress = new UpdateAddress();
updateAddress.setName(addressName); updateAddress.setName(addressName);
@ -192,7 +192,7 @@ public class AddressCommandTest extends JMSTestBase {
final String addressName = "address"; final String addressName = "address";
final SimpleString addressSimpleString = new SimpleString(addressName); final SimpleString addressSimpleString = new SimpleString(addressName);
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST)); final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST));
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
server.createQueue(addressSimpleString, RoutingType.MULTICAST, new SimpleString("queue1"), null, true, false); server.createQueue(addressSimpleString, RoutingType.MULTICAST, new SimpleString("queue1"), null, true, false);
final UpdateAddress updateAddress = new UpdateAddress(); final UpdateAddress updateAddress = new UpdateAddress();

View File

@ -92,7 +92,7 @@ public class QueueCommandTest extends JMSTestBase {
command.setAnycast(false); command.setAnycast(false);
command.setAddress(address); command.setAddress(address);
server.createOrUpdateAddressInfo(new AddressInfo(new SimpleString(address), RoutingType.MULTICAST)); server.addOrUpdateAddressInfo(new AddressInfo(new SimpleString(address), RoutingType.MULTICAST));
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(command); checkExecutionPassed(command);
@ -247,7 +247,7 @@ public class QueueCommandTest extends JMSTestBase {
final RoutingType oldRoutingType = RoutingType.MULTICAST; final RoutingType oldRoutingType = RoutingType.MULTICAST;
final boolean oldDeleteOnNoConsumers = false; final boolean oldDeleteOnNoConsumers = false;
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST)); final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST));
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false); server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false);
final int newMaxConsumers = 1; final int newMaxConsumers = 1;
@ -280,7 +280,7 @@ public class QueueCommandTest extends JMSTestBase {
final boolean oldDeleteOnNoConsumers = false; final boolean oldDeleteOnNoConsumers = false;
final Set<RoutingType> supportedRoutingTypes = EnumSet.of(oldRoutingType); final Set<RoutingType> supportedRoutingTypes = EnumSet.of(oldRoutingType);
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.copyOf(supportedRoutingTypes)); final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.copyOf(supportedRoutingTypes));
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false); server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false);
final RoutingType newRoutingType = RoutingType.ANYCAST; final RoutingType newRoutingType = RoutingType.ANYCAST;
@ -309,7 +309,7 @@ public class QueueCommandTest extends JMSTestBase {
final RoutingType oldRoutingType = RoutingType.MULTICAST; final RoutingType oldRoutingType = RoutingType.MULTICAST;
final boolean oldDeleteOnNoConsumers = false; final boolean oldDeleteOnNoConsumers = false;
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, oldRoutingType); final AddressInfo addressInfo = new AddressInfo(addressSimpleString, oldRoutingType);
server.createAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false); server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false);
server.locateQueue(queueNameString).addConsumer(new DummyServerConsumer()); server.locateQueue(queueNameString).addConsumer(new DummyServerConsumer());

View File

@ -49,7 +49,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
@Test @Test
public void testAutoDeleteAutoCreatedAddress() throws Exception { public void testAutoDeleteAutoCreatedAddress() throws Exception {
// auto-delete-addresses defaults to true // auto-delete-addresses defaults to true
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, true); server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.getAddressInfo(addressA)); assertNotNull(server.getAddressInfo(addressA));
cf.createSession().createConsumer(queueA).close(); cf.createSession().createConsumer(queueA).close();
assertNull(server.getAddressInfo(addressA)); assertNull(server.getAddressInfo(addressA));
@ -58,7 +58,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
@Test @Test
public void testNegativeAutoDeleteAutoCreatedAddress() throws Exception { public void testNegativeAutoDeleteAutoCreatedAddress() throws Exception {
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteAddresses(false)); server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteAddresses(false));
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, true); server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.getAddressInfo(addressA)); assertNotNull(server.getAddressInfo(addressA));
cf.createSession().createConsumer(queueA).close(); cf.createSession().createConsumer(queueA).close();
assertNotNull(server.getAddressInfo(addressA)); assertNotNull(server.getAddressInfo(addressA));

View File

@ -49,7 +49,7 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
@Test @Test
public void testAutoDeleteAutoCreatedQueue() throws Exception { public void testAutoDeleteAutoCreatedQueue() throws Exception {
// auto-delete-queues defaults to true // auto-delete-queues defaults to true
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, true); server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.locateQueue(queueA)); assertNotNull(server.locateQueue(queueA));
cf.createSession().createConsumer(queueA).close(); cf.createSession().createConsumer(queueA).close();
assertNull(server.locateQueue(queueA)); assertNull(server.locateQueue(queueA));
@ -58,7 +58,7 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
@Test @Test
public void testNegativeAutoDeleteAutoCreatedQueue() throws Exception { public void testNegativeAutoDeleteAutoCreatedQueue() throws Exception {
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteQueues(false)); server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteQueues(false));
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, true); server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.locateQueue(queueA)); assertNotNull(server.locateQueue(queueA));
cf.createSession().createConsumer(queueA).close(); cf.createSession().createConsumer(queueA).close();
assertNotNull(server.locateQueue(queueA)); assertNotNull(server.locateQueue(queueA));

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before; import org.junit.Before;
@ -73,7 +74,7 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase {
ClientProducer producer = session.createProducer(ADDRESS); ClientProducer producer = session.createProducer(ADDRESS);
// just to make it page forever // just to make it page forever
Queue serverQueue = server.createQueue(ADDRESS, SimpleString.toSimpleString("everPage"), null, true, false); Queue serverQueue = server.createQueue(ADDRESS, RoutingType.ANYCAST, SimpleString.toSimpleString("everPage"), null, true, false);
serverQueue.getPageSubscription().getPagingStore().startPaging(); serverQueue.getPageSubscription().getPagingStore().startPaging();
Consumer[] consumers = new Consumer[10]; Consumer[] consumers = new Consumer[10];

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
@ -73,11 +74,11 @@ public class ExpiryLargeMessageTest extends ActiveMQTestBase {
server.start(); server.start();
server.createQueue(EXPIRY, EXPIRY, null, true, false); server.createQueue(EXPIRY, RoutingType.ANYCAST, EXPIRY, null, true, false);
server.createQueue(DLQ, DLQ, null, true, false); server.createQueue(DLQ, RoutingType.ANYCAST, DLQ, null, true, false);
server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false); server.createQueue(MY_QUEUE, RoutingType.ANYCAST, MY_QUEUE, null, true, false);
ServerLocator locator = createInVMNonHALocator(); ServerLocator locator = createInVMNonHALocator();
@ -255,11 +256,11 @@ public class ExpiryLargeMessageTest extends ActiveMQTestBase {
server.start(); server.start();
server.createQueue(EXPIRY, EXPIRY, null, true, false); server.createQueue(EXPIRY, RoutingType.ANYCAST, EXPIRY, null, true, false);
server.createQueue(DLQ, DLQ, null, true, false); server.createQueue(DLQ, RoutingType.ANYCAST, DLQ, null, true, false);
server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false); server.createQueue(MY_QUEUE, RoutingType.ANYCAST, MY_QUEUE, null, true, false);
ServerLocator locator = createInVMNonHALocator(); ServerLocator locator = createInVMNonHALocator();

View File

@ -109,7 +109,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
@Test @Test
public void testHangOnDelivery() throws Exception { public void testHangOnDelivery() throws Exception {
queue = server.createQueue(QUEUE, QUEUE, null, true, false); queue = server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
try { try {
ClientSessionFactory factory = locator.createSessionFactory(); ClientSessionFactory factory = locator.createSessionFactory();
@ -288,7 +288,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
((ActiveMQServerImpl) server).replaceQueueFactory(queueFactory); ((ActiveMQServerImpl) server).replaceQueueFactory(queueFactory);
queue = server.createQueue(QUEUE, QUEUE, null, true, false); queue = server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
blocked.acquire(); blocked.acquire();
@ -316,7 +316,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
Assert.assertTrue(latchDelete.await(10, TimeUnit.SECONDS)); Assert.assertTrue(latchDelete.await(10, TimeUnit.SECONDS));
try { try {
server.createQueue(QUEUE, QUEUE, null, true, false); server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
} catch (Exception expected) { } catch (Exception expected) {
} }
@ -344,7 +344,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
*/ */
@Test @Test
public void testForceDuplicationOnBindings() throws Exception { public void testForceDuplicationOnBindings() throws Exception {
queue = server.createQueue(QUEUE, QUEUE, null, true, false); queue = server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
ClientSessionFactory factory = locator.createSessionFactory(); ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false, false); ClientSession session = factory.createSession(false, false, false);
@ -375,7 +375,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
// An exception during delivery shouldn't make the message disappear // An exception during delivery shouldn't make the message disappear
@Test @Test
public void testExceptionWhileDelivering() throws Exception { public void testExceptionWhileDelivering() throws Exception {
queue = server.createQueue(QUEUE, QUEUE, null, true, false); queue = server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
HangInterceptor hangInt = new HangInterceptor(); HangInterceptor hangInt = new HangInterceptor();
try { try {
@ -429,7 +429,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
try { try {
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
if (server.locateQueue(SimpleString.toSimpleString("tt")) == null) { if (server.locateQueue(SimpleString.toSimpleString("tt")) == null) {
server.createQueue(SimpleString.toSimpleString("tt"), SimpleString.toSimpleString("tt"), SimpleString.toSimpleString(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false); server.createQueue(SimpleString.toSimpleString("tt"), RoutingType.ANYCAST, SimpleString.toSimpleString("tt"), SimpleString.toSimpleString(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false);
} }
server.stop(); server.stop();

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@ -216,7 +217,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
SimpleString jmsAddress = new SimpleString("Test"); SimpleString jmsAddress = new SimpleString("Test");
server.createQueue(jmsAddress, jmsAddress, null, true, false); server.createQueue(jmsAddress, RoutingType.ANYCAST, jmsAddress, null, true, false);
final AtomicInteger unexpectedErrors = new AtomicInteger(0); final AtomicInteger unexpectedErrors = new AtomicInteger(0);
final AtomicInteger expectedErrors = new AtomicInteger(0); final AtomicInteger expectedErrors = new AtomicInteger(0);

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
@ -107,7 +108,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
server.createQueue(QUEUE, QUEUE, null, true, false).getPageSubscription().getPagingStore().startPaging(); server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false).getPageSubscription().getPagingStore().startPaging();
locator = createFactory(isNetty); locator = createFactory(isNetty);
} }

View File

@ -41,7 +41,7 @@ public class PendingDeliveriesTest extends ClientTestBase {
@Before @Before
public void createQueue() throws Exception { public void createQueue() throws Exception {
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST));
server.createQueue(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST, SimpleString.toSimpleString("queue1"), null, true, false); server.createQueue(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST, SimpleString.toSimpleString("queue1"), null, true, false);
} }

View File

@ -1896,7 +1896,7 @@ public class BridgeTest extends ActiveMQTestBase {
ActiveMQServer server = addServer(new ActiveMQServerImpl(config, null, null, null, serviceRegistry)); ActiveMQServer server = addServer(new ActiveMQServerImpl(config, null, null, null, serviceRegistry));
server.start(); server.start();
server.waitForActivation(100, TimeUnit.MILLISECONDS); server.waitForActivation(100, TimeUnit.MILLISECONDS);
server.deployQueue(ADDRESS, QUEUE, null, false, false); server.createQueue(ADDRESS, RoutingType.ANYCAST, QUEUE, null, false, false);
List<String> connectors = new ArrayList<>(); List<String> connectors = new ArrayList<>();
connectors.add("in-vm"); connectors.add("in-vm");
server.deployBridge(new BridgeConfiguration().setName(BRIDGE).setQueueName(QUEUE.toString()).setForwardingAddress(ADDRESS.toString()).setStaticConnectors(connectors)); server.deployBridge(new BridgeConfiguration().setName(BRIDGE).setQueueName(QUEUE.toString()).setForwardingAddress(ADDRESS.toString()).setStaticConnectors(connectors));

View File

@ -547,7 +547,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
boolean defaultDeleteOnNoConsumers) throws Exception { boolean defaultDeleteOnNoConsumers) throws Exception {
AddressInfo addressInfo = new AddressInfo(new SimpleString(address)); AddressInfo addressInfo = new AddressInfo(new SimpleString(address));
addressInfo.addRoutingType(routingType); addressInfo.addRoutingType(routingType);
servers[node].createOrUpdateAddressInfo(addressInfo); servers[node].addOrUpdateAddressInfo(addressInfo);
} }
protected void deleteQueue(final int node, final String queueName) throws Exception { protected void deleteQueue(final int node, final String queueName) throws Exception {

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
@ -74,7 +75,7 @@ public class ExpireWhileLoadBalanceTest extends ClusterTestBase {
for (int i = 0; i <= 2; i++) { for (int i = 0; i <= 2; i++) {
createQueue(i, "queues.testaddress", "queue0", null, true); createQueue(i, "queues.testaddress", "queue0", null, true);
getServer(i).createQueue(expiryQueue, expiryQueue, null, true, false); getServer(i).createQueue(expiryQueue, RoutingType.ANYCAST, expiryQueue, null, true, false);
getServer(i).getAddressSettingsRepository().addMatch("queues.*", as); getServer(i).getAddressSettingsRepository().addMatch("queues.*", as);
} }

View File

@ -33,6 +33,7 @@ import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager; import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -67,7 +68,7 @@ public class AMQPToOpenwireTest extends ActiveMQTestBase {
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ"))); serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(false); serverConfig.setSecurityEnabled(false);
coreQueue = new SimpleString(queueName); coreQueue = new SimpleString(queueName);
server.createQueue(coreQueue, coreQueue, null, false, false); server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false);
qpidfactory = new JmsConnectionFactory("amqp://localhost:61616"); qpidfactory = new JmsConnectionFactory("amqp://localhost:61616");
} }

View File

@ -1310,7 +1310,7 @@ public class DivertTest extends ActiveMQTestBase {
ActiveMQServer server = addServer(new ActiveMQServerImpl(null, null, null, null, serviceRegistry)); ActiveMQServer server = addServer(new ActiveMQServerImpl(null, null, null, null, serviceRegistry));
server.start(); server.start();
server.waitForActivation(100, TimeUnit.MILLISECONDS); server.waitForActivation(100, TimeUnit.MILLISECONDS);
server.deployQueue(ADDRESS, RoutingType.MULTICAST, SimpleString.toSimpleString("myQueue"), null, false, false); server.createQueue(ADDRESS, RoutingType.MULTICAST, SimpleString.toSimpleString("myQueue"), null, false, false);
server.deployDivert(new DivertConfiguration().setName(DIVERT).setAddress(ADDRESS.toString()).setForwardingAddress(ADDRESS.toString())); server.deployDivert(new DivertConfiguration().setName(DIVERT).setAddress(ADDRESS.toString()).setForwardingAddress(ADDRESS.toString()));
Collection<Binding> bindings = server.getPostOffice().getBindingsForAddress(ADDRESS).getBindings(); Collection<Binding> bindings = server.getPostOffice().getBindingsForAddress(ADDRESS).getBindings();
Divert divert = null; Divert divert = null;

View File

@ -1213,7 +1213,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ServerLocator locator = createInVMNonHALocator(); ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory factory = createSessionFactory(locator); ClientSessionFactory factory = createSessionFactory(locator);
ClientSession session = addClientSession(factory.createSession()); ClientSession session = addClientSession(factory.createSession());
server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false); server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
addClientConsumer(session.createConsumer(queueName)); addClientConsumer(session.createConsumer(queueName));
Thread.sleep(100); // We check the timestamp for the creation time. We need to make sure it's different Thread.sleep(100); // We check the timestamp for the creation time. We need to make sure it's different
@ -1337,7 +1337,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
@Test @Test
public void testListSessionsAsJSON() throws Exception { public void testListSessionsAsJSON() throws Exception {
SimpleString queueName = new SimpleString(UUID.randomUUID().toString()); SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false); server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
ActiveMQServerControl serverControl = createManagementControl(); ActiveMQServerControl serverControl = createManagementControl();
@ -1403,9 +1403,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
this.conf.clearConnectorConfigurations().addConnectorConfiguration("server2-connector", new TransportConfiguration(INVM_CONNECTOR_FACTORY, params)); this.conf.clearConnectorConfigurations().addConnectorConfiguration("server2-connector", new TransportConfiguration(INVM_CONNECTOR_FACTORY, params));
server2.start(); server2.start();
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false); server.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false);
server2.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); server2.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server2.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false); server2.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false);
ServerLocator locator = createInVMNonHALocator(); ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(locator); ClientSessionFactory csf = createSessionFactory(locator);

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManage
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.Base64;
@ -1766,8 +1767,8 @@ public class QueueControlTest extends ManagementTestBase {
@Test @Test
public void testMoveMessagesBack() throws Exception { public void testMoveMessagesBack() throws Exception {
server.createQueue(new SimpleString("q1"), new SimpleString("q1"), null, true, false); server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, true, false);
server.createQueue(new SimpleString("q2"), new SimpleString("q2"), null, true, false); server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, true, false);
ServerLocator locator = createInVMNonHALocator(); ServerLocator locator = createInVMNonHALocator();
@ -1830,8 +1831,8 @@ public class QueueControlTest extends ManagementTestBase {
@Test @Test
public void testMoveMessagesBack2() throws Exception { public void testMoveMessagesBack2() throws Exception {
server.createQueue(new SimpleString("q1"), new SimpleString("q1"), null, true, false); server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, true, false);
server.createQueue(new SimpleString("q2"), new SimpleString("q2"), null, true, false); server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, true, false);
ServerLocator locator = createInVMNonHALocator(); ServerLocator locator = createInVMNonHALocator();
@ -2071,7 +2072,7 @@ public class QueueControlTest extends ManagementTestBase {
SimpleString testQueueName = new SimpleString("newQueue"); SimpleString testQueueName = new SimpleString("newQueue");
String testQueueName2 = "newQueue2"; String testQueueName2 = "newQueue2";
this.server.createQueue(testQueueName, testQueueName, null, false, false); this.server.createQueue(testQueueName, RoutingType.ANYCAST, testQueueName, null, false, false);
Notification notif = listener.getNotification(); Notification notif = listener.getNotification();

View File

@ -1679,7 +1679,7 @@ public class MQTTTest extends MQTTTestSupport {
public void testAnycastAddressWorksWithMQTT() throws Exception { public void testAnycastAddressWorksWithMQTT() throws Exception {
String anycastAddress = "foo/bar"; String anycastAddress = "foo/bar";
getServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), RoutingType.ANYCAST)); getServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), RoutingType.ANYCAST));
String clientId = "testMqtt"; String clientId = "testMqtt";
Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)}; Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)};
@ -1717,7 +1717,7 @@ public class MQTTTest extends MQTTTestSupport {
routingTypeSet.add(RoutingType.ANYCAST); routingTypeSet.add(RoutingType.ANYCAST);
routingTypeSet.add(RoutingType.MULTICAST); routingTypeSet.add(RoutingType.MULTICAST);
getServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), routingTypeSet)); getServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), routingTypeSet));
String clientId = "testMqtt"; String clientId = "testMqtt";
Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)}; Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)};

View File

@ -80,7 +80,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(); ClientSession session = sf.createSession();
try { try {
server.createAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false); Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
PageSubscriptionCounter counter = locateCounter(queue); PageSubscriptionCounter counter = locateCounter(queue);
@ -110,7 +110,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(); ClientSession session = sf.createSession();
try { try {
server.createAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false); Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
PageSubscriptionCounter counter = locateCounter(queue); PageSubscriptionCounter counter = locateCounter(queue);
@ -167,7 +167,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
try { try {
server.createAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false); Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
PageSubscriptionCounter counter = locateCounter(queue); PageSubscriptionCounter counter = locateCounter(queue);
@ -221,7 +221,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
@Test @Test
public void testRestartCounter() throws Exception { public void testRestartCounter() throws Exception {
server.createAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false); Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
PageSubscriptionCounter counter = locateCounter(queue); PageSubscriptionCounter counter = locateCounter(queue);
@ -274,7 +274,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
public void testPrepareCounter() throws Exception { public void testPrepareCounter() throws Exception {
Xid xid = newXID(); Xid xid = newXID();
Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false); Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
PageSubscriptionCounter counter = locateCounter(queue); PageSubscriptionCounter counter = locateCounter(queue);

View File

@ -95,7 +95,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, false, false); ClientSession session = sf.createSession(false, false, false);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false); server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS); ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@ -185,7 +185,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, false, false); ClientSession session = sf.createSession(false, false, false);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
Queue q1 = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false); Queue q1 = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
Queue q2 = server.createQueue(ADDRESS, RoutingType.ANYCAST, new SimpleString("inactive"), null, true, false); Queue q2 = server.createQueue(ADDRESS, RoutingType.ANYCAST, new SimpleString("inactive"), null, true, false);
@ -315,7 +315,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, false, false); ClientSession session = sf.createSession(false, false, false);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
Queue q1 = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false); Queue q1 = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
Queue q2 = server.createQueue(ADDRESS, RoutingType.ANYCAST, new SimpleString("inactive"), null, true, false); Queue q2 = server.createQueue(ADDRESS, RoutingType.ANYCAST, new SimpleString("inactive"), null, true, false);
@ -409,7 +409,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, false, false); ClientSession session = sf.createSession(false, false, false);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false); server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS); ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@ -494,7 +494,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, false, false); ClientSession session = sf.createSession(false, false, false);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
QueueImpl queue = (QueueImpl) server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false); QueueImpl queue = (QueueImpl) server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS); ClientProducer producer = session.createProducer(PagingTest.ADDRESS);

View File

@ -59,7 +59,7 @@ public class PagingReceiveTest extends ActiveMQTestBase {
super.setUp(); super.setUp();
server = internalCreateServer(); server = internalCreateServer();
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
Queue queue = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false); Queue queue = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
queue.getPageSubscription().getPagingStore().startPaging(); queue.getPageSubscription().getPagingStore().startPaging();

View File

@ -71,7 +71,7 @@ public class PagingSyncTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, false, false); ClientSession session = sf.createSession(false, false, false);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false); server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS); ClientProducer producer = session.createProducer(PagingTest.ADDRESS);

View File

@ -358,7 +358,7 @@ public class PagingTest extends ActiveMQTestBase {
sf = createSessionFactory(locator); sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true); ClientSession session = sf.createSession(false, true, true);
server.createAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST));
Queue queue = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false); Queue queue = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
queue.getPageSubscription().getPagingStore().startPaging(); queue.getPageSubscription().getPagingStore().startPaging();
@ -3611,7 +3611,7 @@ public class PagingTest extends ActiveMQTestBase {
server.start(); server.start();
try { try {
server.createAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST));
server.createQueue(PagingTest.ADDRESS, RoutingType.ANYCAST, PagingTest.ADDRESS, null, true, false); server.createQueue(PagingTest.ADDRESS, RoutingType.ANYCAST, PagingTest.ADDRESS, null, true, false);
final CountDownLatch pageUp = new CountDownLatch(0); final CountDownLatch pageUp = new CountDownLatch(0);
@ -3651,8 +3651,7 @@ public class PagingTest extends ActiveMQTestBase {
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start(); server.start();
// server.createAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST)); server.createQueue(PagingTest.ADDRESS, RoutingType.ANYCAST, PagingTest.ADDRESS, null, true, false);
server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
final CountDownLatch pageUp = new CountDownLatch(0); final CountDownLatch pageUp = new CountDownLatch(0);
final CountDownLatch pageDone = new CountDownLatch(1); final CountDownLatch pageDone = new CountDownLatch(1);

View File

@ -86,7 +86,7 @@ public class RestDeserializationTest extends RestTestBase {
@Test @Test
public void testWithoutBlackWhiteListTopic() throws Exception { public void testWithoutBlackWhiteListTopic() throws Exception {
jmsServer.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("ordersTopic"), RoutingType.MULTICAST)); jmsServer.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString("ordersTopic"), RoutingType.MULTICAST));
deployAndconfigureRESTService("rest-test.war"); deployAndconfigureRESTService("rest-test.war");

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin; import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -239,7 +240,7 @@ public class LegacyLDAPSecuritySettingPluginListenerTest extends AbstractLdapTes
server.getConfiguration().setSecurityInvalidationInterval(0); server.getConfiguration().setSecurityInvalidationInterval(0);
server.start(); server.start();
String queue = "queue2"; String queue = "queue2";
server.createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, false, false); server.createQueue(SimpleString.toSimpleString(queue), RoutingType.ANYCAST, SimpleString.toSimpleString(queue), null, false, false);
ClientSessionFactory cf = locator.createSessionFactory(); ClientSessionFactory cf = locator.createSessionFactory();
ClientSession session = cf.createSession("first", "secret", false, true, true, false, 0); ClientSession session = cf.createSession("first", "secret", false, true, true, false, 0);
ClientConsumer consumer; ClientConsumer consumer;
@ -281,7 +282,7 @@ public class LegacyLDAPSecuritySettingPluginListenerTest extends AbstractLdapTes
server.getConfiguration().setSecurityInvalidationInterval(0); server.getConfiguration().setSecurityInvalidationInterval(0);
server.start(); server.start();
String queue = "queue2"; String queue = "queue2";
server.createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, false, false); server.createQueue(SimpleString.toSimpleString(queue), RoutingType.ANYCAST, SimpleString.toSimpleString(queue), null, false, false);
ClientSessionFactory cf = locator.createSessionFactory(); ClientSessionFactory cf = locator.createSessionFactory();
ClientSession session = cf.createSession("first", "secret", false, true, true, false, 0); ClientSession session = cf.createSession("first", "secret", false, true, true, false, 0);
ClientProducer producer = session.createProducer(SimpleString.toSimpleString(queue)); ClientProducer producer = session.createProducer(SimpleString.toSimpleString(queue));

View File

@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin; import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -162,7 +163,7 @@ public class LegacyLDAPSecuritySettingPluginTest extends AbstractLdapTestUnit {
final SimpleString QUEUE = new SimpleString("queue2"); final SimpleString QUEUE = new SimpleString("queue2");
server.start(); server.start();
server.createQueue(ADDRESS, QUEUE, null, true, false); server.createQueue(ADDRESS, RoutingType.ANYCAST, QUEUE, null, true, false);
ClientSessionFactory cf = locator.createSessionFactory(); ClientSessionFactory cf = locator.createSessionFactory();
ClientSession session = cf.createSession("second", "secret", false, true, true, false, 0); ClientSession session = cf.createSession("second", "secret", false, true, true, false, 0);

View File

@ -229,7 +229,7 @@ public class SecurityTest extends ActiveMQTestBase {
roles.add(new Role("programmers", false, false, false, false, false, false, false, false, false, false)); roles.add(new Role("programmers", false, false, false, false, false, false, false, false, false, false));
server.getConfiguration().putSecurityRoles("#", roles); server.getConfiguration().putSecurityRoles("#", roles);
server.start(); server.start();
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(ADDRESS, RoutingType.ANYCAST, DURABLE_QUEUE, null, true, false); server.createQueue(ADDRESS, RoutingType.ANYCAST, DURABLE_QUEUE, null, true, false);
server.createQueue(ADDRESS, RoutingType.ANYCAST, NON_DURABLE_QUEUE, null, false, false); server.createQueue(ADDRESS, RoutingType.ANYCAST, NON_DURABLE_QUEUE, null, false, false);
@ -318,7 +318,7 @@ public class SecurityTest extends ActiveMQTestBase {
bRoles.add(new Role(QUEUE_B.toString(), false, true, false, false, false, false, false, false, false, false)); bRoles.add(new Role(QUEUE_B.toString(), false, true, false, false, false, false, false, false, false, false));
server.getConfiguration().putSecurityRoles(ADDRESS.concat(".").concat(QUEUE_B).toString(), bRoles); server.getConfiguration().putSecurityRoles(ADDRESS.concat(".").concat(QUEUE_B).toString(), bRoles);
server.start(); server.start();
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(ADDRESS, RoutingType.ANYCAST, QUEUE_A, null, true, false); server.createQueue(ADDRESS, RoutingType.ANYCAST, QUEUE_A, null, true, false);
server.createQueue(ADDRESS, RoutingType.ANYCAST, QUEUE_B, null, true, false); server.createQueue(ADDRESS, RoutingType.ANYCAST, QUEUE_B, null, true, false);
@ -393,7 +393,7 @@ public class SecurityTest extends ActiveMQTestBase {
ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(tc)); ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(tc));
ClientSessionFactory cf = createSessionFactory(locator); ClientSessionFactory cf = createSessionFactory(locator);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(ADDRESS, RoutingType.ANYCAST, DURABLE_QUEUE, null, true, false); server.createQueue(ADDRESS, RoutingType.ANYCAST, DURABLE_QUEUE, null, true, false);
server.createQueue(ADDRESS, RoutingType.ANYCAST, NON_DURABLE_QUEUE, null, false, false); server.createQueue(ADDRESS, RoutingType.ANYCAST, NON_DURABLE_QUEUE, null, false, false);

View File

@ -20,6 +20,7 @@ package org.apache.activemq.artemis.tests.integration.server;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -32,6 +33,7 @@ public class QueuePeristPauseTest extends ActiveMQTestBase {
server.start(); server.start();
Queue queue = server.createQueue(SimpleString.toSimpleString("q1"), Queue queue = server.createQueue(SimpleString.toSimpleString("q1"),
RoutingType.ANYCAST,
SimpleString.toSimpleString("q1"), SimpleString.toSimpleString("q1"),
null, true, false); null, true, false);

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport; import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
@ -183,7 +184,7 @@ public class CoreClientOverOneWaySSLTest extends ActiveMQTestBase {
@Test @Test
public void testOneWaySSLReloaded() throws Exception { public void testOneWaySSLReloaded() throws Exception {
createCustomSslServer(); createCustomSslServer();
server.createQueue(CoreClientOverOneWaySSLTest.QUEUE, CoreClientOverOneWaySSLTest.QUEUE, null, false, false); server.createQueue(CoreClientOverOneWaySSLTest.QUEUE, RoutingType.ANYCAST, CoreClientOverOneWaySSLTest.QUEUE, null, false, false);
String text = RandomUtil.randomString(); String text = RandomUtil.randomString();
// create a valid SSL connection and keep it for use later // create a valid SSL connection and keep it for use later

View File

@ -714,7 +714,7 @@ public class MessageProducerTest extends JMSTestCase {
supportedRoutingTypes.add(RoutingType.ANYCAST); supportedRoutingTypes.add(RoutingType.ANYCAST);
supportedRoutingTypes.add(RoutingType.MULTICAST); supportedRoutingTypes.add(RoutingType.MULTICAST);
servers.get(0).getActiveMQServer().createAddressInfo(new AddressInfo(addr, supportedRoutingTypes)); servers.get(0).getActiveMQServer().addAddressInfo(new AddressInfo(addr, supportedRoutingTypes));
servers.get(0).getActiveMQServer().createQueue(addr, RoutingType.ANYCAST, addr, null, false, false); servers.get(0).getActiveMQServer().createQueue(addr, RoutingType.ANYCAST, addr, null, false, false);
Connection pconn = createConnection(); Connection pconn = createConnection();

View File

@ -107,6 +107,7 @@
<artifactId>artemis-commons</artifactId> <artifactId>artemis-commons</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
<scope>test</scope> <scope>test</scope>
<type>test-jar</type>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties; import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -78,9 +79,9 @@ public class SendReceiveMultiThreadTest extends ActiveMQTestBase {
server.start(); server.start();
Queue queue = server.createQueue(SimpleString.toSimpleString("performanceQueue"), SimpleString.toSimpleString("performanceQueue"), null, true, false); Queue queue = server.createQueue(SimpleString.toSimpleString("performanceQueue"), RoutingType.ANYCAST, SimpleString.toSimpleString("performanceQueue"), null, true, false);
Queue queue2 = server.createQueue(SimpleString.toSimpleString("stationaryQueue"), SimpleString.toSimpleString("stationaryQueue"), null, true, false); Queue queue2 = server.createQueue(SimpleString.toSimpleString("stationaryQueue"), RoutingType.ANYCAST, SimpleString.toSimpleString("stationaryQueue"), null, true, false);
MyThread[] threads = new MyThread[NUMBER_OF_THREADS]; MyThread[] threads = new MyThread[NUMBER_OF_THREADS];

View File

@ -124,6 +124,7 @@
<artifactId>artemis-commons</artifactId> <artifactId>artemis-commons</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
<scope>test</scope> <scope>test</scope>
<type>test-jar</type>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.unit.UnitTestLogger; import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
@ -101,7 +102,7 @@ public class MultipleConsumersPageStressTest extends ActiveMQTestBase {
server = createServer(true, createDefaultInVMConfig(), 10024, 200024, settings); server = createServer(true, createDefaultInVMConfig(), 10024, 200024, settings);
server.start(); server.start();
pagedServerQueue = (QueueImpl) server.createQueue(ADDRESS, ADDRESS, null, true, false); pagedServerQueue = (QueueImpl) server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
} }

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContex
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
@ -797,7 +798,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
queueList.clear(); queueList.clear();
try { try {
queue = server.createQueue(ADDRESS, ADDRESS, null, true, false); queue = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
queue.pause(); queue.pause();
} catch (Exception ignored) { } catch (Exception ignored) {
} }

View File

@ -93,12 +93,6 @@ public class FakePostOffice implements PostOffice {
return false; return false;
} }
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
return null;
}
@Override @Override
public AddressInfo removeAddressInfo(SimpleString address) { public AddressInfo removeAddressInfo(SimpleString address) {
return null; return null;