From 21b64b3e4f4fc6dbeaa30ce610ecefa15110100a Mon Sep 17 00:00:00 2001 From: Dejan Bosanac Date: Tue, 17 Jan 2017 15:44:28 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/ARTEMIS-815 - support wildcard address configuration in mqtt layer https://issues.apache.org/jira/browse/ARTEMIS-815 - support wildcard address configuration in mqtt layer - remove old swap method https://issues.apache.org/jira/browse/ARTEMIS-815 - added tests for mqtt-openwire integration and fixed openwire layer https://issues.apache.org/jira/browse/ARTEMIS-815 - remove unused imports --- .../artemis/api/core/SimpleString.java | 12 ++ .../protocol/mqtt/MQTTProtocolHandler.java | 2 +- .../protocol/mqtt/MQTTPublishManager.java | 2 +- .../mqtt/MQTTRetainMessageManager.java | 4 +- .../core/protocol/mqtt/MQTTSession.java | 16 +- .../core/protocol/mqtt/MQTTSessionState.java | 5 +- .../mqtt/MQTTSubscriptionManager.java | 9 +- .../artemis/core/protocol/mqtt/MQTTUtil.java | 48 +++--- .../protocol/openwire/amq/AMQConsumer.java | 5 +- .../protocol/openwire/amq/AMQSession.java | 9 +- .../protocol/openwire/util/OpenWireUtil.java | 21 +-- .../core/config/WildcardConfiguration.java | 17 ++- .../core/postoffice/impl/AddressImpl.java | 6 +- .../impl/WildcardAddressManager.java | 11 -- .../mqtt/imported/MQTTOpenwireTest.java | 141 ++++++++++++++++++ .../integration/mqtt/imported/MQTTTest.java | 2 - .../mqtt/imported/MQTTTestSupport.java | 19 ++- .../openwire/OpenWireUtilTest.java | 9 +- 18 files changed, 257 insertions(+), 81 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTOpenwireTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java index decd189fdb..b7f70c6583 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java @@ -98,6 +98,18 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl this.data = data; } + public SimpleString(final char c) { + data = new byte[2]; + + byte low = (byte) (c & 0xFF); // low byte + + data[0] = low; + + byte high = (byte) (c >> 8 & 0xFF); // high byte + + data[1] = high; + } + // CharSequence implementation // --------------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index cfa944adaa..b3587a3a2d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -78,7 +78,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception { this.connectionEntry = entry; this.connection = connection; - this.session = new MQTTSession(this, connection, protocolManager); + this.session = new MQTTSession(this, connection, protocolManager, server.getConfiguration().getWildcardConfiguration()); } void stop(boolean error) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 2e5a1e92b5..26886c6db9 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -219,7 +219,7 @@ public class MQTTPublishManager { } private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) { - String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString()); + String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration()); ByteBuf payload; switch (message.getType()) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java index 008bcd8526..27423d8452 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java @@ -43,7 +43,7 @@ public class MQTTRetainMessageManager { * the retained queue and the previous retain message consumed to remove it from the queue. */ void handleRetainedMessage(ServerMessage message, String address, boolean reset) throws Exception { - SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address)); + SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration())); Queue queue = session.getServer().locateQueue(retainAddress); if (queue == null) { @@ -70,7 +70,7 @@ public class MQTTRetainMessageManager { // Queue to add the retained messages to // The address filter that matches all retained message queues. - String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address); + String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()); BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress)); // Iterate over all matching retain queues and add the head message to the original queue. diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index cf0b4e67e8..c96beba6c0 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -55,13 +56,18 @@ public class MQTTSession { private MQTTProtocolManager protocolManager; + private boolean isClean; + private WildcardConfiguration wildcardConfiguration; + public MQTTSession(MQTTProtocolHandler protocolHandler, MQTTConnection connection, - MQTTProtocolManager protocolManager) throws Exception { + MQTTProtocolManager protocolManager, + WildcardConfiguration wildcardConfiguration) throws Exception { this.protocolHandler = protocolHandler; this.protocolManager = protocolManager; + this.wildcardConfiguration = wildcardConfiguration; this.connection = connection; @@ -181,4 +187,12 @@ public class MQTTSession { mqttPublishManager.clean(); state.clear(); } + + public WildcardConfiguration getWildcardConfiguration() { + return wildcardConfiguration; + } + + public void setWildcardConfiguration(WildcardConfiguration wildcardConfiguration) { + this.wildcardConfiguration = wildcardConfiguration; + } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java index 9e18bc536f..9458f8b080 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.server.ServerMessage; public class MQTTSessionState { @@ -98,9 +99,9 @@ public class MQTTSessionState { return subscriptions.values(); } - boolean addSubscription(MqttTopicSubscription subscription) { + boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration) { synchronized (subscriptions) { - addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName()), new ConcurrentHashMap()); + addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), wildcardConfiguration), new ConcurrentHashMap()); MqttTopicSubscription existingSubscription = subscriptions.get(subscription.topicName()); if (existingSubscription != null) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index e012a26a07..c9e7a942fe 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -68,7 +68,7 @@ public class MQTTSubscriptionManager { synchronized void start() throws Exception { for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) { - String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName()); + String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), session.getWildcardConfiguration()); Queue q = createQueueForSubscription(coreAddress, subscription.qualityOfService().value()); createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value()); } @@ -164,9 +164,9 @@ public class MQTTSubscriptionManager { int qos = subscription.qualityOfService().value(); String topic = subscription.topicName(); - String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic); + String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration()); - session.getSessionState().addSubscription(subscription); + session.getSessionState().addSubscription(subscription, session.getWildcardConfiguration()); Queue q = createQueueForSubscription(coreAddress, qos); @@ -186,7 +186,8 @@ public class MQTTSubscriptionManager { // FIXME: Do we need this synchronzied? private synchronized void removeSubscription(String address) throws Exception { - String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address); + String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address, session.getWildcardConfiguration()); + SimpleString internalQueueName = getQueueNameForTopic(internalAddress); session.getSessionState().removeSubscription(address); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 4819006274..7bc6b84dfd 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -27,6 +27,7 @@ import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; @@ -65,44 +66,31 @@ public class MQTTUtil { public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000; - public static String convertMQTTAddressFilterToCore(String filter) { - return swapMQTTAndCoreWildCards(filter); + public static String convertMQTTAddressFilterToCore(String filter, WildcardConfiguration wildcardConfiguration) { + return MQTT_WILDCARD.convert(filter, wildcardConfiguration); } + public static class MQTTWildcardConfiguration extends WildcardConfiguration { + public MQTTWildcardConfiguration() { + setDelimiter('/'); + setSingleWord('+'); + setAnyWords('#'); + } + } + + public static final WildcardConfiguration MQTT_WILDCARD = new MQTTWildcardConfiguration(); + private static final MQTTLogger logger = MQTTLogger.LOGGER; - public static String convertCoreAddressFilterToMQTT(String filter) { + public static String convertCoreAddressFilterToMQTT(String filter, WildcardConfiguration wildcardConfiguration) { if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) { filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length()); } - return swapMQTTAndCoreWildCards(filter); + return wildcardConfiguration.convert(filter, MQTT_WILDCARD); } - public static String convertMQTTAddressFilterToCoreRetain(String filter) { - return MQTT_RETAIN_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter); - } - - public static String swapMQTTAndCoreWildCards(String filter) { - char[] topicFilter = filter.toCharArray(); - for (int i = 0; i < topicFilter.length; i++) { - switch (topicFilter[i]) { - case '/': - topicFilter[i] = '.'; - break; - case '.': - topicFilter[i] = '/'; - break; - case '*': - topicFilter[i] = '+'; - break; - case '+': - topicFilter[i] = '*'; - break; - default: - break; - } - } - return String.valueOf(topicFilter); + public static String convertMQTTAddressFilterToCoreRetain(String filter, WildcardConfiguration wildcardConfiguration) { + return MQTT_RETAIN_ADDRESS_PREFIX + MQTT_WILDCARD.convert(filter, wildcardConfiguration); } private static ServerMessage createServerMessage(MQTTSession session, @@ -124,7 +112,7 @@ public class MQTTUtil { boolean retain, int qos, ByteBuf payload) { - String coreAddress = convertMQTTAddressFilterToCore(topic); + String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration()); ServerMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos); // FIXME does this involve a copy? diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index ed2783f248..77a1a4a095 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -29,7 +29,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; -import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; @@ -81,7 +80,7 @@ public class AMQConsumer { SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector()); - String physicalName = OpenWireUtil.convertWildcard(openwireDestination.getPhysicalName()); + String physicalName = session.convertWildcard(openwireDestination.getPhysicalName()); SimpleString address; @@ -97,7 +96,7 @@ public class AMQConsumer { serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); } else { - SimpleString queueName = new SimpleString(OpenWireUtil.convertWildcard(openwireDestination.getPhysicalName())); + SimpleString queueName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName())); try { session.getCoreServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); } catch (ActiveMQQueueExistsException e) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 5e4304ff6d..7cdd070ef2 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -29,7 +29,6 @@ import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; -import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; @@ -57,6 +56,8 @@ import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.wireformat.WireFormat; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD; + public class AMQSession implements SessionCallback { private final Logger logger = Logger.getLogger(AMQSession.class); @@ -152,7 +153,7 @@ public class AMQSession implements SessionCallback { for (ActiveMQDestination openWireDest : dests) { if (openWireDest.isQueue()) { - SimpleString queueName = new SimpleString(OpenWireUtil.convertWildcard(openWireDest.getPhysicalName())); + SimpleString queueName = new SimpleString(convertWildcard(openWireDest.getPhysicalName())); if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) { throw new InvalidDestinationException("Destination doesn't exist: " + queueName); @@ -405,6 +406,10 @@ public class AMQSession implements SessionCallback { } } + public String convertWildcard(String physicalName) { + return OPENWIRE_WILDCARD.convert(physicalName, server.getConfiguration().getWildcardConfiguration()); + } + public ServerSession getCoreSession() { return this.coreSession; } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java index 04bd6a3114..5355c63f87 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.util; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.command.ActiveMQDestination; @@ -29,6 +30,16 @@ import org.apache.activemq.util.ByteSequence; public class OpenWireUtil { + public static class OpenWireWildcardConfiguration extends WildcardConfiguration { + public OpenWireWildcardConfiguration() { + setDelimiter('.'); + setSingleWord('*'); + setAnyWords('>'); + } + } + + public static final WildcardConfiguration OPENWIRE_WILDCARD = new OpenWireWildcardConfiguration(); + public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) { ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bytes.length); @@ -52,16 +63,6 @@ public class OpenWireUtil { } } - /* - *This util converts amq wildcards to compatible core wildcards - *The conversion is like this: - *AMQ * wildcard --> Core * wildcard (no conversion) - *AMQ > wildcard --> Core # wildcard - */ - public static String convertWildcard(String physicalName) { - return physicalName.replaceAll("(\\.>)+", ".#"); - } - public static XidImpl toXID(TransactionId xaXid) { return toXID((XATransactionId) xaXid); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java index 10c9cf2297..bd9046bb52 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java @@ -57,8 +57,17 @@ public class WildcardConfiguration implements Serializable { return result; } - public boolean isEnabled() { + @Override + public String toString() { + return "WildcardConfiguration{" + + "anyWords=" + anyWords + + ", enabled=" + enabled + + ", singleWord=" + singleWord + + ", delimiter=" + delimiter + + '}'; + } + public boolean isEnabled() { return enabled; } @@ -90,4 +99,10 @@ public class WildcardConfiguration implements Serializable { this.singleWord = singleWord; } + public String convert(String filter, WildcardConfiguration to) { + return filter.replace(getDelimiter(), to.getDelimiter()) + .replace(getSingleWord(), to.getSingleWord()) + .replace(getAnyWords(), to.getAnyWords()); + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java index 107ce619ac..ea78e4f748 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java @@ -93,15 +93,15 @@ public class AddressImpl implements Address { for (; matchPos < add.getAddressParts().length; ) { if (pos >= addressParts.length) { // test for # as last address part - return pos + 1 == add.getAddressParts().length && add.getAddressParts()[pos].equals(WildcardAddressManager.ANY_WORDS_SIMPLESTRING); + return pos + 1 == add.getAddressParts().length && add.getAddressParts()[pos].equals(new SimpleString(wildcardConfiguration.getAnyWords())); } SimpleString curr = addressParts[pos]; SimpleString next = addressParts.length > pos + 1 ? addressParts[pos + 1] : null; SimpleString currMatch = add.getAddressParts()[matchPos]; - if (currMatch.equals(WildcardAddressManager.SINGLE_WORD_SIMPLESTRING)) { + if (currMatch.equals(new SimpleString(wildcardConfiguration.getSingleWord()))) { pos++; matchPos++; - } else if (currMatch.equals(WildcardAddressManager.ANY_WORDS_SIMPLESTRING)) { + } else if (currMatch.equals(new SimpleString(wildcardConfiguration.getAnyWords()))) { if (matchPos == addressParts.length - 1) { pos++; matchPos++; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java index 5ca1b02980..516cf3773d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java @@ -34,16 +34,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction; */ public class WildcardAddressManager extends SimpleAddressManager { - static final char SINGLE_WORD = '*'; - - static final char ANY_WORDS = '#'; - - static final char DELIM = '.'; - - static final SimpleString SINGLE_WORD_SIMPLESTRING = new SimpleString("*"); - - static final SimpleString ANY_WORDS_SIMPLESTRING = new SimpleString("#"); - /** * These are all the addresses, we use this so we can link back from the actual address to its linked wilcard addresses * or vice versa @@ -175,7 +165,6 @@ public class WildcardAddressManager extends SimpleAddressManager { if (actualAddress.matches(destAdd)) { destAdd.addLinkedAddress(actualAddress); actualAddress.addLinkedAddress(destAdd); - } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTOpenwireTest.java new file mode 100644 index 0000000000..9d3f8fb220 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTOpenwireTest.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.mqtt.imported; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.util.ByteSequence; +import org.junit.Test; + +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +public class MQTTOpenwireTest extends MQTTTestSupport { + + protected static final int NUM_MESSAGES = 1; + + @Override + public void configureBroker() throws Exception { + super.configureBroker(); + WildcardConfiguration wildcardConfiguration = new WildcardConfiguration(); + wildcardConfiguration.setDelimiter('.'); + wildcardConfiguration.setSingleWord('*'); + wildcardConfiguration.setAnyWords('>'); + server.getConfiguration().setWildCardConfiguration(wildcardConfiguration); + } + + @Override + public void createJMSConnection() throws Exception { + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.cacheEnabled=true"); + } + + @Test + public void testWildcards() throws Exception { + doTestSendJMSReceiveMQTT("foo.bar", "foo/+"); + doTestSendJMSReceiveMQTT("foo.bar", "foo/#"); + doTestSendJMSReceiveMQTT("foo.bar.har", "foo/#"); + doTestSendJMSReceiveMQTT("foo.bar.har", "foo/+/+"); + doTestSendMQTTReceiveJMS("foo/bah", "foo.*"); + doTestSendMQTTReceiveJMS("foo/bah", "foo.>"); + doTestSendMQTTReceiveJMS("foo/bah/hah", "foo.*.*"); + doTestSendMQTTReceiveJMS("foo/bah/har", "foo.>"); + } + + public void doTestSendMQTTReceiveJMS(String mqttTopic, String jmsDestination) throws Exception { + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); + + try { + // MUST set to true to receive retained messages + activeMQConnection.setUseRetroactiveConsumer(true); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Topic jmsTopic = s.createTopic(jmsDestination); + MessageConsumer consumer = s.createConsumer(jmsTopic); + + // send retained message + final String RETAINED = "RETAINED"; + provider.publish(mqttTopic, RETAINED.getBytes(), AT_LEAST_ONCE, true); + + // check whether we received retained message on JMS subscribe + ActiveMQMessage message = (ActiveMQMessage) consumer.receive(2000); + assertNotNull("Should get retained message " + mqttTopic + "->" + jmsDestination, message); + ByteSequence bs = message.getContent(); + assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length)); + + for (int i = 0; i < 1; i++) { + String payload = "Test Message: " + i; + provider.publish(mqttTopic, payload.getBytes(), AT_LEAST_ONCE); + message = (ActiveMQMessage) consumer.receive(1000); + assertNotNull("Should get a message " + mqttTopic + "->" + jmsDestination, message); + bs = message.getContent(); + assertEquals(payload, new String(bs.data, bs.offset, bs.length)); + } + } finally { + activeMQConnection.close(); + provider.disconnect(); + } + } + + public void doTestSendJMSReceiveMQTT(String jmsDestination, String mqttTopic) throws Exception { + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); + try { + activeMQConnection.setUseRetroactiveConsumer(true); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Topic jmsTopic = s.createTopic(jmsDestination); + MessageProducer producer = s.createProducer(jmsTopic); + + final String RETAINED = "RETAINED"; + provider.subscribe(mqttTopic, AT_MOST_ONCE); + + // send retained message from JMS + TextMessage sendMessage = s.createTextMessage(RETAINED); + // mark the message to be retained + sendMessage.setBooleanProperty("ActiveMQ.Retain", true); + // MQTT QoS can be set using MQTTProtocolConverter.QOS_PROPERTY_NAME property + sendMessage.setIntProperty("ActiveMQ.MQTT.QoS", 0); + producer.send(sendMessage); + + byte[] message = provider.receive(2000); + assertNotNull("Should get retained message " + jmsDestination + "->" + mqttTopic, message); + assertEquals(RETAINED, new String(message)); + + for (int i = 0; i < 1; i++) { + String payload = "This is Test Message: " + i; + sendMessage = s.createTextMessage(payload); + producer.send(sendMessage); + message = provider.receive(1000); + assertNotNull("Should get a message " + jmsDestination + "->" + mqttTopic, message); + + assertEquals(payload, new String(message)); + } + } finally { + activeMQConnection.close(); + provider.disconnect(); + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index d359f2e6aa..a26a046b2c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -67,8 +67,6 @@ public class MQTTTest extends MQTTTestSupport { private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class); - private static final int NUM_MESSAGES = 250; - @Override @Before public void setUp() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index b578f97924..a45f06d88b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported; +import javax.jms.ConnectionFactory; import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -58,18 +59,20 @@ import static java.util.Collections.singletonList; public class MQTTTestSupport extends ActiveMQTestBase { - private ActiveMQServer server; + protected ActiveMQServer server; private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class); protected int port = 1883; - protected ActiveMQConnectionFactory cf; + protected ConnectionFactory cf; protected LinkedList exceptions = new LinkedList<>(); protected boolean persistent; protected String protocolConfig; protected String protocolScheme; protected boolean useSSL; + protected static final int NUM_MESSAGES = 250; + public static final int AT_MOST_ONCE = 0; public static final int AT_LEAST_ONCE = 1; public static final int EXACTLY_ONCE = 2; @@ -80,7 +83,6 @@ public class MQTTTestSupport extends ActiveMQTestBase { public MQTTTestSupport() { this.protocolScheme = "mqtt"; this.useSSL = false; - cf = new ActiveMQConnectionFactory(false, new TransportConfiguration(ActiveMQTestBase.NETTY_CONNECTOR_FACTORY)); } public File basedir() throws IOException { @@ -110,6 +112,7 @@ public class MQTTTestSupport extends ActiveMQTestBase { exceptions.clear(); startBroker(); + createJMSConnection(); } @Override @@ -125,7 +128,7 @@ public class MQTTTestSupport extends ActiveMQTestBase { super.tearDown(); } - public void startBroker() throws Exception { + public void configureBroker() throws Exception { // TODO Add SSL super.setUp(); server = createServerForMQTT(); @@ -137,10 +140,18 @@ public class MQTTTestSupport extends ActiveMQTestBase { addressSettings.setAutoCreateAddresses(true); server.getAddressSettingsRepository().addMatch("#", addressSettings); + } + + public void startBroker() throws Exception { + configureBroker(); server.start(); server.waitForActivation(10, TimeUnit.SECONDS); } + public void createJMSConnection() throws Exception { + cf = new ActiveMQConnectionFactory(false, new TransportConfiguration(ActiveMQTestBase.NETTY_CONNECTOR_FACTORY)); + } + private ActiveMQServer createServerForMQTT() throws Exception { Configuration defaultConfig = createDefaultConfig(true).setIncomingInterceptorClassNames(singletonList(MQTTIncomingInterceptor.class.getName())).setOutgoingInterceptorClassNames(singletonList(MQTTOutoingInterceptor.class.getName())); AddressSettings addressSettings = new AddressSettings(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java index 4f2696dd48..5c92937cc8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.openwire; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.junit.Test; @@ -26,15 +27,15 @@ public class OpenWireUtilTest { @Test public void testWildcardConversion() throws Exception { String amqTarget = "TEST.ONE.>"; - String coreTarget = OpenWireUtil.convertWildcard(amqTarget); + String coreTarget = OpenWireUtil.OPENWIRE_WILDCARD.convert(amqTarget, new WildcardConfiguration()); assertEquals("TEST.ONE.#", coreTarget); amqTarget = "TEST.*.ONE"; - coreTarget = OpenWireUtil.convertWildcard(amqTarget); + coreTarget = OpenWireUtil.OPENWIRE_WILDCARD.convert(amqTarget, new WildcardConfiguration()); assertEquals("TEST.*.ONE", coreTarget); amqTarget = "a.*.>.>"; - coreTarget = OpenWireUtil.convertWildcard(amqTarget); - assertEquals("a.*.#", coreTarget); + coreTarget = OpenWireUtil.OPENWIRE_WILDCARD.convert(amqTarget, new WildcardConfiguration()); + assertEquals("a.*.#.#", coreTarget); } }