This commit is contained in:
Clebert Suconic 2021-03-22 19:15:02 -04:00
commit 04fcecb579
7 changed files with 146 additions and 30 deletions

View File

@ -80,6 +80,8 @@ import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.LongSequenceGenerator;
import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.SELECTOR_AWARE_OPTION;
public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
@ -133,7 +135,29 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
private final Map<DestinationFilter, Integer> vtConsumerDestinationMatchers = new HashMap<>();
protected class VirtualTopicConfig {
public int filterPathTerminus;
public boolean selectorAware;
public VirtualTopicConfig(String[] configuration) {
filterPathTerminus = Integer.valueOf(configuration[1]);
// optional config
for (int i = 2; i < configuration.length; i++) {
String[] optionPair = configuration[i].split("=");
consumeOption(optionPair);
}
}
private void consumeOption(String[] optionPair) {
if (optionPair.length == 2) {
if (SELECTOR_AWARE_OPTION.equals(optionPair[0])) {
selectorAware = Boolean.valueOf(optionPair[1]);
}
}
}
}
private final Map<DestinationFilter, VirtualTopicConfig> vtConsumerDestinationMatchers = new HashMap<>();
protected final LRUCache<ActiveMQDestination, ActiveMQDestination> vtDestMapCache = new LRUCache();
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
@ -622,8 +646,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
public void setVirtualTopicConsumerWildcards(String virtualTopicConsumerWildcards) {
for (String filter : virtualTopicConsumerWildcards.split(",")) {
String[] wildcardLimitPair = filter.split(";");
vtConsumerDestinationMatchers.put(DestinationFilter.parseFilter(new ActiveMQQueue(wildcardLimitPair[0])), Integer.valueOf(wildcardLimitPair[1]));
String[] configuration = filter.split(";");
vtConsumerDestinationMatchers.put(DestinationFilter.parseFilter(new ActiveMQQueue(configuration[0])), new VirtualTopicConfig(configuration));
}
}
@ -646,15 +670,15 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
return mappedDestination;
}
for (Map.Entry<DestinationFilter, Integer> candidate : vtConsumerDestinationMatchers.entrySet()) {
for (Map.Entry<DestinationFilter, VirtualTopicConfig> candidate : vtConsumerDestinationMatchers.entrySet()) {
if (candidate.getKey().matches(destination)) {
// convert to matching FQQN
String[] paths = DestinationPath.getDestinationPaths(destination);
StringBuilder fqqn = new StringBuilder();
int filterPathTerminus = candidate.getValue();
VirtualTopicConfig virtualTopicConfig = candidate.getValue();
// address - ie: topic
for (int i = filterPathTerminus; i < paths.length; i++) {
if (i > filterPathTerminus) {
for (int i = virtualTopicConfig.filterPathTerminus; i < paths.length; i++) {
if (i > virtualTopicConfig.filterPathTerminus) {
fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
}
fqqn.append(paths[i]);
@ -667,7 +691,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
fqqn.append(paths[i]);
}
mappedDestination = new ActiveMQQueue(fqqn.toString());
mappedDestination = new ActiveMQQueue(fqqn.toString() + ( virtualTopicConfig.selectorAware ? "?" + SELECTOR_AWARE_OPTION + "=true" : "" ));
break;
}
}

View File

@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
@ -181,7 +182,7 @@ public class AMQSession implements SessionCallback {
openWireDest = protocolManager.virtualTopicConsumerToFQQN(openWireDest);
SimpleString queueName = new SimpleString(convertWildcard(openWireDest));
if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {
if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary(), OpenWireUtil.extractFilterStringOrNull(info, openWireDest))) {
throw new InvalidDestinationException("Destination doesn't exist: " + queueName);
}
}
@ -223,6 +224,10 @@ public class AMQSession implements SessionCallback {
}
private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary) throws Exception {
return checkAutoCreateQueue(queueName, isTemporary, null);
}
private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary, String filter) throws Exception {
boolean hasQueue = true;
if (!connection.containsKnownDestination(queueName)) {
@ -245,7 +250,7 @@ public class AMQSession implements SessionCallback {
routingTypeToUse = as.getDefaultAddressRoutingType();
}
}
coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true));
coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true).setFilterString(filter));
connection.addKnownDestination(queueName);
} else {
hasQueue = false;

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
@ -37,6 +38,19 @@ public class OpenWireUtil {
public static final WildcardConfiguration OPENWIRE_WILDCARD = new OpenWireWildcardConfiguration();
public static final String SELECTOR_AWARE_OPTION = "selectorAware";
public static String extractFilterStringOrNull(final ConsumerInfo info, final ActiveMQDestination openWireDest) {
if (info.getSelector() != null) {
if (openWireDest.getOptions() != null) {
if (Boolean.valueOf(openWireDest.getOptions().get(SELECTOR_AWARE_OPTION))) {
return info.getSelector();
}
}
}
return null;
}
/**
* We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
* destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was

View File

@ -2,7 +2,7 @@ Virtual Topics
==============
Virtual Topics (a specialisation of virtual destinations) in ActiveMQ 5.x typically address two different but related
problems. Lets take each in turn:
problems. Let's take each in turn:
Shared access to a JMS durable topic subscription
-------------------------------------------------
@ -20,7 +20,7 @@ JMS 2.0 adds the possibility of shared subscriptions with new API's that are ful
Fully Qualified Queue name (FQQN)
---------------------------------
Secondly, Artemis uses a queue per topic subscriber model internally and it is possibly to directly address the
Secondly, Artemis uses a queue per topic subscriber model internally, and it is possibly to directly address the
subscription queue using its Fully Qualified Queue name (FQQN).
For example, a default 5.x consumer destination for topic `VirtualTopic.Orders` subscription `A`:
@ -42,7 +42,7 @@ If OpenWire clients cannot be modified, Artemis supports a virtual topic wildcar
mechanism on the OpenWire protocol handler that will automatically convert the consumer destination into the
corresponding FQQN.
The format is a comma separated list of strings pairs, delimited with a ';'. Each pair identifies a filter to match
the virtual topic consumer destination and an int that specifies the number of path matches that terminate the consumer
the virtual topic consumer destination, and an int that specifies the number of path matches that terminate the consumer
queue identity.
E.g: For the default 5.x virtual topic consumer prefix of ```Consumer.*.``` the parameter ```virtualTopicConsumerWildcards``` should be: ```Consumer.*.>;2```.
@ -55,8 +55,8 @@ In this way a consumer destination of ```Consumer.A.VirtualTopic.Orders``` will
Durable topic subscribers in a network of brokers
-------------------------------------------------
The store and forward network bridges in 5.x create a durable subscriber per destination. As demand migrates across a
network, duplicate durable subs get created on each node in the network but they do not migrate. The end result can
network, duplicate durable subs get created on each node in the network, but they do not migrate. The end result can
result in duplicate message storage and ultimately duplicate delivery, which is not good.
When durable subscribers map to virtual topic subscriber queues, the queues can migrate and the problem can be avoided.
When durable subscribers map to virtual topic subscriber queues, the queues can migrate, and the problem can be avoided.
In Artemis, because a durable sub is modeled as a queue, this problem does not arise.

View File

@ -35,7 +35,7 @@ are:
- `useKeepAlive`
Whether or not to send a KeepAliveInfo on an idle connection to prevent it
Indicates whether to send a KeepAliveInfo on an idle connection to prevent it
from timing out. Enabled by default. Disabling the keep alive will still make
connections time out if no data was received on the connection for the
specified amount of time.
@ -64,13 +64,13 @@ broker side.
- `supportAdvisory`
Whether or not the broker supports advisory messages. If the value is true,
Indicates whether the broker supports advisory messages. If the value is true,
advisory addresses/queues will be created. If the value is false, no advisory
addresses/queues are created. Default value is `true`.
- `suppressInternalManagementObjects`
Whether or not the advisory addresses/queues, if any, will be registered to
Indicates whether advisory addresses/queues, if any, will be registered to
management service (e.g. JMX registry). If set to true, no advisory
addresses/queues will be registered. If set to false, those are registered and
will be displayed on the management console. Default value is `true`.
@ -88,12 +88,14 @@ configure a mapping function that will translate the virtual topic consumer
destination into a FQQN address. This address will then represents the consumer as a
multicast binding to an address representing the virtual topic.
The configuration string property `virtualTopicConsumerWildcards` has two parts
separated by a `;`. The first is the 5.x style destination filter that
The configuration string list property `virtualTopicConsumerWildcards` has parts
separated by a `;`. The first is the classic style destination filter that
identifies the destination as belonging to a virtual topic. The second
identifies the number of `paths` that identify the consumer queue such that it
can be parsed from the destination. For example, the default 5.x virtual topic
with consumer prefix of `Consumer.*.`, would require a
can be parsed from the destination. Any subsequent parts are additional configuration
parameters for that mapping.
For example, the default virtual topic with consumer prefix of `Consumer.*.`, would require a
`virtualTopicConsumerWildcards` filter of `Consumer.*.>;2`. As a url parameter
this transforms to `Consumer.*.%3E%3B2` when the url significant characters
`>;` are escaped with their hex code points. In an `acceptor` url it would be:
@ -105,8 +107,13 @@ this transforms to `Consumer.*.%3E%3B2` when the url significant characters
This will translate `Consumer.A.VirtualTopic.Orders` into a FQQN of
`VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders` using the int component `2` of the
configuration to identify the consumer queue as the first two paths of the
destination. `virtualTopicConsumerWildcards` is multi valued using a `,`
destination. `virtualTopicConsumerWildcards` is multi valued using a `,`
separator.
### selectorAware
The mappings support an optional parameter, `selectorAware` which when true, transfers any selector information from the
OpenWire consumer into a queue filter of any auto-created subscription queue. Note: the selector/filter is persisted with
the queue binding in the normal way, such that it works independent of connected consumers.
Please see Virtual Topic Mapping example contained in the OpenWire
[examples](examples.md).

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.openwire;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -27,8 +28,6 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class OpenWireProtocolManagerTest extends ActiveMQTestBase {
@ -36,7 +35,7 @@ public class OpenWireProtocolManagerTest extends ActiveMQTestBase {
LRUCache lruCacheRef;
@Test
public void testVtAutoConversion() throws Exception {
public void testVtAutoConversion() {
underTest = new OpenWireProtocolManager(null, new DummyServer()) {
@Override
public ActiveMQDestination virtualTopicConsumerToFQQN(ActiveMQDestination destination) {
@ -49,17 +48,24 @@ public class OpenWireProtocolManagerTest extends ActiveMQTestBase {
final int maxCacheSize = 10;
underTest.setVirtualTopicConsumerLruCacheMax(10);
underTest.setVirtualTopicConsumerWildcards("A.>;1,B.*.>;2,C.*.*.*.EE;3");
underTest.setVirtualTopicConsumerWildcards("A.>;1;selectorAware=true,B.*.>;2,C.*.*.*.EE;3;selectorAware=false");
ActiveMQDestination A = new org.apache.activemq.command.ActiveMQQueue("A.SomeTopic");
assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic::A.SomeTopic"), underTest.virtualTopicConsumerToFQQN(A));
ActiveMQDestination checkOption = underTest.virtualTopicConsumerToFQQN(A);
assertNotNull(checkOption.getOptions());
assertTrue(Boolean.parseBoolean(checkOption.getOptions().get(OpenWireUtil.SELECTOR_AWARE_OPTION)));
ActiveMQDestination B = new org.apache.activemq.command.ActiveMQQueue("B.b.SomeTopic.B");
assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.B::B.b.SomeTopic.B"), underTest.virtualTopicConsumerToFQQN(B));
ActiveMQDestination C = new org.apache.activemq.command.ActiveMQQueue("C.c.c.SomeTopic.EE");
assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.EE::C.c.c.SomeTopic.EE"), underTest.virtualTopicConsumerToFQQN(C));
checkOption = underTest.virtualTopicConsumerToFQQN(C);
assertNull(checkOption.getOptions());
for (int i = 0; i < maxCacheSize; i++) {
ActiveMQDestination identity = new org.apache.activemq.command.ActiveMQQueue("Identity" + i);
assertEquals(identity, underTest.virtualTopicConsumerToFQQN(identity));

View File

@ -25,23 +25,27 @@ import javax.jms.TextMessage;
import java.util.Set;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Test;
public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase {
@Override
protected void extraServerConfig(Configuration serverConfig) {
Set<TransportConfiguration> acceptors = server.getConfiguration().getAcceptorConfigurations();
Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations();
for (TransportConfiguration tc : acceptors) {
if (tc.getName().equals("netty")) {
tc.getExtraParams().put("virtualTopicConsumerWildcards", "Consumer.*.>;2");
tc.getExtraParams().put("virtualTopicConsumerWildcards", "Consumer.*.>;2,C.*.>;2;selectorAware=true");
tc.getExtraParams().put("virtualTopicConsumerLruCacheMax", "10000");
}
}
serverConfig.setJMXManagementEnabled(true);
}
@Test
@ -51,6 +55,7 @@ public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase {
SimpleString topic = new SimpleString("VirtualTopic.Orders");
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoDeleteQueues(false);
try {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
@ -222,4 +227,59 @@ public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase {
}
}
}
@Test
public void testSelectorAwareVT() throws Exception {
Connection connection = null;
SimpleString topic = new SimpleString("SVT.Orders.A");
this.server.getAddressSettingsRepository().getMatch("SVT.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("SVT.#").setAutoCreateAddresses(true);
try {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
activeMQConnectionFactory.setWatchTopicAdvisories(false);
connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topic.toString());
MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("C.A." + topic.toString()), "stuff = 'A'");
MessageConsumer messageConsumerB = session.createConsumer(session.createQueue("C.B." + topic.toString()), "stuff = 'B'");
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a text message");
for (String stuffValue : new String[] {"A", "B", "C"}) {
message.setStringProperty("stuff", stuffValue);
producer.send(message);
}
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
assertTrue((messageReceivedA != null && messageReceivedB != null));
String text = messageReceivedA.getText();
assertEquals("This is a text message", text);
assertEquals("A", messageReceivedA.getStringProperty("stuff"));
assertEquals("B", messageReceivedB.getStringProperty("stuff"));
// verify C message got dropped
final QueueControl queueControlA = ManagementControlHelper.createQueueControl(topic, SimpleString.toSimpleString("C.A." + topic.toString()), RoutingType.MULTICAST, mbeanServer);
Wait.assertEquals(0, () -> queueControlA.countMessages());
final QueueControl queueControlB = ManagementControlHelper.createQueueControl(topic, SimpleString.toSimpleString("C.B." + topic.toString()), RoutingType.MULTICAST, mbeanServer);
Wait.assertEquals(0, () -> queueControlB.countMessages());
} finally {
if (connection != null) {
connection.close();
}
}
}
}