[ARTEMIS-1030] add support for auto mapping openwire virtual topic consumer destinations to FQQN
This commit is contained in:
parent
6428a897c3
commit
82e4f465ee
|
@ -49,14 +49,18 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||||
|
import org.apache.activemq.artemis.selector.impl.LRUCache;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
|
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||||
import org.apache.activemq.artemis.utils.DataConstants;
|
import org.apache.activemq.artemis.utils.DataConstants;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.command.BrokerId;
|
import org.apache.activemq.command.BrokerId;
|
||||||
import org.apache.activemq.command.BrokerInfo;
|
import org.apache.activemq.command.BrokerInfo;
|
||||||
|
@ -70,6 +74,8 @@ import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.command.ProducerId;
|
import org.apache.activemq.command.ProducerId;
|
||||||
import org.apache.activemq.command.ProducerInfo;
|
import org.apache.activemq.command.ProducerInfo;
|
||||||
import org.apache.activemq.command.WireFormatInfo;
|
import org.apache.activemq.command.WireFormatInfo;
|
||||||
|
import org.apache.activemq.filter.DestinationFilter;
|
||||||
|
import org.apache.activemq.filter.DestinationPath;
|
||||||
import org.apache.activemq.openwire.OpenWireFormat;
|
import org.apache.activemq.openwire.OpenWireFormat;
|
||||||
import org.apache.activemq.openwire.OpenWireFormatFactory;
|
import org.apache.activemq.openwire.OpenWireFormatFactory;
|
||||||
import org.apache.activemq.state.ProducerState;
|
import org.apache.activemq.state.ProducerState;
|
||||||
|
@ -128,6 +134,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
||||||
|
|
||||||
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
|
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
|
||||||
|
|
||||||
|
private final Map<DestinationFilter, Integer> vtConsumerDestinationMatchers = new HashMap<>();
|
||||||
|
protected final LRUCache<ActiveMQDestination, ActiveMQDestination> vtDestMapCache = new LRUCache();
|
||||||
|
|
||||||
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
|
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
|
||||||
this.factory = factory;
|
this.factory = factory;
|
||||||
this.server = server;
|
this.server = server;
|
||||||
|
@ -607,4 +616,70 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
||||||
public void setSuppressInternalManagementObjects(boolean suppressInternalManagementObjects) {
|
public void setSuppressInternalManagementObjects(boolean suppressInternalManagementObjects) {
|
||||||
this.suppressInternalManagementObjects = suppressInternalManagementObjects;
|
this.suppressInternalManagementObjects = suppressInternalManagementObjects;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setVirtualTopicConsumerWildcards(String virtualTopicConsumerWildcards) {
|
||||||
|
for (String filter : virtualTopicConsumerWildcards.split(",")) {
|
||||||
|
String[] wildcardLimitPair = filter.split(";");
|
||||||
|
vtConsumerDestinationMatchers.put(DestinationFilter.parseFilter(new ActiveMQQueue(wildcardLimitPair[0])), Integer.valueOf(wildcardLimitPair[1]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setVirtualTopicConsumerLruCacheMax(int max) {
|
||||||
|
vtDestMapCache.setMaxCacheSize(max);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ActiveMQDestination virtualTopicConsumerToFQQN(final ActiveMQDestination destination) {
|
||||||
|
|
||||||
|
if (vtConsumerDestinationMatchers.isEmpty()) {
|
||||||
|
return destination;
|
||||||
|
}
|
||||||
|
|
||||||
|
ActiveMQDestination mappedDestination = null;
|
||||||
|
synchronized (vtDestMapCache) {
|
||||||
|
mappedDestination = vtDestMapCache.get(destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mappedDestination != null) {
|
||||||
|
return mappedDestination;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Map.Entry<DestinationFilter, Integer> candidate : vtConsumerDestinationMatchers.entrySet()) {
|
||||||
|
if (candidate.getKey().matches(destination)) {
|
||||||
|
// convert to matching FQQN
|
||||||
|
String[] paths = DestinationPath.getDestinationPaths(destination);
|
||||||
|
StringBuilder fqqn = new StringBuilder();
|
||||||
|
int filterPathTerminus = candidate.getValue();
|
||||||
|
// address - ie: topic
|
||||||
|
for (int i = filterPathTerminus; i < paths.length; i++) {
|
||||||
|
if (i > filterPathTerminus) {
|
||||||
|
fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
|
||||||
|
}
|
||||||
|
fqqn.append(paths[i]);
|
||||||
|
}
|
||||||
|
fqqn.append(CompositeAddress.SEPARATOR);
|
||||||
|
// consumer queue
|
||||||
|
for (int i = 0; i < filterPathTerminus; i++) {
|
||||||
|
if (i > 0) {
|
||||||
|
fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
|
||||||
|
}
|
||||||
|
fqqn.append(paths[i]);
|
||||||
|
}
|
||||||
|
mappedDestination = new ActiveMQQueue(fqqn.toString());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (mappedDestination == null) {
|
||||||
|
// cache the identity mapping
|
||||||
|
mappedDestination = destination;
|
||||||
|
}
|
||||||
|
synchronized (vtDestMapCache) {
|
||||||
|
ActiveMQDestination existing = vtDestMapCache.put(destination, mappedDestination);
|
||||||
|
if (existing != null) {
|
||||||
|
// some one beat us to the put, revert
|
||||||
|
vtDestMapCache.put(destination, existing);
|
||||||
|
mappedDestination = existing;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return mappedDestination;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -173,6 +173,7 @@ public class AMQSession implements SessionCallback {
|
||||||
isInternalAddress = connection.isSuppressInternalManagementObjects();
|
isInternalAddress = connection.isSuppressInternalManagementObjects();
|
||||||
}
|
}
|
||||||
if (openWireDest.isQueue()) {
|
if (openWireDest.isQueue()) {
|
||||||
|
openWireDest = protocolManager.virtualTopicConsumerToFQQN(openWireDest);
|
||||||
SimpleString queueName = new SimpleString(convertWildcard(openWireDest.getPhysicalName()));
|
SimpleString queueName = new SimpleString(convertWildcard(openWireDest.getPhysicalName()));
|
||||||
|
|
||||||
if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {
|
if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {
|
||||||
|
|
|
@ -11,12 +11,19 @@ component must be unique to a connection on the broker. This means that the subs
|
||||||
not possible to load balance the stream of messages across consumers and quick failover is difficult because the
|
not possible to load balance the stream of messages across consumers and quick failover is difficult because the
|
||||||
existing connection state on the broker needs to be first disposed.
|
existing connection state on the broker needs to be first disposed.
|
||||||
With virtual topics, each subscription's stream of messages is redirected to a queue.
|
With virtual topics, each subscription's stream of messages is redirected to a queue.
|
||||||
|
|
||||||
|
In Artemis there are two alternatives, the new JMS 2.0 api or direct access to a subscription queue via the FQQN.
|
||||||
|
|
||||||
JMS2.0 adds the possibility of shared subscriptions with new API's that are fully supported in Artemis.
|
JMS 2.0 shared subscriptions
|
||||||
|
----------------------------
|
||||||
|
JMS 2.0 adds the possibility of shared subscriptions with new API's that are fully supported in Artemis.
|
||||||
|
|
||||||
|
Fully Qualified Queue name (FQQN)
|
||||||
|
---------------------------------
|
||||||
Secondly, Artemis uses a queue per topic subscriber model internally and it is possibly to directly address the
|
Secondly, Artemis uses a queue per topic subscriber model internally and it is possibly to directly address the
|
||||||
subscription queue using it's Fully Qualified Queue name (FQQN).
|
subscription queue using it's Fully Qualified Queue name (FQQN).
|
||||||
|
|
||||||
For example, a default 5.x consumer for topic `VirtualTopic.Orders` subscription `A`:
|
For example, a default 5.x consumer destination for topic `VirtualTopic.Orders` subscription `A`:
|
||||||
```
|
```
|
||||||
...
|
...
|
||||||
Queue subscriptionQueue = session.createQueue("Consumer.A.VirtualTopic.Orders");
|
Queue subscriptionQueue = session.createQueue("Consumer.A.VirtualTopic.Orders");
|
||||||
|
@ -30,6 +37,19 @@ would be replaced with an Artemis FQQN comprised of the address and queue.
|
||||||
session.createConsumer(subscriptionQueue);
|
session.createConsumer(subscriptionQueue);
|
||||||
```
|
```
|
||||||
|
|
||||||
|
This does require modification to the destination name used by consumers which is not ideal.
|
||||||
|
If OpenWire clients cannot be modified, Artemis supports a virtual topic wildcard filter
|
||||||
|
mechanism on the openwire protocol handler that will automatically convert the consumer destination into the
|
||||||
|
corresponding FQQN.
|
||||||
|
The format is a comma separated list of strings pairs, delimited with a ';'. Each pair identifies a filter to match
|
||||||
|
the virtual topic consumer destination and an int that specifies the number of path matches that terminate the consumer
|
||||||
|
queue identity.
|
||||||
|
|
||||||
|
E.g: For the default 5.x virtual topic consumer prefix of ```Consumer.*.``` the url parameter ```virtualTopicConsumerWildcards``` should be: ```Consumer.*.>;2```.
|
||||||
|
In this way a consumer destination of ```Consumer.A.VirtualTopic.Orders``` will be transformed into a FQQN of
|
||||||
|
```VirtualTopic.Orders::Consumer.A```.
|
||||||
|
|
||||||
|
|
||||||
Durable topic subscribers in a network of brokers
|
Durable topic subscribers in a network of brokers
|
||||||
-------------------------------------------------
|
-------------------------------------------------
|
||||||
The store and forward network bridges in 5.x create a durable subscriber per destination. As demand migrates across a
|
The store and forward network bridges in 5.x create a durable subscriber per destination. As demand migrates across a
|
||||||
|
|
|
@ -204,6 +204,25 @@ The two parameters are configured on openwire acceptors, via URLs or API. For ex
|
||||||
|
|
||||||
<acceptor name="artemis">tcp://127.0.0.1:61616?protocols=CORE,AMQP,OPENWIRE;supportAdvisory=true;suppressInternalManagementObjects=false</acceptor>
|
<acceptor name="artemis">tcp://127.0.0.1:61616?protocols=CORE,AMQP,OPENWIRE;supportAdvisory=true;suppressInternalManagementObjects=false</acceptor>
|
||||||
|
|
||||||
|
### Virtual Topic Consumer Destination Translation
|
||||||
|
|
||||||
|
For existing Openwire consumers of virtual topic destinations it is possible to configure a mapping function
|
||||||
|
that will translate the virtual topic consumer destination into a FQQN address. This address then represents
|
||||||
|
the consumer as a multicast binding to an address representing the virtual topic.
|
||||||
|
|
||||||
|
The configuration string property ```virtualTopicConsumerWildcards``` has two parts seperated by a ```;```.
|
||||||
|
The first is the 5.x style destination filter that identifies the destination as belonging to a virtual topic.
|
||||||
|
The second identifies the number of ```paths``` that identify the consumer queue such that it can be parsed from the
|
||||||
|
destination.
|
||||||
|
For example, the default 5.x virtual topic with consumer prefix of ```Consumer.*.```, would require a
|
||||||
|
```virtualTopicConsumerWildcards``` filter of:
|
||||||
|
|
||||||
|
<acceptor name="artemis">tcp://127.0.0.1:61616?protocols=OPENWIRE;virtualTopicConsumerWildcards=Consumer.*.>;2</acceptor>
|
||||||
|
|
||||||
|
This will translate ```Consumer.A.VirtualTopic.Orders``` into a FQQN of ```VirtualTopic.Orders::Consumer.A``` using the
|
||||||
|
int component ```2``` of the configuration to identify the consumer queue as the first two paths of the destination.
|
||||||
|
```virtualTopicConsumerWildcards``` is multi valued using a ```,``` separator.
|
||||||
|
|
||||||
## MQTT
|
## MQTT
|
||||||
|
|
||||||
MQTT is a light weight, client to server, publish / subscribe messaging protocol. MQTT has been specifically
|
MQTT is a light weight, client to server, publish / subscribe messaging protocol. MQTT has been specifically
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.protocol.openwire;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.Activation;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
|
import org.apache.activemq.artemis.selector.impl.LRUCache;
|
||||||
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
|
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
|
||||||
|
public class OpenWireProtocolManagerTest {
|
||||||
|
|
||||||
|
OpenWireProtocolManager underTest;
|
||||||
|
LRUCache lruCacheRef;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVtAutoConversion() throws Exception {
|
||||||
|
underTest = new OpenWireProtocolManager(null, new DummyServer()) {
|
||||||
|
@Override
|
||||||
|
public ActiveMQDestination virtualTopicConsumerToFQQN(ActiveMQDestination destination) {
|
||||||
|
if (lruCacheRef == null) {
|
||||||
|
lruCacheRef = vtDestMapCache;
|
||||||
|
}
|
||||||
|
return super.virtualTopicConsumerToFQQN(destination);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
final int maxCacheSize = 10;
|
||||||
|
underTest.setVirtualTopicConsumerLruCacheMax(10);
|
||||||
|
underTest.setVirtualTopicConsumerWildcards("A.>;1,B.*.>;2,C.*.*.*.EE;3");
|
||||||
|
|
||||||
|
ActiveMQDestination A = new org.apache.activemq.command.ActiveMQQueue("A.SomeTopic");
|
||||||
|
assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic::A"), underTest.virtualTopicConsumerToFQQN(A));
|
||||||
|
|
||||||
|
ActiveMQDestination B = new org.apache.activemq.command.ActiveMQQueue("B.b.SomeTopic.B");
|
||||||
|
assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.B::B.b"), underTest.virtualTopicConsumerToFQQN(B));
|
||||||
|
|
||||||
|
ActiveMQDestination C = new org.apache.activemq.command.ActiveMQQueue("C.c.c.SomeTopic.EE");
|
||||||
|
assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.EE::C.c.c"), underTest.virtualTopicConsumerToFQQN(C));
|
||||||
|
|
||||||
|
for (int i = 0; i < maxCacheSize; i++) {
|
||||||
|
ActiveMQDestination identity = new org.apache.activemq.command.ActiveMQQueue("Identity" + i);
|
||||||
|
assertEquals(identity, underTest.virtualTopicConsumerToFQQN(identity));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertFalse(lruCacheRef.containsKey(A));
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class DummyServer extends ActiveMQServerImpl {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClusterManager getClusterManager() {
|
||||||
|
return new ClusterManager(getExecutorFactory(), this, null, null, null, null, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ExecutorFactory getExecutorFactory() {
|
||||||
|
return new ExecutorFactory() {
|
||||||
|
@Override
|
||||||
|
public ArtemisExecutor getExecutor() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Activation getActivation() {
|
||||||
|
return new Activation() {
|
||||||
|
@Override
|
||||||
|
public void close(boolean permanently, boolean restarting) throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,87 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.openwire;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void extraServerConfig(Configuration serverConfig) {
|
||||||
|
Set<TransportConfiguration> acceptors = server.getConfiguration().getAcceptorConfigurations();
|
||||||
|
for (TransportConfiguration tc : acceptors) {
|
||||||
|
if (tc.getName().equals("netty")) {
|
||||||
|
tc.getExtraParams().put("virtualTopicConsumerWildcards", "Consumer.*.>;2");
|
||||||
|
tc.getExtraParams().put("virtualTopicConsumerLruCacheMax", "10000");
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoVirtualTopicFQQN() throws Exception {
|
||||||
|
Connection connection = null;
|
||||||
|
|
||||||
|
SimpleString topic = new SimpleString("VirtualTopic.Orders");
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
|
||||||
|
activeMQConnectionFactory.setWatchTopicAdvisories(false);
|
||||||
|
connection = activeMQConnectionFactory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createTopic(topic.toString());
|
||||||
|
|
||||||
|
MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A." + topic.toString()));
|
||||||
|
MessageConsumer messageConsumerB = session.createConsumer(session.createQueue("Consumer.B." + topic.toString()));
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
TextMessage message = session.createTextMessage("This is a text message");
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
|
||||||
|
TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
|
||||||
|
|
||||||
|
assertTrue((messageReceivedA != null && messageReceivedB != null));
|
||||||
|
String text = messageReceivedA.getText();
|
||||||
|
assertEquals("This is a text message", text);
|
||||||
|
|
||||||
|
messageConsumerA.close();
|
||||||
|
messageConsumerB.close();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue