This commit is contained in:
Howard Gao 2018-01-22 20:32:06 +08:00
commit 701ab1feba
6 changed files with 305 additions and 2 deletions

View File

@ -49,14 +49,18 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.impl.LRUCache;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@ -70,6 +74,8 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.DestinationPath;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.state.ProducerState;
@ -128,6 +134,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
private final Map<DestinationFilter, Integer> vtConsumerDestinationMatchers = new HashMap<>();
protected final LRUCache<ActiveMQDestination, ActiveMQDestination> vtDestMapCache = new LRUCache();
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
this.factory = factory;
this.server = server;
@ -607,4 +616,70 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
public void setSuppressInternalManagementObjects(boolean suppressInternalManagementObjects) {
this.suppressInternalManagementObjects = suppressInternalManagementObjects;
}
public void setVirtualTopicConsumerWildcards(String virtualTopicConsumerWildcards) {
for (String filter : virtualTopicConsumerWildcards.split(",")) {
String[] wildcardLimitPair = filter.split(";");
vtConsumerDestinationMatchers.put(DestinationFilter.parseFilter(new ActiveMQQueue(wildcardLimitPair[0])), Integer.valueOf(wildcardLimitPair[1]));
}
}
public void setVirtualTopicConsumerLruCacheMax(int max) {
vtDestMapCache.setMaxCacheSize(max);
}
public ActiveMQDestination virtualTopicConsumerToFQQN(final ActiveMQDestination destination) {
if (vtConsumerDestinationMatchers.isEmpty()) {
return destination;
}
ActiveMQDestination mappedDestination = null;
synchronized (vtDestMapCache) {
mappedDestination = vtDestMapCache.get(destination);
}
if (mappedDestination != null) {
return mappedDestination;
}
for (Map.Entry<DestinationFilter, Integer> candidate : vtConsumerDestinationMatchers.entrySet()) {
if (candidate.getKey().matches(destination)) {
// convert to matching FQQN
String[] paths = DestinationPath.getDestinationPaths(destination);
StringBuilder fqqn = new StringBuilder();
int filterPathTerminus = candidate.getValue();
// address - ie: topic
for (int i = filterPathTerminus; i < paths.length; i++) {
if (i > filterPathTerminus) {
fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
}
fqqn.append(paths[i]);
}
fqqn.append(CompositeAddress.SEPARATOR);
// consumer queue
for (int i = 0; i < filterPathTerminus; i++) {
if (i > 0) {
fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
}
fqqn.append(paths[i]);
}
mappedDestination = new ActiveMQQueue(fqqn.toString());
break;
}
}
if (mappedDestination == null) {
// cache the identity mapping
mappedDestination = destination;
}
synchronized (vtDestMapCache) {
ActiveMQDestination existing = vtDestMapCache.put(destination, mappedDestination);
if (existing != null) {
// some one beat us to the put, revert
vtDestMapCache.put(destination, existing);
mappedDestination = existing;
}
}
return mappedDestination;
}
}

View File

@ -173,6 +173,7 @@ public class AMQSession implements SessionCallback {
isInternalAddress = connection.isSuppressInternalManagementObjects();
}
if (openWireDest.isQueue()) {
openWireDest = protocolManager.virtualTopicConsumerToFQQN(openWireDest);
SimpleString queueName = new SimpleString(convertWildcard(openWireDest.getPhysicalName()));
if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {

View File

@ -11,12 +11,19 @@ component must be unique to a connection on the broker. This means that the subs
not possible to load balance the stream of messages across consumers and quick failover is difficult because the
existing connection state on the broker needs to be first disposed.
With virtual topics, each subscription's stream of messages is redirected to a queue.
In Artemis there are two alternatives, the new JMS 2.0 api or direct access to a subscription queue via the FQQN.
JMS2.0 adds the possibility of shared subscriptions with new API's that are fully supported in Artemis.
JMS 2.0 shared subscriptions
----------------------------
JMS 2.0 adds the possibility of shared subscriptions with new API's that are fully supported in Artemis.
Fully Qualified Queue name (FQQN)
---------------------------------
Secondly, Artemis uses a queue per topic subscriber model internally and it is possibly to directly address the
subscription queue using it's Fully Qualified Queue name (FQQN).
For example, a default 5.x consumer for topic `VirtualTopic.Orders` subscription `A`:
For example, a default 5.x consumer destination for topic `VirtualTopic.Orders` subscription `A`:
```
...
Queue subscriptionQueue = session.createQueue("Consumer.A.VirtualTopic.Orders");
@ -30,6 +37,19 @@ would be replaced with an Artemis FQQN comprised of the address and queue.
session.createConsumer(subscriptionQueue);
```
This does require modification to the destination name used by consumers which is not ideal.
If OpenWire clients cannot be modified, Artemis supports a virtual topic wildcard filter
mechanism on the openwire protocol handler that will automatically convert the consumer destination into the
corresponding FQQN.
The format is a comma separated list of strings pairs, delimited with a ';'. Each pair identifies a filter to match
the virtual topic consumer destination and an int that specifies the number of path matches that terminate the consumer
queue identity.
E.g: For the default 5.x virtual topic consumer prefix of ```Consumer.*.``` the url parameter ```virtualTopicConsumerWildcards``` should be: ```Consumer.*.>;2```.
In this way a consumer destination of ```Consumer.A.VirtualTopic.Orders``` will be transformed into a FQQN of
```VirtualTopic.Orders::Consumer.A```.
Durable topic subscribers in a network of brokers
-------------------------------------------------
The store and forward network bridges in 5.x create a durable subscriber per destination. As demand migrates across a

View File

@ -204,6 +204,25 @@ The two parameters are configured on openwire acceptors, via URLs or API. For ex
<acceptor name="artemis">tcp://127.0.0.1:61616?protocols=CORE,AMQP,OPENWIRE;supportAdvisory=true;suppressInternalManagementObjects=false</acceptor>
### Virtual Topic Consumer Destination Translation
For existing Openwire consumers of virtual topic destinations it is possible to configure a mapping function
that will translate the virtual topic consumer destination into a FQQN address. This address then represents
the consumer as a multicast binding to an address representing the virtual topic.
The configuration string property ```virtualTopicConsumerWildcards``` has two parts seperated by a ```;```.
The first is the 5.x style destination filter that identifies the destination as belonging to a virtual topic.
The second identifies the number of ```paths``` that identify the consumer queue such that it can be parsed from the
destination.
For example, the default 5.x virtual topic with consumer prefix of ```Consumer.*.```, would require a
```virtualTopicConsumerWildcards``` filter of:
<acceptor name="artemis">tcp://127.0.0.1:61616?protocols=OPENWIRE;virtualTopicConsumerWildcards=Consumer.*.>;2</acceptor>
This will translate ```Consumer.A.VirtualTopic.Orders``` into a FQQN of ```VirtualTopic.Orders::Consumer.A``` using the
int component ```2``` of the configuration to identify the consumer queue as the first two paths of the destination.
```virtualTopicConsumerWildcards``` is multi valued using a ```,``` separator.
## MQTT
MQTT is a light weight, client to server, publish / subscribe messaging protocol. MQTT has been specifically

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.selector.impl.LRUCache;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class OpenWireProtocolManagerTest {
OpenWireProtocolManager underTest;
LRUCache lruCacheRef;
@Test
public void testVtAutoConversion() throws Exception {
underTest = new OpenWireProtocolManager(null, new DummyServer()) {
@Override
public ActiveMQDestination virtualTopicConsumerToFQQN(ActiveMQDestination destination) {
if (lruCacheRef == null) {
lruCacheRef = vtDestMapCache;
}
return super.virtualTopicConsumerToFQQN(destination);
}
};
final int maxCacheSize = 10;
underTest.setVirtualTopicConsumerLruCacheMax(10);
underTest.setVirtualTopicConsumerWildcards("A.>;1,B.*.>;2,C.*.*.*.EE;3");
ActiveMQDestination A = new org.apache.activemq.command.ActiveMQQueue("A.SomeTopic");
assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic::A"), underTest.virtualTopicConsumerToFQQN(A));
ActiveMQDestination B = new org.apache.activemq.command.ActiveMQQueue("B.b.SomeTopic.B");
assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.B::B.b"), underTest.virtualTopicConsumerToFQQN(B));
ActiveMQDestination C = new org.apache.activemq.command.ActiveMQQueue("C.c.c.SomeTopic.EE");
assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.EE::C.c.c"), underTest.virtualTopicConsumerToFQQN(C));
for (int i = 0; i < maxCacheSize; i++) {
ActiveMQDestination identity = new org.apache.activemq.command.ActiveMQQueue("Identity" + i);
assertEquals(identity, underTest.virtualTopicConsumerToFQQN(identity));
}
assertFalse(lruCacheRef.containsKey(A));
}
static final class DummyServer extends ActiveMQServerImpl {
@Override
public ClusterManager getClusterManager() {
return new ClusterManager(getExecutorFactory(), this, null, null, null, null, null, false);
}
@Override
public ExecutorFactory getExecutorFactory() {
return new ExecutorFactory() {
@Override
public ArtemisExecutor getExecutor() {
return null;
}
};
}
@Override
public Activation getActivation() {
return new Activation() {
@Override
public void close(boolean permanently, boolean restarting) throws Exception {
}
@Override
public void run() {
}
};
}
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Set;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.junit.Test;
public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase {
@Override
protected void extraServerConfig(Configuration serverConfig) {
Set<TransportConfiguration> acceptors = server.getConfiguration().getAcceptorConfigurations();
for (TransportConfiguration tc : acceptors) {
if (tc.getName().equals("netty")) {
tc.getExtraParams().put("virtualTopicConsumerWildcards", "Consumer.*.>;2");
tc.getExtraParams().put("virtualTopicConsumerLruCacheMax", "10000");
}
}
}
@Test
public void testAutoVirtualTopicFQQN() throws Exception {
Connection connection = null;
SimpleString topic = new SimpleString("VirtualTopic.Orders");
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
try {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
activeMQConnectionFactory.setWatchTopicAdvisories(false);
connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topic.toString());
MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A." + topic.toString()));
MessageConsumer messageConsumerB = session.createConsumer(session.createQueue("Consumer.B." + topic.toString()));
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a text message");
producer.send(message);
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
assertTrue((messageReceivedA != null && messageReceivedB != null));
String text = messageReceivedA.getText();
assertEquals("This is a text message", text);
messageConsumerA.close();
messageConsumerB.close();
} finally {
if (connection != null) {
connection.close();
}
}
}
}