This closes #963

This commit is contained in:
Clebert Suconic 2017-01-16 14:40:43 -05:00
commit bc8d673e89
62 changed files with 267 additions and 415 deletions

View File

@ -1068,8 +1068,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
if (queues.get(queueName) != null) {
return false;
} else {
ActiveMQQueue activeMQQueue = ActiveMQDestination.createQueue(queueName);
// Convert from JMS selector to core filter
String coreFilterString = null;
@ -1077,11 +1075,12 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
}
server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQQueue.getName())).addRoutingType(RoutingType.ANYCAST));
server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueName)).addRoutingType(RoutingType.ANYCAST));
server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), RoutingType.ANYCAST, SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false, autoCreated);
AddressSettings as = server.getAddressSettingsRepository().getMatch(queueName);
server.createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(coreFilterString), null, durable, false, true, false, false, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
queues.put(queueName, activeMQQueue);
queues.put(queueName, ActiveMQDestination.createQueue(queueName));
this.recoverregistryBindings(queueName, PersistedType.Queue);
@ -1108,12 +1107,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
return false;
} else {
ActiveMQTopic activeMQTopic = ActiveMQDestination.createTopic(topicName);
// We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
// checks when routing messages to a topic that
// does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
// subscriptions - core has no notion of a topic
// server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false, autoCreated);
server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQTopic.getAddress()), RoutingType.MULTICAST));
server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQTopic.getAddress()), RoutingType.MULTICAST));
topics.put(topicName, activeMQTopic);

View File

@ -31,7 +31,7 @@ public class FindDestinationTest extends MessageTestBase {
@Test
public void testFindQueue() throws Exception {
String testName = "testFindQueue";
server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString(testName), RoutingType.MULTICAST));
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(testName), RoutingType.MULTICAST));
server.getActiveMQServer().createQueue(new SimpleString(testName), RoutingType.MULTICAST, new SimpleString(testName), null, false, false);
ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/" + testName));
@ -62,7 +62,7 @@ public class FindDestinationTest extends MessageTestBase {
@Test
public void testFindTopic() throws Exception {
server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("testTopic"), RoutingType.MULTICAST));
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString("testTopic"), RoutingType.MULTICAST));
server.getActiveMQServer().createQueue(new SimpleString("testTopic"), RoutingType.MULTICAST, new SimpleString("testTopic"), null, false, false);
ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/topics/testTopic"));

View File

@ -67,7 +67,7 @@ public class RawAckTest {
consumerSessionFactory = serverLocator.createSessionFactory();
SimpleString addr = SimpleString.toSimpleString("testQueue");
activeMQServer.createAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST));
activeMQServer.addAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST));
activeMQServer.createQueue(addr, RoutingType.MULTICAST, addr, null, false, false);
session = sessionFactory.createSession(true, true);
producer = session.createProducer(addr);

View File

@ -626,10 +626,10 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
set.add(RoutingType.valueOf(routingType));
}
final AddressInfo addressInfo = new AddressInfo(new SimpleString(name), set);
if (server.createAddressInfo(addressInfo)) {
if (server.addAddressInfo(addressInfo)) {
return AddressInfoTextFormatter.Long.format(addressInfo, new StringBuilder()).toString();
} else {
return "";
throw ActiveMQMessageBundle.BUNDLE.addressAlreadyExists(addressInfo.getName());
}
} finally {
blockOnIO();
@ -652,11 +652,10 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
routingTypeSet.add(RoutingType.valueOf(routingTypeName));
}
}
final AddressInfo updatedAddressInfo = server.updateAddressInfo(name, routingTypeSet);
if (updatedAddressInfo == null) {
if (!server.updateAddressInfo(SimpleString.toSimpleString(name), routingTypeSet)) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(name));
}
return AddressInfoTextFormatter.Long.format(updatedAddressInfo, new StringBuilder()).toString();
return AddressInfoTextFormatter.Long.format(server.getAddressInfo(SimpleString.toSimpleString(name)), new StringBuilder()).toString();
} finally {
blockOnIO();
}
@ -691,7 +690,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
clearIO();
try {
server.deployQueue(SimpleString.toSimpleString(address), server.getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType(), new SimpleString(name), filter, durable, false);
server.createQueue(SimpleString.toSimpleString(address), server.getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType(), new SimpleString(name), filter, durable, false);
} finally {
blockOnIO();
}

View File

@ -64,14 +64,7 @@ public interface AddressManager {
*/
boolean addAddressInfo(AddressInfo addressInfo);
AddressInfo updateAddressInfo(SimpleString addressName,
Collection<RoutingType> routingTypes) throws Exception;
/**
* @param addressInfo
* @return the same provided {@code addressInfo} if the address was added, another if it was updated
*/
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo updateAddressInfo(SimpleString addressName, Collection<RoutingType> routingTypes);
AddressInfo removeAddressInfo(SimpleString address);

View File

@ -51,12 +51,6 @@ public interface PostOffice extends ActiveMQComponent {
*/
boolean addAddressInfo(AddressInfo addressInfo);
/**
* @param addressInfo
* @return the same provided {@code addressInfo} if the address was added, another if it was updated
*/
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo removeAddressInfo(SimpleString address) throws Exception;
AddressInfo getAddressInfo(SimpleString address);

View File

@ -440,23 +440,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
synchronized (addressLock) {
final AddressInfo updatedAddressInfo = addressManager.addOrUpdateAddressInfo(addressInfo);
// only register address if it is newly added
final boolean isNew = updatedAddressInfo == addressInfo;
if (isNew) {
try {
managementService.registerAddress(addressInfo);
} catch (Exception e) {
e.printStackTrace();
}
}
return updatedAddressInfo;
}
}
@Override
public QueueBinding updateQueue(SimpleString name,
RoutingType routingType,

View File

@ -231,7 +231,7 @@ public class SimpleAddressManager implements AddressManager {
@Override
public AddressInfo updateAddressInfo(SimpleString addressName,
Collection<RoutingType> routingTypes) throws Exception {
Collection<RoutingType> routingTypes) {
if (routingTypes == null) {
return this.addressInfoMap.get(addressName);
} else {
@ -259,21 +259,6 @@ public class SimpleAddressManager implements AddressManager {
}
}
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
return this.addressInfoMap.compute(addressInfo.getName(), (name, oldAddressInfo) -> {
if (oldAddressInfo != null) {
final Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
validateRoutingTypes(name, routingTypes);
final Set<RoutingType> updatedRoutingTypes = EnumSet.copyOf(routingTypes);
oldAddressInfo.setRoutingTypes(updatedRoutingTypes);
return oldAddressInfo;
} else {
return addressInfo;
}
});
}
@Override
public AddressInfo removeAddressInfo(SimpleString address) {
return addressInfoMap.remove(address);

View File

@ -267,15 +267,6 @@ public interface ActiveMQServer extends ActiveMQComponent {
*/
boolean waitForActivation(long timeout, TimeUnit unit) throws InterruptedException;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
boolean autoCreated,
Integer maxConsumers,
Boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
/**
* Creates a transient queue. A queue that will exist as long as there are consumers.
* The queue will be deleted as soon as all the consumers are removed.
@ -290,57 +281,32 @@ public interface ActiveMQServer extends ActiveMQComponent {
* @throws NullPointerException if {@code address} is {@code null}
*/
void createSharedQueue(final SimpleString address, final RoutingType routingType, final SimpleString name, final SimpleString filterString,
final SimpleString user,
boolean durable) throws Exception;
final SimpleString user, boolean durable) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
boolean durable,
boolean temporary) throws Exception;
boolean durable, boolean temporary) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
boolean durable, boolean temporary, int maxConsumers, boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
Boolean deleteOnNoConsumers, boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
boolean autoCreated, int maxConsumers, boolean deleteOnNoConsumers, boolean autoCreateAddress) throws Exception;
@Deprecated
Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
boolean durable,
boolean temporary,
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
boolean autoCreated) throws Exception;
@Deprecated
Queue deployQueue(String address, String queue, String filter, boolean durable, boolean temporary) throws Exception;
@Deprecated
Queue deployQueue(SimpleString address, SimpleString queue, SimpleString filter, boolean durable, boolean temporary) throws Exception;
Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString resourceName, SimpleString filterString,
boolean durable,
boolean temporary) throws Exception;
Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
boolean durable,
boolean temporary,
boolean autoCreated) throws Exception;
Queue locateQueue(SimpleString queueName);
BindingQueryResult bindingQuery(SimpleString address) throws Exception;
@ -349,14 +315,6 @@ public interface ActiveMQServer extends ActiveMQComponent {
AddressQueryResult addressQuery(SimpleString name) throws Exception;
Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
boolean durable,
boolean temporary,
boolean autoCreated,
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
void destroyQueue(SimpleString queueName) throws Exception;
void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception;
@ -406,22 +364,6 @@ public interface ActiveMQServer extends ActiveMQComponent {
void stop(boolean failoverOnServerShutdown) throws Exception;
AddressInfo getAddressInfo(SimpleString address);
Queue createQueue(SimpleString addressName,
SimpleString queueName,
RoutingType routingType,
SimpleString filterString,
SimpleString user,
boolean durable,
boolean temporary,
boolean ignoreIfExists,
boolean transientQueue,
boolean autoCreated,
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
Queue updateQueue(String name,
RoutingType routingType,
Integer maxConsumers,
@ -460,13 +402,45 @@ public interface ActiveMQServer extends ActiveMQComponent {
void removeClientConnection(String clientId);
AddressInfo updateAddressInfo(String name, Collection<RoutingType> routingTypes) throws Exception;
AddressInfo getAddressInfo(SimpleString address);
boolean createAddressInfo(AddressInfo addressInfo) throws Exception;
/**
* Updates an {@code AddressInfo} on the broker with the specified routing types.
*
* @param address the name of the {@code AddressInfo} to update
* @param routingTypes the routing types to update the {@code AddressInfo} with
* @return {@code true} if the {@code AddressInfo} was updated, {@code false} otherwise
* @throws Exception
*/
boolean updateAddressInfo(SimpleString address, Collection<RoutingType> routingTypes) throws Exception;
AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
/**
* Add the {@code AddressInfo} to the broker
*
* @param addressInfo the {@code AddressInfo} to add
* @return {@code true} if the {@code AddressInfo} was added, {@code false} otherwise
* @throws Exception
*/
boolean addAddressInfo(AddressInfo addressInfo) throws Exception;
void removeAddressInfo(SimpleString address, SecurityAuth session) throws Exception;
/**
* A convenience method to combine the functionality of {@code addAddressInfo} and {@code updateAddressInfo}. It will
* add the {@code AddressInfo} object to the broker if it doesn't exist or update it if it does.
*
* @param addressInfo the {@code AddressInfo} to add or the info used to update the existing {@code AddressInfo}
* @return the resulting {@code AddressInfo}
* @throws Exception
*/
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
/**
* Remove an {@code AddressInfo} from the broker.
*
* @param address the {@code AddressInfo} to remove
* @param auth authorization information; {@code null} is valid
* @throws Exception
*/
void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
String getInternalNamingPrefix();
}

View File

@ -1503,68 +1503,20 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean durable,
final boolean temporary) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
return createQueue(address, routingType, queueName, filterString, null, durable, temporary, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
}
@Override
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary) throws Exception {
return createQueue(address, getAddressSettingsRepository().getMatch(address.toString()).getDefaultQueueRoutingType(), queueName, filterString, durable, temporary);
return createQueue(address, routingType, queueName, filterString, durable, temporary, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
}
@Override
public Queue createQueue(final SimpleString address,
final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final SimpleString filter,
final boolean durable,
final boolean temporary,
final int maxConsumers,
final boolean deleteOnNoConsumers,
final boolean autoCreateAddress) throws Exception {
return createQueue(address, queueName, routingType, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override
public Queue createQueue(final SimpleString address,
final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final SimpleString user,
final boolean durable,
final boolean temporary) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
return createQueue(address, routingType, queueName, filterString, user, durable, temporary, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
}
@Override
public Queue createQueue(SimpleString address,
RoutingType routingType,
SimpleString queueName,
SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception {
return createQueue(address, queueName, routingType, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override
public Queue createQueue(final SimpleString address,
final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final SimpleString user,
final boolean durable,
final boolean temporary,
final boolean autoCreated) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
return createQueue(address, routingType, queueName, filterString, user, durable, temporary, autoCreated, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
return createQueue(address, routingType, queueName, filter, null, durable, temporary, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override
@ -1579,7 +1531,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Integer maxConsumers,
Boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception {
return createQueue(address, queueName, routingType, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
return createQueue(address, routingType, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Deprecated
@Override
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary) throws Exception {
return createQueue(address, getAddressSettingsRepository().getMatch(address.toString()).getDefaultQueueRoutingType(), queueName, filterString, durable, temporary);
}
@Override
@ -1602,7 +1564,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
final Queue queue = createQueue(address, name, routingType, filterString, user, durable, !durable, true, !durable, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, true, !durable, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
if (!queue.getAddress().equals(address)) {
throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name);
@ -1635,15 +1597,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return (Queue) binding.getBindable();
}
@Deprecated
@Override
public Queue deployQueue(final SimpleString address,
final SimpleString resourceName,
final SimpleString filterString,
final boolean durable,
final boolean temporary) throws Exception {
return deployQueue(address, getAddressSettingsRepository().getMatch(address.toString()).getDefaultQueueRoutingType(), resourceName, filterString, durable, temporary);
return createQueue(address, getAddressSettingsRepository().getMatch(address.toString()).getDefaultQueueRoutingType(), resourceName, filterString, durable, temporary);
}
@Deprecated
@Override
public Queue deployQueue(final String address,
final String resourceName,
@ -1653,45 +1617,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return deployQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(resourceName), SimpleString.toSimpleString(filterString), durable, temporary);
}
@Override
public Queue deployQueue(final SimpleString address,
final RoutingType routingType,
final SimpleString resourceName,
final SimpleString filterString,
final boolean durable,
final boolean temporary) throws Exception {
return deployQueue(address, routingType, resourceName, filterString, durable, temporary, false);
}
@Override
public Queue deployQueue(final SimpleString address,
final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final boolean autoCreated) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
return deployQueue(address, routingType, queueName, filterString, durable, temporary, autoCreated, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
}
@Override
public Queue deployQueue(final SimpleString address,
final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final boolean autoCreated,
final int maxConsumers,
final boolean deleteOnNoConsumers,
final boolean autoCreateAddress) throws Exception {
ActiveMQServerLogger.LOGGER.deployQueue(queueName);
return createQueue(address, queueName, routingType, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override
public void destroyQueue(final SimpleString queueName) throws Exception {
// The session is passed as an argument to verify if the user has authorization to delete the queue
@ -2296,14 +2221,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployAddressesFromConfiguration() throws Exception {
for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) {
AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes());
createOrUpdateAddressInfo(info);
addOrUpdateAddressInfo(info);
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
}
}
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) {
deployQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers(), true);
ActiveMQServerLogger.LOGGER.deployQueue(SimpleString.toSimpleString(config.getName()));
createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), null, config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers(), true);
}
}
@ -2407,25 +2334,27 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public AddressInfo updateAddressInfo(String address, Collection<RoutingType> routingTypes) throws Exception {
final SimpleString addressName = new SimpleString(address);
//after the postOffice call, updatedAddressInfo could change further (concurrently)!
final AddressInfo updatedAddressInfo = postOffice.updateAddressInfo(addressName, routingTypes);
if (updatedAddressInfo != null) {
//it change the address info without any lock!
final long txID = storageManager.generateID();
try {
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
storageManager.addAddressBinding(txID, updatedAddressInfo);
} finally {
storageManager.commitBindings(txID);
}
public boolean updateAddressInfo(SimpleString address, Collection<RoutingType> routingTypes) throws Exception {
if (getAddressInfo(address) == null) {
return false;
}
return updatedAddressInfo;
//after the postOffice call, updatedAddressInfo could change further (concurrently)!
final AddressInfo updatedAddressInfo = postOffice.updateAddressInfo(address, routingTypes);
//it change the address info without any lock!
final long txID = storageManager.generateID();
try {
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
storageManager.addAddressBinding(txID, updatedAddressInfo);
} finally {
storageManager.commitBindings(txID);
}
return true;
}
@Override
public boolean createAddressInfo(AddressInfo addressInfo) throws Exception {
public boolean addAddressInfo(AddressInfo addressInfo) throws Exception {
boolean result = postOffice.addAddressInfo(addressInfo);
if (result) {
@ -2433,34 +2362,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
storageManager.addAddressBinding(txID, addressInfo);
storageManager.commitBindings(txID);
} else {
throw ActiveMQMessageBundle.BUNDLE.addressAlreadyExists(addressInfo.getName());
result = false;
}
return result;
}
@Override
public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception {
final AddressInfo updatedAddressInfo = postOffice.addOrUpdateAddressInfo(addressInfo);
final boolean isNew = updatedAddressInfo == addressInfo;
final long txID = storageManager.generateID();
if (isNew) {
storageManager.addAddressBinding(txID, addressInfo);
storageManager.commitBindings(txID);
} else {
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
storageManager.addAddressBinding(txID, updatedAddressInfo);
storageManager.commitBindings(txID);
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception {
if (!addAddressInfo(addressInfo)) {
updateAddressInfo(addressInfo.getName(), addressInfo.getRoutingTypes());
}
return updatedAddressInfo;
return getAddressInfo(addressInfo.getName());
}
@Override
public void removeAddressInfo(final SimpleString address, final SecurityAuth session) throws Exception {
if (session != null) {
securityStore.check(address, CheckType.DELETE_ADDRESS, session);
public void removeAddressInfo(final SimpleString address, final SecurityAuth auth) throws Exception {
if (auth != null) {
securityStore.check(address, CheckType.DELETE_ADDRESS, auth);
}
AddressInfo addressInfo = getAddressInfo(address);
@ -2484,9 +2404,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public Queue createQueue(final SimpleString addressName,
final SimpleString queueName,
public Queue createQueue(final SimpleString address,
final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final SimpleString user,
final boolean durable,
@ -2513,28 +2433,28 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final long queueID = storageManager.generateID();
final QueueConfig.Builder queueConfigBuilder;
if (addressName == null) {
if (address == null) {
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName);
} else {
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressName);
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, address);
}
AddressInfo info = postOffice.getAddressInfo(addressName);
AddressInfo info = postOffice.getAddressInfo(address);
if (autoCreateAddress) {
RoutingType rt = (routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
if (info == null) {
final AddressInfo addressInfo = new AddressInfo(addressName, rt);
final AddressInfo addressInfo = new AddressInfo(address, rt);
addressInfo.setAutoCreated(true);
createAddressInfo(addressInfo);
addAddressInfo(addressInfo);
} else if (!info.getRoutingTypes().contains(routingType)) {
Set<RoutingType> routingTypes = new HashSet<>();
routingTypes.addAll(info.getRoutingTypes());
routingTypes.add(routingType);
updateAddressInfo(info.getName().toString(), routingTypes);
updateAddressInfo(info.getName(), routingTypes);
}
} else if (info == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
} else if (!info.getRoutingTypes().contains(routingType)) {
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, info.getName().toString(), info.getRoutingTypes());
}

View File

@ -30,8 +30,6 @@ public class AddressInfo {
private boolean autoCreated = false;
private boolean deletable = false;
private Set<RoutingType> routingTypes;
public AddressInfo(SimpleString name) {

View File

@ -591,7 +591,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean autoCreated) throws Exception {
Pair<SimpleString, Set<RoutingType>> art = getAddressAndRoutingTypes(address, routingTypes);
securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
server.createOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
server.addOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
return server.getAddressInfo(art.getA());
}
@ -601,7 +601,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean autoCreated) throws Exception {
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType);
securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
server.createOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
server.addOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
return server.getAddressInfo(art.getA());
}

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -251,7 +252,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
if (coreQ == null) {
coreQ = new SimpleString(qname);
try {
this.server.createQueue(coreQ, coreQ, null, false, false);
this.server.createQueue(coreQ, RoutingType.MULTICAST, coreQ, null, false, false);
testQueues.put(qname, coreQ);
} catch (ActiveMQQueueExistsException e) {
//ignore

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -131,7 +132,7 @@ public class PagingLeakTest extends ActiveMQTestBase {
final int maxConsumed;
Consumer(int sleepTime, String suffix, int maxConsumed) throws Exception {
server.createQueue(address, address.concat(suffix), null, true, false);
server.createQueue(address, RoutingType.MULTICAST, address.concat(suffix), null, true, false);
this.sleepTime = sleepTime;
locator = createInVMLocator(0);

View File

@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
@ -55,7 +56,7 @@ public class TimeoutXATest extends ActiveMQTestBase {
server.getConfiguration().setTransactionTimeoutScanPeriod(1100);
server.getConfiguration().setJournalSyncNonTransactional(false);
server.start();
server.createQueue(SimpleString.toSimpleString("Queue1"), SimpleString.toSimpleString("Queue1"), null, true, false);
server.createQueue(SimpleString.toSimpleString("Queue1"), RoutingType.ANYCAST, SimpleString.toSimpleString("Queue1"), null, true, false);
removingTXEntered0 = new CountDownLatch(1);
removingTXAwait0 = new CountDownLatch(1);

View File

@ -44,7 +44,7 @@ public class AddressConfigTest extends ActiveMQTestBase {
@Test
public void persistAddressConfigTest() throws Exception {
server.createQueue(SimpleString.toSimpleString("myAddress"), SimpleString.toSimpleString("myQueue"), null, true, false);
server.createQueue(SimpleString.toSimpleString("myAddress"), RoutingType.MULTICAST, SimpleString.toSimpleString("myQueue"), null, true, false);
server.stop();
server.start();
AddressInfo addressInfo = server.getAddressInfo(SimpleString.toSimpleString("myAddress"));

View File

@ -73,9 +73,9 @@ public class AddressingTest extends ActiveMQTestBase {
AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress));
addressInfo.addRoutingType(RoutingType.MULTICAST);
server.createOrUpdateAddressInfo(addressInfo);
Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false);
Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false);
server.addOrUpdateAddressInfo(addressInfo);
Queue q1 = server.createQueue(new SimpleString(consumeAddress), RoutingType.MULTICAST, new SimpleString(consumeAddress + ".1"), null, true, false);
Queue q2 = server.createQueue(new SimpleString(consumeAddress), RoutingType.MULTICAST, new SimpleString(consumeAddress + ".2"), null, true, false);
ClientSession session = sessionFactory.createSession();
session.start();
@ -111,7 +111,7 @@ public class AddressingTest extends ActiveMQTestBase {
AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress));
addressInfo.addRoutingType(RoutingType.ANYCAST);
server.createOrUpdateAddressInfo(addressInfo);
server.addOrUpdateAddressInfo(addressInfo);
Queue q1 = server.createQueue(new SimpleString(consumeAddress), RoutingType.ANYCAST, new SimpleString(consumeAddress + ".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
Queue q2 = server.createQueue(new SimpleString(consumeAddress), RoutingType.ANYCAST, new SimpleString(consumeAddress + ".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
@ -145,7 +145,7 @@ public class AddressingTest extends ActiveMQTestBase {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.addRoutingType(RoutingType.ANYCAST);
server.createOrUpdateAddressInfo(addressInfo);
server.addOrUpdateAddressInfo(addressInfo);
Queue q1 = server.createQueue(address, RoutingType.ANYCAST, address.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
Queue q2 = server.createQueue(address, RoutingType.ANYCAST, address.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
Queue q3 = server.createQueue(address, RoutingType.ANYCAST, address.concat(".3"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
@ -199,8 +199,8 @@ public class AddressingTest extends ActiveMQTestBase {
for (String consumeAddress : testAddresses) {
// For each address, create 2 Queues with the same address, assert both queues receive message
Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false);
Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false);
Queue q1 = server.createQueue(new SimpleString(consumeAddress), RoutingType.MULTICAST, new SimpleString(consumeAddress + ".1"), null, true, false);
Queue q2 = server.createQueue(new SimpleString(consumeAddress), RoutingType.MULTICAST, new SimpleString(consumeAddress + ".2"), null, true, false);
ClientSession session = sessionFactory.createSession();
session.start();

View File

@ -60,7 +60,7 @@ public class AnycastTest extends ActiveMQTestBase {
addressInfo = new AddressInfo(baseAddress);
addressInfo.addRoutingType(RoutingType.ANYCAST);
server.createOrUpdateAddressInfo(addressInfo);
server.addOrUpdateAddressInfo(addressInfo);
}
@Test

View File

@ -60,7 +60,7 @@ public class MulticastTest extends ActiveMQTestBase {
addressInfo = new AddressInfo(baseAddress);
addressInfo.addRoutingType(RoutingType.MULTICAST);
server.createOrUpdateAddressInfo(addressInfo);
server.addOrUpdateAddressInfo(addressInfo);
}
@Test

View File

@ -56,7 +56,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
@Override
public void setUp() throws Exception {
super.setUp();
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST));
server.createQueue(new SimpleString(getTopicName()), RoutingType.MULTICAST, new SimpleString(getTopicName()), null, true, false);
}

View File

@ -154,7 +154,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception {
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTestName()), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTestName()), RoutingType.ANYCAST));
server.createQueue(new SimpleString(getTestName()), RoutingType.ANYCAST, new SimpleString(getTestName()), null, true, false);
AmqpClient client = createAmqpClient(user1, password1);

View File

@ -43,7 +43,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
sendMessages(1, address.toString());
@ -64,7 +64,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameNameMultipleQueues() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
@ -86,7 +86,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressDifferentName() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
sendMessages(1, address.toString());
@ -107,7 +107,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressDifferentNameMultipleQueues() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
server.createQueue(address, RoutingType.ANYCAST, queue2, null, true, false);
@ -129,7 +129,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testConsumeFromSingleQualifiedQueueOnAddressSameName() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
sendMessages(1, address.toString());
@ -150,7 +150,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testConsumeWhenOnlyMulticast() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
sendMessages(1, address.toString());
@ -208,7 +208,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.ANYCAST);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
AmqpClient client = createAmqpClient();

View File

@ -42,7 +42,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
sendMessages(1, address.toString());
@ -63,7 +63,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testConsumeWhenOnlyAnycast() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
sendMessages(1, address.toString());
@ -87,7 +87,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
addressInfo.getRoutingTypes().add(RoutingType.ANYCAST);
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
AmqpClient client = createAmqpClient();

View File

@ -41,7 +41,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedVolatileAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -69,7 +69,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedVolatileAddressBrokerDefined() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false);
AmqpClient client = createAmqpClient();
@ -98,7 +98,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedVolatileAddressNoReceiverClose() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -124,7 +124,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedVolatileAddressGlobal() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect(false));
@ -152,7 +152,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedDurableAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -180,7 +180,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedDurableAddressReconnect() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -218,7 +218,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedDurableAddressReconnectwithNull() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -256,7 +256,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnSharedDurableAddressGlobal() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect(false));
@ -284,7 +284,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
public void test2ConsumersOnNonSharedDurableAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));

View File

@ -57,10 +57,10 @@ public class ProtonPubSubTest extends ProtonTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
server.createAddressInfo(new AddressInfo(ssPubAddress, RoutingType.MULTICAST));
server.createAddressInfo(new AddressInfo(ssprefixedPubAddress, RoutingType.MULTICAST));
server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true);
server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
server.addAddressInfo(new AddressInfo(ssPubAddress, RoutingType.MULTICAST));
server.addAddressInfo(new AddressInfo(ssprefixedPubAddress, RoutingType.MULTICAST));
server.createQueue(ssPubAddress, RoutingType.MULTICAST, ssPubAddress, new SimpleString("foo=bar"), false, true);
server.createQueue(ssprefixedPubAddress, RoutingType.MULTICAST, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
factory = new JmsConnectionFactory("amqp://localhost:5672");
factory.setClientID("myClientID");
connection = factory.createConnection();

View File

@ -161,18 +161,17 @@ public class ProtonTest extends ProtonTestBase {
settings.put("#", addressSetting);
}
addressSetting.setAutoCreateQueues(false);
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "1"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "2"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "3"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "4"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "5"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "6"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "7"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "8"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "9"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "10"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "1"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "2"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "3"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "4"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "5"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "6"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "7"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "8"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "9"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "10"), RoutingType.ANYCAST));
server.createQueue(new SimpleString(coreAddress), RoutingType.ANYCAST, new SimpleString(coreAddress), null, true, false);
server.createQueue(new SimpleString(coreAddress + "1"), RoutingType.ANYCAST, new SimpleString(coreAddress + "1"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "2"), RoutingType.ANYCAST, new SimpleString(coreAddress + "2"), null, true, false);
@ -184,7 +183,7 @@ public class ProtonTest extends ProtonTestBase {
server.createQueue(new SimpleString(coreAddress + "8"), RoutingType.ANYCAST, new SimpleString(coreAddress + "8"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "9"), RoutingType.ANYCAST, new SimpleString(coreAddress + "9"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "10"), RoutingType.ANYCAST, new SimpleString(coreAddress + "10"), null, true, false);
server.createAddressInfo(new AddressInfo(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST));
server.addAddressInfo(new AddressInfo(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST));
server.createQueue(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST, new SimpleString("amqp_testtopic"), null, true, false);
/* server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "2"), new SimpleString("amqp_testtopic" + "2"), null, true, false);
@ -870,7 +869,7 @@ public class ProtonTest extends ProtonTestBase {
@Test
public void testClientIdIsSetInSubscriptionList() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST));
AmqpConnection amqpConnection = client.createConnection();
amqpConnection.setContainerId("testClient");
amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic")));
@ -899,7 +898,7 @@ public class ProtonTest extends ProtonTestBase {
String queueName = "TestQueueName";
String address = "TestAddress";
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST));
server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);

View File

@ -106,7 +106,7 @@ public class AddressCommandTest extends JMSTestBase {
final String addressName = "address";
final SimpleString addressSimpleString = new SimpleString(addressName);
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST));
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
server.createQueue(addressSimpleString, RoutingType.MULTICAST, new SimpleString("queue1"), null, true, false);
final DeleteAddress deleteAddress = new DeleteAddress();
@ -143,7 +143,7 @@ public class AddressCommandTest extends JMSTestBase {
// Create bindings
SimpleString address = new SimpleString("address");
server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
server.createQueue(address, RoutingType.MULTICAST, new SimpleString("queue1"), null, true, false);
server.createQueue(address, RoutingType.MULTICAST, new SimpleString("queue2"), null, true, false);
server.createQueue(address, RoutingType.MULTICAST, new SimpleString("queue3"), null, true, false);
@ -164,7 +164,7 @@ public class AddressCommandTest extends JMSTestBase {
public void testUpdateAddressRoutingTypes() throws Exception {
final String addressName = "address";
final SimpleString address = new SimpleString(addressName);
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
final UpdateAddress updateAddress = new UpdateAddress();
updateAddress.setName(addressName);
@ -192,7 +192,7 @@ public class AddressCommandTest extends JMSTestBase {
final String addressName = "address";
final SimpleString addressSimpleString = new SimpleString(addressName);
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST));
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
server.createQueue(addressSimpleString, RoutingType.MULTICAST, new SimpleString("queue1"), null, true, false);
final UpdateAddress updateAddress = new UpdateAddress();

View File

@ -92,7 +92,7 @@ public class QueueCommandTest extends JMSTestBase {
command.setAnycast(false);
command.setAddress(address);
server.createOrUpdateAddressInfo(new AddressInfo(new SimpleString(address), RoutingType.MULTICAST));
server.addOrUpdateAddressInfo(new AddressInfo(new SimpleString(address), RoutingType.MULTICAST));
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(command);
@ -247,7 +247,7 @@ public class QueueCommandTest extends JMSTestBase {
final RoutingType oldRoutingType = RoutingType.MULTICAST;
final boolean oldDeleteOnNoConsumers = false;
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST));
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false);
final int newMaxConsumers = 1;
@ -280,7 +280,7 @@ public class QueueCommandTest extends JMSTestBase {
final boolean oldDeleteOnNoConsumers = false;
final Set<RoutingType> supportedRoutingTypes = EnumSet.of(oldRoutingType);
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.copyOf(supportedRoutingTypes));
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false);
final RoutingType newRoutingType = RoutingType.ANYCAST;
@ -309,7 +309,7 @@ public class QueueCommandTest extends JMSTestBase {
final RoutingType oldRoutingType = RoutingType.MULTICAST;
final boolean oldDeleteOnNoConsumers = false;
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, oldRoutingType);
server.createAddressInfo(addressInfo);
server.addAddressInfo(addressInfo);
server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false);
server.locateQueue(queueNameString).addConsumer(new DummyServerConsumer());

View File

@ -49,7 +49,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
@Test
public void testAutoDeleteAutoCreatedAddress() throws Exception {
// auto-delete-addresses defaults to true
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, true);
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.getAddressInfo(addressA));
cf.createSession().createConsumer(queueA).close();
assertNull(server.getAddressInfo(addressA));
@ -58,7 +58,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
@Test
public void testNegativeAutoDeleteAutoCreatedAddress() throws Exception {
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteAddresses(false));
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, true);
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.getAddressInfo(addressA));
cf.createSession().createConsumer(queueA).close();
assertNotNull(server.getAddressInfo(addressA));

View File

@ -49,7 +49,7 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
@Test
public void testAutoDeleteAutoCreatedQueue() throws Exception {
// auto-delete-queues defaults to true
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, true);
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.locateQueue(queueA));
cf.createSession().createConsumer(queueA).close();
assertNull(server.locateQueue(queueA));
@ -58,7 +58,7 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
@Test
public void testNegativeAutoDeleteAutoCreatedQueue() throws Exception {
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteQueues(false));
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, true);
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.locateQueue(queueA));
cf.createSession().createConsumer(queueA).close();
assertNotNull(server.locateQueue(queueA));

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
@ -73,7 +74,7 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase {
ClientProducer producer = session.createProducer(ADDRESS);
// just to make it page forever
Queue serverQueue = server.createQueue(ADDRESS, SimpleString.toSimpleString("everPage"), null, true, false);
Queue serverQueue = server.createQueue(ADDRESS, RoutingType.ANYCAST, SimpleString.toSimpleString("everPage"), null, true, false);
serverQueue.getPageSubscription().getPagingStore().startPaging();
Consumer[] consumers = new Consumer[10];

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
@ -73,11 +74,11 @@ public class ExpiryLargeMessageTest extends ActiveMQTestBase {
server.start();
server.createQueue(EXPIRY, EXPIRY, null, true, false);
server.createQueue(EXPIRY, RoutingType.ANYCAST, EXPIRY, null, true, false);
server.createQueue(DLQ, DLQ, null, true, false);
server.createQueue(DLQ, RoutingType.ANYCAST, DLQ, null, true, false);
server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
server.createQueue(MY_QUEUE, RoutingType.ANYCAST, MY_QUEUE, null, true, false);
ServerLocator locator = createInVMNonHALocator();
@ -255,11 +256,11 @@ public class ExpiryLargeMessageTest extends ActiveMQTestBase {
server.start();
server.createQueue(EXPIRY, EXPIRY, null, true, false);
server.createQueue(EXPIRY, RoutingType.ANYCAST, EXPIRY, null, true, false);
server.createQueue(DLQ, DLQ, null, true, false);
server.createQueue(DLQ, RoutingType.ANYCAST, DLQ, null, true, false);
server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
server.createQueue(MY_QUEUE, RoutingType.ANYCAST, MY_QUEUE, null, true, false);
ServerLocator locator = createInVMNonHALocator();

View File

@ -109,7 +109,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
@Test
public void testHangOnDelivery() throws Exception {
queue = server.createQueue(QUEUE, QUEUE, null, true, false);
queue = server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
try {
ClientSessionFactory factory = locator.createSessionFactory();
@ -288,7 +288,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
((ActiveMQServerImpl) server).replaceQueueFactory(queueFactory);
queue = server.createQueue(QUEUE, QUEUE, null, true, false);
queue = server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
blocked.acquire();
@ -316,7 +316,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
Assert.assertTrue(latchDelete.await(10, TimeUnit.SECONDS));
try {
server.createQueue(QUEUE, QUEUE, null, true, false);
server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
} catch (Exception expected) {
}
@ -344,7 +344,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
*/
@Test
public void testForceDuplicationOnBindings() throws Exception {
queue = server.createQueue(QUEUE, QUEUE, null, true, false);
queue = server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false, false);
@ -375,7 +375,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
// An exception during delivery shouldn't make the message disappear
@Test
public void testExceptionWhileDelivering() throws Exception {
queue = server.createQueue(QUEUE, QUEUE, null, true, false);
queue = server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
HangInterceptor hangInt = new HangInterceptor();
try {
@ -429,7 +429,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
try {
for (int i = 0; i < 5; i++) {
if (server.locateQueue(SimpleString.toSimpleString("tt")) == null) {
server.createQueue(SimpleString.toSimpleString("tt"), SimpleString.toSimpleString("tt"), SimpleString.toSimpleString(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false);
server.createQueue(SimpleString.toSimpleString("tt"), RoutingType.ANYCAST, SimpleString.toSimpleString("tt"), SimpleString.toSimpleString(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false);
}
server.stop();

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@ -216,7 +217,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
SimpleString jmsAddress = new SimpleString("Test");
server.createQueue(jmsAddress, jmsAddress, null, true, false);
server.createQueue(jmsAddress, RoutingType.ANYCAST, jmsAddress, null, true, false);
final AtomicInteger unexpectedErrors = new AtomicInteger(0);
final AtomicInteger expectedErrors = new AtomicInteger(0);

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
@ -107,7 +108,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
server.createQueue(QUEUE, QUEUE, null, true, false).getPageSubscription().getPagingStore().startPaging();
server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false).getPageSubscription().getPagingStore().startPaging();
locator = createFactory(isNetty);
}

View File

@ -41,7 +41,7 @@ public class PendingDeliveriesTest extends ClientTestBase {
@Before
public void createQueue() throws Exception {
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST));
server.createQueue(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST, SimpleString.toSimpleString("queue1"), null, true, false);
}

View File

@ -1896,7 +1896,7 @@ public class BridgeTest extends ActiveMQTestBase {
ActiveMQServer server = addServer(new ActiveMQServerImpl(config, null, null, null, serviceRegistry));
server.start();
server.waitForActivation(100, TimeUnit.MILLISECONDS);
server.deployQueue(ADDRESS, QUEUE, null, false, false);
server.createQueue(ADDRESS, RoutingType.ANYCAST, QUEUE, null, false, false);
List<String> connectors = new ArrayList<>();
connectors.add("in-vm");
server.deployBridge(new BridgeConfiguration().setName(BRIDGE).setQueueName(QUEUE.toString()).setForwardingAddress(ADDRESS.toString()).setStaticConnectors(connectors));

View File

@ -547,7 +547,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
boolean defaultDeleteOnNoConsumers) throws Exception {
AddressInfo addressInfo = new AddressInfo(new SimpleString(address));
addressInfo.addRoutingType(routingType);
servers[node].createOrUpdateAddressInfo(addressInfo);
servers[node].addOrUpdateAddressInfo(addressInfo);
}
protected void deleteQueue(final int node, final String queueName) throws Exception {

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
@ -74,7 +75,7 @@ public class ExpireWhileLoadBalanceTest extends ClusterTestBase {
for (int i = 0; i <= 2; i++) {
createQueue(i, "queues.testaddress", "queue0", null, true);
getServer(i).createQueue(expiryQueue, expiryQueue, null, true, false);
getServer(i).createQueue(expiryQueue, RoutingType.ANYCAST, expiryQueue, null, true, false);
getServer(i).getAddressSettingsRepository().addMatch("queues.*", as);
}

View File

@ -33,6 +33,7 @@ import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -67,7 +68,7 @@ public class AMQPToOpenwireTest extends ActiveMQTestBase {
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(false);
coreQueue = new SimpleString(queueName);
server.createQueue(coreQueue, coreQueue, null, false, false);
server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false);
qpidfactory = new JmsConnectionFactory("amqp://localhost:61616");
}

View File

@ -1310,7 +1310,7 @@ public class DivertTest extends ActiveMQTestBase {
ActiveMQServer server = addServer(new ActiveMQServerImpl(null, null, null, null, serviceRegistry));
server.start();
server.waitForActivation(100, TimeUnit.MILLISECONDS);
server.deployQueue(ADDRESS, RoutingType.MULTICAST, SimpleString.toSimpleString("myQueue"), null, false, false);
server.createQueue(ADDRESS, RoutingType.MULTICAST, SimpleString.toSimpleString("myQueue"), null, false, false);
server.deployDivert(new DivertConfiguration().setName(DIVERT).setAddress(ADDRESS.toString()).setForwardingAddress(ADDRESS.toString()));
Collection<Binding> bindings = server.getPostOffice().getBindingsForAddress(ADDRESS).getBindings();
Divert divert = null;

View File

@ -1213,7 +1213,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory factory = createSessionFactory(locator);
ClientSession session = addClientSession(factory.createSession());
server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
addClientConsumer(session.createConsumer(queueName));
Thread.sleep(100); // We check the timestamp for the creation time. We need to make sure it's different
@ -1337,7 +1337,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
@Test
public void testListSessionsAsJSON() throws Exception {
SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
ActiveMQServerControl serverControl = createManagementControl();
@ -1403,9 +1403,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
this.conf.clearConnectorConfigurations().addConnectorConfiguration("server2-connector", new TransportConfiguration(INVM_CONNECTOR_FACTORY, params));
server2.start();
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false);
server2.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server2.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server2.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false);
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(locator);

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManage
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.utils.Base64;
@ -1766,8 +1767,8 @@ public class QueueControlTest extends ManagementTestBase {
@Test
public void testMoveMessagesBack() throws Exception {
server.createQueue(new SimpleString("q1"), new SimpleString("q1"), null, true, false);
server.createQueue(new SimpleString("q2"), new SimpleString("q2"), null, true, false);
server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, true, false);
server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, true, false);
ServerLocator locator = createInVMNonHALocator();
@ -1830,8 +1831,8 @@ public class QueueControlTest extends ManagementTestBase {
@Test
public void testMoveMessagesBack2() throws Exception {
server.createQueue(new SimpleString("q1"), new SimpleString("q1"), null, true, false);
server.createQueue(new SimpleString("q2"), new SimpleString("q2"), null, true, false);
server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, true, false);
server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, true, false);
ServerLocator locator = createInVMNonHALocator();
@ -2071,7 +2072,7 @@ public class QueueControlTest extends ManagementTestBase {
SimpleString testQueueName = new SimpleString("newQueue");
String testQueueName2 = "newQueue2";
this.server.createQueue(testQueueName, testQueueName, null, false, false);
this.server.createQueue(testQueueName, RoutingType.ANYCAST, testQueueName, null, false, false);
Notification notif = listener.getNotification();

View File

@ -1679,7 +1679,7 @@ public class MQTTTest extends MQTTTestSupport {
public void testAnycastAddressWorksWithMQTT() throws Exception {
String anycastAddress = "foo/bar";
getServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), RoutingType.ANYCAST));
getServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), RoutingType.ANYCAST));
String clientId = "testMqtt";
Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)};
@ -1717,7 +1717,7 @@ public class MQTTTest extends MQTTTestSupport {
routingTypeSet.add(RoutingType.ANYCAST);
routingTypeSet.add(RoutingType.MULTICAST);
getServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), routingTypeSet));
getServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), routingTypeSet));
String clientId = "testMqtt";
Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)};

View File

@ -80,7 +80,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
ClientSession session = sf.createSession();
try {
server.createAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
PageSubscriptionCounter counter = locateCounter(queue);
@ -110,7 +110,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
ClientSession session = sf.createSession();
try {
server.createAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
PageSubscriptionCounter counter = locateCounter(queue);
@ -167,7 +167,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
try {
server.createAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
PageSubscriptionCounter counter = locateCounter(queue);
@ -221,7 +221,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
@Test
public void testRestartCounter() throws Exception {
server.createAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
PageSubscriptionCounter counter = locateCounter(queue);
@ -274,7 +274,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
public void testPrepareCounter() throws Exception {
Xid xid = newXID();
Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
PageSubscriptionCounter counter = locateCounter(queue);

View File

@ -95,7 +95,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, false, false);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@ -185,7 +185,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, false, false);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
Queue q1 = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
Queue q2 = server.createQueue(ADDRESS, RoutingType.ANYCAST, new SimpleString("inactive"), null, true, false);
@ -315,7 +315,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, false, false);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
Queue q1 = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
Queue q2 = server.createQueue(ADDRESS, RoutingType.ANYCAST, new SimpleString("inactive"), null, true, false);
@ -409,7 +409,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, false, false);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@ -494,7 +494,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, false, false);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
QueueImpl queue = (QueueImpl) server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);

View File

@ -59,7 +59,7 @@ public class PagingReceiveTest extends ActiveMQTestBase {
super.setUp();
server = internalCreateServer();
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
Queue queue = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
queue.getPageSubscription().getPagingStore().startPaging();

View File

@ -71,7 +71,7 @@ public class PagingSyncTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, false, false);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);

View File

@ -358,7 +358,7 @@ public class PagingTest extends ActiveMQTestBase {
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true);
server.createAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST));
Queue queue = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
queue.getPageSubscription().getPagingStore().startPaging();
@ -3611,7 +3611,7 @@ public class PagingTest extends ActiveMQTestBase {
server.start();
try {
server.createAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST));
server.createQueue(PagingTest.ADDRESS, RoutingType.ANYCAST, PagingTest.ADDRESS, null, true, false);
final CountDownLatch pageUp = new CountDownLatch(0);
@ -3651,8 +3651,7 @@ public class PagingTest extends ActiveMQTestBase {
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
// server.createAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST));
server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
server.createQueue(PagingTest.ADDRESS, RoutingType.ANYCAST, PagingTest.ADDRESS, null, true, false);
final CountDownLatch pageUp = new CountDownLatch(0);
final CountDownLatch pageDone = new CountDownLatch(1);

View File

@ -86,7 +86,7 @@ public class RestDeserializationTest extends RestTestBase {
@Test
public void testWithoutBlackWhiteListTopic() throws Exception {
jmsServer.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("ordersTopic"), RoutingType.MULTICAST));
jmsServer.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString("ordersTopic"), RoutingType.MULTICAST));
deployAndconfigureRESTService("rest-test.war");

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -239,7 +240,7 @@ public class LegacyLDAPSecuritySettingPluginListenerTest extends AbstractLdapTes
server.getConfiguration().setSecurityInvalidationInterval(0);
server.start();
String queue = "queue2";
server.createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, false, false);
server.createQueue(SimpleString.toSimpleString(queue), RoutingType.ANYCAST, SimpleString.toSimpleString(queue), null, false, false);
ClientSessionFactory cf = locator.createSessionFactory();
ClientSession session = cf.createSession("first", "secret", false, true, true, false, 0);
ClientConsumer consumer;
@ -281,7 +282,7 @@ public class LegacyLDAPSecuritySettingPluginListenerTest extends AbstractLdapTes
server.getConfiguration().setSecurityInvalidationInterval(0);
server.start();
String queue = "queue2";
server.createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, false, false);
server.createQueue(SimpleString.toSimpleString(queue), RoutingType.ANYCAST, SimpleString.toSimpleString(queue), null, false, false);
ClientSessionFactory cf = locator.createSessionFactory();
ClientSession session = cf.createSession("first", "secret", false, true, true, false, 0);
ClientProducer producer = session.createProducer(SimpleString.toSimpleString(queue));

View File

@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -162,7 +163,7 @@ public class LegacyLDAPSecuritySettingPluginTest extends AbstractLdapTestUnit {
final SimpleString QUEUE = new SimpleString("queue2");
server.start();
server.createQueue(ADDRESS, QUEUE, null, true, false);
server.createQueue(ADDRESS, RoutingType.ANYCAST, QUEUE, null, true, false);
ClientSessionFactory cf = locator.createSessionFactory();
ClientSession session = cf.createSession("second", "secret", false, true, true, false, 0);

View File

@ -229,7 +229,7 @@ public class SecurityTest extends ActiveMQTestBase {
roles.add(new Role("programmers", false, false, false, false, false, false, false, false, false, false));
server.getConfiguration().putSecurityRoles("#", roles);
server.start();
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(ADDRESS, RoutingType.ANYCAST, DURABLE_QUEUE, null, true, false);
server.createQueue(ADDRESS, RoutingType.ANYCAST, NON_DURABLE_QUEUE, null, false, false);
@ -318,7 +318,7 @@ public class SecurityTest extends ActiveMQTestBase {
bRoles.add(new Role(QUEUE_B.toString(), false, true, false, false, false, false, false, false, false, false));
server.getConfiguration().putSecurityRoles(ADDRESS.concat(".").concat(QUEUE_B).toString(), bRoles);
server.start();
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(ADDRESS, RoutingType.ANYCAST, QUEUE_A, null, true, false);
server.createQueue(ADDRESS, RoutingType.ANYCAST, QUEUE_B, null, true, false);
@ -393,7 +393,7 @@ public class SecurityTest extends ActiveMQTestBase {
ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(tc));
ClientSessionFactory cf = createSessionFactory(locator);
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(ADDRESS, RoutingType.ANYCAST, DURABLE_QUEUE, null, true, false);
server.createQueue(ADDRESS, RoutingType.ANYCAST, NON_DURABLE_QUEUE, null, false, false);

View File

@ -20,6 +20,7 @@ package org.apache.activemq.artemis.tests.integration.server;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
@ -32,6 +33,7 @@ public class QueuePeristPauseTest extends ActiveMQTestBase {
server.start();
Queue queue = server.createQueue(SimpleString.toSimpleString("q1"),
RoutingType.ANYCAST,
SimpleString.toSimpleString("q1"),
null, true, false);

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
@ -183,7 +184,7 @@ public class CoreClientOverOneWaySSLTest extends ActiveMQTestBase {
@Test
public void testOneWaySSLReloaded() throws Exception {
createCustomSslServer();
server.createQueue(CoreClientOverOneWaySSLTest.QUEUE, CoreClientOverOneWaySSLTest.QUEUE, null, false, false);
server.createQueue(CoreClientOverOneWaySSLTest.QUEUE, RoutingType.ANYCAST, CoreClientOverOneWaySSLTest.QUEUE, null, false, false);
String text = RandomUtil.randomString();
// create a valid SSL connection and keep it for use later

View File

@ -714,7 +714,7 @@ public class MessageProducerTest extends JMSTestCase {
supportedRoutingTypes.add(RoutingType.ANYCAST);
supportedRoutingTypes.add(RoutingType.MULTICAST);
servers.get(0).getActiveMQServer().createAddressInfo(new AddressInfo(addr, supportedRoutingTypes));
servers.get(0).getActiveMQServer().addAddressInfo(new AddressInfo(addr, supportedRoutingTypes));
servers.get(0).getActiveMQServer().createQueue(addr, RoutingType.ANYCAST, addr, null, false, false);
Connection pconn = createConnection();

View File

@ -107,6 +107,7 @@
<artifactId>artemis-commons</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -78,9 +79,9 @@ public class SendReceiveMultiThreadTest extends ActiveMQTestBase {
server.start();
Queue queue = server.createQueue(SimpleString.toSimpleString("performanceQueue"), SimpleString.toSimpleString("performanceQueue"), null, true, false);
Queue queue = server.createQueue(SimpleString.toSimpleString("performanceQueue"), RoutingType.ANYCAST, SimpleString.toSimpleString("performanceQueue"), null, true, false);
Queue queue2 = server.createQueue(SimpleString.toSimpleString("stationaryQueue"), SimpleString.toSimpleString("stationaryQueue"), null, true, false);
Queue queue2 = server.createQueue(SimpleString.toSimpleString("stationaryQueue"), RoutingType.ANYCAST, SimpleString.toSimpleString("stationaryQueue"), null, true, false);
MyThread[] threads = new MyThread[NUMBER_OF_THREADS];

View File

@ -124,6 +124,7 @@
<artifactId>artemis-commons</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
@ -101,7 +102,7 @@ public class MultipleConsumersPageStressTest extends ActiveMQTestBase {
server = createServer(true, createDefaultInVMConfig(), 10024, 200024, settings);
server.start();
pagedServerQueue = (QueueImpl) server.createQueue(ADDRESS, ADDRESS, null, true, false);
pagedServerQueue = (QueueImpl) server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
}

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContex
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
@ -797,7 +798,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
queueList.clear();
try {
queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
queue = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
queue.pause();
} catch (Exception ignored) {
}

View File

@ -93,12 +93,6 @@ public class FakePostOffice implements PostOffice {
return false;
}
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
return null;
}
@Override
public AddressInfo removeAddressInfo(SimpleString address) {
return null;