NO-JIRA de-duplicate createQueue()
There were two different but nearly identical implementations of createQueue(). I consolidated these into a single method. There should be no semantic differences.
This commit is contained in:
parent
5c47e71d4f
commit
03c45d6479
|
@ -1739,18 +1739,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
@Override
|
||||
public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception {
|
||||
AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
|
||||
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress);
|
||||
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, boolean autoCreateAddress) throws Exception {
|
||||
AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
|
||||
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress);
|
||||
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception {
|
||||
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
|
||||
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress, false);
|
||||
}
|
||||
|
||||
|
||||
|
@ -2847,7 +2847,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
} else {
|
||||
// if the address::queue doesn't exist then create it
|
||||
try {
|
||||
createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isLastValue, lastValueKey, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, true, true);
|
||||
createQueue(new AddressInfo(SimpleString.toSimpleString(config.getAddress())).addRoutingType(config.getRoutingType()), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isLastValue, lastValueKey, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, true, true);
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
// the queue may exist on a *different* address
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage());
|
||||
|
@ -3043,7 +3043,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
final boolean nonDestructive,
|
||||
final int consumersBeforeDispatch,
|
||||
final long delayBeforeDispatch,
|
||||
final boolean autoCreateAddress) throws Exception {
|
||||
final boolean autoCreateAddress,
|
||||
final boolean configurationManaged) throws Exception {
|
||||
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
|
||||
if (binding != null) {
|
||||
if (ignoreIfExists) {
|
||||
|
@ -3101,6 +3102,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
.nonDestructive(nonDestructive)
|
||||
.consumersBeforeDispatch(consumersBeforeDispatch)
|
||||
.delayBeforeDispatch(delayBeforeDispatch)
|
||||
.configurationManaged(configurationManaged)
|
||||
.build();
|
||||
|
||||
if (hasBrokerQueuePlugins()) {
|
||||
|
@ -3178,138 +3180,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
final int consumersBeforeDispatch,
|
||||
final long delayBeforeDispatch,
|
||||
final boolean autoCreateAddress) throws Exception {
|
||||
return createQueue(address, routingType, queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress, false);
|
||||
}
|
||||
|
||||
private Queue createQueue(final SimpleString address,
|
||||
final RoutingType routingType,
|
||||
final SimpleString queueName,
|
||||
final SimpleString filterString,
|
||||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean ignoreIfExists,
|
||||
final boolean transientQueue,
|
||||
final boolean autoCreated,
|
||||
final int maxConsumers,
|
||||
final boolean purgeOnNoConsumers,
|
||||
final boolean exclusive,
|
||||
final boolean lastValue,
|
||||
final SimpleString lastValueKey,
|
||||
final boolean nonDestructive,
|
||||
final int consumersBeforeDispatch,
|
||||
final long delayBeforeDispatch,
|
||||
final boolean autoCreateAddress,
|
||||
final boolean configurationManaged) throws Exception {
|
||||
|
||||
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
|
||||
if (binding != null) {
|
||||
if (ignoreIfExists) {
|
||||
return binding.getQueue();
|
||||
} else {
|
||||
throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueName, binding.getAddress());
|
||||
}
|
||||
}
|
||||
|
||||
final Filter filter = FilterImpl.createFilter(filterString);
|
||||
|
||||
final long txID = storageManager.generateID();
|
||||
final long queueID = storageManager.generateID();
|
||||
|
||||
final QueueConfig.Builder queueConfigBuilder;
|
||||
|
||||
final SimpleString addressToUse = address == null ? queueName : address;
|
||||
|
||||
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressToUse);
|
||||
|
||||
AddressInfo info = postOffice.getAddressInfo(addressToUse);
|
||||
|
||||
if (autoCreateAddress) {
|
||||
RoutingType rt = (routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
|
||||
if (info == null) {
|
||||
final AddressInfo addressInfo = new AddressInfo(addressToUse, rt);
|
||||
addressInfo.setAutoCreated(true);
|
||||
addAddressInfo(addressInfo);
|
||||
} else if (!info.getRoutingTypes().contains(routingType)) {
|
||||
EnumSet<RoutingType> routingTypes = EnumSet.copyOf(info.getRoutingTypes());
|
||||
routingTypes.add(routingType);
|
||||
updateAddressInfo(info.getName(), routingTypes);
|
||||
}
|
||||
} else if (info == null) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressToUse);
|
||||
} else if (!info.getRoutingTypes().contains(routingType)) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, info.getName().toString(), info.getRoutingTypes());
|
||||
}
|
||||
|
||||
final QueueConfig queueConfig = queueConfigBuilder
|
||||
.filter(filter)
|
||||
.pagingManager(pagingManager)
|
||||
.user(user)
|
||||
.durable(durable)
|
||||
.temporary(temporary)
|
||||
.autoCreated(autoCreated).routingType(routingType)
|
||||
.maxConsumers(maxConsumers)
|
||||
.purgeOnNoConsumers(purgeOnNoConsumers)
|
||||
.exclusive(exclusive)
|
||||
.lastValue(lastValue)
|
||||
.lastValueKey(lastValueKey)
|
||||
.nonDestructive(nonDestructive)
|
||||
.consumersBeforeDispatch(consumersBeforeDispatch)
|
||||
.delayBeforeDispatch(delayBeforeDispatch)
|
||||
.configurationManaged(configurationManaged)
|
||||
.build();
|
||||
|
||||
if (hasBrokerQueuePlugins()) {
|
||||
callBrokerQueuePlugins(plugin -> plugin.beforeCreateQueue(queueConfig));
|
||||
}
|
||||
|
||||
final Queue queue = queueFactory.createQueueWith(queueConfig);
|
||||
|
||||
if (transientQueue) {
|
||||
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
|
||||
} else {
|
||||
queue.setConsumersRefCount(new QueueManagerImpl(this, queue.getName()));
|
||||
}
|
||||
|
||||
final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
|
||||
|
||||
if (queue.isDurable()) {
|
||||
storageManager.addQueueBinding(txID, localQueueBinding);
|
||||
}
|
||||
|
||||
try {
|
||||
postOffice.addBinding(localQueueBinding);
|
||||
if (queue.isDurable()) {
|
||||
storageManager.commitBindings(txID);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
if (durable) {
|
||||
storageManager.rollbackBindings(txID);
|
||||
}
|
||||
final PageSubscription pageSubscription = queue.getPageSubscription();
|
||||
try {
|
||||
queue.close();
|
||||
} finally {
|
||||
if (pageSubscription != null) {
|
||||
pageSubscription.destroy();
|
||||
}
|
||||
}
|
||||
} catch (Throwable ignored) {
|
||||
logger.debug(ignored.getMessage(), ignored);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
managementService.registerQueue(queue, queue.getAddress(), storageManager);
|
||||
|
||||
if (hasBrokerQueuePlugins()) {
|
||||
callBrokerQueuePlugins(plugin -> plugin.afterCreateQueue(queue));
|
||||
}
|
||||
|
||||
callPostQueueCreationCallbacks(queue.getName());
|
||||
|
||||
return queue;
|
||||
return createQueue(new AddressInfo(address).addRoutingType(routingType), queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress, false);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
|
|
Loading…
Reference in New Issue