ARTEMIS-2103 - use the full openwire consumer queue for the mapped virtual topic queue binding, fix and test
This commit is contained in:
parent
714a3f862e
commit
b812bfdbed
|
@ -657,8 +657,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
||||||
fqqn.append(paths[i]);
|
fqqn.append(paths[i]);
|
||||||
}
|
}
|
||||||
fqqn.append(CompositeAddress.SEPARATOR);
|
fqqn.append(CompositeAddress.SEPARATOR);
|
||||||
// consumer queue
|
// consumer queue - the full vt queue
|
||||||
for (int i = 0; i < filterPathTerminus; i++) {
|
for (int i = 0; i < paths.length; i++) {
|
||||||
if (i > 0) {
|
if (i > 0) {
|
||||||
fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
|
fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,7 @@ For example, a default 5.x consumer destination for topic `VirtualTopic.Orders`
|
||||||
would be replaced with an Artemis FQQN comprised of the address and queue.
|
would be replaced with an Artemis FQQN comprised of the address and queue.
|
||||||
```
|
```
|
||||||
...
|
...
|
||||||
Queue subscriptionQueue = session.createQueue("VirtualTopic.Orders::Consumer.A");
|
Queue subscriptionQueue = session.createQueue("VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders");
|
||||||
session.createConsumer(subscriptionQueue);
|
session.createConsumer(subscriptionQueue);
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ E.g: For the default 5.x virtual topic consumer prefix of ```Consumer.*.``` the
|
||||||
However, there is a caveat because this value needs to be encoded in a uri for the xml configuration. Any unsafe url characters
|
However, there is a caveat because this value needs to be encoded in a uri for the xml configuration. Any unsafe url characters
|
||||||
, in this case: ```> ;``` need to be escaped with their hex code point representation; leading to a value of ```Consumer.*.%3E%3B2```.
|
, in this case: ```> ;``` need to be escaped with their hex code point representation; leading to a value of ```Consumer.*.%3E%3B2```.
|
||||||
In this way a consumer destination of ```Consumer.A.VirtualTopic.Orders``` will be transformed into a FQQN of
|
In this way a consumer destination of ```Consumer.A.VirtualTopic.Orders``` will be transformed into a FQQN of
|
||||||
```VirtualTopic.Orders::Consumer.A```.
|
```VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders```.
|
||||||
|
|
||||||
|
|
||||||
Durable topic subscribers in a network of brokers
|
Durable topic subscribers in a network of brokers
|
||||||
|
|
|
@ -85,7 +85,7 @@ The two parameters are configured on an OpenWire `acceptor`, e.g.:
|
||||||
|
|
||||||
For existing OpenWire consumers of virtual topic destinations it is possible to
|
For existing OpenWire consumers of virtual topic destinations it is possible to
|
||||||
configure a mapping function that will translate the virtual topic consumer
|
configure a mapping function that will translate the virtual topic consumer
|
||||||
destination into a FQQN address. This address then represents the consumer as a
|
destination into a FQQN address. This address will then represents the consumer as a
|
||||||
multicast binding to an address representing the virtual topic.
|
multicast binding to an address representing the virtual topic.
|
||||||
|
|
||||||
The configuration string property `virtualTopicConsumerWildcards` has two parts
|
The configuration string property `virtualTopicConsumerWildcards` has two parts
|
||||||
|
@ -103,7 +103,7 @@ this transforms to `Consumer.*.%3E%3B2` when the url significant characters
|
||||||
```
|
```
|
||||||
|
|
||||||
This will translate `Consumer.A.VirtualTopic.Orders` into a FQQN of
|
This will translate `Consumer.A.VirtualTopic.Orders` into a FQQN of
|
||||||
`VirtualTopic.Orders::Consumer.A` using the int component `2` of the
|
`VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders` using the int component `2` of the
|
||||||
configuration to identify the consumer queue as the first two paths of the
|
configuration to identify the consumer queue as the first two paths of the
|
||||||
destination. `virtualTopicConsumerWildcards` is multi valued using a `,`
|
destination. `virtualTopicConsumerWildcards` is multi valued using a `,`
|
||||||
separator.
|
separator.
|
||||||
|
|
|
@ -18,7 +18,7 @@ Address.
|
||||||
The example sends a message to a topic (using openwire protocol) and an openwire consumer listens on the backing queue
|
The example sends a message to a topic (using openwire protocol) and an openwire consumer listens on the backing queue
|
||||||
using the ActiveMQ 5.x virtual topic naming convention. Due to the acceptor url parameter `virtualTopicConsumerWildcards`,
|
using the ActiveMQ 5.x virtual topic naming convention. Due to the acceptor url parameter `virtualTopicConsumerWildcards`,
|
||||||
(see below), Artemis maps the consumer consuming from `Consumer.A.VirtualTopic.Orders` to actually consume from
|
(see below), Artemis maps the consumer consuming from `Consumer.A.VirtualTopic.Orders` to actually consume from
|
||||||
FQQN of `VirtualTopic.Orders::Consumer.A`
|
FQQN of `VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders`
|
||||||
|
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
* The example sends a message to a topic (using openwire protocol) and an openwire consumer listens on the backing queue
|
* The example sends a message to a topic (using openwire protocol) and an openwire consumer listens on the backing queue
|
||||||
* using the ActiveMQ 5.x virtual topic naming convention. Due to the acceptor parameter virtualTopicConsumerWildcards
|
* using the ActiveMQ 5.x virtual topic naming convention. Due to the acceptor parameter virtualTopicConsumerWildcards
|
||||||
* Artemis maps the consumer consuming from "Consumer.A.VirtualTopic.Orders" to actually consume from
|
* Artemis maps the consumer consuming from "Consumer.A.VirtualTopic.Orders" to actually consume from
|
||||||
* FQQN "VirtualTopic.Orders::Consumer.A"
|
* FQQN "VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders"
|
||||||
*/
|
*/
|
||||||
public class VirtualTopicMappingExample {
|
public class VirtualTopicMappingExample {
|
||||||
|
|
||||||
|
|
|
@ -84,4 +84,142 @@ public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTwoTopicSubsSameNameAutoVirtualTopicFQQN() throws Exception {
|
||||||
|
Connection connection = null;
|
||||||
|
|
||||||
|
SimpleString topic1 = new SimpleString("VirtualTopic.Orders1");
|
||||||
|
SimpleString topic2 = new SimpleString("VirtualTopic.Orders2");
|
||||||
|
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
|
||||||
|
activeMQConnectionFactory.setWatchTopicAdvisories(false);
|
||||||
|
connection = activeMQConnectionFactory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination1 = session.createTopic(topic1.toString());
|
||||||
|
Destination destination2 = session.createTopic(topic2.toString());
|
||||||
|
|
||||||
|
MessageConsumer messageConsumer1 = session.createConsumer(session.createQueue("Consumer.A." + topic1.toString()));
|
||||||
|
MessageConsumer messageConsumer2 = session.createConsumer(session.createQueue("Consumer.A." + topic2.toString()));
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(null);
|
||||||
|
TextMessage message = session.createTextMessage("This is a text message to 1");
|
||||||
|
producer.send(destination1, message);
|
||||||
|
message = session.createTextMessage("This is a text message to 2");
|
||||||
|
producer.send(destination2, message);
|
||||||
|
|
||||||
|
|
||||||
|
TextMessage messageReceived1 = (TextMessage) messageConsumer1.receive(2000);
|
||||||
|
TextMessage messageReceived2 = (TextMessage) messageConsumer2.receive(2000);
|
||||||
|
|
||||||
|
assertNotNull(messageReceived1);
|
||||||
|
assertNotNull(messageReceived2);
|
||||||
|
|
||||||
|
String text = messageReceived1.getText();
|
||||||
|
assertEquals("This is a text message to 1", text);
|
||||||
|
|
||||||
|
text = messageReceived2.getText();
|
||||||
|
assertEquals("This is a text message to 2", text);
|
||||||
|
|
||||||
|
messageConsumer1.close();
|
||||||
|
messageConsumer2.close();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoVirtualTopicWildcardFQQN() throws Exception {
|
||||||
|
Connection connection = null;
|
||||||
|
|
||||||
|
SimpleString topicA = new SimpleString("VirtualTopic.Orders.A");
|
||||||
|
SimpleString topicB = new SimpleString("VirtualTopic.Orders.B");
|
||||||
|
SimpleString topic = new SimpleString("VirtualTopic.Orders.>");
|
||||||
|
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
|
||||||
|
activeMQConnectionFactory.setWatchTopicAdvisories(false);
|
||||||
|
connection = activeMQConnectionFactory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createTopic(topicA.toString() + "," + topicB.toString());
|
||||||
|
|
||||||
|
MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A." + topic.toString()));
|
||||||
|
// MessageConsumer messageConsumerB = session.createConsumer(session.createQueue("Consumer.B." + topic.toString()));
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
TextMessage message = session.createTextMessage("This is a text message");
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
|
||||||
|
TextMessage messageReceivedB = (TextMessage) messageConsumerA.receive(2000);
|
||||||
|
|
||||||
|
assertTrue((messageReceivedA != null && messageReceivedB != null));
|
||||||
|
String text = messageReceivedA.getText();
|
||||||
|
assertEquals("This is a text message", text);
|
||||||
|
|
||||||
|
messageConsumerA.close();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoVirtualTopicWildcardStarFQQN() throws Exception {
|
||||||
|
Connection connection = null;
|
||||||
|
|
||||||
|
SimpleString topicA = new SimpleString("VirtualTopic.Orders.A");
|
||||||
|
SimpleString topicB = new SimpleString("VirtualTopic.Orders.B");
|
||||||
|
SimpleString topic = new SimpleString("VirtualTopic.Orders.*");
|
||||||
|
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
|
||||||
|
activeMQConnectionFactory.setWatchTopicAdvisories(false);
|
||||||
|
connection = activeMQConnectionFactory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createTopic(topicA.toString() + "," + topicB.toString());
|
||||||
|
|
||||||
|
MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A." + topic.toString()));
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
TextMessage message = session.createTextMessage("This is a text message");
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
|
||||||
|
TextMessage messageReceivedB = (TextMessage) messageConsumerA.receive(2000);
|
||||||
|
|
||||||
|
assertTrue((messageReceivedA != null && messageReceivedB != null));
|
||||||
|
String text = messageReceivedA.getText();
|
||||||
|
assertEquals("This is a text message", text);
|
||||||
|
|
||||||
|
messageConsumerA.close();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue