https://issues.apache.org/jira/browse/ARTEMIS-815 - support wildcard address configuration in mqtt layer
https://issues.apache.org/jira/browse/ARTEMIS-815 - support wildcard address configuration in mqtt layer - remove old swap method https://issues.apache.org/jira/browse/ARTEMIS-815 - added tests for mqtt-openwire integration and fixed openwire layer https://issues.apache.org/jira/browse/ARTEMIS-815 - remove unused imports
This commit is contained in:
parent
f660783df5
commit
21b64b3e4f
|
@ -98,6 +98,18 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
|
|||
this.data = data;
|
||||
}
|
||||
|
||||
public SimpleString(final char c) {
|
||||
data = new byte[2];
|
||||
|
||||
byte low = (byte) (c & 0xFF); // low byte
|
||||
|
||||
data[0] = low;
|
||||
|
||||
byte high = (byte) (c >> 8 & 0xFF); // high byte
|
||||
|
||||
data[1] = high;
|
||||
}
|
||||
|
||||
// CharSequence implementation
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -78,7 +78,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception {
|
||||
this.connectionEntry = entry;
|
||||
this.connection = connection;
|
||||
this.session = new MQTTSession(this, connection, protocolManager);
|
||||
this.session = new MQTTSession(this, connection, protocolManager, server.getConfiguration().getWildcardConfiguration());
|
||||
}
|
||||
|
||||
void stop(boolean error) {
|
||||
|
|
|
@ -219,7 +219,7 @@ public class MQTTPublishManager {
|
|||
}
|
||||
|
||||
private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
|
||||
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString());
|
||||
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
|
||||
|
||||
ByteBuf payload;
|
||||
switch (message.getType()) {
|
||||
|
|
|
@ -43,7 +43,7 @@ public class MQTTRetainMessageManager {
|
|||
* the retained queue and the previous retain message consumed to remove it from the queue.
|
||||
*/
|
||||
void handleRetainedMessage(ServerMessage message, String address, boolean reset) throws Exception {
|
||||
SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address));
|
||||
SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
|
||||
|
||||
Queue queue = session.getServer().locateQueue(retainAddress);
|
||||
if (queue == null) {
|
||||
|
@ -70,7 +70,7 @@ public class MQTTRetainMessageManager {
|
|||
// Queue to add the retained messages to
|
||||
|
||||
// The address filter that matches all retained message queues.
|
||||
String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address);
|
||||
String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration());
|
||||
BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
|
||||
|
||||
// Iterate over all matching retain queues and add the head message to the original queue.
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Map;
|
|||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||
|
@ -55,13 +56,18 @@ public class MQTTSession {
|
|||
|
||||
private MQTTProtocolManager protocolManager;
|
||||
|
||||
|
||||
private boolean isClean;
|
||||
|
||||
private WildcardConfiguration wildcardConfiguration;
|
||||
|
||||
public MQTTSession(MQTTProtocolHandler protocolHandler,
|
||||
MQTTConnection connection,
|
||||
MQTTProtocolManager protocolManager) throws Exception {
|
||||
MQTTProtocolManager protocolManager,
|
||||
WildcardConfiguration wildcardConfiguration) throws Exception {
|
||||
this.protocolHandler = protocolHandler;
|
||||
this.protocolManager = protocolManager;
|
||||
this.wildcardConfiguration = wildcardConfiguration;
|
||||
|
||||
this.connection = connection;
|
||||
|
||||
|
@ -181,4 +187,12 @@ public class MQTTSession {
|
|||
mqttPublishManager.clean();
|
||||
state.clear();
|
||||
}
|
||||
|
||||
public WildcardConfiguration getWildcardConfiguration() {
|
||||
return wildcardConfiguration;
|
||||
}
|
||||
|
||||
public void setWildcardConfiguration(WildcardConfiguration wildcardConfiguration) {
|
||||
this.wildcardConfiguration = wildcardConfiguration;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
|
||||
public class MQTTSessionState {
|
||||
|
@ -98,9 +99,9 @@ public class MQTTSessionState {
|
|||
return subscriptions.values();
|
||||
}
|
||||
|
||||
boolean addSubscription(MqttTopicSubscription subscription) {
|
||||
boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration) {
|
||||
synchronized (subscriptions) {
|
||||
addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName()), new ConcurrentHashMap<Long, Integer>());
|
||||
addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), wildcardConfiguration), new ConcurrentHashMap<Long, Integer>());
|
||||
|
||||
MqttTopicSubscription existingSubscription = subscriptions.get(subscription.topicName());
|
||||
if (existingSubscription != null) {
|
||||
|
|
|
@ -68,7 +68,7 @@ public class MQTTSubscriptionManager {
|
|||
|
||||
synchronized void start() throws Exception {
|
||||
for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) {
|
||||
String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName());
|
||||
String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), session.getWildcardConfiguration());
|
||||
Queue q = createQueueForSubscription(coreAddress, subscription.qualityOfService().value());
|
||||
createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
|
||||
}
|
||||
|
@ -164,9 +164,9 @@ public class MQTTSubscriptionManager {
|
|||
int qos = subscription.qualityOfService().value();
|
||||
String topic = subscription.topicName();
|
||||
|
||||
String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic);
|
||||
String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
|
||||
|
||||
session.getSessionState().addSubscription(subscription);
|
||||
session.getSessionState().addSubscription(subscription, session.getWildcardConfiguration());
|
||||
|
||||
Queue q = createQueueForSubscription(coreAddress, qos);
|
||||
|
||||
|
@ -186,7 +186,8 @@ public class MQTTSubscriptionManager {
|
|||
|
||||
// FIXME: Do we need this synchronzied?
|
||||
private synchronized void removeSubscription(String address) throws Exception {
|
||||
String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address);
|
||||
String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address, session.getWildcardConfiguration());
|
||||
|
||||
SimpleString internalQueueName = getQueueNameForTopic(internalAddress);
|
||||
session.getSessionState().removeSubscription(address);
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import io.netty.handler.codec.mqtt.MqttTopicSubscription;
|
|||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
||||
|
||||
|
@ -65,44 +66,31 @@ public class MQTTUtil {
|
|||
|
||||
public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000;
|
||||
|
||||
public static String convertMQTTAddressFilterToCore(String filter) {
|
||||
return swapMQTTAndCoreWildCards(filter);
|
||||
public static String convertMQTTAddressFilterToCore(String filter, WildcardConfiguration wildcardConfiguration) {
|
||||
return MQTT_WILDCARD.convert(filter, wildcardConfiguration);
|
||||
}
|
||||
|
||||
public static class MQTTWildcardConfiguration extends WildcardConfiguration {
|
||||
public MQTTWildcardConfiguration() {
|
||||
setDelimiter('/');
|
||||
setSingleWord('+');
|
||||
setAnyWords('#');
|
||||
}
|
||||
}
|
||||
|
||||
public static final WildcardConfiguration MQTT_WILDCARD = new MQTTWildcardConfiguration();
|
||||
|
||||
private static final MQTTLogger logger = MQTTLogger.LOGGER;
|
||||
|
||||
public static String convertCoreAddressFilterToMQTT(String filter) {
|
||||
public static String convertCoreAddressFilterToMQTT(String filter, WildcardConfiguration wildcardConfiguration) {
|
||||
if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
|
||||
filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
|
||||
}
|
||||
return swapMQTTAndCoreWildCards(filter);
|
||||
return wildcardConfiguration.convert(filter, MQTT_WILDCARD);
|
||||
}
|
||||
|
||||
public static String convertMQTTAddressFilterToCoreRetain(String filter) {
|
||||
return MQTT_RETAIN_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter);
|
||||
}
|
||||
|
||||
public static String swapMQTTAndCoreWildCards(String filter) {
|
||||
char[] topicFilter = filter.toCharArray();
|
||||
for (int i = 0; i < topicFilter.length; i++) {
|
||||
switch (topicFilter[i]) {
|
||||
case '/':
|
||||
topicFilter[i] = '.';
|
||||
break;
|
||||
case '.':
|
||||
topicFilter[i] = '/';
|
||||
break;
|
||||
case '*':
|
||||
topicFilter[i] = '+';
|
||||
break;
|
||||
case '+':
|
||||
topicFilter[i] = '*';
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
return String.valueOf(topicFilter);
|
||||
public static String convertMQTTAddressFilterToCoreRetain(String filter, WildcardConfiguration wildcardConfiguration) {
|
||||
return MQTT_RETAIN_ADDRESS_PREFIX + MQTT_WILDCARD.convert(filter, wildcardConfiguration);
|
||||
}
|
||||
|
||||
private static ServerMessage createServerMessage(MQTTSession session,
|
||||
|
@ -124,7 +112,7 @@ public class MQTTUtil {
|
|||
boolean retain,
|
||||
int qos,
|
||||
ByteBuf payload) {
|
||||
String coreAddress = convertMQTTAddressFilterToCore(topic);
|
||||
String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
|
||||
ServerMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
|
||||
|
||||
// FIXME does this involve a copy?
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
|||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
|
@ -81,7 +80,7 @@ public class AMQConsumer {
|
|||
|
||||
SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
|
||||
|
||||
String physicalName = OpenWireUtil.convertWildcard(openwireDestination.getPhysicalName());
|
||||
String physicalName = session.convertWildcard(openwireDestination.getPhysicalName());
|
||||
|
||||
SimpleString address;
|
||||
|
||||
|
@ -97,7 +96,7 @@ public class AMQConsumer {
|
|||
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
|
||||
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
|
||||
} else {
|
||||
SimpleString queueName = new SimpleString(OpenWireUtil.convertWildcard(openwireDestination.getPhysicalName()));
|
||||
SimpleString queueName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName()));
|
||||
try {
|
||||
session.getCoreServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
|||
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.BindingQueryResult;
|
||||
|
@ -57,6 +56,8 @@ import org.apache.activemq.openwire.OpenWireFormat;
|
|||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;
|
||||
|
||||
public class AMQSession implements SessionCallback {
|
||||
private final Logger logger = Logger.getLogger(AMQSession.class);
|
||||
|
||||
|
@ -152,7 +153,7 @@ public class AMQSession implements SessionCallback {
|
|||
|
||||
for (ActiveMQDestination openWireDest : dests) {
|
||||
if (openWireDest.isQueue()) {
|
||||
SimpleString queueName = new SimpleString(OpenWireUtil.convertWildcard(openWireDest.getPhysicalName()));
|
||||
SimpleString queueName = new SimpleString(convertWildcard(openWireDest.getPhysicalName()));
|
||||
|
||||
if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {
|
||||
throw new InvalidDestinationException("Destination doesn't exist: " + queueName);
|
||||
|
@ -405,6 +406,10 @@ public class AMQSession implements SessionCallback {
|
|||
}
|
||||
}
|
||||
|
||||
public String convertWildcard(String physicalName) {
|
||||
return OPENWIRE_WILDCARD.convert(physicalName, server.getConfiguration().getWildcardConfiguration());
|
||||
}
|
||||
|
||||
public ServerSession getCoreSession() {
|
||||
return this.coreSession;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.util;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
|
@ -29,6 +30,16 @@ import org.apache.activemq.util.ByteSequence;
|
|||
|
||||
public class OpenWireUtil {
|
||||
|
||||
public static class OpenWireWildcardConfiguration extends WildcardConfiguration {
|
||||
public OpenWireWildcardConfiguration() {
|
||||
setDelimiter('.');
|
||||
setSingleWord('*');
|
||||
setAnyWords('>');
|
||||
}
|
||||
}
|
||||
|
||||
public static final WildcardConfiguration OPENWIRE_WILDCARD = new OpenWireWildcardConfiguration();
|
||||
|
||||
public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) {
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bytes.length);
|
||||
|
||||
|
@ -52,16 +63,6 @@ public class OpenWireUtil {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
*This util converts amq wildcards to compatible core wildcards
|
||||
*The conversion is like this:
|
||||
*AMQ * wildcard --> Core * wildcard (no conversion)
|
||||
*AMQ > wildcard --> Core # wildcard
|
||||
*/
|
||||
public static String convertWildcard(String physicalName) {
|
||||
return physicalName.replaceAll("(\\.>)+", ".#");
|
||||
}
|
||||
|
||||
public static XidImpl toXID(TransactionId xaXid) {
|
||||
return toXID((XATransactionId) xaXid);
|
||||
}
|
||||
|
|
|
@ -57,8 +57,17 @@ public class WildcardConfiguration implements Serializable {
|
|||
return result;
|
||||
}
|
||||
|
||||
public boolean isEnabled() {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "WildcardConfiguration{" +
|
||||
"anyWords=" + anyWords +
|
||||
", enabled=" + enabled +
|
||||
", singleWord=" + singleWord +
|
||||
", delimiter=" + delimiter +
|
||||
'}';
|
||||
}
|
||||
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
|
@ -90,4 +99,10 @@ public class WildcardConfiguration implements Serializable {
|
|||
this.singleWord = singleWord;
|
||||
}
|
||||
|
||||
public String convert(String filter, WildcardConfiguration to) {
|
||||
return filter.replace(getDelimiter(), to.getDelimiter())
|
||||
.replace(getSingleWord(), to.getSingleWord())
|
||||
.replace(getAnyWords(), to.getAnyWords());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -93,15 +93,15 @@ public class AddressImpl implements Address {
|
|||
for (; matchPos < add.getAddressParts().length; ) {
|
||||
if (pos >= addressParts.length) {
|
||||
// test for # as last address part
|
||||
return pos + 1 == add.getAddressParts().length && add.getAddressParts()[pos].equals(WildcardAddressManager.ANY_WORDS_SIMPLESTRING);
|
||||
return pos + 1 == add.getAddressParts().length && add.getAddressParts()[pos].equals(new SimpleString(wildcardConfiguration.getAnyWords()));
|
||||
}
|
||||
SimpleString curr = addressParts[pos];
|
||||
SimpleString next = addressParts.length > pos + 1 ? addressParts[pos + 1] : null;
|
||||
SimpleString currMatch = add.getAddressParts()[matchPos];
|
||||
if (currMatch.equals(WildcardAddressManager.SINGLE_WORD_SIMPLESTRING)) {
|
||||
if (currMatch.equals(new SimpleString(wildcardConfiguration.getSingleWord()))) {
|
||||
pos++;
|
||||
matchPos++;
|
||||
} else if (currMatch.equals(WildcardAddressManager.ANY_WORDS_SIMPLESTRING)) {
|
||||
} else if (currMatch.equals(new SimpleString(wildcardConfiguration.getAnyWords()))) {
|
||||
if (matchPos == addressParts.length - 1) {
|
||||
pos++;
|
||||
matchPos++;
|
||||
|
|
|
@ -34,16 +34,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
|
|||
*/
|
||||
public class WildcardAddressManager extends SimpleAddressManager {
|
||||
|
||||
static final char SINGLE_WORD = '*';
|
||||
|
||||
static final char ANY_WORDS = '#';
|
||||
|
||||
static final char DELIM = '.';
|
||||
|
||||
static final SimpleString SINGLE_WORD_SIMPLESTRING = new SimpleString("*");
|
||||
|
||||
static final SimpleString ANY_WORDS_SIMPLESTRING = new SimpleString("#");
|
||||
|
||||
/**
|
||||
* These are all the addresses, we use this so we can link back from the actual address to its linked wilcard addresses
|
||||
* or vice versa
|
||||
|
@ -175,7 +165,6 @@ public class WildcardAddressManager extends SimpleAddressManager {
|
|||
if (actualAddress.matches(destAdd)) {
|
||||
destAdd.addLinkedAddress(actualAddress);
|
||||
actualAddress.addLinkedAddress(destAdd);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
public class MQTTOpenwireTest extends MQTTTestSupport {
|
||||
|
||||
protected static final int NUM_MESSAGES = 1;
|
||||
|
||||
@Override
|
||||
public void configureBroker() throws Exception {
|
||||
super.configureBroker();
|
||||
WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
|
||||
wildcardConfiguration.setDelimiter('.');
|
||||
wildcardConfiguration.setSingleWord('*');
|
||||
wildcardConfiguration.setAnyWords('>');
|
||||
server.getConfiguration().setWildCardConfiguration(wildcardConfiguration);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createJMSConnection() throws Exception {
|
||||
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.cacheEnabled=true");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWildcards() throws Exception {
|
||||
doTestSendJMSReceiveMQTT("foo.bar", "foo/+");
|
||||
doTestSendJMSReceiveMQTT("foo.bar", "foo/#");
|
||||
doTestSendJMSReceiveMQTT("foo.bar.har", "foo/#");
|
||||
doTestSendJMSReceiveMQTT("foo.bar.har", "foo/+/+");
|
||||
doTestSendMQTTReceiveJMS("foo/bah", "foo.*");
|
||||
doTestSendMQTTReceiveJMS("foo/bah", "foo.>");
|
||||
doTestSendMQTTReceiveJMS("foo/bah/hah", "foo.*.*");
|
||||
doTestSendMQTTReceiveJMS("foo/bah/har", "foo.>");
|
||||
}
|
||||
|
||||
public void doTestSendMQTTReceiveJMS(String mqttTopic, String jmsDestination) throws Exception {
|
||||
final MQTTClientProvider provider = getMQTTClientProvider();
|
||||
initializeConnection(provider);
|
||||
|
||||
ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
|
||||
|
||||
try {
|
||||
// MUST set to true to receive retained messages
|
||||
activeMQConnection.setUseRetroactiveConsumer(true);
|
||||
activeMQConnection.start();
|
||||
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Topic jmsTopic = s.createTopic(jmsDestination);
|
||||
MessageConsumer consumer = s.createConsumer(jmsTopic);
|
||||
|
||||
// send retained message
|
||||
final String RETAINED = "RETAINED";
|
||||
provider.publish(mqttTopic, RETAINED.getBytes(), AT_LEAST_ONCE, true);
|
||||
|
||||
// check whether we received retained message on JMS subscribe
|
||||
ActiveMQMessage message = (ActiveMQMessage) consumer.receive(2000);
|
||||
assertNotNull("Should get retained message " + mqttTopic + "->" + jmsDestination, message);
|
||||
ByteSequence bs = message.getContent();
|
||||
assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
|
||||
|
||||
for (int i = 0; i < 1; i++) {
|
||||
String payload = "Test Message: " + i;
|
||||
provider.publish(mqttTopic, payload.getBytes(), AT_LEAST_ONCE);
|
||||
message = (ActiveMQMessage) consumer.receive(1000);
|
||||
assertNotNull("Should get a message " + mqttTopic + "->" + jmsDestination, message);
|
||||
bs = message.getContent();
|
||||
assertEquals(payload, new String(bs.data, bs.offset, bs.length));
|
||||
}
|
||||
} finally {
|
||||
activeMQConnection.close();
|
||||
provider.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
public void doTestSendJMSReceiveMQTT(String jmsDestination, String mqttTopic) throws Exception {
|
||||
final MQTTClientProvider provider = getMQTTClientProvider();
|
||||
initializeConnection(provider);
|
||||
|
||||
ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
|
||||
try {
|
||||
activeMQConnection.setUseRetroactiveConsumer(true);
|
||||
activeMQConnection.start();
|
||||
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Topic jmsTopic = s.createTopic(jmsDestination);
|
||||
MessageProducer producer = s.createProducer(jmsTopic);
|
||||
|
||||
final String RETAINED = "RETAINED";
|
||||
provider.subscribe(mqttTopic, AT_MOST_ONCE);
|
||||
|
||||
// send retained message from JMS
|
||||
TextMessage sendMessage = s.createTextMessage(RETAINED);
|
||||
// mark the message to be retained
|
||||
sendMessage.setBooleanProperty("ActiveMQ.Retain", true);
|
||||
// MQTT QoS can be set using MQTTProtocolConverter.QOS_PROPERTY_NAME property
|
||||
sendMessage.setIntProperty("ActiveMQ.MQTT.QoS", 0);
|
||||
producer.send(sendMessage);
|
||||
|
||||
byte[] message = provider.receive(2000);
|
||||
assertNotNull("Should get retained message " + jmsDestination + "->" + mqttTopic, message);
|
||||
assertEquals(RETAINED, new String(message));
|
||||
|
||||
for (int i = 0; i < 1; i++) {
|
||||
String payload = "This is Test Message: " + i;
|
||||
sendMessage = s.createTextMessage(payload);
|
||||
producer.send(sendMessage);
|
||||
message = provider.receive(1000);
|
||||
assertNotNull("Should get a message " + jmsDestination + "->" + mqttTopic, message);
|
||||
|
||||
assertEquals(payload, new String(message));
|
||||
}
|
||||
} finally {
|
||||
activeMQConnection.close();
|
||||
provider.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -67,8 +67,6 @@ public class MQTTTest extends MQTTTestSupport {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
|
||||
|
||||
private static final int NUM_MESSAGES = 250;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.net.ssl.KeyManager;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.TrustManager;
|
||||
|
@ -58,18 +59,20 @@ import static java.util.Collections.singletonList;
|
|||
|
||||
public class MQTTTestSupport extends ActiveMQTestBase {
|
||||
|
||||
private ActiveMQServer server;
|
||||
protected ActiveMQServer server;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
|
||||
|
||||
protected int port = 1883;
|
||||
protected ActiveMQConnectionFactory cf;
|
||||
protected ConnectionFactory cf;
|
||||
protected LinkedList<Throwable> exceptions = new LinkedList<>();
|
||||
protected boolean persistent;
|
||||
protected String protocolConfig;
|
||||
protected String protocolScheme;
|
||||
protected boolean useSSL;
|
||||
|
||||
protected static final int NUM_MESSAGES = 250;
|
||||
|
||||
public static final int AT_MOST_ONCE = 0;
|
||||
public static final int AT_LEAST_ONCE = 1;
|
||||
public static final int EXACTLY_ONCE = 2;
|
||||
|
@ -80,7 +83,6 @@ public class MQTTTestSupport extends ActiveMQTestBase {
|
|||
public MQTTTestSupport() {
|
||||
this.protocolScheme = "mqtt";
|
||||
this.useSSL = false;
|
||||
cf = new ActiveMQConnectionFactory(false, new TransportConfiguration(ActiveMQTestBase.NETTY_CONNECTOR_FACTORY));
|
||||
}
|
||||
|
||||
public File basedir() throws IOException {
|
||||
|
@ -110,6 +112,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
|
|||
|
||||
exceptions.clear();
|
||||
startBroker();
|
||||
createJMSConnection();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -125,7 +128,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
|
|||
super.tearDown();
|
||||
}
|
||||
|
||||
public void startBroker() throws Exception {
|
||||
public void configureBroker() throws Exception {
|
||||
// TODO Add SSL
|
||||
super.setUp();
|
||||
server = createServerForMQTT();
|
||||
|
@ -137,10 +140,18 @@ public class MQTTTestSupport extends ActiveMQTestBase {
|
|||
addressSettings.setAutoCreateAddresses(true);
|
||||
|
||||
server.getAddressSettingsRepository().addMatch("#", addressSettings);
|
||||
}
|
||||
|
||||
public void startBroker() throws Exception {
|
||||
configureBroker();
|
||||
server.start();
|
||||
server.waitForActivation(10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void createJMSConnection() throws Exception {
|
||||
cf = new ActiveMQConnectionFactory(false, new TransportConfiguration(ActiveMQTestBase.NETTY_CONNECTOR_FACTORY));
|
||||
}
|
||||
|
||||
private ActiveMQServer createServerForMQTT() throws Exception {
|
||||
Configuration defaultConfig = createDefaultConfig(true).setIncomingInterceptorClassNames(singletonList(MQTTIncomingInterceptor.class.getName())).setOutgoingInterceptorClassNames(singletonList(MQTTOutoingInterceptor.class.getName()));
|
||||
AddressSettings addressSettings = new AddressSettings();
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.openwire;
|
||||
|
||||
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -26,15 +27,15 @@ public class OpenWireUtilTest {
|
|||
@Test
|
||||
public void testWildcardConversion() throws Exception {
|
||||
String amqTarget = "TEST.ONE.>";
|
||||
String coreTarget = OpenWireUtil.convertWildcard(amqTarget);
|
||||
String coreTarget = OpenWireUtil.OPENWIRE_WILDCARD.convert(amqTarget, new WildcardConfiguration());
|
||||
assertEquals("TEST.ONE.#", coreTarget);
|
||||
|
||||
amqTarget = "TEST.*.ONE";
|
||||
coreTarget = OpenWireUtil.convertWildcard(amqTarget);
|
||||
coreTarget = OpenWireUtil.OPENWIRE_WILDCARD.convert(amqTarget, new WildcardConfiguration());
|
||||
assertEquals("TEST.*.ONE", coreTarget);
|
||||
|
||||
amqTarget = "a.*.>.>";
|
||||
coreTarget = OpenWireUtil.convertWildcard(amqTarget);
|
||||
assertEquals("a.*.#", coreTarget);
|
||||
coreTarget = OpenWireUtil.OPENWIRE_WILDCARD.convert(amqTarget, new WildcardConfiguration());
|
||||
assertEquals("a.*.#.#", coreTarget);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue