mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-20 17:05:51 +00:00
ARTEMIS-1609 Add distinct name/address for JMS destinations
* add methods to JMSServerManager to be able to create JMS queues and topics that have distincts names (as returned by the JMS API) and addresses (as used by Artemis Core API). * add constructors to ActiveMQQueue and ActiveMQTopic to specify JMS name distinct from their core address This allows to emulate Artemis 1.x naming conventions where a JMS queue would have a name 'foo'and and an address 'jms.queue.foo' (the same applying for JMS topic as well).
This commit is contained in:
parent
1aa7b5c038
commit
86175d677a
@ -198,6 +198,10 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
|
||||
return new ActiveMQQueue(address);
|
||||
}
|
||||
|
||||
public static ActiveMQQueue createQueue(final String address, final String name) {
|
||||
return new ActiveMQQueue(address, name);
|
||||
}
|
||||
|
||||
public static ActiveMQTopic createTopic(final String address) {
|
||||
return new ActiveMQTopic(address);
|
||||
}
|
||||
@ -206,6 +210,10 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
|
||||
return new ActiveMQTopic(address);
|
||||
}
|
||||
|
||||
public static ActiveMQTopic createTopic(final String address, final String name) {
|
||||
return new ActiveMQTopic(address, name);
|
||||
}
|
||||
|
||||
public static ActiveMQTemporaryQueue createTemporaryQueue(final String address, final ActiveMQSession session) {
|
||||
return new ActiveMQTemporaryQueue(address, session);
|
||||
}
|
||||
|
@ -47,6 +47,11 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
|
||||
super(address, TYPE.QUEUE, null);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public ActiveMQQueue(final String address, final String name) {
|
||||
super(address, name, TYPE.QUEUE, null);
|
||||
}
|
||||
|
||||
public ActiveMQQueue(final String address, boolean temporary) {
|
||||
super(address, temporary ? TYPE.TEMP_QUEUE : TYPE.QUEUE, null);
|
||||
}
|
||||
|
@ -46,8 +46,13 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic {
|
||||
super(address, TYPE.TOPIC, null);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public ActiveMQTopic(final String address, final String name) {
|
||||
super(address, name, TYPE.TOPIC, null);
|
||||
}
|
||||
|
||||
public ActiveMQTopic(final String address, boolean temporary) {
|
||||
super(address, TYPE.TOPIC, null);
|
||||
this(address, temporary, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -60,6 +60,24 @@ public interface JMSServerManager extends ActiveMQComponent {
|
||||
boolean durable,
|
||||
String... bindings) throws Exception;
|
||||
|
||||
/**
|
||||
* Creates a JMS Queue.
|
||||
*
|
||||
* @param queueName The name of the core queue to create
|
||||
* @param jmsQueueName the name of this JMS queue
|
||||
* @param selectorString
|
||||
* @param durable
|
||||
* @return true if the queue is created or if it existed and was added to
|
||||
* the Binding Registry
|
||||
* @throws Exception if problems were encountered creating the queue.
|
||||
*/
|
||||
boolean createQueue(boolean storeConfig,
|
||||
String queueName,
|
||||
String jmsQueueName,
|
||||
String selectorString,
|
||||
boolean durable,
|
||||
String... bindings) throws Exception;
|
||||
|
||||
boolean addTopicToBindingRegistry(String topicName, String binding) throws Exception;
|
||||
|
||||
boolean addQueueToBindingRegistry(String queueName, String binding) throws Exception;
|
||||
@ -69,23 +87,46 @@ public interface JMSServerManager extends ActiveMQComponent {
|
||||
/**
|
||||
* Creates a JMS Topic
|
||||
*
|
||||
* @param address the core addres of thetopic
|
||||
* @param bindings the names of the binding for the Binding Registry or BindingRegistry
|
||||
* @return true if the topic was created or if it existed and was added to
|
||||
* the Binding Registry
|
||||
* @throws Exception if a problem occurred creating the topic
|
||||
*/
|
||||
boolean createTopic(boolean storeConfig, String address, String... bindings) throws Exception;
|
||||
|
||||
/**
|
||||
* Creates a JMS Topic
|
||||
*
|
||||
* @param address the core addres of thetopic
|
||||
* @param topicName the name of the topic
|
||||
* @param bindings the names of the binding for the Binding Registry or BindingRegistry
|
||||
* @return true if the topic was created or if it existed and was added to
|
||||
* the Binding Registry
|
||||
* @throws Exception if a problem occurred creating the topic
|
||||
*/
|
||||
boolean createTopic(boolean storeConfig, String topicName, String... bindings) throws Exception;
|
||||
boolean createTopic(String address, boolean storeConfig, String topicName, String... bindings) throws Exception;
|
||||
|
||||
/**
|
||||
* @param storeConfig
|
||||
* @param address
|
||||
* @param autoCreated
|
||||
* @param bindings
|
||||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
boolean createTopic(boolean storeConfig, String address, boolean autoCreated, String... bindings) throws Exception;
|
||||
|
||||
/**
|
||||
* @param storeConfig
|
||||
* @param address
|
||||
* @param topicName
|
||||
* @param autoCreated
|
||||
* @param bindings
|
||||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
boolean createTopic(boolean storeConfig, String topicName, boolean autoCreated, String... bindings) throws Exception;
|
||||
boolean createTopic(boolean storeConfig, String address, String topicName, boolean autoCreated, String... bindings) throws Exception;
|
||||
|
||||
/**
|
||||
* Remove the topic from the Binding Registry or BindingRegistry.
|
||||
|
@ -117,8 +117,10 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||
|
||||
private BindingRegistry registry;
|
||||
|
||||
// keys are the core addresses of the JMS queues
|
||||
private final Map<String, ActiveMQQueue> queues = new HashMap<>();
|
||||
|
||||
// keys are the core addresses of the topics
|
||||
private final Map<String, ActiveMQTopic> topics = new HashMap<>();
|
||||
|
||||
private final Map<String, ActiveMQConnectionFactory> connectionFactories = new HashMap<>();
|
||||
@ -465,11 +467,17 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||
final String selectorString,
|
||||
final boolean durable,
|
||||
final String... bindings) throws Exception {
|
||||
return internalCreateJMSQueue(storeConfig, queueName, selectorString, durable, false, bindings);
|
||||
return internalCreateJMSQueue(storeConfig, queueName, queueName, selectorString, durable, false, bindings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean createQueue(boolean storeConfig, String queueName, String jmsQueueName, String selectorString, boolean durable, String... bindings) throws Exception {
|
||||
return internalCreateJMSQueue(storeConfig, queueName, jmsQueueName, selectorString, durable, false, bindings);
|
||||
}
|
||||
|
||||
protected boolean internalCreateJMSQueue(final boolean storeConfig,
|
||||
final String queueName,
|
||||
final String jmsQueueName,
|
||||
final String selectorString,
|
||||
final boolean durable,
|
||||
final boolean autoCreated,
|
||||
@ -489,7 +497,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||
public void runException() throws Exception {
|
||||
checkBindings(bindings);
|
||||
|
||||
if (internalCreateQueue(queueName, selectorString, durable, autoCreated)) {
|
||||
if (internalCreateQueue(queueName, jmsQueueName, selectorString, durable, autoCreated)) {
|
||||
|
||||
ActiveMQDestination destination = queues.get(queueName);
|
||||
if (destination == null) {
|
||||
@ -528,32 +536,46 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||
|
||||
@Override
|
||||
public synchronized boolean createTopic(final boolean storeConfig,
|
||||
final String topicName,
|
||||
final String address,
|
||||
final String... bindings) throws Exception {
|
||||
return createTopic(storeConfig, topicName, false, bindings);
|
||||
return createTopic(storeConfig, address, false, bindings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean createTopic(String address, boolean storeConfig, String topicName, String... bindings) throws Exception {
|
||||
return createTopic(storeConfig, address, topicName, false, bindings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean createTopic(final boolean storeConfig,
|
||||
final String topicName,
|
||||
final String address,
|
||||
final boolean autoCreated,
|
||||
final String... bindings) throws Exception {
|
||||
if (active && topics.get(topicName) != null) {
|
||||
return createTopic(storeConfig, address, address, autoCreated, bindings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean createTopic(final boolean storeConfig,
|
||||
final String address,
|
||||
final String topicName,
|
||||
final boolean autoCreated,
|
||||
final String... bindings) throws Exception {
|
||||
if (active && topics.get(address) != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
runAfterActive(new WrappedRunnable() {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "createTopic for " + topicName;
|
||||
return "createTopic for " + address;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runException() throws Exception {
|
||||
checkBindings(bindings);
|
||||
|
||||
if (internalCreateTopic(topicName, autoCreated)) {
|
||||
ActiveMQDestination destination = topics.get(topicName);
|
||||
if (internalCreateTopic(address, topicName, autoCreated)) {
|
||||
ActiveMQDestination destination = topics.get(address);
|
||||
|
||||
if (destination == null) {
|
||||
// sanity check. internalCreateQueue should already have done this check
|
||||
@ -571,17 +593,17 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||
}
|
||||
|
||||
String[] usedBindings = bindingsToAdd.toArray(new String[bindingsToAdd.size()]);
|
||||
addToBindings(topicBindings, topicName, usedBindings);
|
||||
addToBindings(topicBindings, address, usedBindings);
|
||||
|
||||
if (storeConfig) {
|
||||
storage.storeDestination(new PersistedDestination(PersistedType.Topic, topicName));
|
||||
storage.addBindings(PersistedType.Topic, topicName, usedBindings);
|
||||
storage.storeDestination(new PersistedDestination(PersistedType.Topic, address));
|
||||
storage.addBindings(PersistedType.Topic, address, usedBindings);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
sendNotification(JMSNotificationType.TOPIC_CREATED, topicName);
|
||||
sendNotification(JMSNotificationType.TOPIC_CREATED, address);
|
||||
return true;
|
||||
|
||||
}
|
||||
@ -1056,10 +1078,11 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||
private synchronized boolean internalCreateQueue(final String queueName,
|
||||
final String selectorString,
|
||||
final boolean durable) throws Exception {
|
||||
return internalCreateQueue(queueName, selectorString, durable, false);
|
||||
return internalCreateQueue(queueName, queueName, selectorString, durable, false);
|
||||
}
|
||||
|
||||
private synchronized boolean internalCreateQueue(final String queueName,
|
||||
final String jmsQueueName,
|
||||
final String selectorString,
|
||||
final boolean durable,
|
||||
final boolean autoCreated) throws Exception {
|
||||
@ -1078,7 +1101,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||
AddressSettings as = server.getAddressSettingsRepository().getMatch(queueName);
|
||||
server.createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(coreFilterString), null, durable, false, true, false, false, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isAutoCreateAddresses());
|
||||
|
||||
queues.put(queueName, ActiveMQDestination.createQueue(queueName));
|
||||
// create the JMS queue with the logical name jmsQueueName and keeps queueName for its *core* queue name
|
||||
queues.put(queueName, ActiveMQDestination.createQueue(queueName, jmsQueueName));
|
||||
|
||||
this.recoverregistryBindings(queueName, PersistedType.Queue);
|
||||
|
||||
@ -1090,21 +1114,23 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||
* Performs the internal creation without activating any storage.
|
||||
* The storage load will call this method
|
||||
*
|
||||
* @param topicName
|
||||
* @param address
|
||||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
private synchronized boolean internalCreateTopic(final String topicName) throws Exception {
|
||||
return internalCreateTopic(topicName, false);
|
||||
private synchronized boolean internalCreateTopic(final String address) throws Exception {
|
||||
return internalCreateTopic(address, address, false);
|
||||
}
|
||||
|
||||
private synchronized boolean internalCreateTopic(final String topicName,
|
||||
private synchronized boolean internalCreateTopic(final String address,
|
||||
final String topicName,
|
||||
final boolean autoCreated) throws Exception {
|
||||
|
||||
if (topics.get(topicName) != null) {
|
||||
if (topics.get(address) != null) {
|
||||
return false;
|
||||
} else {
|
||||
ActiveMQTopic activeMQTopic = ActiveMQDestination.createTopic(topicName);
|
||||
// Create the JMS topic with topicName as the logical name of the topic *and* address as its address
|
||||
ActiveMQTopic activeMQTopic = ActiveMQDestination.createTopic(address, topicName);
|
||||
server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQTopic.getAddress()), RoutingType.MULTICAST));
|
||||
|
||||
topics.put(topicName, activeMQTopic);
|
||||
|
Loading…
x
Reference in New Issue
Block a user